package org.apache.flink.streaming.api.operators.python;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.utils.ClassLeakCleaner;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.class */
public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStreamOperator<OUT> {
    private static final long serialVersionUID = 1;
    protected final Configuration config;
    protected transient int maxBundleSize;
    protected transient int elementCount;
    private transient long maxBundleTimeMills;
    protected transient long lastFinishBundleTime;
    private transient ScheduledFuture<?> checkFinishBundleTimer;
    protected transient Runnable bundleFinishedCallback;

    public AbstractPythonFunctionOperator(Configuration configuration) {
        this.config = (Configuration) Preconditions.checkNotNull(configuration);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        try {
            this.maxBundleSize = ((Integer) this.config.get(PythonOptions.MAX_BUNDLE_SIZE)).intValue();
            if (this.maxBundleSize <= 0) {
                this.maxBundleSize = ((Integer) PythonOptions.MAX_BUNDLE_SIZE.defaultValue()).intValue();
                LOG.warn("Invalid value for the maximum bundle size. Using default value of " + this.maxBundleSize + '.');
            } else {
                LOG.info("The maximum bundle size is configured to {}.", Integer.valueOf(this.maxBundleSize));
            }
            this.maxBundleTimeMills = ((Long) this.config.get(PythonOptions.MAX_BUNDLE_TIME_MILLS)).longValue();
            if (this.maxBundleTimeMills <= 0) {
                this.maxBundleTimeMills = ((Long) PythonOptions.MAX_BUNDLE_TIME_MILLS.defaultValue()).longValue();
                LOG.warn("Invalid value for the maximum bundle time. Using default value of " + this.maxBundleTimeMills + '.');
            } else {
                LOG.info("The maximum bundle time is configured to {} milliseconds.", Long.valueOf(this.maxBundleTimeMills));
            }
            this.elementCount = 0;
            this.lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime();
            long max = Math.max(this.maxBundleTimeMills, 1L);
            this.checkFinishBundleTimer = getProcessingTimeService().scheduleAtFixedRate(j -> {
                checkInvokeFinishBundleByTime();
            }, max, max);
        } finally {
            super.open();
        }
    }

    public void finish() throws Exception {
        try {
            invokeFinishBundle();
        } finally {
            super.finish();
        }
    }

    public void close() throws Exception {
        try {
            if (this.checkFinishBundleTimer != null) {
                this.checkFinishBundleTimer.cancel(true);
                this.checkFinishBundleTimer = null;
            }
        } finally {
            super.close();
            try {
                ClassLeakCleaner.cleanUpLeakingClasses(getClass().getClassLoader());
            } catch (Throwable th) {
                LOG.warn("Failed to clean up the leaking objects.", th);
            }
        }
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        try {
            invokeFinishBundle();
        } finally {
            super.prepareSnapshotPreBarrier(j);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (getTimeServiceManager().isPresent()) {
            ((InternalTimeServiceManager) getTimeServiceManager().get()).advanceWatermark(watermark);
        }
        if (watermark.getTimestamp() == Long.MAX_VALUE) {
            invokeFinishBundle();
            processElementsOfCurrentKeyIfNeeded(null);
            advanceWatermark(watermark);
            this.output.emitWatermark(watermark);
            return;
        }
        if (isBundleFinished()) {
            this.output.emitWatermark(watermark);
        } else {
            this.bundleFinishedCallback = () -> {
                try {
                    advanceWatermark(watermark);
                    this.output.emitWatermark(watermark);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to process watermark after finished bundle.", e);
                }
            };
        }
    }

    public void setCurrentKey(Object obj) {
        processElementsOfCurrentKeyIfNeeded(obj);
        super.setCurrentKey(obj);
    }

    private void processElementsOfCurrentKeyIfNeeded(Object obj) {
        if (getKeyedStateStore() == null || !PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend()) || Objects.equals(obj, getCurrentKey())) {
            return;
        }
        while (!isBundleFinished()) {
            try {
                invokeFinishBundle();
                fireAllRegisteredTimers(obj);
            } catch (Exception e) {
                throw new WrappingRuntimeException(e);
            }
        }
    }

    private void fireAllRegisteredTimers(Object obj) throws Exception {
        Field declaredField = BatchExecutionKeyedStateBackend.class.getDeclaredField("keySelectionListeners");
        declaredField.setAccessible(true);
        for (KeyedStateBackend.KeySelectionListener keySelectionListener : (List) declaredField.get(getKeyedStateBackend())) {
            if (keySelectionListener instanceof BatchExecutionInternalTimeServiceManager) {
                keySelectionListener.keySelected(obj);
                keySelectionListener.keySelected(getCurrentKey());
            }
        }
    }

    public boolean isBundleFinished() {
        return this.elementCount == 0;
    }

    public Configuration getConfiguration() {
        return this.config;
    }

    protected abstract void invokeFinishBundle() throws Exception;

    protected abstract PythonEnvironmentManager createPythonEnvironmentManager();

    private void advanceWatermark(Watermark watermark) throws Exception {
        if (getTimeServiceManager().isPresent()) {
            InternalTimeServiceManager internalTimeServiceManager = (InternalTimeServiceManager) getTimeServiceManager().get();
            internalTimeServiceManager.advanceWatermark(watermark);
            while (!isBundleFinished()) {
                invokeFinishBundle();
                internalTimeServiceManager.advanceWatermark(watermark);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkInvokeFinishBundleByCount() throws Exception {
        if (this.elementCount >= this.maxBundleSize) {
            invokeFinishBundle();
        }
    }

    private void checkInvokeFinishBundleByTime() throws Exception {
        if (getProcessingTimeService().getCurrentProcessingTime() - this.lastFinishBundleTime >= this.maxBundleTimeMills) {
            invokeFinishBundle();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkMetricContainer getFlinkMetricContainer() {
        if (((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue()) {
            return new FlinkMetricContainer(getRuntimeContext().getMetricGroup());
        }
        return null;
    }
}
