package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.japi.pf.FI;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.testkit.CallingThreadDispatcher;
import akka.testkit.JavaTestKit;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Int;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.class */
public class JobManagerHARecoveryTest {
    private static ActorSystem system;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$BlockingInvokable.class */
    public static class BlockingInvokable extends AbstractInvokable {
        private static boolean blocking = true;
        private static Object lock = new Object();

        public void invoke() throws Exception {
            while (blocking) {
                synchronized (lock) {
                    lock.wait();
                }
            }
        }

        public static void unblock() {
            blocking = false;
            synchronized (lock) {
                lock.notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$BlockingStatefulInvokable.class */
    public static class BlockingStatefulInvokable extends BlockingInvokable implements StatefulTask {
        private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
        private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
        private static volatile long[] recoveredStates = new long[0];
        private int completedCheckpoints = 0;

        public void setInitialState(TaskStateHandles taskStateHandles) throws Exception {
            int indexInSubtaskGroup = getIndexInSubtaskGroup();
            if (indexInSubtaskGroup < recoveredStates.length) {
                FSDataInputStream openInputStream = taskStateHandles.getLegacyOperatorState().get(0).openInputStream();
                Throwable th = null;
                try {
                    recoveredStates[indexInSubtaskGroup] = ((Long) InstantiationUtil.deserializeObject(openInputStream, getUserCodeClassLoader())).longValue();
                    if (openInputStream != null) {
                        if (0 == 0) {
                            openInputStream.close();
                            return;
                        }
                        try {
                            openInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (openInputStream != null) {
                        if (0 != 0) {
                            try {
                                openInputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            openInputStream.close();
                        }
                    }
                    throw th3;
                }
            }
        }

        public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception {
            getEnvironment().acknowledgeCheckpoint(new CheckpointMetaData(checkpointMetaData.getCheckpointId(), -1L, 0L, 0L, 0L, 0L), new SubtaskState(new ChainedStateHandle(Collections.singletonList(new TestByteStreamStateHandleDeepCompare(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(Long.valueOf(checkpointMetaData.getCheckpointId()))))), (ChainedStateHandle) null, (ChainedStateHandle) null, (KeyGroupsStateHandle) null, (KeyGroupsStateHandle) null));
            return true;
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData) throws Exception {
            throw new UnsupportedOperationException("should not be called!");
        }

        public void abortCheckpointOnBarrier(long j, Throwable th) {
            throw new UnsupportedOperationException("should not be called!");
        }

        public void notifyCheckpointComplete(long j) {
            int i = this.completedCheckpoints;
            this.completedCheckpoints = i + 1;
            if (i > NUM_CHECKPOINTS_TO_COMPLETE) {
                completedCheckpointsLatch.countDown();
            }
        }

        public static void initializeStaticHelpers(int i) {
            completedCheckpointsLatch = new CountDownLatch(i);
            recoveredStates = new long[i];
        }

        public static void awaitCompletedCheckpoints() throws InterruptedException {
            completedCheckpointsLatch.await();
        }

        public static long[] getRecoveredStates() {
            return recoveredStates;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$MyCheckpointRecoveryFactory.class */
    static class MyCheckpointRecoveryFactory implements CheckpointRecoveryFactory {
        private final CompletedCheckpointStore store;
        private final CheckpointIDCounter counter;

        public MyCheckpointRecoveryFactory(CompletedCheckpointStore completedCheckpointStore, CheckpointIDCounter checkpointIDCounter) {
            this.store = completedCheckpointStore;
            this.counter = checkpointIDCounter;
        }

        public void start() {
        }

        public void stop() {
        }

        public CompletedCheckpointStore createCheckpointStore(JobID jobID, ClassLoader classLoader) throws Exception {
            return this.store;
        }

        public CheckpointIDCounter createCheckpointIDCounter(JobID jobID) throws Exception {
            return this.counter;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$MyCheckpointStore.class */
    static class MyCheckpointStore implements CompletedCheckpointStore {
        private final ArrayDeque<CompletedCheckpoint> checkpoints = new ArrayDeque<>(2);
        private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);

        MyCheckpointStore() {
        }

        public void recover() throws Exception {
            this.checkpoints.addAll(this.suspended);
            this.suspended.clear();
        }

        public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
            this.checkpoints.addLast(completedCheckpoint);
            if (this.checkpoints.size() > 1) {
                this.checkpoints.removeFirst().subsume();
            }
        }

        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            if (this.checkpoints.isEmpty()) {
                return null;
            }
            return this.checkpoints.getLast();
        }

        public void shutdown(JobStatus jobStatus) throws Exception {
            if (jobStatus.isGloballyTerminalState()) {
                this.checkpoints.clear();
                this.suspended.clear();
            } else {
                this.suspended.addAll(this.checkpoints);
                this.checkpoints.clear();
            }
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return new ArrayList(this.checkpoints);
        }

        public int getNumberOfRetainedCheckpoints() {
            return this.checkpoints.size();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$MySubmittedJobGraphStore.class */
    static class MySubmittedJobGraphStore implements SubmittedJobGraphStore {
        Map<JobID, SubmittedJobGraph> storedJobs = new HashMap();

        MySubmittedJobGraphStore() {
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener) throws Exception {
        }

        public void stop() throws Exception {
        }

        public SubmittedJobGraph recoverJobGraph(JobID jobID) throws Exception {
            if (this.storedJobs.containsKey(jobID)) {
                return this.storedJobs.get(jobID);
            }
            return null;
        }

        public void putJobGraph(SubmittedJobGraph submittedJobGraph) throws Exception {
            this.storedJobs.put(submittedJobGraph.getJobId(), submittedJobGraph);
        }

        public void removeJobGraph(JobID jobID) throws Exception {
            this.storedJobs.remove(jobID);
        }

        public Collection<JobID> getJobIds() throws Exception {
            return this.storedJobs.keySet();
        }

        boolean contains(JobID jobID) {
            return this.storedJobs.containsKey(jobID);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest$TestingFailingHAJobManager.class */
    public static class TestingFailingHAJobManager extends JobManager {
        private final Collection<JobID> recoveredJobs;

        public TestingFailingHAJobManager(Configuration configuration, Executor executor, Executor executor2, InstanceManager instanceManager, Scheduler scheduler, BlobLibraryCacheManager blobLibraryCacheManager, ActorRef actorRef, RestartStrategyFactory restartStrategyFactory, FiniteDuration finiteDuration, LeaderElectionService leaderElectionService, SubmittedJobGraphStore submittedJobGraphStore, CheckpointRecoveryFactory checkpointRecoveryFactory, FiniteDuration finiteDuration2, Option<MetricRegistry> option, Collection<JobID> collection) {
            super(configuration, executor, executor2, instanceManager, scheduler, blobLibraryCacheManager, actorRef, restartStrategyFactory, finiteDuration, leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, finiteDuration2, option);
            this.recoveredJobs = collection;
        }

        public PartialFunction<Object, BoxedUnit> handleMessage() {
            return ReceiveBuilder.match(JobManagerMessages.RecoverSubmittedJob.class, new FI.UnitApply<JobManagerMessages.RecoverSubmittedJob>() { // from class: org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.TestingFailingHAJobManager.2
                public void apply(JobManagerMessages.RecoverSubmittedJob recoverSubmittedJob) throws Exception {
                    TestingFailingHAJobManager.this.recoveredJobs.add(recoverSubmittedJob.submittedJobGraph().getJobId());
                }
            }).matchAny(new FI.UnitApply<Object>() { // from class: org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.TestingFailingHAJobManager.1
                public void apply(Object obj) throws Exception {
                    TestingFailingHAJobManager.super.handleMessage().apply(obj);
                }
            }).build();
        }
    }

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testJobRecoveryWhenLosingLeadership() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        FiniteDuration finiteDuration2 = new FiniteDuration(3L, TimeUnit.SECONDS);
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        Configuration configuration = new Configuration();
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        ActorRef actorRef = null;
        ActorRef actorRef2 = null;
        ActorRef actorRef3 = null;
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().toString());
        configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
        ForkJoinPool forkJoinPool = null;
        try {
            Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
            MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
            MyCheckpointRecoveryFactory myCheckpointRecoveryFactory = new MyCheckpointRecoveryFactory(new MyCheckpointStore(), new StandaloneCheckpointIDCounter());
            TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
            TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService();
            InstanceManager instanceManager = new InstanceManager();
            instanceManager.addInstanceListener(scheduler);
            actorRef = system.actorOf(Props.create(MemoryArchivist.class, new Object[]{10}));
            forkJoinPool = new ForkJoinPool();
            actorRef2 = system.actorOf(Props.create(TestingJobManager.class, new Object[]{configuration, forkJoinPool, forkJoinPool, instanceManager, scheduler, new BlobLibraryCacheManager(new BlobServer(configuration), 3600000L), actorRef, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100L), finiteDuration, testingLeaderElectionService, mySubmittedJobGraphStore, myCheckpointRecoveryFactory, finiteDuration2, Option.apply((Object) null)}));
            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(actorRef2, randomUUID);
            actorRef3 = TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), system, "localhost", Option.apply("taskmanager"), Option.apply(testingLeaderRetrievalService), true, TestingTaskManager.class);
            AkkaActorGateway akkaActorGateway2 = new AkkaActorGateway(actorRef3, randomUUID);
            Await.ready(akkaActorGateway2.ask(TestingMessages.getAlive(), fromNow.timeLeft()), fromNow.timeLeft());
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(BlockingStatefulInvokable.class);
            jobVertex.setParallelism(2);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            List singletonList = Collections.singletonList(jobVertex.getID());
            jobGraph.setSnapshotSettings(new JobSnapshottingSettings(singletonList, singletonList, singletonList, 100L, 600000L, 0L, 1, ExternalizedCheckpointSettings.none(), true));
            BlockingStatefulInvokable.initializeStaticHelpers(2);
            Future ask = akkaActorGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), fromNow.timeLeft());
            Future ask2 = akkaActorGateway2.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorRef2), fromNow.timeLeft());
            testingLeaderElectionService.isLeader(randomUUID);
            testingLeaderRetrievalService.notifyListener(akkaActorGateway.path(), randomUUID);
            Await.ready(ask, fromNow.timeLeft());
            Await.ready(ask2, fromNow.timeLeft());
            Await.ready(akkaActorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), fromNow.timeLeft()), fromNow.timeLeft());
            BlockingStatefulInvokable.awaitCompletedCheckpoints();
            Future ask3 = akkaActorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), fromNow.timeLeft());
            testingLeaderElectionService.notLeader();
            Await.ready(ask3, fromNow.timeLeft());
            Assert.assertTrue(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            Future ask4 = akkaActorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), fromNow.timeLeft());
            testingLeaderElectionService.isLeader(randomUUID2);
            testingLeaderRetrievalService.notifyListener(akkaActorGateway.path(), randomUUID2);
            Await.ready(ask4, fromNow.timeLeft());
            Future ask5 = akkaActorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), fromNow.timeLeft());
            BlockingInvokable.unblock();
            Await.ready(ask5, fromNow.timeLeft());
            Assert.assertFalse(mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            long[] recoveredStates = BlockingStatefulInvokable.getRecoveredStates();
            int length = recoveredStates.length;
            for (int i = 0; i < length; i++) {
                long j = recoveredStates[i];
                Assert.assertTrue("Did not recover checkpoint state correctly, expecting >= 5, but state was " + j, j >= 5);
            }
            if (actorRef != null) {
                actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorRef2 != null) {
                actorRef2.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorRef3 != null) {
                actorRef3.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (forkJoinPool != null) {
                forkJoinPool.shutdownNow();
            }
        } catch (Throwable th) {
            if (actorRef != null) {
                actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorRef2 != null) {
                actorRef2.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorRef3 != null) {
                actorRef3.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (forkJoinPool != null) {
                forkJoinPool.shutdownNow();
            }
            throw th;
        }
    }

    @Test
    public void testFailingJobRecovery() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(10L, TimeUnit.SECONDS);
        FiniteDuration finiteDuration2 = new FiniteDuration(0L, TimeUnit.SECONDS);
        Deadline fromNow = new FiniteDuration(1L, TimeUnit.MINUTES).fromNow();
        Configuration configuration = new Configuration();
        UUID randomUUID = UUID.randomUUID();
        ActorRef actorRef = null;
        JobID jobID = new JobID();
        JobID jobID2 = new JobID();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        try {
            SubmittedJobGraphStore submittedJobGraphStore = (SubmittedJobGraphStore) Mockito.mock(SubmittedJobGraphStore.class);
            SubmittedJobGraph submittedJobGraph = (SubmittedJobGraph) Mockito.mock(SubmittedJobGraph.class);
            Mockito.when(submittedJobGraph.getJobId()).thenReturn(jobID2);
            Mockito.when(submittedJobGraphStore.getJobIds()).thenReturn(Arrays.asList(jobID, jobID2));
            Mockito.when(submittedJobGraphStore.recoverJobGraph((JobID) Matchers.eq(jobID))).thenThrow(new Throwable[]{new Exception("Test exception")});
            Mockito.when(submittedJobGraphStore.recoverJobGraph((JobID) Matchers.eq(jobID2))).thenReturn(submittedJobGraph);
            TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
            ArrayList arrayList = new ArrayList(2);
            actorRef = system.actorOf(Props.create(TestingFailingHAJobManager.class, new Object[]{configuration, Executors.directExecutor(), Executors.directExecutor(), Mockito.mock(InstanceManager.class), Mockito.mock(Scheduler.class), new BlobLibraryCacheManager((BlobService) Mockito.mock(BlobService.class), 1048576L), ActorRef.noSender(), new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100L), finiteDuration, testingLeaderElectionService, submittedJobGraphStore, Mockito.mock(CheckpointRecoveryFactory.class), finiteDuration2, Option.apply((Object) null), arrayList}).withDispatcher(CallingThreadDispatcher.Id()));
            Await.ready(Patterns.ask(actorRef, new Identify(42), fromNow.timeLeft().toMillis()), fromNow.timeLeft());
            testingLeaderElectionService.isLeader(randomUUID);
            Assert.assertThat(arrayList, org.hamcrest.Matchers.containsInAnyOrder(new JobID[]{jobID2}));
            TestingUtils.stopActor(actorRef);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorRef);
            throw th;
        }
    }
}
