package org.apache.flink.streaming.api.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.python.collector.RunnerOutputCollector;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator.class */
public abstract class AbstractOneInputPythonFunctionOperator<IN, OUT> extends AbstractDataStreamPythonFunctionOperator<OUT> implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    private final TypeInformation<IN> inputTypeInfo;
    private transient TypeInformation<Row> runnerInputTypeInfo;
    private transient TypeInformation<Row> runnerOutputTypeInfo;
    private transient TypeSerializer<Row> runnerInputTypeSerializer;
    private transient TypeSerializer<Row> runnerOutputTypeSerializer;
    private transient ByteArrayInputStreamWithPos bais;
    private transient DataInputViewStreamWrapper baisWrapper;
    protected transient ByteArrayOutputStreamWithPos baos;
    protected transient DataOutputViewStreamWrapper baosWrapper;
    private transient RunnerInputHandler runnerInputHandler;
    private transient RunnerOutputCollector<OUT> runnerOutputCollector;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/python/AbstractOneInputPythonFunctionOperator$RunnerInputHandler.class */
    private static final class RunnerInputHandler {
        private final Row reusableRunnerInput = new Row(3);

        public Row buildRunnerInputData(long j, long j2, Object obj) {
            this.reusableRunnerInput.setField(0, Long.valueOf(j));
            this.reusableRunnerInput.setField(1, Long.valueOf(j2));
            this.reusableRunnerInput.setField(2, obj);
            return this.reusableRunnerInput;
        }

        public static TypeInformation<Row> getRunnerInputTypeInfo(TypeInformation<?> typeInformation) {
            return Types.ROW(new TypeInformation[]{Types.LONG, Types.LONG, typeInformation});
        }
    }

    public AbstractOneInputPythonFunctionOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation2);
        this.inputTypeInfo = (TypeInformation) Preconditions.checkNotNull(typeInformation);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.bais = new ByteArrayInputStreamWithPos();
        this.baisWrapper = new DataInputViewStreamWrapper(this.bais);
        this.baos = new ByteArrayOutputStreamWithPos();
        this.baosWrapper = new DataOutputViewStreamWrapper(this.baos);
        this.runnerInputTypeInfo = RunnerInputHandler.getRunnerInputTypeInfo(this.inputTypeInfo);
        this.runnerOutputTypeInfo = RunnerOutputCollector.getRunnerOutputTypeInfo(getProducedType());
        this.runnerInputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerInputTypeInfo);
        this.runnerOutputTypeSerializer = PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(this.runnerOutputTypeInfo);
        this.runnerInputHandler = new RunnerInputHandler();
        this.runnerOutputCollector = new RunnerOutputCollector<>(new TimestampedCollector(this.output));
        super.open();
    }

    public void endInput() throws Exception {
        invokeFinishBundle();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        String str = (String) tuple3.f0;
        this.bais.setBuffer((byte[]) tuple3.f1, 0, ((Integer) tuple3.f2).intValue());
        if (str.equals("")) {
            this.runnerOutputCollector.collect((Row) this.runnerOutputTypeSerializer.deserialize(this.baisWrapper));
        } else {
            this.runnerOutputCollector.collect(getOutputTagById(str), (Row) getSideOutputTypeSerializerById(str).deserialize(this.baisWrapper));
        }
    }

    public void processElement(long j, long j2, Object obj) throws Exception {
        this.runnerInputTypeSerializer.serialize(this.runnerInputHandler.buildRunnerInputData(j, j2, obj), this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }

    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor() {
        return ProtoUtils.createRawTypeCoderInfoDescriptorProto(this.runnerInputTypeInfo, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor() {
        return ProtoUtils.createRawTypeCoderInfoDescriptorProto(this.runnerOutputTypeInfo, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    public TypeInformation<IN> getInputTypeInfo() {
        return this.inputTypeInfo;
    }
}
