package org.apache.flink.streaming.api.operators.python.embedded;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.types.Row;
import pemja.core.object.PyIterator;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.class */
public class EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT> extends AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, OUT> implements Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID = 1;
    private transient TypeInformation<K> keyTypeInfo;
    private transient EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>.ContextImpl context;
    private transient EmbeddedPythonKeyedCoProcessOperator<K, IN1, IN2, OUT>.OnTimerContextImpl onTimerContext;
    private transient PythonTypeUtils.DataConverter<K, Object> keyConverter;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator$ContextImpl.class */
    private class ContextImpl {
        private final TimerService timerService;

        ContextImpl(TimerService timerService) {
            this.timerService = timerService;
        }

        public long timestamp() {
            return EmbeddedPythonKeyedCoProcessOperator.this.timestamp;
        }

        public TimerService timerService() {
            return this.timerService;
        }

        public Object getCurrentKey() {
            return EmbeddedPythonKeyedCoProcessOperator.this.keyConverter.toExternal(((Row) EmbeddedPythonKeyedCoProcessOperator.this.getCurrentKey()).getField(0));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator$OnTimerContextImpl.class */
    public class OnTimerContextImpl {
        private final TimerService timerService;
        private TimeDomain timeDomain;
        private InternalTimer<K, VoidNamespace> timer;

        OnTimerContextImpl(TimerService timerService) {
            this.timerService = timerService;
        }

        public long timestamp() {
            return this.timer.getTimestamp();
        }

        public TimerService timerService() {
            return this.timerService;
        }

        public int timeDomain() {
            return this.timeDomain.ordinal();
        }

        public Object getCurrentKey() {
            return EmbeddedPythonKeyedCoProcessOperator.this.keyConverter.toExternal(((Row) this.timer.getKey()).getField(0));
        }
    }

    public EmbeddedPythonKeyedCoProcessOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation, typeInformation2, typeInformation3);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractTwoInputEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.keyTypeInfo = getInputTypeInfo1().getTypeAt(0);
        this.keyConverter = PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(this.keyTypeInfo);
        SimpleTimerService simpleTimerService = new SimpleTimerService(getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this));
        this.context = new ContextImpl(simpleTimerService);
        this.onTimerContext = new OnTimerContextImpl(simpleTimerService);
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractTwoInputEmbeddedPythonFunctionOperator
    public List<FlinkFnApi.UserDefinedDataStreamFunction> createUserDefinedFunctionsProto() {
        return ProtoUtils.createUserDefinedDataStreamStatefulFunctionProtos(getPythonFunctionInfo(), getRuntimeContext(), getJobParameters(), this.keyTypeInfo, PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend()), ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue(), this.hasSideOutput, ((Integer) this.config.get(PythonOptions.STATE_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_READ_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_WRITE_CACHE_SIZE)).intValue());
    }

    public void onEventTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        this.collector.setAbsoluteTimestamp(internalTimer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, internalTimer);
    }

    public void onProcessingTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        this.collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, internalTimer);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractTwoInputEmbeddedPythonFunctionOperator
    public Object getFunctionContext() {
        return this.context;
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractTwoInputEmbeddedPythonFunctionOperator
    public Object getTimerContext() {
        return this.onTimerContext;
    }

    @Override // org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator
    public <T> AbstractEmbeddedDataStreamPythonFunctionOperator<T> copy(DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<T> typeInformation) {
        return new EmbeddedPythonKeyedCoProcessOperator(this.config, dataStreamPythonFunctionInfo, getInputTypeInfo1(), getInputTypeInfo2(), typeInformation);
    }

    private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = timeDomain;
        ((OnTimerContextImpl) this.onTimerContext).timer = internalTimer;
        PyIterator pyIterator = (PyIterator) this.interpreter.invokeMethod("operation", "on_timer", Long.valueOf(internalTimer.getTimestamp()));
        while (pyIterator.hasNext()) {
            this.collector.collect(this.outputDataConverter.toInternal(pyIterator.next()));
        }
        pyIterator.close();
        ((OnTimerContextImpl) this.onTimerContext).timeDomain = null;
        ((OnTimerContextImpl) this.onTimerContext).timer = null;
    }
}
