package org.apache.flink.streaming.api.operators.python.embedded;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import pemja.core.object.PyIterator;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.class */
public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, OUT> extends AbstractEmbeddedDataStreamPythonFunctionOperator<OUT> implements TwoInputStreamOperator<IN1, IN2, OUT>, BoundedMultiInput {
    private static final long serialVersionUID = 1;
    private final TypeInformation<IN1> inputTypeInfo1;
    private final TypeInformation<IN2> inputTypeInfo2;
    private PythonTypeUtils.DataConverter<IN1, Object> inputDataConverter1;
    private PythonTypeUtils.DataConverter<IN2, Object> inputDataConverter2;
    protected transient long timestamp;

    public AbstractTwoInputEmbeddedPythonFunctionOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation3);
        this.inputTypeInfo1 = (TypeInformation) Preconditions.checkNotNull(typeInformation);
        this.inputTypeInfo2 = (TypeInformation) Preconditions.checkNotNull(typeInformation2);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.inputDataConverter1 = PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(this.inputTypeInfo1);
        this.inputDataConverter2 = PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(this.inputTypeInfo2);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator
    public void openPythonInterpreter() {
        this.interpreter.set("function_protos", createUserDefinedFunctionsProto().stream().map((v0) -> {
            return v0.toByteArray();
        }).collect(Collectors.toList()));
        this.interpreter.set("input_coder_info1", ProtoUtils.createRawTypeCoderInfoDescriptorProto(getInputTypeInfo1(), FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false, null).toByteArray());
        this.interpreter.set("input_coder_info2", ProtoUtils.createRawTypeCoderInfoDescriptorProto(getInputTypeInfo2(), FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false, null).toByteArray());
        this.interpreter.set("output_coder_info", ProtoUtils.createRawTypeCoderInfoDescriptorProto(this.outputTypeInfo, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, false, null).toByteArray());
        this.interpreter.set("runtime_context", getRuntimeContext());
        this.interpreter.set("function_context", getFunctionContext());
        this.interpreter.set("timer_context", getTimerContext());
        this.interpreter.set("side_output_context", this.sideOutputContext);
        this.interpreter.set("keyed_state_backend", getKeyedStateBackend());
        this.interpreter.set("job_parameters", getJobParameters());
        this.interpreter.set("operator_state_backend", getOperatorStateBackend());
        this.interpreter.exec("from pyflink.fn_execution.embedded.operation_utils import create_two_input_user_defined_data_stream_function_from_protos");
        this.interpreter.exec("operation = create_two_input_user_defined_data_stream_function_from_protos(function_protos,input_coder_info1,input_coder_info2,output_coder_info,runtime_context,function_context,timer_context,side_output_context,job_parameters,keyed_state_backend,operator_state_backend)");
        this.interpreter.invokeMethod("operation", "open", new Object[0]);
    }

    public void endInput(int i) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void close() throws Exception {
        if (this.interpreter != null) {
            this.interpreter.invokeMethod("operation", "close", new Object[0]);
        }
        super.close();
    }

    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        this.collector.setTimestamp(streamRecord);
        this.timestamp = streamRecord.getTimestamp();
        PyIterator pyIterator = (PyIterator) this.interpreter.invokeMethod("operation", "process_element1", this.inputDataConverter1.toExternal(streamRecord.getValue()));
        while (pyIterator.hasNext()) {
            this.collector.collect(this.outputDataConverter.toInternal(pyIterator.next()));
        }
        pyIterator.close();
    }

    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        this.collector.setTimestamp(streamRecord);
        this.timestamp = streamRecord.getTimestamp();
        PyIterator pyIterator = (PyIterator) this.interpreter.invokeMethod("operation", "process_element2", this.inputDataConverter2.toExternal(streamRecord.getValue()));
        while (pyIterator.hasNext()) {
            this.collector.collect(this.outputDataConverter.toInternal(pyIterator.next()));
        }
        pyIterator.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TypeInformation<IN1> getInputTypeInfo1() {
        return this.inputTypeInfo1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TypeInformation<IN2> getInputTypeInfo2() {
        return this.inputTypeInfo2;
    }

    public abstract List<FlinkFnApi.UserDefinedDataStreamFunction> createUserDefinedFunctionsProto();

    public abstract Object getFunctionContext();

    public abstract Object getTimerContext();
}
