package org.apache.flink.runtime.checkpoint;

import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpointTest.class */
public class PendingCheckpointTest {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap();
    private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpointTest$QueueExecutor.class */
    private static final class QueueExecutor implements Executor {
        private final Queue<Runnable> queue;

        private QueueExecutor() {
            this.queue = new ArrayDeque(4);
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.queue.add(runnable);
        }

        public void runQueuedCommands() {
            Iterator<Runnable> it = this.queue.iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }
    }

    @Test
    public void testCanBeSubsumed() throws Exception {
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(new CheckpointProperties(true, true, false, false, false, false, false), "ignored");
        Assert.assertFalse(createPendingCheckpoint.canBeSubsumed());
        try {
            createPendingCheckpoint.abortSubsumed();
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e) {
        }
        Assert.assertTrue(createPendingCheckpoint(new CheckpointProperties(false, true, false, false, false, false, false), "ignored").canBeSubsumed());
    }

    @Test
    public void testPersistExternally() throws Exception {
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(new CheckpointProperties(false, true, false, false, false, false, false), this.tmpFolder.newFolder().getAbsolutePath());
        createPendingCheckpoint.acknowledgeTask(ATTEMPT_ID, (SubtaskState) null, new CheckpointMetaData(createPendingCheckpoint.getCheckpointId(), createPendingCheckpoint.getCheckpointTimestamp()));
        Assert.assertEquals(0L, r0.listFiles().length);
        createPendingCheckpoint.finalizeCheckpoint();
        Assert.assertEquals(1L, r0.listFiles().length);
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(new CheckpointProperties(false, false, true, true, true, true, true), null);
        createPendingCheckpoint2.acknowledgeTask(ATTEMPT_ID, (SubtaskState) null, new CheckpointMetaData(createPendingCheckpoint2.getCheckpointId(), createPendingCheckpoint2.getCheckpointTimestamp()));
        Assert.assertEquals(1L, r0.listFiles().length);
        createPendingCheckpoint2.finalizeCheckpoint();
        Assert.assertEquals(1L, r0.listFiles().length);
    }

