package org.apache.flink.streaming.api.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.python.Constants;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/PythonCoProcessOperator.class */
public class PythonCoProcessOperator<IN1, IN2, OUT> extends AbstractTwoInputPythonFunctionOperator<IN1, IN2, OUT> {
    private static final long serialVersionUID = 2;
    private transient long currentWatermark;

    public PythonCoProcessOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation, typeInformation2, typeInformation3);
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractTwoInputPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        super.open();
        this.currentWatermark = Long.MIN_VALUE;
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator
    public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
        return new BeamDataStreamPythonFunctionRunner(getRuntimeContext().getTaskName(), createPythonEnvironmentManager(), Constants.STATELESS_FUNCTION_URN, ProtoUtils.createUserDefinedDataStreamFunctionProtos(getPythonFunctionInfo(), getRuntimeContext(), getInternalParameters(), PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend()), ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue(), getSideOutputTags().size() > 0, ((Integer) this.config.get(PythonOptions.STATE_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_READ_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_WRITE_CACHE_SIZE)).intValue()), getFlinkMetricContainer(), null, getOperatorStateBackend(), null, null, null, getContainingTask().getEnvironment().getMemoryManager(), getOperatorConfig().getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.PYTHON, getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), getContainingTask().getEnvironment().getUserCodeClassLoader().asClassLoader()), createInputCoderInfoDescriptor(), createOutputCoderInfoDescriptor(), null, createSideOutputCoderDescriptors());
    }

    public void processElement1(StreamRecord<IN1> streamRecord) throws Exception {
        processElement(true, streamRecord.getTimestamp(), this.currentWatermark, streamRecord.getValue());
    }

    public void processElement2(StreamRecord<IN2> streamRecord) throws Exception {
        processElement(false, streamRecord.getTimestamp(), this.currentWatermark, streamRecord.getValue());
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.currentWatermark = watermark.getTimestamp();
    }

    @Override // org.apache.flink.streaming.api.operators.python.AbstractDataStreamPythonFunctionOperator
    public <T> AbstractDataStreamPythonFunctionOperator<T> copy(DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<T> typeInformation) {
        return new PythonCoProcessOperator(this.config, dataStreamPythonFunctionInfo, getLeftInputType(), getRightInputType(), typeInformation);
    }
}
