package org.apache.flink.streaming.runtime.io;

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.class */
public class StreamMultipleInputProcessorFactory {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory$StreamTaskNetworkOutput.class */
    private static class StreamTaskNetworkOutput<T> implements PushingAsyncDataInput.DataOutput<T> {
        private final Input<T> input;
        private final WatermarkGauge inputWatermarkGauge;
        private final Counter mainOperatorRecordsIn;
        private final Counter networkRecordsIn;

        private StreamTaskNetworkOutput(Input<T> input, WatermarkGauge watermarkGauge, Counter counter, Counter counter2) {
            this.input = (Input) Preconditions.checkNotNull(input);
            this.inputWatermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.mainOperatorRecordsIn = counter;
            this.networkRecordsIn = counter2;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) throws Exception {
            this.input.setKeyContextElement(streamRecord);
            this.input.processElement(streamRecord);
            this.mainOperatorRecordsIn.inc();
            this.networkRecordsIn.inc();
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.input.processWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.input.processWatermarkStatus(watermarkStatus);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.input.processLatencyMarker(latencyMarker);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory$StreamTaskSourceOutput.class */
    private static class StreamTaskSourceOutput extends SourceOperatorStreamTask.AsyncDataOutputToOutput {
        private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedOutput;

        public StreamTaskSourceOutput(WatermarkGaugeExposingOutput<StreamRecord<?>> watermarkGaugeExposingOutput, WatermarkGauge watermarkGauge, InternalSourceReaderMetricGroup internalSourceReaderMetricGroup) {
            super(watermarkGaugeExposingOutput, internalSourceReaderMetricGroup, watermarkGauge);
            this.chainedOutput = watermarkGaugeExposingOutput;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.AsyncDataOutputToOutput, org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
            this.chainedOutput.emitWatermarkStatus(watermarkStatus);
        }
    }

    public static StreamMultipleInputProcessor create(TaskInvokable taskInvokable, CheckpointedInputGate[] checkpointedInputGateArr, StreamConfig.InputConfig[] inputConfigArr, IOManager iOManager, MemoryManager memoryManager, TaskIOMetricGroup taskIOMetricGroup, Counter counter, MultipleInputStreamOperator<?> multipleInputStreamOperator, WatermarkGauge[] watermarkGaugeArr, StreamConfig streamConfig, Configuration configuration, Configuration configuration2, ExecutionConfig executionConfig, ClassLoader classLoader, OperatorChain<?, ?> operatorChain, InflightDataRescalingDescriptor inflightDataRescalingDescriptor, Function<Integer, StreamPartitioner<?>> function, TaskInfo taskInfo) {
        Preconditions.checkNotNull(operatorChain);
        List<Input> inputs = multipleInputStreamOperator.getInputs();
        int size = inputs.size();
        StreamOneInputProcessor[] streamOneInputProcessorArr = new StreamOneInputProcessor[size];
        SimpleCounter simpleCounter = new SimpleCounter();
        taskIOMetricGroup.reuseRecordsInputCounter(simpleCounter);
        Preconditions.checkState(inputConfigArr.length == size, "Number of configured inputs in StreamConfig [%s] doesn't match the main operator's number of inputs [%s]", new Object[]{Integer.valueOf(inputConfigArr.length), Integer.valueOf(size)});
        StreamTaskInput[] streamTaskInputArr = new StreamTaskInput[size];
        for (int i = 0; i < size; i++) {
            StreamConfig.InputConfig inputConfig = inputConfigArr[i];
            if (inputConfig instanceof StreamConfig.NetworkInputConfig) {
                StreamConfig.NetworkInputConfig networkInputConfig = (StreamConfig.NetworkInputConfig) inputConfig;
                streamTaskInputArr[i] = StreamTaskNetworkInputFactory.create(checkpointedInputGateArr[networkInputConfig.getInputGateIndex()], networkInputConfig.getTypeSerializer(), iOManager, new StatusWatermarkValve(checkpointedInputGateArr[networkInputConfig.getInputGateIndex()].getNumberOfInputChannels()), i, inflightDataRescalingDescriptor, function, taskInfo);
            } else {
                if (!(inputConfig instanceof StreamConfig.SourceInputConfig)) {
                    throw new UnsupportedOperationException("Unknown input type: " + inputConfig);
                }
                streamTaskInputArr[i] = operatorChain.getSourceTaskInput((StreamConfig.SourceInputConfig) inputConfig);
            }
        }
        InputSelectable inputSelectable = multipleInputStreamOperator instanceof InputSelectable ? (InputSelectable) multipleInputStreamOperator : null;
        StreamConfig.InputConfig[] inputs2 = streamConfig.getInputs(classLoader);
        if (Arrays.stream(inputs2).anyMatch(StreamConfig::requiresSorting)) {
            if (inputSelectable != null) {
                throw new IllegalStateException("The InputSelectable interface is not supported with sorting inputs");
            }
            MultiInputSortingDataInput.SelectableSortingInputs wrapInputs = MultiInputSortingDataInput.wrapInputs(taskInvokable, (StreamTaskInput[]) IntStream.range(0, size).filter(i2 -> {
                return StreamConfig.requiresSorting(inputs2[i2]);
            }).mapToObj(i3 -> {
                return streamTaskInputArr[i3];
            }).toArray(i4 -> {
                return new StreamTaskInput[i4];
            }), (KeySelector[]) IntStream.range(0, size).filter(i5 -> {
                return StreamConfig.requiresSorting(inputs2[i5]);
            }).mapToObj(i6 -> {
                return streamConfig.getStatePartitioner(i6, classLoader);
            }).toArray(i7 -> {
                return new KeySelector[i7];
            }), (TypeSerializer[]) IntStream.range(0, size).filter(i8 -> {
                return StreamConfig.requiresSorting(inputs2[i8]);
            }).mapToObj(i9 -> {
                return streamConfig.getTypeSerializerIn(i9, classLoader);
            }).toArray(i10 -> {
                return new TypeSerializer[i10];
            }), streamConfig.getStateKeySerializer(classLoader), (StreamTaskInput[]) IntStream.range(0, size).filter(i11 -> {
                return !StreamConfig.requiresSorting(inputs2[i11]);
            }).mapToObj(i12 -> {
                return streamTaskInputArr[i12];
            }).toArray(i13 -> {
                return new StreamTaskInput[i13];
            }), memoryManager, iOManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, configuration, classLoader), configuration2, executionConfig);
            StreamTaskInput<?>[] sortedInputs = wrapInputs.getSortedInputs();
            StreamTaskInput<?>[] passThroughInputs = wrapInputs.getPassThroughInputs();
            int i14 = 0;
            int i15 = 0;
            for (int i16 = 0; i16 < streamTaskInputArr.length; i16++) {
                if (StreamConfig.requiresSorting(inputs2[i16])) {
                    streamTaskInputArr[i16] = sortedInputs[i14];
                    i14++;
                } else {
                    streamTaskInputArr[i16] = passThroughInputs[i15];
                    i15++;
                }
            }
            inputSelectable = wrapInputs.getInputSelectable();
        }
        for (int i17 = 0; i17 < size; i17++) {
            StreamConfig.InputConfig inputConfig2 = inputConfigArr[i17];
            if (inputConfig2 instanceof StreamConfig.NetworkInputConfig) {
                streamOneInputProcessorArr[i17] = new StreamOneInputProcessor(streamTaskInputArr[i17], new StreamTaskNetworkOutput(operatorChain.getFinishedOnRestoreInputOrDefault(inputs.get(i17)), watermarkGaugeArr[i17], counter, simpleCounter), operatorChain);
            } else {
                if (!(inputConfig2 instanceof StreamConfig.SourceInputConfig)) {
                    throw new UnsupportedOperationException("Unknown input type: " + inputConfig2);
                }
                OperatorChain.ChainedSource chainedSource = operatorChain.getChainedSource((StreamConfig.SourceInputConfig) inputConfig2);
                streamOneInputProcessorArr[i17] = new StreamOneInputProcessor(streamTaskInputArr[i17], new StreamTaskSourceOutput(chainedSource.getSourceOutput(), watermarkGaugeArr[i17], chainedSource.getSourceTaskInput().getOperator().getSourceMetricGroup()), operatorChain);
            }
        }
        return new StreamMultipleInputProcessor(new MultipleInputSelectionHandler(inputSelectable, size), streamOneInputProcessorArr);
    }
}
