package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.class */
public class CheckpointCoordinatorTest {
    private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[0], cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.FINISHED, new ExecutionState[0])}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[0], cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID()), mockExecutionVertex(new ExecutionAttemptID())}, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[0], cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            long longValue = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().get(Long.valueOf(longValue));
            Assert.assertNotNull(pendingCheckpoint);
            Assert.assertEquals(longValue, pendingCheckpoint.getCheckpointId());
            Assert.assertEquals(currentTimeMillis, pendingCheckpoint.getCheckpointTimestamp());
            Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
            Assert.assertEquals(2L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(0L, pendingCheckpoint.getCollectedStates().size());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            TriggerCheckpoint triggerCheckpoint = new TriggerCheckpoint(jobID, executionAttemptID, longValue, currentTimeMillis);
            TriggerCheckpoint triggerCheckpoint2 = new TriggerCheckpoint(jobID, executionAttemptID2, longValue, currentTimeMillis);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(triggerCheckpoint), (ExecutionAttemptID) Mockito.eq(executionAttemptID));
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(triggerCheckpoint2), (ExecutionAttemptID) Mockito.eq(executionAttemptID2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals(1L, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue));
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            Assert.assertFalse(pendingCheckpoint.isFullyAcknowledged());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            NotifyCheckpointComplete notifyCheckpointComplete = new NotifyCheckpointComplete(jobID, executionAttemptID, longValue, currentTimeMillis);
            NotifyCheckpointComplete notifyCheckpointComplete2 = new NotifyCheckpointComplete(jobID, executionAttemptID2, longValue, currentTimeMillis);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(notifyCheckpointComplete), (ExecutionAttemptID) Mockito.eq(executionAttemptID));
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(notifyCheckpointComplete2), (ExecutionAttemptID) Mockito.eq(executionAttemptID2));
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertEquals(currentTimeMillis, completedCheckpoint.getTimestamp());
            Assert.assertEquals(pendingCheckpoint.getCheckpointId(), completedCheckpoint.getCheckpointID());
            Assert.assertTrue(completedCheckpoint.getStates().isEmpty());
            long j = currentTimeMillis + 7;
            checkpointCoordinator.triggerCheckpoint(j);
            long longValue2 = ((Long) ((Map.Entry) checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next()).getKey()).longValue();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, longValue2));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertEquals(j, completedCheckpoint2.getTimestamp());
            Assert.assertEquals(longValue2, completedCheckpoint2.getCheckpointID());
            Assert.assertTrue(completedCheckpoint2.getStates().isEmpty());
            TriggerCheckpoint triggerCheckpoint3 = new TriggerCheckpoint(jobID, executionAttemptID, longValue2, j);
            TriggerCheckpoint triggerCheckpoint4 = new TriggerCheckpoint(jobID, executionAttemptID2, longValue2, j);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(triggerCheckpoint3), (ExecutionAttemptID) Mockito.eq(executionAttemptID));
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(triggerCheckpoint4), (ExecutionAttemptID) Mockito.eq(executionAttemptID2));
            NotifyCheckpointComplete notifyCheckpointComplete3 = new NotifyCheckpointComplete(jobID, executionAttemptID, longValue2, j);
            NotifyCheckpointComplete notifyCheckpointComplete4 = new NotifyCheckpointComplete(jobID, executionAttemptID2, longValue2, j);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(notifyCheckpointComplete3), (ExecutionAttemptID) Mockito.eq(executionAttemptID));
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution((Serializable) Mockito.eq(notifyCheckpointComplete4), (ExecutionAttemptID) Mockito.eq(executionAttemptID2));
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 8617;
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}, new ExecutionVertex[]{mockExecutionVertex6}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID, checkpointId, currentTimeMillis), executionAttemptID);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID2, checkpointId, currentTimeMillis), executionAttemptID2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId));
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(j));
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID, checkpointId2, j), executionAttemptID);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID2, checkpointId2, j), executionAttemptID2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex6, Mockito.times(1))).sendMessageToCurrentExecution(new NotifyCheckpointComplete(jobID, executionAttemptID6, checkpointId, currentTimeMillis), executionAttemptID6);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2));
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex6, Mockito.times(1))).sendMessageToCurrentExecution(new NotifyCheckpointComplete(jobID, executionAttemptID6, checkpointId2, j), executionAttemptID6);
            List successfulCheckpoints = checkpointCoordinator.getSuccessfulCheckpoints();
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) successfulCheckpoints.get(0);
            Assert.assertEquals(checkpointId, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(currentTimeMillis, completedCheckpoint.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertTrue(completedCheckpoint.getStates().isEmpty());
            CompletedCheckpoint completedCheckpoint2 = (CompletedCheckpoint) successfulCheckpoints.get(1);
            Assert.assertEquals(checkpointId2, completedCheckpoint2.getCheckpointID());
            Assert.assertEquals(j, completedCheckpoint2.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint2.getJobId());
            Assert.assertTrue(completedCheckpoint2.getStates().isEmpty());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 1552;
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID5 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID6 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            ExecutionVertex mockExecutionVertex5 = mockExecutionVertex(executionAttemptID5);
            ExecutionVertex mockExecutionVertex6 = mockExecutionVertex(executionAttemptID6);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 600000L, new ExecutionVertex[]{mockExecutionVertex, mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3, mockExecutionVertex4, mockExecutionVertex5}, new ExecutionVertex[]{mockExecutionVertex6}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(10, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pendingCheckpoint.getCheckpointId();
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID, checkpointId, currentTimeMillis), executionAttemptID);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID2, checkpointId, currentTimeMillis), executionAttemptID2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId));
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(j));
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator it = checkpointCoordinator.getPendingCheckpoints().values().iterator();
            PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) it.next();
            PendingCheckpoint pendingCheckpoint3 = pendingCheckpoint == pendingCheckpoint2 ? (PendingCheckpoint) it.next() : pendingCheckpoint2;
            long checkpointId2 = pendingCheckpoint3.getCheckpointId();
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID, checkpointId2, j), executionAttemptID);
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex2, Mockito.times(1))).sendMessageToCurrentExecution(new TriggerCheckpoint(jobID, executionAttemptID2, checkpointId2, j), executionAttemptID2);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId2));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID3, checkpointId));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID4, checkpointId2));
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertTrue(pendingCheckpoint3.isDiscarded());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) checkpointCoordinator.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals(checkpointId2, completedCheckpoint.getCheckpointID());
            Assert.assertEquals(j, completedCheckpoint.getTimestamp());
            Assert.assertEquals(jobID, completedCheckpoint.getJobId());
            Assert.assertTrue(completedCheckpoint.getStates().isEmpty());
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex6, Mockito.times(1))).sendMessageToCurrentExecution(new NotifyCheckpointComplete(jobID, executionAttemptID6, checkpointId2, j), executionAttemptID6);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID5, checkpointId));
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            ExecutionVertex mockExecutionVertex4 = mockExecutionVertex(executionAttemptID4);
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 600000L, 200L, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2, mockExecutionVertex3}, new ExecutionVertex[]{mockExecutionVertex4}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, pendingCheckpoint.getCheckpointId()));
            long currentTimeMillis2 = System.currentTimeMillis() + 5000;
            do {
                Thread.sleep(250L);
                if (pendingCheckpoint.isDiscarded() || checkpointCoordinator.getNumberOfPendingCheckpoints() <= 0) {
                    break;
                }
            } while (System.currentTimeMillis() < currentTimeMillis2);
            Assert.assertTrue("Checkpoint was not canceled by the timeout", pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex4, Mockito.times(0))).sendMessageToCurrentExecution((Serializable) Mockito.any(NotifyCheckpointComplete.class), (ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class));
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void handleMessagesForNonExistingCheckpoints() {
        try {
            JobID jobID = new JobID();
            long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID4 = new ExecutionAttemptID();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 200000L, 200000L, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID2), mockExecutionVertex(executionAttemptID3)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID4)}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertTrue(checkpointCoordinator.triggerCheckpoint(currentTimeMillis));
            long longValue = ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), executionAttemptID2, longValue));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, new ExecutionAttemptID(), longValue));
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPeriodicTriggering() {
        try {
            JobID jobID = new JobID();
            final long currentTimeMillis = System.currentTimeMillis();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            final AtomicInteger atomicInteger = new AtomicInteger();
            ((ExecutionVertex) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.1
                private long lastId = -1;
                private long lastTs = -1;

                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m16answer(InvocationOnMock invocationOnMock) throws Throwable {
                    TriggerCheckpoint triggerCheckpoint = (TriggerCheckpoint) invocationOnMock.getArguments()[0];
                    long checkpointId = triggerCheckpoint.getCheckpointId();
                    long timestamp = triggerCheckpoint.getTimestamp();
                    Assert.assertTrue(checkpointId > this.lastId);
                    Assert.assertTrue(timestamp >= this.lastTs);
                    Assert.assertTrue(timestamp >= currentTimeMillis);
                    this.lastId = checkpointId;
                    this.lastTs = timestamp;
                    atomicInteger.incrementAndGet();
                    return null;
                }
            }).when(mockExecutionVertex)).sendMessageToCurrentExecution((Serializable) Mockito.any(Serializable.class), (ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class));
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis2 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (currentTimeMillis2 <= System.currentTimeMillis()) {
                    break;
                }
            } while (atomicInteger.get() < 5);
            Assert.assertTrue(atomicInteger.get() >= 5);
            checkpointCoordinator.stopCheckpointScheduler();
            int i = atomicInteger.get();
            Thread.sleep(400L);
            Assert.assertTrue(i == atomicInteger.get() || i + 1 == atomicInteger.get());
            atomicInteger.set(0);
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis3 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (currentTimeMillis3 <= System.currentTimeMillis()) {
                    break;
                }
            } while (atomicInteger.get() < 5);
            Assert.assertTrue(atomicInteger.get() >= 5);
            checkpointCoordinator.stopCheckpointScheduler();
            int i2 = atomicInteger.get();
            Thread.sleep(400L);
            Assert.assertTrue(i2 == atomicInteger.get() || i2 + 1 == atomicInteger.get());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempts1() {
        testMaxConcurrentAttemps(1);
    }

    @Test
    public void testMaxConcurrentAttempts2() {
        testMaxConcurrentAttemps(2);
    }

    @Test
    public void testMaxConcurrentAttempts5() {
        testMaxConcurrentAttemps(5);
    }

    private void testMaxConcurrentAttemps(int i) {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            final AtomicInteger atomicInteger = new AtomicInteger();
            ((ExecutionVertex) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m17answer(InvocationOnMock invocationOnMock) throws Throwable {
                    atomicInteger.incrementAndGet();
                    return null;
                }
            }).when(mockExecutionVertex)).sendMessageToCurrentExecution((Serializable) Mockito.any(Serializable.class), (ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class));
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, i, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, new DisabledCheckpointStatsTracker());
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 60000;
            long j2 = currentTimeMillis + 100;
            while (true) {
                Thread.sleep(20L);
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 < j2 || (atomicInteger.get() < i && currentTimeMillis2 < j)) {
                }
            }
            Assert.assertEquals(i, atomicInteger.get());
            ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(i))).sendMessageToCurrentExecution((Serializable) Mockito.any(TriggerCheckpoint.class), (ExecutionAttemptID) Mockito.eq(executionAttemptID));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 1L));
            long currentTimeMillis3 = System.currentTimeMillis();
            long j3 = currentTimeMillis3 + 60000;
            do {
                Thread.sleep(20L);
                if (atomicInteger.get() >= i + 1) {
                    break;
                }
            } while (currentTimeMillis3 < j3);
            Assert.assertEquals(i + 1, atomicInteger.get());
            Thread.sleep(200L);
            Assert.assertEquals(i + 1, atomicInteger.get());
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testMaxConcurrentAttempsWithSubsumption() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, 2, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID)}, new ExecutionVertex[]{mockExecutionVertex(executionAttemptID2)}, new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID())}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, new DisabledCheckpointStatsTracker());
            checkpointCoordinator.startCheckpointScheduler();
            long currentTimeMillis = System.currentTimeMillis();
            long j = currentTimeMillis + 60000;
            long j2 = currentTimeMillis + 100;
            while (true) {
                Thread.sleep(20L);
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 < j2 || (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2 && currentTimeMillis2 < j)) {
                }
            }
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(1L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(2L));
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID2, 2L));
            long currentTimeMillis3 = System.currentTimeMillis() + 60000;
            do {
                Thread.sleep(20L);
                if (checkpointCoordinator.getPendingCheckpoints().get(4L) != null) {
                    break;
                }
            } while (System.currentTimeMillis() < currentTimeMillis3);
            Assert.assertEquals(2L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(3L));
            Assert.assertNotNull(checkpointCoordinator.getPendingCheckpoints().get(4L));
            checkpointCoordinator.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testPeriodicSchedulingWithInactiveTasks() {
        try {
            JobID jobID = new JobID();
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID executionAttemptID3 = new ExecutionAttemptID();
            ExecutionVertex mockExecutionVertex = mockExecutionVertex(executionAttemptID);
            ExecutionVertex mockExecutionVertex2 = mockExecutionVertex(executionAttemptID2);
            ExecutionVertex mockExecutionVertex3 = mockExecutionVertex(executionAttemptID3);
            final AtomicReference atomicReference = new AtomicReference(ExecutionState.CREATED);
            Mockito.when(mockExecutionVertex.getCurrentExecutionAttempt().getState()).thenAnswer(new Answer<ExecutionState>() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.3
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public ExecutionState m18answer(InvocationOnMock invocationOnMock) {
                    return (ExecutionState) atomicReference.get();
                }
            });
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, 10L, 200000L, 0L, 2, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex2}, new ExecutionVertex[]{mockExecutionVertex3}, cl, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE, new DisabledCheckpointStatsTracker());
            checkpointCoordinator.startCheckpointScheduler();
            Thread.sleep(200L);
            Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
            atomicReference.set(ExecutionState.RUNNING);
            long currentTimeMillis = System.currentTimeMillis() + 10000;
            do {
                Thread.sleep(20L);
                if (System.currentTimeMillis() >= currentTimeMillis) {
                    break;
                }
            } while (checkpointCoordinator.getNumberOfPendingCheckpoints() == 0);
            Assert.assertTrue(checkpointCoordinator.getNumberOfPendingCheckpoints() > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionAttemptID) {
        return mockExecutionVertex(executionAttemptID, ExecutionState.RUNNING, new ExecutionState[0]);
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionAttemptID, ExecutionState executionState, ExecutionState... executionStateArr) {
        Execution execution = (Execution) Mockito.mock(Execution.class);
        Mockito.when(execution.getAttemptId()).thenReturn(executionAttemptID);
        Mockito.when(execution.getState()).thenReturn(executionState, executionStateArr);
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Mockito.when(executionVertex.getJobvertexId()).thenReturn(new JobVertexID());
        Mockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        return executionVertex;
    }
}
