/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingManualHighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class LeaderChangeJobRecoveryTest
extends TestLogger {
    private static FiniteDuration timeout = FiniteDuration.apply((long)30L, (TimeUnit)TimeUnit.SECONDS);
    private int numTMs = 1;
    private int numSlotsPerTM = 1;
    private int parallelism = this.numTMs * this.numSlotsPerTM;
    private JobID jobId;
    private TestingCluster cluster = null;
    private JobGraph job = this.createBlockingJob(this.parallelism);
    private TestingManualHighAvailabilityServices highAvailabilityServices;

    @Before
    public void before() throws TimeoutException, InterruptedException {
        this.jobId = HighAvailabilityServices.DEFAULT_JOB_ID;
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        Configuration configuration = new Configuration();
        configuration.setInteger("local.number-jobmanager", 1);
        configuration.setInteger("local.number-taskmanager", this.numTMs);
        configuration.setInteger("taskmanager.numberOfTaskSlots", this.numSlotsPerTM);
        configuration.setString("restart-strategy", "fixeddelay");
        configuration.setInteger("restart-strategy.fixed-delay.attempts", 9999);
        configuration.setString("restart-strategy.fixed-delay.delay", "100 milli");
        this.highAvailabilityServices = new TestingManualHighAvailabilityServices();
        this.cluster = new TestingCluster(configuration, this.highAvailabilityServices, true, false);
        this.cluster.start(false);
        this.cluster.waitForActorsToBeAlive();
    }

    @Test
    public void testNotRestartedWhenLosingLeadership() throws Exception {
        UUID leaderSessionID = UUID.randomUUID();
        this.highAvailabilityServices.grantLeadership(this.jobId, 0, leaderSessionID);
        this.highAvailabilityServices.notifyRetrievers(this.jobId, 0, leaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered(timeout);
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future futureExecutionGraph = jm.ask((Object)new TestingJobManagerMessages.RequestExecutionGraph(this.job.getJobID()), timeout);
        TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph = (TestingJobManagerMessages.ResponseExecutionGraph)Await.result((Awaitable)futureExecutionGraph, (Duration)timeout);
        Assert.assertTrue((boolean)(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound));
        ExecutionGraph executionGraph = (ExecutionGraph)((TestingJobManagerMessages.ExecutionGraphFound)responseExecutionGraph).executionGraph();
        this.highAvailabilityServices.revokeLeadership(this.jobId);
        executionGraph.getTerminationFuture().get(30L, TimeUnit.SECONDS);
    }

    public JobGraph createBlockingJob(int parallelism) {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex sender = new JobVertex("sender");
        JobVertex receiver = new JobVertex("receiver");
        sender.setInvokableClass(Tasks.Sender.class);
        receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        sender.setParallelism(parallelism);
        receiver.setParallelism(parallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        return new JobGraph("Blocking test job", new JobVertex[]{sender, receiver});
    }
}

