package org.apache.beam.runners.flink.translation.wrappers.streaming;

import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.class */
public class DoFnOperator<InputT, FnOutputT, OutputT> extends AbstractStreamOperator<OutputT> implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>, KeyGroupCheckpointedOperator, Triggerable<Object, TimerInternals.TimerData> {
    protected DoFn<InputT, FnOutputT> doFn;
    protected final SerializedPipelineOptions serializedOptions;
    protected final TupleTag<FnOutputT> mainOutputTag;
    protected final List<TupleTag<?>> sideOutputTags;
    protected final Collection<PCollectionView<?>> sideInputs;
    protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final OutputManagerFactory<OutputT> outputManagerFactory;
    protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
    protected transient SideInputHandler sideInputHandler;
    protected transient SideInputReader sideInputReader;
    protected transient DoFnRunners.OutputManager outputManager;
    private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
    protected transient long currentInputWatermark;
    protected transient long currentOutputWatermark;
    private transient StateTag<Object, BagState<WindowedValue<InputT>>> pushedBackTag;
    protected transient FlinkStateInternals<?> stateInternals;
    private Coder<WindowedValue<InputT>> inputCoder;
    private final Coder<?> keyCoder;
    private final TimerInternals.TimerDataCoder timerCoder;
    protected transient HeapInternalTimerService<?, TimerInternals.TimerData> timerService;
    protected transient DoFnOperator<InputT, FnOutputT, OutputT>.FlinkTimerInternals timerInternals;
    private transient StateInternals<?> pushbackStateInternals;
    private transient Optional<Long> pushedBackWatermark;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$DefaultOutputManagerFactory.class */
    public static class DefaultOutputManagerFactory<OutputT> implements OutputManagerFactory<OutputT> {
        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.DefaultOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    output.collect(new StreamRecord(windowedValue));
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$FlinkTimerInternals.class */
    public class FlinkTimerInternals implements TimerInternals {
        private FlinkTimerInternals() {
        }

        public void setTimer(StateNamespace stateNamespace, String str, Instant instant, TimeDomain timeDomain) {
            setTimer(TimerInternals.TimerData.of(str, stateNamespace, instant, timeDomain));
        }

        @Deprecated
        public void setTimer(TimerInternals.TimerData timerData) {
            long millis = timerData.getTimestamp().getMillis();
            if (timerData.getDomain().equals(TimeDomain.EVENT_TIME)) {
                DoFnOperator.this.timerService.registerEventTimeTimer(timerData, millis);
            } else {
                if (!timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
                }
                DoFnOperator.this.timerService.registerProcessingTimeTimer(timerData, millis);
            }
        }

        @Deprecated
        public void deleteTimer(StateNamespace stateNamespace, String str) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        public void deleteTimer(StateNamespace stateNamespace, String str, TimeDomain timeDomain) {
            throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
        }

        @Deprecated
        public void deleteTimer(TimerInternals.TimerData timerData) {
            long millis = timerData.getTimestamp().getMillis();
            if (timerData.getDomain().equals(TimeDomain.EVENT_TIME)) {
                DoFnOperator.this.timerService.deleteEventTimeTimer(timerData, millis);
            } else {
                if (!timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
                    throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
                }
                DoFnOperator.this.timerService.deleteProcessingTimeTimer(timerData, millis);
            }
        }

        public Instant currentProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        @Nullable
        public Instant currentSynchronizedProcessingTime() {
            return new Instant(DoFnOperator.this.timerService.currentProcessingTime());
        }

        public Instant currentInputWatermarkTime() {
            return new Instant(Math.min(DoFnOperator.this.currentInputWatermark, DoFnOperator.this.getPushbackWatermarkHold()));
        }

