package org.apache.beam.runners.flink.translation.wrappers.streaming;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.class */
public class WindowDoFnOperator<K, InputT, OutputT> extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> implements Triggerable {
    private final Coder<K> keyCoder;
    private final TimerInternals.TimerDataCoder timerCoder;
    private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimers;
    private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimersQueue;
    private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimersQueue;
    private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimers;
    private transient Multiset<Long> processingTimeTimerTimestamps;
    private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
    private transient FlinkStateInternals<K> stateInternals;
    private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator$StepContext.class */
    protected class StepContext extends DoFnOperator.StepContext {
        protected StepContext() {
            super(WindowDoFnOperator.this);
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.StepContext
        public TimerInternals timerInternals() {
            return new TimerInternals() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.StepContext.1
                public void setTimer(StateNamespace stateNamespace, String str, Instant instant, TimeDomain timeDomain) {
                    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
                }

                @Deprecated
                public void setTimer(TimerInternals.TimerData timerData) {
                    if (timerData.getDomain().equals(TimeDomain.EVENT_TIME)) {
                        WindowDoFnOperator.this.registerEventTimeTimer(timerData);
                    } else {
                        if (!timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
                            throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
                        }
                        WindowDoFnOperator.this.registerProcessingTimeTimer(timerData);
                    }
                }

                public void deleteTimer(StateNamespace stateNamespace, String str, TimeDomain timeDomain) {
                    throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
                }

                @Deprecated
                public void deleteTimer(StateNamespace stateNamespace, String str) {
                    throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported.");
                }

                @Deprecated
                public void deleteTimer(TimerInternals.TimerData timerData) {
                    if (timerData.getDomain().equals(TimeDomain.EVENT_TIME)) {
                        WindowDoFnOperator.this.deleteEventTimeTimer(timerData);
                    } else {
                        if (!timerData.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
                            throw new UnsupportedOperationException("Unsupported time domain: " + timerData.getDomain());
                        }
                        WindowDoFnOperator.this.deleteProcessingTimeTimer(timerData);
                    }
                }

                public Instant currentProcessingTime() {
                    return new Instant(WindowDoFnOperator.this.getCurrentProcessingTime());
                }

                @Nullable
                public Instant currentSynchronizedProcessingTime() {
                    return new Instant(WindowDoFnOperator.this.getCurrentProcessingTime());
                }

                public Instant currentInputWatermarkTime() {
                    return new Instant(Math.min(WindowDoFnOperator.this.currentInputWatermark, WindowDoFnOperator.this.getPushbackWatermarkHold()));
                }

                @Nullable
                public Instant currentOutputWatermarkTime() {
                    return new Instant(WindowDoFnOperator.this.currentOutputWatermark);
                }
            };
        }
    }

