/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobSubmitTest {
    private static final FiniteDuration timeout = new FiniteDuration(60000L, TimeUnit.MILLISECONDS);
    private static ActorSystem jobManagerSystem;
    private static ActorGateway jmGateway;
    private static Configuration jmConfig;
    private static HighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void setupJobManager() {
        jmConfig = new Configuration();
        int port = NetUtils.getAvailablePort();
        jmConfig.setString("jobmanager.rpc.address", "localhost");
        jmConfig.setInteger("jobmanager.rpc.port", port);
        Option listeningAddress = Option.apply((Object)new Tuple2((Object)"localhost", (Object)port));
        jobManagerSystem = AkkaUtils.createActorSystem((Configuration)jmConfig, (Option)listeningAddress);
        highAvailabilityServices = new EmbeddedHaServices((Executor)TestingUtils.defaultExecutor());
        JobManager.startJobManagerActors((Configuration)jmConfig, (ActorSystem)jobManagerSystem, (ScheduledExecutorService)TestingUtils.defaultExecutor(), (Executor)TestingUtils.defaultExecutor(), (HighAvailabilityServices)highAvailabilityServices, JobManager.class, MemoryArchivist.class)._1();
        try {
            LeaderRetrievalService lrs = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
            jmGateway = LeaderRetrievalUtils.retrieveLeaderGateway((LeaderRetrievalService)lrs, (ActorSystem)jobManagerSystem, (FiniteDuration)timeout);
        }
        catch (Exception e) {
            Assert.fail((String)("Could not retrieve the JobManager gateway. " + e.getMessage()));
        }
    }

    @AfterClass
    public static void teardownJobmanager() throws Exception {
        if (jobManagerSystem != null) {
            jobManagerSystem.shutdown();
        }
        if (highAvailabilityServices != null) {
            highAvailabilityServices.closeAndCleanupAllData();
            highAvailabilityServices = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailureWhenJarBlobsMissing() {
        try {
            BlobKey key2;
            BlobKey key1;
            JobVertex jobVertex = new JobVertex("Test Vertex");
            jobVertex.setInvokableClass(NoOpInvokable.class);
            JobGraph jg = new JobGraph("test job", new JobVertex[]{jobVertex});
            Future future = jmGateway.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
            int blobPort = (Integer)Await.result((Awaitable)future, (Duration)timeout);
            try (BlobClient bc = new BlobClient(new InetSocketAddress("localhost", blobPort), jmConfig);){
                key1 = bc.put(new byte[10]);
                key2 = bc.put(new byte[10]);
                bc.delete(key2);
            }
            jg.addBlob(key1);
            jg.addBlob(key2);
            Future submitFuture = jmGateway.ask((Object)new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.EXECUTION_RESULT), timeout);
            try {
                Await.result((Awaitable)submitFuture, (Duration)timeout);
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof IOException));
            }
            catch (Exception e) {
                Assert.fail((String)"Wrong exception type");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailureWhenInitializeOnMasterFails() {
        try {
            JobVertex jobVertex = new JobVertex("Vertex that fails in initializeOnMaster"){
                private static final long serialVersionUID = -3540303593784587652L;

                public void initializeOnMaster(ClassLoader loader) throws Exception {
                    throw new RuntimeException("test exception");
                }
            };
            jobVertex.setInvokableClass(NoOpInvokable.class);
            JobGraph jg = new JobGraph("test job", new JobVertex[]{jobVertex});
            Future submitFuture = jmGateway.ask((Object)new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.EXECUTION_RESULT), timeout);
            try {
                Await.result((Awaitable)submitFuture, (Duration)timeout);
            }
            catch (JobExecutionException e) {
                Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
            }
            catch (Exception e) {
                Assert.fail((String)"Wrong exception type");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testAnswerFailureWhenSavepointReadFails() throws Exception {
        JobGraph jg = this.createSimpleJobGraph();
        jg.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"pathThatReallyDoesNotExist..."));
        Future submitFuture = jmGateway.ask((Object)new JobManagerMessages.SubmitJob(jg, ListeningBehaviour.DETACHED), timeout);
        Object result = Await.result((Awaitable)submitFuture, (Duration)timeout);
        Assert.assertEquals(JobManagerMessages.JobResultFailure.class, result.getClass());
    }

    private JobGraph createSimpleJobGraph() {
        JobVertex jobVertex = new JobVertex("Vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
        JobGraph jg = new JobGraph("test job", new JobVertex[]{jobVertex});
        jg.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList, 5000L, 5000L, 0L, 10, ExternalizedCheckpointSettings.none(), null, true));
        return jg;
    }
}

