package org.apache.beam.runners.flink.translation.wrappers.streaming;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.class */
public class DoFnOperator<InputT, FnOutputT, OutputT> extends AbstractStreamOperator<OutputT> implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>, TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
    protected OldDoFn<InputT, FnOutputT> oldDoFn;
    protected final SerializedPipelineOptions serializedOptions;
    protected final TupleTag<FnOutputT> mainOutputTag;
    protected final List<TupleTag<?>> sideOutputTags;
    protected final Collection<PCollectionView<?>> sideInputs;
    protected final Map<Integer, PCollectionView<?>> sideInputTagMapping;
    protected final WindowingStrategy<?, ?> windowingStrategy;
    protected final OutputManagerFactory<OutputT> outputManagerFactory;
    protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner;
    protected transient SideInputHandler sideInputHandler;
    protected transient long currentInputWatermark;
    protected transient long currentOutputWatermark;
    private transient AbstractStateBackend sideInputStateBackend;
    private final ReducingStateDescriptor<Long> pushedBackWatermarkDescriptor;
    private final ListStateDescriptor<WindowedValue<InputT>> pushedBackDescriptor;
    private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$DefaultOutputManagerFactory.class */
    public static class DefaultOutputManagerFactory<OutputT> implements OutputManagerFactory<OutputT> {
        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public DoFnRunners.OutputManager create(final Output<StreamRecord<OutputT>> output) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.DefaultOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    output.collect(new StreamRecord(windowedValue));
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$LongMinReducer.class */
    private static class LongMinReducer implements ReduceFunction<Long> {
        private LongMinReducer() {
        }

        public Long reduce(Long l, Long l2) throws Exception {
            return Long.valueOf(Math.min(l.longValue(), l2.longValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$MultiOutputOutputManagerFactory.class */
    public static class MultiOutputOutputManagerFactory implements OutputManagerFactory<RawUnionValue> {
        Map<TupleTag<?>, Integer> mapping;

        public MultiOutputOutputManagerFactory(Map<TupleTag<?>, Integer> map) {
            this.mapping = map;
        }

        @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.OutputManagerFactory
        public DoFnRunners.OutputManager create(final Output<StreamRecord<RawUnionValue>> output) {
            return new DoFnRunners.OutputManager() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory.1
                public <T> void output(TupleTag<T> tupleTag, WindowedValue<T> windowedValue) {
                    output.collect(new StreamRecord(new RawUnionValue(MultiOutputOutputManagerFactory.this.mapping.get(tupleTag).intValue(), windowedValue)));
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$OutputManagerFactory.class */
    public interface OutputManagerFactory<OutputT> extends Serializable {
        DoFnRunners.OutputManager create(Output<StreamRecord<OutputT>> output);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator$StepContext.class */
    public class StepContext implements ExecutionContext.StepContext {
        /* JADX INFO: Access modifiers changed from: protected */
        public StepContext() {
        }

        public String getStepName() {
            return null;
        }

        public String getTransformName() {
            return null;
        }

        public void noteOutput(WindowedValue<?> windowedValue) {
        }

        public void noteSideOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
        }

        public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder2) throws IOException {
            throw new UnsupportedOperationException("Writing side-input data is not supported.");
        }

        public StateInternals<?> stateInternals() {
            throw new UnsupportedOperationException("Not supported for regular DoFns.");
        }

        public TimerInternals timerInternals() {
            throw new UnsupportedOperationException("Not supported for regular DoFns.");
        }
    }

    @Deprecated
    public DoFnOperator(OldDoFn<InputT, FnOutputT> oldDoFn, TypeInformation<WindowedValue<InputT>> typeInformation, TupleTag<FnOutputT> tupleTag, List<TupleTag<?>> list, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions) {
        this.oldDoFn = oldDoFn;
        this.mainOutputTag = tupleTag;
        this.sideOutputTags = list;
        this.sideInputTagMapping = map;
        this.sideInputs = collection;
        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
        this.windowingStrategy = windowingStrategy;
        this.outputManagerFactory = outputManagerFactory;
        this.pushedBackWatermarkDescriptor = new ReducingStateDescriptor<>("pushed-back-elements-watermark-hold", new LongMinReducer(), LongSerializer.INSTANCE);
        this.pushedBackDescriptor = new ListStateDescriptor<>("pushed-back-values", typeInformation);
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public DoFnOperator(DoFn<InputT, FnOutputT> doFn, TypeInformation<WindowedValue<InputT>> typeInformation, TupleTag<FnOutputT> tupleTag, List<TupleTag<?>> list, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions) {
        this(DoFnAdapters.toOldDoFn(doFn), typeInformation, tupleTag, list, outputManagerFactory, windowingStrategy, map, collection, pipelineOptions);
    }

    protected ExecutionContext.StepContext createStepContext() {
        return new StepContext();
    }

    protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
        return this.oldDoFn;
    }

    public void open() throws Exception {
        super.open();
        this.oldDoFn = getOldDoFn();
        this.currentInputWatermark = Long.MIN_VALUE;
        this.currentOutputWatermark = this.currentInputWatermark;
        AggregatorFactory aggregatorFactory = new AggregatorFactory() { // from class: org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.1
            public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(Class<?> cls, ExecutionContext.StepContext stepContext, String str, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                SerializableFnAggregatorWrapper serializableFnAggregatorWrapper = (SerializableFnAggregatorWrapper) DoFnOperator.this.getRuntimeContext().getAccumulator(str);
                if (serializableFnAggregatorWrapper == null) {
                    serializableFnAggregatorWrapper = new SerializableFnAggregatorWrapper(combineFn);
                    DoFnOperator.this.getRuntimeContext().addAccumulator(str, serializableFnAggregatorWrapper);
                }
                return serializableFnAggregatorWrapper;
            }
        };
        SideInputHandler of = NullSideInputReader.of(this.sideInputs);
        if (!this.sideInputs.isEmpty()) {
            this.sideInputStateBackend = getContainingTask().createStateBackend(getClass().getSimpleName() + "_" + getRuntimeContext().getIndexOfThisSubtask() + "_sideInput", new GenericTypeInfo(ByteBuffer.class).createSerializer(new ExecutionConfig()));
            Preconditions.checkState(this.sideInputStateBackend != null, "Side input state backend cannot be null");
            if (this.restoredSideInputState != null) {
                this.sideInputStateBackend.injectKeyValueStateSnapshots((HashMap) this.restoredSideInputState);
                this.restoredSideInputState = null;
            }
            this.sideInputStateBackend.setCurrentKey(ByteBuffer.wrap(CoderUtils.encodeToByteArray(VoidCoder.of(), (Object) null)));
            this.sideInputHandler = new SideInputHandler(this.sideInputs, new FlinkStateInternals(this.sideInputStateBackend, VoidCoder.of()));
            of = this.sideInputHandler;
        }
        ExecutionContext.StepContext createStepContext = createStepContext();
        DoFnRunner simpleRunner = DoFnRunners.simpleRunner(this.serializedOptions.getPipelineOptions(), this.oldDoFn, of, this.outputManagerFactory.create(this.output), this.mainOutputTag, this.sideOutputTags, createStepContext, aggregatorFactory, this.windowingStrategy);
        if (this.oldDoFn instanceof GroupAlsoByWindowViaWindowSetDoFn) {
            simpleRunner = DoFnRunners.lateDataDroppingRunner(simpleRunner, createStepContext, this.windowingStrategy, this.oldDoFn.getDroppedDueToLatenessAggregator());
        }
        this.pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(simpleRunner, this.sideInputs, this.sideInputHandler);
        this.oldDoFn.setup();
    }

    public void close() throws Exception {
        super.close();
        this.oldDoFn.teardown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final long getPushbackWatermarkHold() {
        if (this.sideInputs.isEmpty()) {
            return Long.MAX_VALUE;
        }
        try {
            Long l = (Long) this.sideInputStateBackend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, this.pushedBackWatermarkDescriptor).get();
            if (l != null) {
                return l.longValue();
            }
            return Long.MAX_VALUE;
        } catch (Exception e) {
            throw new RuntimeException("Error retrieving pushed back watermark state.", e);
        }
    }

    public final void processElement(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        this.pushbackDoFnRunner.processElement((WindowedValue) streamRecord.getValue());
        this.pushbackDoFnRunner.finishBundle();
    }

    public final void processElement1(StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        Iterable<WindowedValue> processElementInReadyWindows = this.pushbackDoFnRunner.processElementInReadyWindows((WindowedValue) streamRecord.getValue());
        ListState partitionedState = this.sideInputStateBackend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, this.pushedBackDescriptor);
        ReducingState partitionedState2 = this.sideInputStateBackend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, this.pushedBackWatermarkDescriptor);
        for (WindowedValue windowedValue : processElementInReadyWindows) {
            partitionedState2.add(Long.valueOf(windowedValue.getTimestamp().getMillis()));
            partitionedState.add(windowedValue);
        }
        this.pushbackDoFnRunner.finishBundle();
    }

    public final void processElement2(StreamRecord<RawUnionValue> streamRecord) throws Exception {
        this.pushbackDoFnRunner.startBundle();
        WindowedValue windowedValue = (WindowedValue) ((RawUnionValue) streamRecord.getValue()).getValue();
        this.sideInputHandler.addSideInputValue(this.sideInputTagMapping.get(Integer.valueOf(((RawUnionValue) streamRecord.getValue()).getUnionTag())), windowedValue);
        ListState partitionedState = this.sideInputStateBackend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, this.pushedBackDescriptor);
        ArrayList<WindowedValue> arrayList = new ArrayList();
        Iterable<WindowedValue> iterable = (Iterable) partitionedState.get();
        if (iterable != null) {
            for (WindowedValue windowedValue2 : iterable) {
                setKeyContextElement1(new StreamRecord(windowedValue2));
                Iterables.addAll(arrayList, this.pushbackDoFnRunner.processElementInReadyWindows(windowedValue2));
            }
        }
        ReducingState partitionedState2 = this.sideInputStateBackend.getPartitionedState((Object) null, VoidSerializer.INSTANCE, this.pushedBackWatermarkDescriptor);
        partitionedState.clear();
        partitionedState2.clear();
        for (WindowedValue windowedValue3 : arrayList) {
            partitionedState2.add(Long.valueOf(windowedValue3.getTimestamp().getMillis()));
            partitionedState.add(windowedValue3);
        }
        this.pushbackDoFnRunner.finishBundle();
        processWatermark1(new Watermark(this.currentInputWatermark));
    }

    public void processWatermark(Watermark watermark) throws Exception {
        processWatermark1(watermark);
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        this.currentInputWatermark = watermark.getTimestamp();
        long min = Math.min(getPushbackWatermarkHold(), this.currentInputWatermark);
        if (min > this.currentOutputWatermark) {
            this.currentOutputWatermark = min;
            this.output.emitWatermark(new Watermark(this.currentOutputWatermark));
        }
    }

    public void processWatermark2(Watermark watermark) throws Exception {
    }

    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        HashMap snapshotPartitionedState;
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        if (this.sideInputStateBackend != null && (snapshotPartitionedState = this.sideInputStateBackend.snapshotPartitionedState(j, j2)) != null) {
            snapshotOperatorState.setFunctionState(this.sideInputStateBackend.checkpointStateSerializable(snapshotPartitionedState, j, j2));
        }
        return snapshotOperatorState;
    }

    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        StateHandle functionState = streamTaskState.getFunctionState();
        if (functionState != null) {
            this.restoredSideInputState = (Map) functionState.getState(getUserCodeClassloader());
        }
    }
}
