package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.testkit.TestActorRef;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase.class */
public class JobManagerSubmittedJobGraphsRecoveryITCase extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final File FileStateBackendBasePath;

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerSubmittedJobGraphsRecoveryITCase$RecordingTestClient.class */
    private static class RecordingTestClient extends UntypedActor {
        private final Queue<Object> messages = new ConcurrentLinkedQueue();
        private CountDownLatch jobResultLatch = new CountDownLatch(1);

        private RecordingTestClient() {
        }

        public void onReceive(Object obj) throws Exception {
            if (obj instanceof JobManagerMessages.LeaderSessionMessage) {
                obj = ((JobManagerMessages.LeaderSessionMessage) obj).message();
            }
            this.messages.add(obj);
            if ((obj instanceof JobManagerMessages.JobResultFailure) || (obj instanceof JobManagerMessages.JobResultSuccess)) {
                this.jobResultLatch.countDown();
            }
        }

        public Queue<Object> getMessages() {
            return this.messages;
        }

        public void awaitJobResult(long j) throws InterruptedException {
            this.jobResultLatch.await(j, TimeUnit.MILLISECONDS);
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory(FileStateBackendBasePath);
        }
    }

    @Before
    public void cleanUp() throws Exception {
        if (FileStateBackendBasePath != null) {
            FileUtils.cleanDirectory(FileStateBackendBasePath);
        }
        ZooKeeper.deleteAll();
    }

    @Test
    public void testJobManagerCleanUp() throws Exception {
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
        createZooKeeperRecoveryModeConfig.setInteger("local.number-jobmanager", 1);
        createZooKeeperRecoveryModeConfig.setInteger("local.number-taskmanager", 1);
        TestingCluster testingCluster = new TestingCluster(createZooKeeperRecoveryModeConfig, false, false);
        try {
            Deadline fromNow = TestTimeOut.fromNow();
            testingCluster.start(true);
            JobGraph createBlockingJobGraph = createBlockingJobGraph();
            ActorGateway leaderGateway = testingCluster.getLeaderGateway(fromNow.timeLeft());
            leaderGateway.tell(new JobManagerMessages.SubmitJob(createBlockingJobGraph, ListeningBehaviour.DETACHED));
            JobManagerActorTestUtils.waitForJobStatus(createBlockingJobGraph.getJobID(), JobStatus.RUNNING, leaderGateway, fromNow.timeLeft());
            testingCluster.shutdown();
            verifyCleanRecoveryState(createZooKeeperRecoveryModeConfig);
        } catch (Throwable th) {
            testingCluster.shutdown();
            throw th;
        }
    }

    @Test
    public void testSubmitJobToNonLeader() throws Exception {
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
        createZooKeeperRecoveryModeConfig.setInteger("local.number-jobmanager", 2);
        createZooKeeperRecoveryModeConfig.setInteger("local.number-taskmanager", 1);
        TestingCluster testingCluster = new TestingCluster(createZooKeeperRecoveryModeConfig, false, false);
        try {
            Deadline fromNow = TestTimeOut.fromNow();
            testingCluster.start(true);
            JobGraph createBlockingJobGraph = createBlockingJobGraph();
            List jobManagersAsJava = testingCluster.getJobManagersAsJava();
            ActorGateway leaderGateway = testingCluster.getLeaderGateway(fromNow.timeLeft());
            AkkaActorGateway akkaActorGateway = ((ActorRef) jobManagersAsJava.get(0)).equals(leaderGateway.actor()) ? new AkkaActorGateway((ActorRef) jobManagersAsJava.get(1), (UUID) null) : new AkkaActorGateway((ActorRef) jobManagersAsJava.get(0), (UUID) null);
            akkaActorGateway.tell(new JobManagerMessages.SubmitJob(createBlockingJobGraph, ListeningBehaviour.DETACHED));
            JobManagerActorTestUtils.waitForJobStatus(createBlockingJobGraph.getJobID(), JobStatus.RUNNING, leaderGateway, fromNow.timeLeft());
            this.log.info("Wait that the non-leader removes the submitted job.");
            boolean z = false;
            while (!z && fromNow.hasTimeLeft()) {
                JobManagerMessages.CurrentJobStatus requestJobStatus = JobManagerActorTestUtils.requestJobStatus(createBlockingJobGraph.getJobID(), akkaActorGateway, fromNow.timeLeft());
                if (requestJobStatus instanceof JobManagerMessages.JobNotFound) {
                    z = true;
                } else {
                    this.log.info(requestJobStatus.status().toString());
                    Thread.sleep(100L);
                }
            }
            if (!z) {
                Assert.fail("Non-leading JM was still holding reference to the job graph.");
            }
            verifyCleanRecoveryState(createZooKeeperRecoveryModeConfig);
        } finally {
            testingCluster.shutdown();
        }
    }

    @Test
    public void testClientNonDetachedListeningBehaviour() throws Exception {
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
        ActorSystem actorSystem = null;
        JobManagerProcess[] jobManagerProcessArr = new JobManagerProcess[2];
        LeaderRetrievalService leaderRetrievalService = null;
        ActorSystem actorSystem2 = null;
        try {
            try {
                Deadline fromNow = TestTimeOut.fromNow();
                actorSystem = AkkaUtils.createActorSystem(new Configuration(), new Some(new Tuple2("localhost", 0)));
                jobManagerProcessArr[0] = new JobManagerProcess(0, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[1] = new JobManagerProcess(1, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[0].createAndStart();
                jobManagerProcessArr[1].createAndStart();
                TestingListener testingListener = new TestingListener();
                leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperRecoveryModeConfig);
                leaderRetrievalService.start(testingListener);
                actorSystem2 = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
                TaskManager.startTaskManagerComponentsAndActor(createZooKeeperRecoveryModeConfig, actorSystem2, "localhost", Option.empty(), Option.empty(), false, TaskManager.class);
                TestActorRef create = TestActorRef.create(actorSystem, Props.create(RecordingTestClient.class, new Object[0]));
                JobGraph createBlockingJobGraph = createBlockingJobGraph();
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                String address = testingListener.getAddress();
                UUID leaderSessionID = testingListener.getLeaderSessionID();
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(create, leaderSessionID);
                AkkaActorGateway akkaActorGateway2 = new AkkaActorGateway(AkkaUtils.getActorRef(address, actorSystem, fromNow.timeLeft()), leaderSessionID);
                akkaActorGateway2.tell(new JobManagerMessages.SubmitJob(createBlockingJobGraph, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), akkaActorGateway);
                JobManagerActorTestUtils.waitForJobStatus(createBlockingJobGraph.getJobID(), JobStatus.RUNNING, akkaActorGateway2, fromNow.timeLeft());
                (jobManagerProcessArr[0].getJobManagerAkkaURL().equals(testingListener.getAddress()) ? jobManagerProcessArr[0] : jobManagerProcessArr[1]).destroy();
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                AkkaActorGateway akkaActorGateway3 = new AkkaActorGateway(AkkaUtils.getActorRef(testingListener.getAddress(), actorSystem, fromNow.timeLeft()), testingListener.getLeaderSessionID());
                JobManagerActorTestUtils.waitForJobStatus(createBlockingJobGraph.getJobID(), JobStatus.RUNNING, akkaActorGateway3, fromNow.timeLeft());
                akkaActorGateway3.tell(new JobManagerMessages.CancelJob(createBlockingJobGraph.getJobID()));
                create.underlyingActor().awaitJobResult(fromNow.timeLeft().toMillis());
                int i = 0;
                Iterator<Object> it = create.underlyingActor().getMessages().iterator();
                while (it.hasNext()) {
                    if (it.next() instanceof JobManagerMessages.JobSubmitSuccess) {
                        i++;
                    }
                }
                Assert.assertEquals(2L, i);
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].destroy();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].destroy();
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                if (actorSystem2 != null) {
                    actorSystem2.shutdown();
                }
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
            } catch (Throwable th) {
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].printProcessLog();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].printProcessLog();
                }
                th.printStackTrace();
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].destroy();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].destroy();
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                if (actorSystem2 != null) {
                    actorSystem2.shutdown();
                }
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
            }
        } catch (Throwable th2) {
            if (jobManagerProcessArr[0] != null) {
                jobManagerProcessArr[0].destroy();
            }
            if (jobManagerProcessArr[1] != null) {
                jobManagerProcessArr[1].destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (actorSystem2 != null) {
                actorSystem2.shutdown();
            }
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            throw th2;
        }
    }

    private static JobGraph createBlockingJobGraph() {
        JobGraph jobGraph = new JobGraph("Blocking program");
        JobVertex jobVertex = new JobVertex("Blocking Vertex");
        jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
        jobGraph.addVertex(jobVertex);
        return jobGraph;
    }

    private static void verifyCleanRecoveryState(Configuration configuration) throws Exception {
        Collection listFiles = FileUtils.listFiles(FileStateBackendBasePath, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        if (!listFiles.isEmpty()) {
            Assert.fail("File state backend is not clean: " + listFiles);
        }
        String string = configuration.getString("recovery.zookeeper.path.jobgraphs", "/jobgraphs");
        Stat stat = (Stat) ZooKeeper.getClient().checkExists().forPath(string);
        if (stat.getCversion() == 0) {
            Assert.fail("ZooKeeper state for '" + string + "' has not been modified during this test. What are you testing?");
        }
        if (stat.getNumChildren() != 0) {
            Assert.fail("ZooKeeper path '" + string + "' is not clean: " + ZooKeeper.getClient().getChildren().forPath(string));
        }
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
        } catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
    }
}
