package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.class */
public class TaskDeploymentDescriptorFactoryTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int PARALLELISM = 4;

    @Test
    public void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
        testCacheShuffleDescriptor(new TestingBlobWriter(Integer.MAX_VALUE));
    }

    @Test
    public void testCacheShuffleDescriptorAsOffloaded() throws Exception {
        testCacheShuffleDescriptor(new TestingBlobWriter(0));
    }

    private void testCacheShuffleDescriptor(TestingBlobWriter testingBlobWriter) throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex executionJobVertex = setupExecutionGraphAndGetVertex(jobID, testingBlobWriter);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        createTaskDeploymentDescriptor(executionVertex);
        ShuffleDescriptor[] deserializeShuffleDescriptors = deserializeShuffleDescriptors(((IntermediateResult) executionJobVertex.getInputs().get(0)).getCachedShuffleDescriptors(executionVertex.getConsumedPartitionGroup(0)), jobID, testingBlobWriter);
        Assert.assertEquals(executionVertex.getConsumedPartitionGroup(0).size(), deserializeShuffleDescriptors.length);
        int i = 0;
        Iterator it = executionVertex.getConsumedPartitionGroup(0).iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            Assert.assertEquals((IntermediateResultPartitionID) it.next(), deserializeShuffleDescriptors[i2].getResultPartitionID().getPartitionId());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testGetOffloadedShuffleDescriptorBeforeLoading() throws Exception {
        ((InputGateDeploymentDescriptor) createTaskDeploymentDescriptor(setupExecutionGraphAndGetVertex(new JobID(), new TestingBlobWriter(0)).getTaskVertices()[0]).getInputGates().get(0)).getShuffleDescriptors();
    }

    private ExecutionJobVertex setupExecutionGraphAndGetVertex(JobID jobID, BlobWriter blobWriter) throws JobException, JobExecutionException {
        JobVertex createJobVertex = createJobVertex("v1", 4);
        JobVertex createJobVertex2 = createJobVertex("v2", 4);
        createJobVertex2.connectNewDataSetAsInput(createJobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return createExecutionGraph(jobID, new ArrayList(Arrays.asList(createJobVertex, createJobVertex2)), blobWriter).getJobVertex(createJobVertex2.getID());
    }

    private static JobVertex createJobVertex(String str, int i) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private static ExecutionGraph createExecutionGraph(JobID jobID, List<JobVertex> list, BlobWriter blobWriter) throws JobException, JobExecutionException {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobID).addJobVertices(list).build()).setBlobWriter(blobWriter).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex executionVertex) throws IOException, ClusterDatasetCorruptedException {
        return TaskDeploymentDescriptorFactory.fromExecution(executionVertex.getCurrentExecutionAttempt()).createDeploymentDescriptor(new AllocationID(), (JobManagerTaskRestore) null, Collections.emptyList());
    }

    public static ShuffleDescriptor[] deserializeShuffleDescriptors(TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> maybeOffloaded, JobID jobID, TestingBlobWriter testingBlobWriter) throws IOException, ClassNotFoundException {
        return maybeOffloaded instanceof TaskDeploymentDescriptor.NonOffloaded ? (ShuffleDescriptor[]) ((TaskDeploymentDescriptor.NonOffloaded) maybeOffloaded).serializedValue.deserializeValue(ClassLoader.getSystemClassLoader()) : (ShuffleDescriptor[]) CompressedSerializedValue.fromBytes(testingBlobWriter.getBlob(jobID, ((TaskDeploymentDescriptor.Offloaded) maybeOffloaded).serializedValueKey)).deserializeValue(ClassLoader.getSystemClassLoader());
    }

    @Test
    public void testComputeConsumedSubpartitionRange3to1() {
        SubpartitionIndexRange computeConsumedSubpartitionRange = computeConsumedSubpartitionRange(0, 1, 3);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getEndIndex()), CoreMatchers.is(2));
    }

    @Test
    public void testComputeConsumedSubpartitionRange3to2() {
        SubpartitionIndexRange computeConsumedSubpartitionRange = computeConsumedSubpartitionRange(0, 2, 3);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getEndIndex()), CoreMatchers.is(0));
        SubpartitionIndexRange computeConsumedSubpartitionRange2 = computeConsumedSubpartitionRange(1, 2, 3);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getStartIndex()), CoreMatchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getEndIndex()), CoreMatchers.is(2));
    }

    @Test
    public void testComputeConsumedSubpartitionRange6to4() {
        SubpartitionIndexRange computeConsumedSubpartitionRange = computeConsumedSubpartitionRange(0, 4, 6);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getEndIndex()), CoreMatchers.is(0));
        SubpartitionIndexRange computeConsumedSubpartitionRange2 = computeConsumedSubpartitionRange(1, 4, 6);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getStartIndex()), CoreMatchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getEndIndex()), CoreMatchers.is(2));
        SubpartitionIndexRange computeConsumedSubpartitionRange3 = computeConsumedSubpartitionRange(2, 4, 6);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getStartIndex()), CoreMatchers.is(3));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getEndIndex()), CoreMatchers.is(3));
        SubpartitionIndexRange computeConsumedSubpartitionRange4 = computeConsumedSubpartitionRange(3, 4, 6);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange4.getStartIndex()), CoreMatchers.is(4));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange4.getEndIndex()), CoreMatchers.is(5));
    }

    @Test
    public void testComputeBroadcastConsumedSubpartitionRange() {
        SubpartitionIndexRange computeConsumedSubpartitionRange = computeConsumedSubpartitionRange(0, 3, 1, true, true);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getEndIndex()), CoreMatchers.is(0));
        SubpartitionIndexRange computeConsumedSubpartitionRange2 = computeConsumedSubpartitionRange(1, 3, 1, true, true);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getEndIndex()), CoreMatchers.is(0));
        SubpartitionIndexRange computeConsumedSubpartitionRange3 = computeConsumedSubpartitionRange(2, 3, 1, true, true);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getEndIndex()), CoreMatchers.is(0));
    }

    @Test
    public void testComputeConsumedSubpartitionRangeForNonDynamicGraph() {
        SubpartitionIndexRange computeConsumedSubpartitionRange = computeConsumedSubpartitionRange(0, 3, 3, false, false);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getStartIndex()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange.getEndIndex()), CoreMatchers.is(0));
        SubpartitionIndexRange computeConsumedSubpartitionRange2 = computeConsumedSubpartitionRange(1, 3, 3, false, false);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getStartIndex()), CoreMatchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange2.getEndIndex()), CoreMatchers.is(1));
        SubpartitionIndexRange computeConsumedSubpartitionRange3 = computeConsumedSubpartitionRange(2, 3, 3, false, false);
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getStartIndex()), CoreMatchers.is(2));
        MatcherAssert.assertThat(Integer.valueOf(computeConsumedSubpartitionRange3.getEndIndex()), CoreMatchers.is(2));
    }

    private static SubpartitionIndexRange computeConsumedSubpartitionRange(int i, int i2, int i3) {
        return computeConsumedSubpartitionRange(i, i2, i3, true, false);
    }

    private static SubpartitionIndexRange computeConsumedSubpartitionRange(int i, int i2, int i3, boolean z, boolean z2) {
        return TaskDeploymentDescriptorFactory.computeConsumedSubpartitionRange(i, i2, i3, z, z2);
    }
}
