/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CheckpointCoordinatorExternalizedCheckpointsTest {
    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();

    @Test
    public void testTriggerAndConfirmSimpleExternalizedCheckpoint() throws Exception {
        JobID jid = new JobID();
        ExternalizedCheckpointSettings externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints((boolean)false);
        File checkpointDir = this.tmp.newFolder();
        ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
        ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
        ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
        ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
        HashMap<JobVertexID, ExecutionJobVertex> jobVertices = new HashMap<JobVertexID, ExecutionJobVertex>();
        jobVertices.put(vertex1.getJobvertexId(), vertex1.getJobVertex());
        jobVertices.put(vertex2.getJobvertexId(), vertex2.getJobVertex());
        CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, 600000L, 0L, Integer.MAX_VALUE, externalizedCheckpointSettings, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), checkpointDir.getAbsolutePath(), Executors.directExecutor());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
        long timestamp1 = System.currentTimeMillis();
        coord.triggerCheckpoint(timestamp1, false);
        long checkpointId1 = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId1));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId1));
        CompletedCheckpoint latest = coord.getCheckpointStore().getLatestCheckpoint();
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpoint(latest, jid, checkpointId1, timestamp1);
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
        long timestamp2 = System.currentTimeMillis() + 7L;
        coord.triggerCheckpoint(timestamp2, false);
        long checkpointId2 = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId2));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId2));
        latest = coord.getCheckpointStore().getLatestCheckpoint();
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpoint(latest, jid, checkpointId2, timestamp2);
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
        long timestamp3 = System.currentTimeMillis() + 146L;
        coord.triggerCheckpoint(timestamp3, false);
        long checkpointId3 = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId3));
        coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId3));
        latest = coord.getCheckpointStore().getLatestCheckpoint();
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpoint(latest, jid, checkpointId3, timestamp3);
        CheckpointCoordinatorExternalizedCheckpointsTest.verifyExternalizedCheckpointRestore(latest, jobVertices, vertex1, vertex2);
        coord.shutdown(JobStatus.FINISHED);
    }

    private static void verifyExternalizedCheckpoint(CompletedCheckpoint checkpoint, JobID jid, long checkpointId, long timestamp) {
        Assert.assertEquals((Object)jid, (Object)checkpoint.getJobId());
        Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointID());
        Assert.assertEquals((long)timestamp, (long)checkpoint.getTimestamp());
        Assert.assertNotNull((Object)checkpoint.getExternalPointer());
        Assert.assertNotNull((Object)checkpoint.getExternalizedMetadata());
        FileStateHandle fsHandle = (FileStateHandle)checkpoint.getExternalizedMetadata();
        Assert.assertTrue((boolean)new File(fsHandle.getFilePath().getPath()).exists());
    }

    private static void verifyExternalizedCheckpointRestore(CompletedCheckpoint checkpoint, Map<JobVertexID, ExecutionJobVertex> jobVertices, ExecutionVertex ... vertices) throws IOException {
        CompletedCheckpoint loaded = SavepointLoader.loadAndValidateSavepoint((JobID)checkpoint.getJobId(), jobVertices, (String)checkpoint.getExternalPointer(), (ClassLoader)Thread.currentThread().getContextClassLoader(), (boolean)false);
        for (ExecutionVertex vertex : vertices) {
            for (OperatorID operatorID : vertex.getJobVertex().getOperatorIDs()) {
                Assert.assertEquals(checkpoint.getOperatorStates().get(operatorID), loaded.getOperatorStates().get(operatorID));
            }
        }
    }
}

