/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.deployment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.TestingBlobWriter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.SubpartitionIndexRange;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.util.CompressedSerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

public class TaskDeploymentDescriptorFactoryTest
extends TestLogger {
    private static final int PARALLELISM = 4;

    @Test
    public void testCacheShuffleDescriptorAsNonOffloaded() throws Exception {
        this.testCacheShuffleDescriptor(new TestingBlobWriter(Integer.MAX_VALUE));
    }

    @Test
    public void testCacheShuffleDescriptorAsOffloaded() throws Exception {
        this.testCacheShuffleDescriptor(new TestingBlobWriter(0));
    }

    private void testCacheShuffleDescriptor(TestingBlobWriter blobWriter) throws Exception {
        JobID jobId = new JobID();
        ExecutionJobVertex ejv = this.setupExecutionGraphAndGetVertex(jobId, blobWriter);
        ExecutionVertex ev21 = ejv.getTaskVertices()[0];
        TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        IntermediateResult consumedResult = (IntermediateResult)ejv.getInputs().get(0);
        TaskDeploymentDescriptor.MaybeOffloaded maybeOffloaded = consumedResult.getCachedShuffleDescriptors(ev21.getConsumedPartitionGroup(0));
        ShuffleDescriptor[] cachedShuffleDescriptors = TaskDeploymentDescriptorFactoryTest.deserializeShuffleDescriptors((TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]>)maybeOffloaded, jobId, blobWriter);
        Assert.assertEquals((long)ev21.getConsumedPartitionGroup(0).size(), (long)cachedShuffleDescriptors.length);
        int idx = 0;
        for (IntermediateResultPartitionID consumedPartitionId : ev21.getConsumedPartitionGroup(0)) {
            Assert.assertEquals((Object)consumedPartitionId, (Object)cachedShuffleDescriptors[idx++].getResultPartitionID().getPartitionId());
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testGetOffloadedShuffleDescriptorBeforeLoading() throws Exception {
        TestingBlobWriter blobWriter = new TestingBlobWriter(0);
        JobID jobId = new JobID();
        ExecutionJobVertex ejv = this.setupExecutionGraphAndGetVertex(jobId, blobWriter);
        ExecutionVertex ev21 = ejv.getTaskVertices()[0];
        TaskDeploymentDescriptor tdd = TaskDeploymentDescriptorFactoryTest.createTaskDeploymentDescriptor(ev21);
        ((InputGateDeploymentDescriptor)tdd.getInputGates().get(0)).getShuffleDescriptors();
    }

    private ExecutionJobVertex setupExecutionGraphAndGetVertex(JobID jobId, BlobWriter blobWriter) throws JobException, JobExecutionException {
        JobVertex v1 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v1", 4);
        JobVertex v2 = TaskDeploymentDescriptorFactoryTest.createJobVertex("v2", 4);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2));
        ExecutionGraph executionGraph = TaskDeploymentDescriptorFactoryTest.createExecutionGraph(jobId, ordered, blobWriter);
        return executionGraph.getJobVertex(v2.getID());
    }

    private static JobVertex createJobVertex(String vertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(AbstractInvokable.class);
        return jobVertex;
    }

    private static ExecutionGraph createExecutionGraph(JobID jobId, List<JobVertex> jobVertices, BlobWriter blobWriter) throws JobException, JobExecutionException {
        JobGraph jobGraph = JobGraphBuilder.newBatchJobGraphBuilder().setJobId(jobId).addJobVertices(jobVertices).build();
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setBlobWriter(blobWriter).build();
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) throws IOException {
        return TaskDeploymentDescriptorFactory.fromExecutionVertex((ExecutionVertex)ev, (int)0).createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList());
    }

    public static ShuffleDescriptor[] deserializeShuffleDescriptors(TaskDeploymentDescriptor.MaybeOffloaded<ShuffleDescriptor[]> maybeOffloaded, JobID jobId, TestingBlobWriter blobWriter) throws IOException, ClassNotFoundException {
        if (maybeOffloaded instanceof TaskDeploymentDescriptor.NonOffloaded) {
            return (ShuffleDescriptor[])((TaskDeploymentDescriptor.NonOffloaded)maybeOffloaded).serializedValue.deserializeValue(ClassLoader.getSystemClassLoader());
        }
        CompressedSerializedValue compressedSerializedValue = CompressedSerializedValue.fromBytes((byte[])blobWriter.getBlob(jobId, ((TaskDeploymentDescriptor.Offloaded)maybeOffloaded).serializedValueKey));
        return (ShuffleDescriptor[])compressedSerializedValue.deserializeValue(ClassLoader.getSystemClassLoader());
    }

    @Test
    public void testComputeConsumedSubpartitionRange3to1() {
        SubpartitionIndexRange range = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(0, 1, 3);
        MatcherAssert.assertThat((Object)range.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range.getEndIndex(), (Matcher)CoreMatchers.is((Object)2));
    }

    @Test
    public void testComputeConsumedSubpartitionRange3to2() {
        SubpartitionIndexRange range1 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(0, 2, 3);
        MatcherAssert.assertThat((Object)range1.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range1.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
        SubpartitionIndexRange range2 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(1, 2, 3);
        MatcherAssert.assertThat((Object)range2.getStartIndex(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)range2.getEndIndex(), (Matcher)CoreMatchers.is((Object)2));
    }

    @Test
    public void testComputeConsumedSubpartitionRange6to4() {
        SubpartitionIndexRange range1 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(0, 4, 6);
        MatcherAssert.assertThat((Object)range1.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range1.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
        SubpartitionIndexRange range2 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(1, 4, 6);
        MatcherAssert.assertThat((Object)range2.getStartIndex(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)range2.getEndIndex(), (Matcher)CoreMatchers.is((Object)2));
        SubpartitionIndexRange range3 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(2, 4, 6);
        MatcherAssert.assertThat((Object)range3.getStartIndex(), (Matcher)CoreMatchers.is((Object)3));
        MatcherAssert.assertThat((Object)range3.getEndIndex(), (Matcher)CoreMatchers.is((Object)3));
        SubpartitionIndexRange range4 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(3, 4, 6);
        MatcherAssert.assertThat((Object)range4.getStartIndex(), (Matcher)CoreMatchers.is((Object)4));
        MatcherAssert.assertThat((Object)range4.getEndIndex(), (Matcher)CoreMatchers.is((Object)5));
    }

    @Test
    public void testComputeBroadcastConsumedSubpartitionRange() {
        SubpartitionIndexRange range1 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(0, 3, 1, true, true);
        MatcherAssert.assertThat((Object)range1.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range1.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
        SubpartitionIndexRange range2 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(1, 3, 1, true, true);
        MatcherAssert.assertThat((Object)range2.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range2.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
        SubpartitionIndexRange range3 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(2, 3, 1, true, true);
        MatcherAssert.assertThat((Object)range3.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range3.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
    }

    @Test
    public void testComputeConsumedSubpartitionRangeForNonDynamicGraph() {
        SubpartitionIndexRange range1 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(0, 3, 3, false, false);
        MatcherAssert.assertThat((Object)range1.getStartIndex(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)range1.getEndIndex(), (Matcher)CoreMatchers.is((Object)0));
        SubpartitionIndexRange range2 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(1, 3, 3, false, false);
        MatcherAssert.assertThat((Object)range2.getStartIndex(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)range2.getEndIndex(), (Matcher)CoreMatchers.is((Object)1));
        SubpartitionIndexRange range3 = TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(2, 3, 3, false, false);
        MatcherAssert.assertThat((Object)range3.getStartIndex(), (Matcher)CoreMatchers.is((Object)2));
        MatcherAssert.assertThat((Object)range3.getEndIndex(), (Matcher)CoreMatchers.is((Object)2));
    }

    private static SubpartitionIndexRange computeConsumedSubpartitionRange(int consumerIndex, int numConsumers, int numSubpartitions) {
        return TaskDeploymentDescriptorFactoryTest.computeConsumedSubpartitionRange(consumerIndex, numConsumers, numSubpartitions, true, false);
    }

    private static SubpartitionIndexRange computeConsumedSubpartitionRange(int consumerIndex, int numConsumers, int numSubpartitions, boolean isDynamicGraph, boolean isBroadcast) {
        return TaskDeploymentDescriptorFactory.computeConsumedSubpartitionRange((int)consumerIndex, (int)numConsumers, (int)numSubpartitions, (boolean)isDynamicGraph, (boolean)isBroadcast);
    }
}