    @Test
    public void testCompletionFuture() throws Exception {
        CheckpointProperties checkpointProperties = new CheckpointProperties(false, true, false, false, false, false, false);
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(checkpointProperties, "ignored");
        Future completionFuture = createPendingCheckpoint.getCompletionFuture();
        Assert.assertFalse(completionFuture.isDone());
        createPendingCheckpoint.abortDeclined();
        Assert.assertTrue(completionFuture.isDone());
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(checkpointProperties, "ignored");
        Future completionFuture2 = createPendingCheckpoint2.getCompletionFuture();
        Assert.assertFalse(completionFuture2.isDone());
        createPendingCheckpoint2.abortExpired();
        Assert.assertTrue(completionFuture2.isDone());
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(checkpointProperties, "ignored");
        Future completionFuture3 = createPendingCheckpoint3.getCompletionFuture();
        Assert.assertFalse(completionFuture3.isDone());
        createPendingCheckpoint3.abortSubsumed();
        Assert.assertTrue(completionFuture3.isDone());
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(checkpointProperties, this.tmpFolder.newFolder().getAbsolutePath());
        Future completionFuture4 = createPendingCheckpoint4.getCompletionFuture();
        Assert.assertFalse(completionFuture4.isDone());
        createPendingCheckpoint4.acknowledgeTask(ATTEMPT_ID, (SubtaskState) null, new CheckpointMetaData(createPendingCheckpoint4.getCheckpointId(), createPendingCheckpoint4.getCheckpointTimestamp()));
        createPendingCheckpoint4.finalizeCheckpoint();
        Assert.assertTrue(completionFuture4.isDone());
        PendingCheckpoint createPendingCheckpoint5 = createPendingCheckpoint(checkpointProperties, "ignored");
        Assert.assertFalse(createPendingCheckpoint5.getCompletionFuture().isDone());
        try {
            createPendingCheckpoint5.finalizeCheckpoint();
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testAbortDiscardsState() throws Exception {
        CheckpointProperties checkpointProperties = new CheckpointProperties(false, true, false, false, false, false, false);
        TaskState taskState = (TaskState) Mockito.mock(TaskState.class);
        QueueExecutor queueExecutor = new QueueExecutor();
        String absolutePath = this.tmpFolder.newFolder().getAbsolutePath();
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(checkpointProperties, absolutePath, queueExecutor);
        setTaskState(createPendingCheckpoint, taskState);
        createPendingCheckpoint.abortDeclined();
        queueExecutor.runQueuedCommands();
        ((TaskState) Mockito.verify(taskState, Mockito.times(1))).discardState();
        Mockito.reset(new TaskState[]{taskState});
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(checkpointProperties, absolutePath, queueExecutor);
        setTaskState(createPendingCheckpoint2, taskState);
        createPendingCheckpoint2.abortError(new Exception("Expected Test Exception"));
        queueExecutor.runQueuedCommands();
        ((TaskState) Mockito.verify(taskState, Mockito.times(1))).discardState();
        Mockito.reset(new TaskState[]{taskState});
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(checkpointProperties, absolutePath, queueExecutor);
        setTaskState(createPendingCheckpoint3, taskState);
        createPendingCheckpoint3.abortExpired();
        queueExecutor.runQueuedCommands();
        ((TaskState) Mockito.verify(taskState, Mockito.times(1))).discardState();
        Mockito.reset(new TaskState[]{taskState});
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(checkpointProperties, absolutePath, queueExecutor);
        setTaskState(createPendingCheckpoint4, taskState);
        createPendingCheckpoint4.abortSubsumed();
        queueExecutor.runQueuedCommands();
        ((TaskState) Mockito.verify(taskState, Mockito.times(1))).discardState();
    }

    @Test
    public void testPendingCheckpointStatsCallbacks() throws Exception {
        PendingCheckpointStats pendingCheckpointStats = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        createPendingCheckpoint.setStatsCallback(pendingCheckpointStats);
        createPendingCheckpoint.acknowledgeTask(ATTEMPT_ID, (SubtaskState) null, new CheckpointMetaData(createPendingCheckpoint.getCheckpointId(), createPendingCheckpoint.getCheckpointTimestamp()));
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats, Mockito.times(1))).reportSubtaskStats((JobVertexID) Matchers.any(JobVertexID.class), (SubtaskStateStats) Matchers.any(SubtaskStateStats.class));
        createPendingCheckpoint.finalizeCheckpoint();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats, Mockito.times(1))).reportCompletedCheckpoint((String) Matchers.any(String.class));
        PendingCheckpointStats pendingCheckpointStats2 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        createPendingCheckpoint2.setStatsCallback(pendingCheckpointStats2);
        createPendingCheckpoint2.abortSubsumed();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats2, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats3 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        createPendingCheckpoint3.setStatsCallback(pendingCheckpointStats3);
        createPendingCheckpoint3.abortDeclined();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats3, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats4 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        createPendingCheckpoint4.setStatsCallback(pendingCheckpointStats4);
        createPendingCheckpoint4.abortError(new Exception("Expected test error"));
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats4, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats5 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint5 = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
        createPendingCheckpoint5.setStatsCallback(pendingCheckpointStats5);
        createPendingCheckpoint5.abortExpired();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats5, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
    }

    private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties checkpointProperties, String str) {
        return createPendingCheckpoint(checkpointProperties, str, Executors.directExecutor());
    }

    private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties checkpointProperties, String str, Executor executor) {
        return new PendingCheckpoint(new JobID(), 0L, 1L, new HashMap(ACK_TASKS), checkpointProperties, str, executor);
    }

    static void setTaskState(PendingCheckpoint pendingCheckpoint, TaskState taskState) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = PendingCheckpoint.class.getDeclaredField("taskStates");
        declaredField.setAccessible(true);
        ((Map) declaredField.get(pendingCheckpoint)).put(new JobVertexID(), taskState);
    }

    /* JADX WARN: Multi-variable type inference failed */
    static {
        ACK_TASKS.put(ATTEMPT_ID, Mockito.mock(ExecutionVertex.class));
    }
}