    public WindowDoFnOperator(SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, TypeInformation<WindowedValue<KeyedWorkItem<K, InputT>>> typeInformation, TupleTag<KV<K, OutputT>> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, Coder<K> coder) {
        super((OldDoFn) null, typeInformation, tupleTag, list, outputManagerFactory, windowingStrategy, map, collection, pipelineOptions);
        this.systemReduceFn = systemReduceFn;
        this.keyCoder = coder;
        this.timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
        return GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, new StateInternalsFactory<K>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.1
            public StateInternals<K> stateInternalsForKey(K k) {
                return WindowDoFnOperator.this.stateInternals;
            }
        }, this.systemReduceFn);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void open() throws Exception {
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashSet();
            this.watermarkTimersQueue = new PriorityQueue(10, new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.2
                @Override // java.util.Comparator
                public int compare(Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2, Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22) {
                    return ((TimerInternals.TimerData) tuple2.f1).compareTo((TimerInternals.TimerData) tuple22.f1);
                }
            });
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashSet();
            this.processingTimeTimerTimestamps = HashMultiset.create();
            this.processingTimeTimersQueue = new PriorityQueue(10, new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.3
                @Override // java.util.Comparator
                public int compare(Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2, Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22) {
                    return ((TimerInternals.TimerData) tuple2.f1).compareTo((TimerInternals.TimerData) tuple22.f1);
                }
            });
        }
        this.processingTimeTimerFutures = new HashMap();
        this.stateInternals = new FlinkStateInternals<>(getStateBackend(), this.keyCoder);
        super.open();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected ExecutionContext.StepContext createStepContext() {
        return new StepContext();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerEventTimeTimer(TimerInternals.TimerData timerData) {
        Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2 = new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timerData);
        if (this.watermarkTimers.add(tuple2)) {
            this.watermarkTimersQueue.add(tuple2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteEventTimeTimer(TimerInternals.TimerData timerData) {
        Tuple2 tuple2 = new Tuple2((ByteBuffer) getStateBackend().getCurrentKey(), timerData);
        if (this.watermarkTimers.remove(tuple2)) {
            this.watermarkTimersQueue.remove(tuple2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerProcessingTimeTimer(TimerInternals.TimerData timerData) {
        Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2 = new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timerData);
        if (this.processingTimeTimers.add(tuple2)) {
            this.processingTimeTimersQueue.add(tuple2);
            if (this.processingTimeTimerTimestamps.add(Long.valueOf(timerData.getTimestamp().getMillis()), 1) == 0) {
                this.processingTimeTimerFutures.put(Long.valueOf(timerData.getTimestamp().getMillis()), registerTimer(timerData.getTimestamp().getMillis(), this));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteProcessingTimeTimer(TimerInternals.TimerData timerData) {
        ScheduledFuture<?> remove;
        Tuple2 tuple2 = new Tuple2((ByteBuffer) getStateBackend().getCurrentKey(), timerData);
        if (this.processingTimeTimers.remove(tuple2)) {
            this.processingTimeTimersQueue.remove(tuple2);
            if (this.processingTimeTimerTimestamps.remove(Long.valueOf(timerData.getTimestamp().getMillis()), 1) != 1 || (remove = this.processingTimeTimerFutures.remove(Long.valueOf(timerData.getTimestamp().getMillis()))) == null || remove.isDone()) {
                return;
            }
            remove.cancel(false);
        }
    }

    public void trigger(long j) throws Exception {
        boolean z;
        this.processingTimeTimerFutures.remove(Long.valueOf(j));
        this.processingTimeTimerTimestamps.setCount(Long.valueOf(j), 0);
        do {
            Tuple2<ByteBuffer, TimerInternals.TimerData> peek = this.processingTimeTimersQueue.peek();
            if (peek == null || ((TimerInternals.TimerData) peek.f1).getTimestamp().getMillis() > j) {
                z = false;
            } else {
                z = true;
                this.processingTimeTimersQueue.remove();
                this.processingTimeTimers.remove(peek);
                setKeyContext(peek.f0);
                this.pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem(this.stateInternals.getKey(), Collections.singletonList(peek.f1))));
            }
        } while (z);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void processWatermark(Watermark watermark) throws Exception {
        processWatermark1(watermark);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void processWatermark1(Watermark watermark) throws Exception {
        boolean z;
        this.pushbackDoFnRunner.startBundle();
        this.currentInputWatermark = watermark.getTimestamp();
        long min = Math.min(getPushbackWatermarkHold(), watermark.getTimestamp());
        do {
            Tuple2<ByteBuffer, TimerInternals.TimerData> peek = this.watermarkTimersQueue.peek();
            if (peek == null || ((TimerInternals.TimerData) peek.f1).getTimestamp().getMillis() >= min) {
                z = false;
            } else {
                z = true;
                this.watermarkTimersQueue.remove();
                this.watermarkTimers.remove(peek);
                setKeyContext(peek.f0);
                this.pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem(this.stateInternals.getKey(), Collections.singletonList(peek.f1))));
            }
        } while (z);
        long min2 = Math.min(this.currentInputWatermark, Math.min(this.stateInternals.watermarkHold().getMillis(), getPushbackWatermarkHold()));
        if (min2 > this.currentOutputWatermark) {
            this.currentOutputWatermark = min2;
            this.output.emitWatermark(new Watermark(this.currentOutputWatermark));
        }
        this.pushbackDoFnRunner.finishBundle();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        AbstractStateBackend.CheckpointStateOutputView createCheckpointStateOutputView = getStateBackend().createCheckpointStateOutputView(j, j2);
        snapshotTimers(createCheckpointStateOutputView);
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputView.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        restoreTimers(new DataInputViewWrapper((DataInputView) streamTaskState.getOperatorState().getState(getUserCodeClassloader())));
    }

    private void restoreTimers(InputStream inputStream) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(inputStream);
        int readInt = dataInputStream.readInt();
        this.watermarkTimers = new HashSet(readInt);
        this.watermarkTimersQueue = new PriorityQueue(Math.max(readInt, 1), new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.4
            @Override // java.util.Comparator
            public int compare(Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2, Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22) {
                return ((TimerInternals.TimerData) tuple2.f1).compareTo((TimerInternals.TimerData) tuple22.f1);
            }
        });
        for (int i = 0; i < readInt; i++) {
            byte[] bArr = new byte[dataInputStream.readInt()];
            dataInputStream.readFully(bArr);
            Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2 = new Tuple2<>(ByteBuffer.wrap(bArr), this.timerCoder.decode(dataInputStream, Coder.Context.NESTED));
            if (this.watermarkTimers.add(tuple2)) {
                this.watermarkTimersQueue.add(tuple2);
            }
        }
        int readInt2 = dataInputStream.readInt();
        this.processingTimeTimers = new HashSet(readInt2);
        this.processingTimeTimersQueue = new PriorityQueue(Math.max(readInt2, 1), new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator.5
            @Override // java.util.Comparator
            public int compare(Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22, Tuple2<ByteBuffer, TimerInternals.TimerData> tuple23) {
                return ((TimerInternals.TimerData) tuple22.f1).compareTo((TimerInternals.TimerData) tuple23.f1);
            }
        });
        this.processingTimeTimerTimestamps = HashMultiset.create();
        this.processingTimeTimerFutures = new HashMap();
        for (int i2 = 0; i2 < readInt2; i2++) {
            byte[] bArr2 = new byte[dataInputStream.readInt()];
            dataInputStream.readFully(bArr2);
            TimerInternals.TimerData decode = this.timerCoder.decode(dataInputStream, Coder.Context.NESTED);
            Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22 = new Tuple2<>(ByteBuffer.wrap(bArr2), decode);
            if (this.processingTimeTimers.add(tuple22)) {
                this.processingTimeTimersQueue.add(tuple22);
                if (this.processingTimeTimerTimestamps.add(Long.valueOf(decode.getTimestamp().getMillis()), 1) == 0) {
                    this.processingTimeTimerFutures.put(Long.valueOf(decode.getTimestamp().getMillis()), registerTimer(decode.getTimestamp().getMillis(), this));
                }
            }
        }
    }

    private void snapshotTimers(OutputStream outputStream) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
        dataOutputStream.writeInt(this.watermarkTimersQueue.size());
        for (Tuple2<ByteBuffer, TimerInternals.TimerData> tuple2 : this.watermarkTimersQueue) {
            dataOutputStream.writeInt(((ByteBuffer) tuple2.f0).limit());
            dataOutputStream.write(((ByteBuffer) tuple2.f0).array(), 0, ((ByteBuffer) tuple2.f0).limit());
            this.timerCoder.encode((TimerInternals.TimerData) tuple2.f1, dataOutputStream, Coder.Context.NESTED);
        }
        dataOutputStream.writeInt(this.processingTimeTimersQueue.size());
        for (Tuple2<ByteBuffer, TimerInternals.TimerData> tuple22 : this.processingTimeTimersQueue) {
            dataOutputStream.writeInt(((ByteBuffer) tuple22.f0).limit());
            dataOutputStream.write(((ByteBuffer) tuple22.f0).array(), 0, ((ByteBuffer) tuple22.f0).limit());
            this.timerCoder.encode((TimerInternals.TimerData) tuple22.f1, dataOutputStream, Coder.Context.NESTED);
        }
    }
}
