package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testtasks.WaitingNoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.StringStartsWith;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterITCase.class */
public class MiniClusterITCase extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterITCase$OutOfMemoryInFinalizationJobVertex.class */
    private static class OutOfMemoryInFinalizationJobVertex extends JobVertex {
        private OutOfMemoryInFinalizationJobVertex() {
            super("FailingInFinalization");
        }

        public void finalizeOnMaster(JobVertex.FinalizeOnMasterContext finalizeOnMasterContext) {
            throw new OutOfMemoryError("Java heap space");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterITCase$OutOfMemoryInInitializationVertex.class */
    private static class OutOfMemoryInInitializationVertex extends JobVertex {
        OutOfMemoryInInitializationVertex() {
            super("FailingInInitialization");
        }

        public void initializeOnMaster(JobVertex.InitializeOnMasterContext initializeOnMasterContext) {
            throw new OutOfMemoryError("Java heap space");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterITCase$WaitOnFinalizeJobVertex.class */
    private static class WaitOnFinalizeJobVertex extends JobVertex {
        private static final long serialVersionUID = -1179547322468530299L;
        private static final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false);
        private final long waitingTime;

        WaitOnFinalizeJobVertex(String str, long j) {
            super(str);
            this.waitingTime = j;
        }

        public void finalizeOnMaster(JobVertex.FinalizeOnMasterContext finalizeOnMasterContext) throws Exception {
            Thread.sleep(this.waitingTime);
            finalizedOnMaster.set(true);
        }

        static void resetFinalizedOnMaster() {
            finalizedOnMaster.set(false);
        }
    }

    @Test
    public void runJobWithSingleRpcService() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.SHARED).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                miniCluster.executeJobBlocking(getSimpleJob(21));
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void runJobWithMultipleRpcServices() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(3).setNumSlotsPerTaskManager(7).setRpcServiceSharing(RpcServiceSharing.DEDICATED).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                miniCluster.executeJobBlocking(getSimpleJob(21));
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testHandleStreamingJobsWhenNotEnoughSlot() throws Exception {
        try {
            JobVertex jobVertex = new JobVertex("Test Vertex1");
            jobVertex.setParallelism(1);
            jobVertex.setMaxParallelism(1);
            jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
            JobVertex jobVertex2 = new JobVertex("Test Vertex2");
            jobVertex2.setParallelism(1);
            jobVertex2.setMaxParallelism(1);
            jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            runHandleJobsWhenNotEnoughSlots(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
            Assert.fail("Job should fail.");
        } catch (JobExecutionException e) {
            Assert.assertThat(e, FlinkMatchers.containsMessage("Job execution failed"));
            Assert.assertThat(e, FlinkMatchers.containsCause(NoResourceAvailableException.class));
        }
    }

    private void runHandleJobsWhenNotEnoughSlots(JobGraph jobGraph) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 100L);
        configuration.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(100L));
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(configuration).build());
        Throwable th = null;
        try {
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testForwardJob() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
            jobVertex.setParallelism(31);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            jobVertex2.setParallelism(31);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testBipartiteJob() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(62).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
            jobVertex.setParallelism(31);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(Tasks.AgnosticReceiver.class);
            jobVertex2.setParallelism(31);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTwoInputJobFailingEdgeMismatch() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(6).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender1");
            jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
            jobVertex.setParallelism(1);
            JobVertex jobVertex2 = new JobVertex("Sender2");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Sender.class);
            jobVertex2.setParallelism(2);
            JobVertex jobVertex3 = new JobVertex("Receiver");
            jobVertex3.setInvokableClass(Tasks.AgnosticTertiaryReceiver.class);
            jobVertex3.setParallelism(3);
            jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex3, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, ArrayIndexOutOfBoundsException.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "2").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTwoInputJob() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(66).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                JobVertex jobVertex = new JobVertex("Sender1");
                jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
                jobVertex.setParallelism(11);
                JobVertex jobVertex2 = new JobVertex("Sender2");
                jobVertex2.setInvokableClass(TestingAbstractInvokables.Sender.class);
                jobVertex2.setParallelism(22);
                JobVertex jobVertex3 = new JobVertex("Receiver");
                jobVertex3.setInvokableClass(Tasks.AgnosticBinaryReceiver.class);
                jobVertex3.setParallelism(33);
                jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
                jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex3, jobVertex2));
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSchedulingAllAtOnce() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                JobVertex jobVertex = new JobVertex("Sender");
                jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
                jobVertex.setParallelism(11);
                JobVertex jobVertex2 = new JobVertex("Forwarder");
                jobVertex2.setInvokableClass(Tasks.Forwarder.class);
                jobVertex2.setParallelism(11);
                JobVertex jobVertex3 = new JobVertex("Receiver");
                jobVertex3.setInvokableClass(Tasks.AgnosticReceiver.class);
                jobVertex3.setParallelism(11);
                SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
                jobVertex.setSlotSharingGroup(slotSharingGroup);
                jobVertex2.setSlotSharingGroup(slotSharingGroup);
                jobVertex3.setSlotSharingGroup(slotSharingGroup);
                jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
                jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2, jobVertex3));
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testJobWithAFailingSenderVertex() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(Tasks.ExceptionSender.class);
            jobVertex.setParallelism(11);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            jobVertex2.setParallelism(11);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, Exception.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Test exception").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build());
        Throwable th = null;
        try {
            miniCluster.start();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(SometimesExceptionSender.class);
            jobVertex.setParallelism(11);
            jobVertex.setSlotSharingGroup(slotSharingGroup);
            SometimesExceptionSender.configFailingSenders(11);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            jobVertex2.setParallelism(11);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, Exception.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Test exception").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJobWithAFailingReceiverVertex() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
            jobVertex.setParallelism(11);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(Tasks.ExceptionReceiver.class);
            jobVertex2.setParallelism(11);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, Exception.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Test exception").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(Tasks.InstantiationErrorSender.class);
            jobVertex.setParallelism(11);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            jobVertex2.setParallelism(11);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, Exception.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Test exception in constructor").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(11).build());
        Throwable th = null;
        try {
            miniCluster.start();
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            JobVertex jobVertex = new JobVertex("Sender");
            jobVertex.setInvokableClass(SometimesInstantiationErrorSender.class);
            jobVertex.setParallelism(11);
            jobVertex.setSlotSharingGroup(slotSharingGroup);
            SometimesInstantiationErrorSender.configFailingSenders(11);
            JobVertex jobVertex2 = new JobVertex("Receiver");
            jobVertex2.setInvokableClass(TestingAbstractInvokables.Receiver.class);
            jobVertex2.setParallelism(11);
            jobVertex2.setSlotSharingGroup(slotSharingGroup);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            try {
                miniCluster.executeJobBlocking(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2));
                Assert.fail("Job should fail.");
            } catch (JobExecutionException e) {
                Assert.assertTrue(ExceptionUtils.findThrowable(e, Exception.class).isPresent());
                Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Test exception in constructor").isPresent());
            }
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(22).build());
        Throwable th = null;
        try {
            miniCluster.start();
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(WaitingNoOpInvokable.class);
            jobVertex.setParallelism(11);
            WaitOnFinalizeJobVertex.resetFinalizedOnMaster();
            WaitOnFinalizeJobVertex waitOnFinalizeJobVertex = new WaitOnFinalizeJobVertex("Sink", 20L);
            waitOnFinalizeJobVertex.setInvokableClass(NoOpInvokable.class);
            waitOnFinalizeJobVertex.setParallelism(11);
            waitOnFinalizeJobVertex.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex, waitOnFinalizeJobVertex);
            ((JobResult) miniCluster.submitJob(streamingJobGraph).thenCompose(jobSubmissionResult -> {
                return miniCluster.requestJobResult(streamingJobGraph.getJobID());
            }).get()).toJobExecutionResult(getClass().getClassLoader());
            Assert.assertTrue(WaitOnFinalizeJobVertex.finalizedOnMaster.get());
            if (miniCluster != null) {
                if (0 == 0) {
                    miniCluster.close();
                    return;
                }
                try {
                    miniCluster.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (0 != 0) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOutOfMemoryErrorMessageEnrichmentInJobVertexFinalization() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                OutOfMemoryInFinalizationJobVertex outOfMemoryInFinalizationJobVertex = new OutOfMemoryInFinalizationJobVertex();
                outOfMemoryInFinalizationJobVertex.setInvokableClass(NoOpInvokable.class);
                outOfMemoryInFinalizationJobVertex.setParallelism(1);
                JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(outOfMemoryInFinalizationJobVertex);
                try {
                    ((JobResult) miniCluster.submitJob(streamingJobGraph).thenCompose(jobSubmissionResult -> {
                        return miniCluster.requestJobResult(streamingJobGraph.getJobID());
                    }).get()).toJobExecutionResult(getClass().getClassLoader());
                } catch (JobExecutionException e) {
                    Assert.assertThat(e, FlinkMatchers.containsCause(OutOfMemoryError.class));
                    Assert.assertThat(ExceptionUtils.findThrowable(e, OutOfMemoryError.class).map((v0) -> {
                        return v0.getMessage();
                    }).get(), StringStartsWith.startsWith("Java heap space. A heap space-related out-of-memory error has occurred."));
                }
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOutOfMemoryErrorMessageEnrichmentInJobVertexInitialization() throws Exception {
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().withRandomPorts().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                OutOfMemoryInInitializationVertex outOfMemoryInInitializationVertex = new OutOfMemoryInInitializationVertex();
                outOfMemoryInInitializationVertex.setInvokableClass(NoOpInvokable.class);
                outOfMemoryInInitializationVertex.setParallelism(1);
                JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(outOfMemoryInInitializationVertex);
                try {
                    miniCluster.submitJob(streamingJobGraph).thenCompose(jobSubmissionResult -> {
                        return miniCluster.requestJobResult(streamingJobGraph.getJobID());
                    }).get();
                } catch (ExecutionException e) {
                    Assert.assertThat(e, FlinkMatchers.containsCause(OutOfMemoryError.class));
                    Assert.assertThat(e, FlinkMatchers.containsMessage("Java heap space. A heap space-related out-of-memory error has occurred."));
                }
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    private static JobGraph getSimpleJob(int i) throws IOException {
        JobVertex jobVertex = new JobVertex("Test task");
        jobVertex.setParallelism(i);
        jobVertex.setMaxParallelism(i);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
        streamingJobGraph.setExecutionConfig(executionConfig);
        return streamingJobGraph;
    }
}
