package org.apache.flink.streaming.runtime.io;

import java.util.ArrayList;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.class */
public class StreamTwoInputProcessorFactory {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory$FinishedOnRestoreWatermarkBypass.class */
    public static class FinishedOnRestoreWatermarkBypass {
        private final RecordWriterOutput<?>[] streamOutputs;
        private boolean receivedFirstMaxWatermark;
        private boolean receivedSecondMaxWatermark;

        public FinishedOnRestoreWatermarkBypass(RecordWriterOutput<?>[] recordWriterOutputArr) {
            this.streamOutputs = recordWriterOutputArr;
        }

        public void processWatermark1(Watermark watermark) {
            this.receivedFirstMaxWatermark = true;
            checkAndForward(watermark);
        }

        public void processWatermark2(Watermark watermark) {
            this.receivedSecondMaxWatermark = true;
            checkAndForward(watermark);
        }

        private void checkAndForward(Watermark watermark) {
            if (watermark.getTimestamp() != Watermark.MAX_WATERMARK.getTimestamp()) {
                throw new IllegalStateException(String.format("We should not receive any watermarks [%s] other than the MAX_WATERMARK if finished on restore", watermark));
            }
            if (this.receivedFirstMaxWatermark && this.receivedSecondMaxWatermark) {
                for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                    recordWriterOutput.emitWatermark(watermark);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.class */
    private static class StreamTaskNetworkOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
        private final TwoInputStreamOperator<?, ?, ?> operator;
        private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;
        private final Counter numRecordsIn;

        @Nullable
        private final FinishedOnRestoreWatermarkBypass watermarkBypass;

        private StreamTaskNetworkOutput(TwoInputStreamOperator<?, ?, ?> twoInputStreamOperator, ThrowingConsumer<StreamRecord<T>, Exception> throwingConsumer, WatermarkGauge watermarkGauge, int i, Counter counter, @Nullable FinishedOnRestoreWatermarkBypass finishedOnRestoreWatermarkBypass) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.recordConsumer = (ThrowingConsumer) Preconditions.checkNotNull(throwingConsumer);
            this.inputWatermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.inputIndex = i;
            this.numRecordsIn = counter;
            this.watermarkBypass = finishedOnRestoreWatermarkBypass;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            this.numRecordsIn.inc();
            this.recordConsumer.accept(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.inputIndex == 0) {
                if (this.watermarkBypass == null) {
                    this.operator.processWatermark1(watermark);
                    return;
                } else {
                    this.watermarkBypass.processWatermark1(watermark);
                    return;
                }
            }
            if (this.watermarkBypass == null) {
                this.operator.processWatermark2(watermark);
            } else {
                this.watermarkBypass.processWatermark2(watermark);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            if (this.inputIndex == 0) {
                this.operator.processWatermarkStatus1(watermarkStatus);
            } else {
                this.operator.processWatermarkStatus2(watermarkStatus);
            }
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            if (this.inputIndex == 0) {
                this.operator.processLatencyMarker1(latencyMarker);
            } else {
                this.operator.processLatencyMarker2(latencyMarker);
            }
        }
    }

    public static <IN1, IN2> StreamMultipleInputProcessor create(TaskInvokable taskInvokable, CheckpointedInputGate[] checkpointedInputGateArr, IOManager iOManager, MemoryManager memoryManager, TaskIOMetricGroup taskIOMetricGroup, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, OperatorChain<?, ?> operatorChain, StreamConfig streamConfig, Configuration configuration, Configuration configuration2, ExecutionConfig executionConfig, ClassLoader classLoader, Counter counter, InflightDataRescalingDescriptor inflightDataRescalingDescriptor, Function<Integer, StreamPartitioner<?>> function, TaskInfo taskInfo, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecordsChecker) {
        Preconditions.checkNotNull(operatorChain);
        taskIOMetricGroup.reuseRecordsInputCounter(counter);
        TypeSerializer typeSerializerIn = streamConfig.getTypeSerializerIn(0, classLoader);
        StreamTaskInput create = StreamTaskNetworkInputFactory.create(checkpointedInputGateArr[0], typeSerializerIn, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[0].getNumberOfInputChannels()), 0, inflightDataRescalingDescriptor, function, taskInfo, canEmitBatchOfRecordsChecker);
        TypeSerializer typeSerializerIn2 = streamConfig.getTypeSerializerIn(1, classLoader);
        StreamTaskInput create2 = StreamTaskNetworkInputFactory.create(checkpointedInputGateArr[1], typeSerializerIn2, iOManager, new StatusWatermarkValve(checkpointedInputGateArr[1].getNumberOfInputChannels()), 1, inflightDataRescalingDescriptor, function, taskInfo, canEmitBatchOfRecordsChecker);
        InputSelectable inputSelectable = twoInputStreamOperator instanceof InputSelectable ? (InputSelectable) twoInputStreamOperator : null;
        StreamConfig.InputConfig[] inputs = streamConfig.getInputs(classLoader);
        boolean requiresSorting = StreamConfig.requiresSorting(inputs[0]);
        boolean requiresSorting2 = StreamConfig.requiresSorting(inputs[1]);
        if (requiresSorting || requiresSorting2) {
            if (inputSelectable != null) {
                throw new IllegalStateException("The InputSelectable interface is not supported with sorting inputs");
            }
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            if (requiresSorting) {
                arrayList.add(create);
                arrayList2.add(streamConfig.getStatePartitioner(0, classLoader));
            } else {
                arrayList3.add(create);
            }
            if (requiresSorting2) {
                arrayList.add(create2);
                arrayList2.add(streamConfig.getStatePartitioner(1, classLoader));
            } else {
                arrayList3.add(create2);
            }
            MultiInputSortingDataInput.SelectableSortingInputs wrapInputs = MultiInputSortingDataInput.wrapInputs(taskInvokable, (StreamTaskInput[]) arrayList.toArray(new StreamTaskInput[0]), (KeySelector[]) arrayList2.toArray(new KeySelector[0]), new TypeSerializer[]{typeSerializerIn, typeSerializerIn2}, streamConfig.getStateKeySerializer(classLoader), (StreamTaskInput[]) arrayList3.toArray(new StreamTaskInput[0]), memoryManager, iOManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, configuration, classLoader), configuration, executionConfig);
            inputSelectable = wrapInputs.getInputSelectable();
            StreamTaskInput<?>[] sortedInputs = wrapInputs.getSortedInputs();
            StreamTaskInput<?>[] passThroughInputs = wrapInputs.getPassThroughInputs();
            create = requiresSorting ? toTypedInput(sortedInputs[0]) : toTypedInput(passThroughInputs[0]);
            create2 = requiresSorting2 ? toTypedInput(sortedInputs[sortedInputs.length - 1]) : toTypedInput(passThroughInputs[passThroughInputs.length - 1]);
        }
        FinishedOnRestoreWatermarkBypass finishedOnRestoreWatermarkBypass = operatorChain.isTaskDeployedAsFinished() ? new FinishedOnRestoreWatermarkBypass(operatorChain.getStreamOutputs()) : null;
        return new StreamMultipleInputProcessor(new MultipleInputSelectionHandler(inputSelectable, 2), new StreamOneInputProcessor[]{new StreamOneInputProcessor(create, new StreamTaskNetworkOutput(twoInputStreamOperator, RecordProcessorUtils.getRecordProcessor1(twoInputStreamOperator), watermarkGauge, 0, counter, finishedOnRestoreWatermarkBypass), operatorChain), new StreamOneInputProcessor(create2, new StreamTaskNetworkOutput(twoInputStreamOperator, RecordProcessorUtils.getRecordProcessor2(twoInputStreamOperator), watermarkGauge2, 1, counter, finishedOnRestoreWatermarkBypass), operatorChain)});
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <IN1> StreamTaskInput<IN1> toTypedInput(StreamTaskInput<?> streamTaskInput) {
        return streamTaskInput;
    }
}
