package org.apache.flink.runtime.jobmaster;

import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.class */
public class JobIntermediateDatasetReuseTest {
    private static final Logger LOG = LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest$Receiver.class */
    public static class Receiver extends AbstractInvokable {
        public Receiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            int indexInSubtaskGroup = getIndexInSubtaskGroup();
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), IntValue.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            for (int i = indexInSubtaskGroup; i < indexInSubtaskGroup + 100; i++) {
                int value = recordReader.next().getValue();
                JobIntermediateDatasetReuseTest.LOG.debug("Receiver({}) received {}", Integer.valueOf(indexInSubtaskGroup), Integer.valueOf(value));
                Assertions.assertThat(value).isEqualTo(i);
            }
            Assertions.assertThat(recordReader.next()).isNull();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest$Sender.class */
    public static class Sender extends AbstractInvokable {
        public Sender(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            int indexInSubtaskGroup = getIndexInSubtaskGroup();
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            for (int i = indexInSubtaskGroup; i < indexInSubtaskGroup + 100; i++) {
                try {
                    build.emit(new IntValue(i));
                    JobIntermediateDatasetReuseTest.LOG.debug("Sender({}) emit {}", Integer.valueOf(indexInSubtaskGroup), Integer.valueOf(i));
                } catch (Throwable th) {
                    build.close();
                    throw th;
                }
            }
            build.flushAll();
            build.close();
        }
    }

    @Test
    public void testClusterPartitionReuse() throws Exception {
        internalTestClusterPartitionReuse(1, 1, jobResult -> {
            Assertions.assertThat(jobResult.isSuccess()).isTrue();
        });
    }

    @Test
    public void testClusterPartitionReuseMultipleParallelism() throws Exception {
        internalTestClusterPartitionReuse(64, 64, jobResult -> {
            Assertions.assertThat(jobResult.isSuccess()).isTrue();
        });
    }

    @Test
    public void testClusterPartitionReuseWithMoreConsumerParallelismThrowException() throws Exception {
        internalTestClusterPartitionReuse(1, 2, jobResult -> {
            Assertions.assertThat(jobResult.isSuccess()).isFalse();
            Assertions.assertThat(getClusterDatasetCorruptedException(jobResult)).isNotNull();
        });
    }

    @Test
    public void testClusterPartitionReuseWithLessConsumerParallelismThrowException() throws Exception {
        internalTestClusterPartitionReuse(2, 1, jobResult -> {
            Assertions.assertThat(jobResult.isSuccess()).isFalse();
            Assertions.assertThat(getClusterDatasetCorruptedException(jobResult)).isNotNull();
        });
    }

    private void internalTestClusterPartitionReuse(int i, int i2, Consumer<JobResult> consumer) throws Exception {
        TestingMiniCluster build = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().build()).build();
        Throwable th = null;
        try {
            try {
                build.start();
                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                JobGraph createFirstJobGraph = createFirstJobGraph(i, intermediateDataSetID);
                build.submitJob(createFirstJobGraph).get();
                Assertions.assertThat(((JobResult) build.requestJobResult(createFirstJobGraph.getJobID()).get()).isSuccess()).isTrue();
                JobGraph createSecondJobGraph = createSecondJobGraph(i2, intermediateDataSetID);
                build.submitJob(createSecondJobGraph).get();
                consumer.accept((JobResult) build.requestJobResult(createSecondJobGraph.getJobID()).get());
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testClusterPartitionReuseWithTMFail() throws Exception {
        TestingMiniCluster build = TestingMiniCluster.newBuilder(TestingMiniClusterConfiguration.newBuilder().build()).build();
        Throwable th = null;
        try {
            try {
                build.start();
                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                JobGraph createFirstJobGraph = createFirstJobGraph(1, intermediateDataSetID);
                build.submitJob(createFirstJobGraph).get();
                Assertions.assertThat(((JobResult) build.requestJobResult(createFirstJobGraph.getJobID()).get()).isSuccess()).isTrue();
                build.terminateTaskManager(0);
                build.startTaskManager();
                JobGraph createSecondJobGraph = createSecondJobGraph(1, intermediateDataSetID);
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1024, 1000L));
                createSecondJobGraph.setExecutionConfig(executionConfig);
                build.submitJob(createSecondJobGraph).get();
                JobResult jobResult = (JobResult) build.requestJobResult(createSecondJobGraph.getJobID()).get();
                Assertions.assertThat(jobResult.isSuccess()).isFalse();
                ClusterDatasetCorruptedException clusterDatasetCorruptedException = getClusterDatasetCorruptedException(jobResult);
                Assertions.assertThat(clusterDatasetCorruptedException).isNotNull();
                Assertions.assertThat((Comparable) clusterDatasetCorruptedException.getCorruptedClusterDatasetIds().get(0)).isEqualTo(intermediateDataSetID);
                createFirstJobGraph.setJobID(new JobID());
                build.submitJob(createFirstJobGraph).get();
                Assertions.assertThat(((JobResult) build.requestJobResult(createFirstJobGraph.getJobID()).get()).isSuccess()).isTrue();
                createSecondJobGraph.setJobID(new JobID());
                build.submitJob(createSecondJobGraph).get();
                Assertions.assertThat(((JobResult) build.requestJobResult(createSecondJobGraph.getJobID()).get()).isSuccess()).isTrue();
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private ClusterDatasetCorruptedException getClusterDatasetCorruptedException(JobResult jobResult) {
        Assertions.assertThat(jobResult.getSerializedThrowable().isPresent()).isTrue();
        Throwable deserializeError = ((SerializedThrowable) jobResult.getSerializedThrowable().get()).deserializeError(Thread.currentThread().getContextClassLoader());
        while (true) {
            Throwable th = deserializeError;
            if (th == null) {
                return null;
            }
            if (th instanceof ClusterDatasetCorruptedException) {
                return (ClusterDatasetCorruptedException) th;
            }
            deserializeError = th.getCause();
        }
    }

    private JobGraph createSecondJobGraph(int i, IntermediateDataSetID intermediateDataSetID) {
        JobVertex jobVertex = new JobVertex("Receiver 2", (JobVertexID) null);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(Receiver.class);
        jobVertex.addIntermediateDataSetIdToConsume(intermediateDataSetID);
        return new JobGraph((JobID) null, "Second Job", new JobVertex[]{jobVertex});
    }

    private JobGraph createFirstJobGraph(int i, IntermediateDataSetID intermediateDataSetID) {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(Sender.class);
        JobVertex jobVertex2 = new JobVertex("Receiver");
        jobVertex2.setParallelism(i);
        jobVertex2.setInvokableClass(Receiver.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING_PERSISTENT, intermediateDataSetID, false);
        return new JobGraph((JobID) null, "First Job", new JobVertex[]{jobVertex, jobVertex2});
    }
}
