package org.apache.flink.table.runtime.operators.python.aggregate.arrow;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator;
import org.apache.flink.table.runtime.operators.python.utils.StreamRecordRowDataWrappingCollector;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/arrow/AbstractArrowPythonAggregateFunctionOperator.class */
public abstract class AbstractArrowPythonAggregateFunctionOperator extends AbstractStatelessFunctionOperator<RowData, RowData, RowData> {
    private static final long serialVersionUID = 1;
    private static final String PANDAS_AGGREGATE_FUNCTION_URN = "flink:transform:aggregate_function:arrow:v1";
    protected final PythonFunctionInfo[] pandasAggFunctions;
    private final GeneratedProjection udafInputGeneratedProjection;
    protected transient ArrowSerializer arrowSerializer;
    protected transient StreamRecordRowDataWrappingCollector rowDataWrapper;
    protected transient JoinedRowData reuseJoinedRow;
    protected transient int currentBatchCount;
    private transient Projection<RowData, BinaryRowData> udafInputProjection;

    public AbstractArrowPythonAggregateFunctionOperator(Configuration configuration, PythonFunctionInfo[] pythonFunctionInfoArr, RowType rowType, RowType rowType2, RowType rowType3, GeneratedProjection generatedProjection) {
        super(configuration, rowType, rowType2, rowType3);
        this.pandasAggFunctions = (PythonFunctionInfo[]) Preconditions.checkNotNull(pythonFunctionInfoArr);
        this.udafInputGeneratedProjection = (GeneratedProjection) Preconditions.checkNotNull(generatedProjection);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator, org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.rowDataWrapper = new StreamRecordRowDataWrappingCollector(this.output);
        this.reuseJoinedRow = new JoinedRowData();
        this.udafInputProjection = (Projection) this.udafInputGeneratedProjection.newInstance(Thread.currentThread().getContextClassLoader());
        this.arrowSerializer = new ArrowSerializer(this.udfInputType, this.udfOutputType);
        this.arrowSerializer.open(this.bais, this.baos);
        this.currentBatchCount = 0;
    }

    @Override // org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void close() throws Exception {
        super.close();
        if (this.arrowSerializer != null) {
            this.arrowSerializer.close();
            this.arrowSerializer = null;
        }
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        bufferInput(rowData);
        processElementInternal(rowData);
        emitResults();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public boolean isBundleFinished() {
        return this.elementCount == 0 && this.currentBatchCount == 0;
    }

    @Override // org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator
    public PythonEnv getPythonEnv() {
        return this.pandasAggFunctions[0].getPythonFunction().getPythonEnv();
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public String getFunctionUrn() {
        return PANDAS_AGGREGATE_FUNCTION_URN;
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createArrowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createArrowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public RowData getFunctionInput(RowData rowData) {
        return this.udafInputProjection.apply(rowData);
    }

    @Override // org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator
    public FlinkFnApi.UserDefinedFunctions createUserDefinedFunctionsProto() {
        return ProtoUtils.createUserDefinedFunctionsProto(this.pandasAggFunctions, ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue());
    }
}