        @Nullable
        public Instant currentOutputWatermarkTime() {
            return new Instant(DoFnOperator.this.currentOutputWatermark);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$MultiOutputOutputManagerFactory.class */
    public static class MultiOutputOutputManagerFactory implements OutputManagerFactory<RawUnionValue> {
        Map<TupleTag<?>, Integer> mapping;

        public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> map) {
            this.mapping = map;
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    output.collect(new StreamRecord(new RawUnionValue(MultiOutputOutputManagerFactory.this.mapping.get(tupleTag).intValue(), windowedValue)));
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$OutputManagerFactory.class */
    public interface OutputManagerFactory<OutputT> extends Serializable {
        DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$StateInternalsStateCleaner.class */
    public static class StateInternalsStateCleaner<W extends BoundedWindow> implements StatefulDoFnRunner.StateCleaner<W> {
        private final DoFn<?, ?> fn;
        private final DoFnSignature signature;
        private final StateInternals<?> stateInternals;
        private final Coder<W> windowCoder;

        public StateInternalsStateCleaner(DoFn<?, ?> doFn, StateInternals<?> stateInternals, Coder<W> coder) {
            this.fn = doFn;
            this.signature = DoFnSignatures.getSignature(doFn.getClass());
            this.stateInternals = stateInternals;
            this.windowCoder = coder;
        }

        public void clearForWindow(W w) {
            for (Map.Entry entry : this.signature.stateDeclarations().entrySet()) {
                try {
                    this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), StateTags.tagForSpec((String) entry.getKey(), (StateSpec) ((DoFnSignature.StateDeclaration) entry.getValue()).field().get(this.fn))).clear();
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$StepContext.class */
    public class StepContext implements ExecutionContext.StepContext {
        protected StepContext() {
        }

        public String getStepName() {
            return null;
        }

        public String getTransformName() {
            return null;
        }

        public void noteOutput(WindowedValue<?> windowedValue) {
        }

        public void noteSideOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
        }

        public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder2) throws IOException {
            throw new UnsupportedOperationException("Writing side-input data is not supported.");
        }

        public StateInternals<?> stateInternals() {
            return DoFnOperator.this.stateInternals;
        }

        public TimerInternals timerInternals() {
            return DoFnOperator.this.timerInternals;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$TimeInternalsCleanupTimer.class */
    public static class TimeInternalsCleanupTimer implements StatefulDoFnRunner.CleanupTimer {
        public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
        private final TimerInternals timerInternals;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final Coder<BoundedWindow> windowCoder;

        public TimeInternalsCleanupTimer(TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {
            this.windowingStrategy = windowingStrategy;
            this.windowCoder = windowingStrategy.getWindowFn().windowCoder();
            this.timerInternals = timerInternals;
        }

        public Instant currentInputWatermarkTime() {
            return this.timerInternals.currentInputWatermarkTime();
        }

        public void setForWindow(BoundedWindow boundedWindow) {
            this.timerInternals.setTimer(StateNamespaces.window(this.windowCoder, boundedWindow), GC_TIMER_ID, boundedWindow.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness()).plus(1L), TimeDomain.EVENT_TIME);
        }

        public boolean isForWindow(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
            return timeDomain.equals(TimeDomain.EVENT_TIME) && GC_TIMER_ID.equals(str) && boundedWindow.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness()).plus(1L).equals(instant);
        }
    }

    public DoFnOperator(DoFn<InputT, FnOutputT> doFn, Coder<WindowedValue<InputT>> coder, TupleTag<FnOutputT> tupleTag, List<TupleTag<?>> list, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, Coder<?> coder2) {
        this.doFn = doFn;
        this.inputCoder = coder;
        this.mainOutputTag = tupleTag;
        this.sideOutputTags = list;
        this.sideInputTagMapping = map;
        this.sideInputs = collection;
        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        setChainingStrategy(ChainingStrategy.ALWAYS);
        this.keyCoder = coder2;
        this.timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
    }

    private ExecutionContext.StepContext createStepContext() {
        return new StepContext();
    }

    protected DoFn<InputT, FnOutputT> getDoFn() {
        return this.doFn;
    }

    public void open() throws Exception {
        super.open();
        this.currentInputWatermark = Long.MIN_VALUE;
        this.currentOutputWatermark = Long.MIN_VALUE;
        AggregatorFactory aggregatorFactory = new AggregatorFactory() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.1
            public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(Class<?> cls, ExecutionContext.StepContext stepContext, String str, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                SerializableFnAggregatorWrapper serializableFnAggregatorWrapper = (SerializableFnAggregatorWrapper) DoFnOperator.this.getRuntimeContext().getAccumulator(str);
                if (serializableFnAggregatorWrapper == null) {
                    serializableFnAggregatorWrapper = new SerializableFnAggregatorWrapper(combineFn);
                    DoFnOperator.this.getRuntimeContext().addAccumulator(str, serializableFnAggregatorWrapper);
                }
                return serializableFnAggregatorWrapper;
            }
        };
        this.sideInputReader = NullSideInputReader.of(this.sideInputs);
        if (!this.sideInputs.isEmpty()) {
            this.pushedBackTag = StateTags.bag("pushed-back-values", this.inputCoder);
            this.sideInputHandler = new SideInputHandler(this.sideInputs, new FlinkBroadcastStateInternals(getContainingTask().getIndexInSubtaskGroup(), getOperatorStateBackend()));
            this.sideInputReader = this.sideInputHandler;
            if (this.pushbackStateInternals == null) {
                if (this.keyCoder != null) {
                    this.pushbackStateInternals = new FlinkKeyGroupStateInternals(this.keyCoder, getKeyedStateBackend());
                } else {
                    this.pushbackStateInternals = new FlinkSplitStateInternals(getOperatorStateBackend());
                }
            }
            this.pushedBackWatermark = Optional.absent();
        }
        this.outputManager = this.outputManagerFactory.create(this.output);
        if (this.keyCoder != null) {
            this.stateInternals = new FlinkStateInternals<>(getKeyedStateBackend(), this.keyCoder);
            this.timerService = getInternalTimerService("beam-timer", new CoderTypeSerializer(this.timerCoder), this);
            this.timerInternals = new FlinkTimerInternals();
        }
        this.doFn = getDoFn();
        this.doFnInvoker = DoFnInvokers.invokerFor(this.doFn);
        this.doFnInvoker.invokeSetup();
        ExecutionContext.StepContext createStepContext = createStepContext();
        DoFnRunner simpleRunner = DoFnRunners.simpleRunner(this.serializedOptions.getPipelineOptions(), this.doFn, this.sideInputReader, this.outputManager, this.mainOutputTag, this.sideOutputTags, createStepContext, aggregatorFactory, this.windowingStrategy);
        if (this.doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {
            simpleRunner = DoFnRunners.lateDataDroppingRunner(simpleRunner, createStepContext, this.windowingStrategy, this.doFn.getDroppedDueToLatenessAggregator());
        } else if (this.keyCoder != null) {
            simpleRunner = DoFnRunners.defaultStatefulDoFnRunner(this.doFn, simpleRunner, createStepContext, aggregatorFactory, this.windowingStrategy, new TimeInternalsCleanupTimer(createStepContext.timerInternals(), this.windowingStrategy), new StateInternalsStateCleaner(this.doFn, createStepContext.stateInternals(), this.windowingStrategy.getWindowFn().windowCoder()));
        }
        this.pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(simpleRunner, this.sideInputs, this.sideInputHandler);
    }

    public void close() throws Exception {
        super.close();
        this.doFnInvoker.invokeTeardown();
    }

    protected final long getPushbackWatermarkHold() {
        if (this.sideInputs.isEmpty()) {
            return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
        }
        try {
            checkInitPushedBackWatermark();
            return ((Long) this.pushedBackWatermark.get()).longValue();
        } catch (Exception e) {
            throw new RuntimeException("Error retrieving pushed back watermark state.", e);
        }
    }

    private void checkInitPushedBackWatermark() {
        if (this.pushedBackWatermark.isPresent()) {
            return;
        }
        BagState state = this.pushbackStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        long millis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
        Iterator it = ((Iterable) state.read()).iterator();
        while (it.hasNext()) {
            millis = Math.min(millis, ((WindowedValue) it.next()).getTimestamp().getMillis());
        }
        setPushedBackWatermark(millis);
    }

    public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        this.pushbackDoFnRunner.processElement((WindowedValue) streamRecord.getValue());
        this.pushbackDoFnRunner.finishBundle();
    }

    private void setPushedBackWatermark(long j) {
        this.pushedBackWatermark = Optional.fromNullable(Long.valueOf(j));
    }

    public final void processElement1(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        Iterable<WindowedValue> processElementInReadyWindows = this.pushbackDoFnRunner.processElementInReadyWindows((WindowedValue) streamRecord.getValue());
        BagState state = this.pushbackStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        checkInitPushedBackWatermark();
        long longValue = ((Long) this.pushedBackWatermark.get()).longValue();
        for (WindowedValue windowedValue : processElementInReadyWindows) {
            longValue = Math.min(longValue, windowedValue.getTimestamp().getMillis());
            state.add(windowedValue);
        }
        setPushedBackWatermark(longValue);
        this.pushbackDoFnRunner.finishBundle();
    }

    public final void processElement2(StreamRecord<RawUnionValue> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        WindowedValue windowedValue = (WindowedValue) ((RawUnionValue) streamRecord.getValue()).getValue();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(((RawUnionValue) streamRecord.getValue()).getUnionTag())), windowedValue);
        BagState state = this.pushbackStateInternals.state(StateNamespaces.global(), this.pushedBackTag);
        ArrayList<WindowedValue> arrayList = new ArrayList();
        Iterable<WindowedValue> iterable = (Iterable) state.read();
        if (iterable != null) {
            for (WindowedValue windowedValue2 : iterable) {
                setKeyContextElement1(new StreamRecord(windowedValue2));
                Iterables.addAll(arrayList, this.pushbackDoFnRunner.processElementInReadyWindows(windowedValue2));
            }
        }
        state.clear();
        long millis = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
        for (WindowedValue windowedValue3 : arrayList) {
            millis = Math.min(millis, windowedValue3.getTimestamp().getMillis());
            state.add(windowedValue3);
        }
        setPushedBackWatermark(millis);
        this.pushbackDoFnRunner.finishBundle();
        processWatermark1(new Watermark(this.currentInputWatermark));
    }

    public void processWatermark(Watermark watermark) throws Exception {
        processWatermark1(watermark);
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        if (this.keyCoder == null) {
            this.currentInputWatermark = watermark.getTimestamp();
            long min = Math.min(getPushbackWatermarkHold(), this.currentInputWatermark);
            if (min > this.currentOutputWatermark) {
                this.currentOutputWatermark = min;
                this.output.emitWatermark(new Watermark(this.currentOutputWatermark));
                return;
            }
            return;
        }
        this.pushbackDoFnRunner.startBundle();
        this.currentInputWatermark = watermark.getTimestamp();
        this.timerService.advanceWatermark(Math.min(getPushbackWatermarkHold(), watermark.getTimestamp()));
        long min2 = Math.min(this.currentInputWatermark, Math.min(this.stateInternals.watermarkHold().getMillis(), getPushbackWatermarkHold()));
        if (min2 > this.currentOutputWatermark) {
            this.currentOutputWatermark = min2;
            this.output.emitWatermark(new Watermark(this.currentOutputWatermark));
        }
        this.pushbackDoFnRunner.finishBundle();
    }

    public void processWatermark2(Watermark watermark) throws Exception {
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        if (getKeyedStateBackend() != null) {
            try {
                KeyedStateCheckpointOutputStream rawKeyedOperatorStateOutput = stateSnapshotContext.getRawKeyedOperatorStateOutput();
                try {
                    try {
                        Iterator it = rawKeyedOperatorStateOutput.getKeyGroupList().iterator();
                        while (it.hasNext()) {
                            int intValue = ((Integer) it.next()).intValue();
                            rawKeyedOperatorStateOutput.startNewKeyGroup(intValue);
                            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(rawKeyedOperatorStateOutput);
                            snapshotKeyGroupState(intValue, dataOutputViewStreamWrapper);
                            if (this.keyCoder != null) {
                                this.timerService.snapshotTimersForKeyGroup(dataOutputViewStreamWrapper, intValue);
                            }
                        }
                        try {
                            rawKeyedOperatorStateOutput.close();
                        } catch (Exception e) {
                            LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e);
                        }
                    } catch (Exception e2) {
                        throw new Exception("Could not write timer service of " + getOperatorName() + " to checkpoint state stream.", e2);
                    }
                } catch (Throwable th) {
                    try {
                        rawKeyedOperatorStateOutput.close();
                    } catch (Exception e3) {
                        LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", getOperatorName(), e3);
                    }
                    throw th;
                }
            } catch (Exception e4) {
                throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', e4);
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator
    public void snapshotKeyGroupState(int i, DataOutputStream dataOutputStream) throws Exception {
        if (this.sideInputs.isEmpty() || this.keyCoder == null) {
            return;
        }
        ((FlinkKeyGroupStateInternals) this.pushbackStateInternals).snapshotKeyGroupState(i, dataOutputStream);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        if (getKeyedStateBackend() != null) {
            int numberOfKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
            KeyGroupsList keyGroupRange = getKeyedStateBackend().getKeyGroupRange();
            for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : stateInitializationContext.getRawKeyedStateInputs()) {
                DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(keyGroupStatePartitionStreamProvider.getStream());
                int keyGroupId = keyGroupStatePartitionStreamProvider.getKeyGroupId();
                Preconditions.checkArgument(keyGroupRange.contains(keyGroupId), "Key Group " + keyGroupId + " does not belong to the local range.");
                restoreKeyGroupState(keyGroupId, dataInputViewStreamWrapper);
                if (this.keyCoder != null) {
                    if (this.timerService == null) {
                        this.timerService = new HeapInternalTimerService<>(numberOfKeyGroups, keyGroupRange, this, getRuntimeContext().getProcessingTimeService());
                    }
                    this.timerService.restoreTimersForKeyGroup(dataInputViewStreamWrapper, keyGroupId, getUserCodeClassloader());
                }
            }
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupRestoringOperator
    public void restoreKeyGroupState(int i, DataInputStream dataInputStream) throws Exception {
        if (this.sideInputs.isEmpty() || this.keyCoder == null) {
            return;
        }
        if (this.pushbackStateInternals == null) {
            this.pushbackStateInternals = new FlinkKeyGroupStateInternals(this.keyCoder, getKeyedStateBackend());
        }
        ((FlinkKeyGroupStateInternals) this.pushbackStateInternals).restoreKeyGroupState(i, dataInputStream, getUserCodeClassloader());
    }

    public void onEventTime(InternalTimer<Object, TimerInternals.TimerData> internalTimer) throws Exception {
        fireTimer(internalTimer);
    }

    public void onProcessingTime(InternalTimer<Object, TimerInternals.TimerData> internalTimer) throws Exception {
        fireTimer(internalTimer);
    }

    public void fireTimer(InternalTimer<?, TimerInternals.TimerData> internalTimer) {
        TimerInternals.TimerData timerData = (TimerInternals.TimerData) internalTimer.getNamespace();
        StateNamespaces.WindowNamespace namespace = timerData.getNamespace();
        Preconditions.checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
        this.pushbackDoFnRunner.onTimer(timerData.getTimerId(), namespace.getWindow(), timerData.getTimestamp(), timerData.getDomain());
    }
}
