/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.TestUtils;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.shuffle.ShuffleTestUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class DefaultExecutionGraphFactoryTest
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

    @Test
    public void testRestoringModifiedJobFromSavepointFails() throws Exception {
        JobGraph jobGraphWithNewOperator = this.createJobGraphWithSavepoint(false, 42L);
        ExecutionGraphFactory executionGraphFactory = this.createExecutionGraphFactory();
        try {
            executionGraphFactory.createAndRestoreExecutionGraph(jobGraphWithNewOperator, (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1), new CheckpointsCleaner(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, (VertexAttemptNumberStore)new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore((JobGraph)jobGraphWithNewOperator), (execution, previousState, newState) -> {}, this.log);
            Assert.fail((String)"Expected ExecutionGraph creation to fail because of non restored state.");
        }
        catch (Exception e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsMessage((String)"Failed to rollback to checkpoint/savepoint"));
        }
    }

    @Test
    public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() throws Exception {
        long savepointId = 42L;
        JobGraph jobGraphWithNewOperator = this.createJobGraphWithSavepoint(true, 42L);
        ExecutionGraphFactory executionGraphFactory = this.createExecutionGraphFactory();
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
        executionGraphFactory.createAndRestoreExecutionGraph(jobGraphWithNewOperator, (CompletedCheckpointStore)completedCheckpointStore, new CheckpointsCleaner(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, (VertexAttemptNumberStore)new DefaultVertexAttemptNumberStore(), SchedulerBase.computeVertexParallelismStore((JobGraph)jobGraphWithNewOperator), (execution, previousState, newState) -> {}, this.log);
        CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint();
        MatcherAssert.assertThat((Object)savepoint, (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)savepoint.getCheckpointID(), (Matcher)Matchers.is((Object)42L));
    }

    @Nonnull
    private ExecutionGraphFactory createExecutionGraphFactory() {
        DefaultExecutionGraphFactory executionGraphFactory = new DefaultExecutionGraphFactory(new Configuration(), ClassLoader.getSystemClassLoader(), (ExecutionDeploymentTracker)new DefaultExecutionDeploymentTracker(), TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), Time.milliseconds((long)0L), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), (BlobWriter)VoidBlobWriter.getInstance(), ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER, (JobMasterPartitionTracker)NoOpJobMasterPartitionTracker.INSTANCE);
        return executionGraphFactory;
    }

    @Nonnull
    private JobGraph createJobGraphWithSavepoint(boolean allowNonRestoredState, long savepointId) throws IOException {
        OperatorID operatorID = new OperatorID();
        File savepointFile = TestUtils.createSavepointWithOperatorState(TEMPORARY_FOLDER.newFile(), savepointId, operatorID);
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath((String)savepointFile.getAbsolutePath(), (boolean)allowNonRestoredState);
        JobVertex jobVertex = new JobVertex("New operator");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(1);
        return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, jobVertex);
    }
}

