package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.class */
public class UpdatePartitionConsumersTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final long TIMEOUT = 5000;
    private JobGraph jobGraph;
    private JobVertex v1;
    private JobVertex v2;
    private JobVertex v3;
    private JobVertex v4;

    @Before
    public void setUp() {
        buildJobGraphWithBlockingEdgeWithinRegion();
    }

    private void buildJobGraphWithBlockingEdgeWithinRegion() {
        this.v1 = new JobVertex("v1");
        this.v1.setInvokableClass(AbstractInvokable.class);
        this.v1.setParallelism(1);
        this.v2 = new JobVertex("v2");
        this.v2.setInvokableClass(AbstractInvokable.class);
        this.v2.setParallelism(1);
        this.v3 = new JobVertex("v3");
        this.v3.setInvokableClass(AbstractInvokable.class);
        this.v3.setParallelism(1);
        this.v4 = new JobVertex("v4");
        this.v4.setInvokableClass(AbstractInvokable.class);
        this.v4.setParallelism(1);
        this.v2.connectNewDataSetAsInput(this.v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.v3.connectNewDataSetAsInput(this.v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.v4.connectNewDataSetAsInput(this.v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        this.v4.connectNewDataSetAsInput(this.v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        this.jobGraph = JobGraphTestUtils.batchJobGraph(this.v1, this.v2, this.v3, this.v4);
    }

    @Test
    public void testUpdatePartitionConsumers() throws Exception {
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        DefaultScheduler build = new DefaultSchedulerBuilder(this.jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(simpleAckingTaskManagerGateway)).build();
        ExecutionVertex executionVertex = build.getExecutionVertex(new ExecutionVertexID(this.v1.getID(), 0));
        ExecutionVertex executionVertex2 = build.getExecutionVertex(new ExecutionVertexID(this.v2.getID(), 0));
        ExecutionVertex executionVertex3 = build.getExecutionVertex(new ExecutionVertexID(this.v3.getID(), 0));
        ExecutionVertex executionVertex4 = build.getExecutionVertex(new ExecutionVertexID(this.v4.getID(), 0));
        CompletableFuture completableFuture = new CompletableFuture();
        simpleAckingTaskManagerGateway.setSubmitConsumer(taskDeploymentDescriptor -> {
            if (taskDeploymentDescriptor.getExecutionAttemptId().equals(executionVertex4.getCurrentExecutionAttempt().getAttemptId())) {
                completableFuture.complete(taskDeploymentDescriptor);
            }
        });
        build.startScheduling();
        Assert.assertThat(executionVertex.getExecutionState(), CoreMatchers.is(ExecutionState.DEPLOYING));
        Assert.assertThat(executionVertex2.getExecutionState(), CoreMatchers.is(ExecutionState.DEPLOYING));
        Assert.assertThat(executionVertex3.getExecutionState(), CoreMatchers.is(ExecutionState.DEPLOYING));
        Assert.assertThat(executionVertex4.getExecutionState(), CoreMatchers.is(ExecutionState.DEPLOYING));
        updateState(build, executionVertex, ExecutionState.INITIALIZING);
        updateState(build, executionVertex, ExecutionState.RUNNING);
        updateState(build, executionVertex2, ExecutionState.INITIALIZING);
        updateState(build, executionVertex2, ExecutionState.RUNNING);
        updateState(build, executionVertex3, ExecutionState.INITIALIZING);
        updateState(build, executionVertex3, ExecutionState.RUNNING);
        updateState(build, executionVertex4, ExecutionState.INITIALIZING);
        updateState(build, executionVertex4, ExecutionState.RUNNING);
        Assert.assertThat(((InputGateDeploymentDescriptor) ((TaskDeploymentDescriptor) completableFuture.get(TIMEOUT, TimeUnit.MILLISECONDS)).getInputGates().get(1)).getShuffleDescriptors()[0], CoreMatchers.instanceOf(UnknownShuffleDescriptor.class));
        CompletableFuture completableFuture2 = new CompletableFuture();
        simpleAckingTaskManagerGateway.setUpdatePartitionsConsumer((executionAttemptID, iterable, time) -> {
            Assert.assertThat(executionAttemptID, CoreMatchers.equalTo(executionVertex4.getCurrentExecutionAttempt().getAttemptId()));
            List list = (List) IterableUtils.toStream(iterable).collect(Collectors.toList());
            Assert.assertThat(list, Matchers.hasSize(1));
            PartitionInfo partitionInfo = (PartitionInfo) list.get(0);
            Assert.assertThat(partitionInfo.getIntermediateDataSetID(), CoreMatchers.equalTo(((IntermediateDataSet) this.v3.getProducedDataSets().get(0)).getId()));
            Assert.assertThat(partitionInfo.getShuffleDescriptor(), CoreMatchers.instanceOf(NettyShuffleDescriptor.class));
            completableFuture2.complete(null);
        });
        updateState(build, executionVertex, ExecutionState.FINISHED);
        updateState(build, executionVertex3, ExecutionState.FINISHED);
        completableFuture2.get(TIMEOUT, TimeUnit.MILLISECONDS);
    }

    private void updateState(SchedulerBase schedulerBase, ExecutionVertex executionVertex, ExecutionState executionState) {
        schedulerBase.updateTaskExecutionState(new TaskExecutionState(executionVertex.getCurrentExecutionAttempt().getAttemptId(), executionState));
    }
}
