package org.apache.flink.streaming.runtime.tasks;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.shaded.curator5.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInputFactory;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.InputProcessorUtil;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.class */
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {

    @Nullable
    private CheckpointBarrierHandler checkpointBarrierHandler;
    private final WatermarkGauge inputWatermarkGauge;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OneInputStreamTask$StreamTaskNetworkOutput.class */
    public static class StreamTaskNetworkOutput<IN> implements PushingAsyncDataInput.DataOutput<IN> {
        private final Input<IN> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;

        private StreamTaskNetworkOutput(Input<IN> input, WatermarkGauge watermarkGauge, Counter counter) {
            this.operator = (Input) Preconditions.checkNotNull(input);
            this.watermarkGauge = (WatermarkGauge) Preconditions.checkNotNull(watermarkGauge);
            this.numRecordsIn = (Counter) Preconditions.checkNotNull(counter);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<IN> streamRecord) throws Exception {
            this.numRecordsIn.inc();
            this.operator.setKeyContextElement(streamRecord);
            this.operator.processElement(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) throws Exception {
            this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            this.operator.processWatermark(watermark);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
            this.operator.processWatermarkStatus(watermarkStatus);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            this.operator.processLatencyMarker(latencyMarker);
        }
    }

    public OneInputStreamTask(Environment environment) throws Exception {
        super(environment);
        this.inputWatermarkGauge = new WatermarkGauge();
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment environment, @Nullable TimerService timerService) throws Exception {
        super(environment, timerService);
        this.inputWatermarkGauge = new WatermarkGauge();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamConfig configuration = getConfiguration();
        if (configuration.getNumberOfNetworkInputs() > 0) {
            CheckpointedInputGate createCheckpointedInputGate = createCheckpointedInputGate();
            Counter counter = setupNumRecordsInCounter(this.mainOperator);
            PushingAsyncDataInput.DataOutput<IN> createDataOutput = createDataOutput(counter);
            StreamTaskInput<IN> createTaskInput = createTaskInput(createCheckpointedInputGate);
            if (StreamConfig.requiresSorting(configuration.getInputs(getUserCodeClassLoader())[0])) {
                Preconditions.checkState(!configuration.isCheckpointingEnabled(), "Checkpointing is not allowed with sorted inputs.");
                createTaskInput = wrapWithSorted(createTaskInput);
            }
            getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(counter);
            this.inputProcessor = new StreamOneInputProcessor(createTaskInput, createDataOutput, this.operatorChain);
        }
        ((OneInputStreamOperator) this.mainOperator).getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge);
        TaskMetricGroup metricGroup = getEnvironment().getMetricGroup();
        WatermarkGauge watermarkGauge = this.inputWatermarkGauge;
        watermarkGauge.getClass();
        metricGroup.gauge("currentInputWatermark", watermarkGauge::m122getValue);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
        return Optional.ofNullable(this.checkpointBarrierHandler);
    }

    private StreamTaskInput<IN> wrapWithSorted(StreamTaskInput<IN> streamTaskInput) {
        ClassLoader userCodeClassLoader = getUserCodeClassLoader();
        return new SortingDataInput(streamTaskInput, this.configuration.getTypeSerializerIn(streamTaskInput.getInputIndex(), userCodeClassLoader), this.configuration.getStateKeySerializer(userCodeClassLoader), this.configuration.getStatePartitioner(streamTaskInput.getInputIndex(), userCodeClassLoader), getEnvironment().getMemoryManager(), getEnvironment().getIOManager(), getExecutionConfig().isObjectReuseEnabled(), this.configuration.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.OPERATOR, getEnvironment().getTaskConfiguration(), userCodeClassLoader), getEnvironment().getTaskManagerInfo().getConfiguration(), this, getExecutionConfig());
    }

    private CheckpointedInputGate createCheckpointedInputGate() {
        IndexedInputGate[] allInputGates = getEnvironment().getAllInputGates();
        this.checkpointBarrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(this, this.configuration, getCheckpointCoordinator(), getTaskNameWithSubtaskAndId(), new List[]{Arrays.asList(allInputGates)}, Collections.emptyList(), this.mainMailboxExecutor, this.systemTimerService);
        return (CheckpointedInputGate) Iterables.getOnlyElement(Arrays.asList(InputProcessorUtil.createCheckpointedMultipleInputGate(this.mainMailboxExecutor, new List[]{Arrays.asList(allInputGates)}, getEnvironment().getMetricGroup().getIOMetricGroup(), this.checkpointBarrierHandler, this.configuration)));
    }

    private PushingAsyncDataInput.DataOutput<IN> createDataOutput(Counter counter) {
        return new StreamTaskNetworkOutput(this.operatorChain.getFinishedOnRestoreInputOrDefault((Input) this.mainOperator), this.inputWatermarkGauge, counter);
    }

    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate checkpointedInputGate) {
        return StreamTaskNetworkInputFactory.create(checkpointedInputGate, this.configuration.getTypeSerializerIn1(getUserCodeClassLoader()), getEnvironment().getIOManager(), new StatusWatermarkValve(checkpointedInputGate.getNumberOfInputChannels()), 0, getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), num -> {
            return this.configuration.getInPhysicalEdges(getUserCodeClassLoader()).get(num.intValue()).getPartitioner();
        }, getEnvironment().getTaskInfo());
    }
}
