package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.BlockingQueue;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest.class */
public class SourceExternalCheckpointTriggerTest {
    private static OneShotLatch ready = new OneShotLatch();
    private static MultiShotLatch sync = new MultiShotLatch();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceExternalCheckpointTriggerTest$ExternalCheckpointsSource.class */
    private static class ExternalCheckpointsSource implements ParallelSourceFunction<Long>, ExternallyInducedSource<Long, Object> {
        private final long numEvents;
        private final long checkpointFrequency;
        private ExternallyInducedSource.CheckpointTrigger trigger;

        ExternalCheckpointsSource(long j, long j2) {
            this.numEvents = j;
            this.checkpointFrequency = j2;
        }

        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource$CheckpointTrigger, long] */
        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            SourceExternalCheckpointTriggerTest.ready.trigger();
            long j = 1;
            long j2 = 1;
            while (true) {
                long j3 = j2;
                if (j3 > this.numEvents) {
                    return;
                }
                SourceExternalCheckpointTriggerTest.sync.await();
                sourceContext.collect(Long.valueOf(j3));
                if (j3 % this.checkpointFrequency == 0) {
                    ?? r0 = this.trigger;
                    j++;
                    r0.triggerCheckpoint((long) r0);
                }
                j2 = j3 + 1;
            }
        }

        public void cancel() {
        }

        public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
            this.trigger = checkpointTrigger;
        }

        public MasterTriggerRestoreHook<Object> createMasterTriggerRestoreHook() {
            throw new UnsupportedOperationException("not implemented");
        }
    }

    @Before
    public void resetLatches() {
        ready = new OneShotLatch();
        sync = new MultiShotLatch();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testCheckpointsTriggeredBySource() throws Exception {
        StreamTaskTestHarness streamTaskTestHarness = new StreamTaskTestHarness(SourceStreamTask::new, BasicTypeInfo.LONG_TYPE_INFO);
        streamTaskTestHarness.setupOutputForSingletonOperatorChain();
        streamTaskTestHarness.getExecutionConfig().setLatencyTrackingInterval(-1L);
        ExternalCheckpointsSource externalCheckpointsSource = new ExternalCheckpointsSource(10L, 3L);
        StreamConfig streamConfig = streamTaskTestHarness.getStreamConfig();
        streamConfig.setStreamOperator(new StreamSource(externalCheckpointsSource));
        streamConfig.setOperatorID(new OperatorID());
        streamTaskTestHarness.invoke();
        StreamTask mo151getTask = streamTaskTestHarness.mo151getTask();
        ready.await();
        Assert.assertTrue(((Boolean) mo151getTask.triggerCheckpointAsync(new CheckpointMetaData(32L, 829L), CheckpointOptions.forCheckpointWithDefaultLocation()).get()).booleanValue());
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 1L);
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 2L);
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 3L);
        verifyCheckpointBarrier(streamTaskTestHarness.getOutput(), 1L);
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 4L);
        Assert.assertTrue(((Boolean) mo151getTask.triggerCheckpointAsync(new CheckpointMetaData(34L, 900L), CheckpointOptions.forCheckpointWithDefaultLocation()).get()).booleanValue());
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 5L);
        sync.trigger();
        verifyNextElement(streamTaskTestHarness.getOutput(), 6L);
        verifyCheckpointBarrier(streamTaskTestHarness.getOutput(), 2L);
        long j = 3;
        for (long j2 = 7; j2 <= 10; j2++) {
            sync.trigger();
            verifyNextElement(streamTaskTestHarness.getOutput(), j2);
            if (j2 % 3 == 0) {
                streamTaskTestHarness.getOutput();
                long j3 = j;
                j = j3 + 1;
                verifyCheckpointBarrier(this, j3);
            }
        }
    }

    private void verifyNextElement(BlockingQueue<Object> blockingQueue, long j) throws InterruptedException {
        Object take = blockingQueue.take();
        Assert.assertTrue("next element is not an event", take instanceof StreamRecord);
        Assert.assertEquals("wrong event", j, ((Long) ((StreamRecord) take).getValue()).longValue());
    }

    private void verifyCheckpointBarrier(BlockingQueue<Object> blockingQueue, long j) throws InterruptedException {
        Object take = blockingQueue.take();
        Assert.assertTrue("next element is not a checkpoint barrier", take instanceof CheckpointBarrier);
        Assert.assertEquals("wrong checkpoint id", j, ((CheckpointBarrier) take).getId());
    }
}
