package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.CancelableInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest.class */
public class TaskExecutorOperatorEventHandlingTest extends TestLogger {

    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private MetricRegistryImpl metricRegistry;
    private TestingRpcService rpcService;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest$CoordinationRequestSendingInvokable.class */
    public static final class CoordinationRequestSendingInvokable extends CancelableInvokable {
        public CoordinationRequestSendingInvokable(Environment environment) {
            super(environment);
        }

        @Override // org.apache.flink.runtime.testutils.CancelableInvokable
        protected void doInvoke() throws Exception {
            getEnvironment().getOperatorCoordinatorEventGateway().sendRequestToCoordinator(new OperatorID(), new SerializedValue(new TestingCoordinationRequestHandler.Request(0L)));
            waitUntilCancelled();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest$OperatorEventFailingInvokable.class */
    public static final class OperatorEventFailingInvokable extends CancelableInvokable {
        public OperatorEventFailingInvokable(Environment environment) {
            super(environment);
        }

        @Override // org.apache.flink.runtime.testutils.CancelableInvokable
        public void doInvoke() throws InterruptedException {
            waitUntilCancelled();
        }

        public void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
            throw new FlinkException("test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorOperatorEventHandlingTest$OperatorEventSendingInvokable.class */
    public static final class OperatorEventSendingInvokable extends CancelableInvokable {
        public OperatorEventSendingInvokable(Environment environment) {
            super(environment);
        }

        @Override // org.apache.flink.runtime.testutils.CancelableInvokable
        public void doInvoke() throws Exception {
            getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(new OperatorID(), new SerializedValue(new TestOperatorEvent()));
            waitUntilCancelled();
        }
    }

    @Before
    public void setup() {
        this.rpcService = new TestingRpcService();
        this.metricRegistry = new MetricRegistryImpl(MetricRegistryTestUtils.defaultMetricRegistryConfiguration());
        this.metricRegistry.startQueryService(this.rpcService, new ResourceID("mqs"));
    }

    @After
    public void teardown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.closeAsync().get();
        }
        if (this.metricRegistry != null) {
            this.metricRegistry.closeAsync().get();
        }
    }

    @Test
    public void eventHandlingInTaskFailureFailsTask() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        TaskSubmissionTestEnvironment createExecutorWithRunningTask = createExecutorWithRunningTask(jobID, createExecutionAttemptId, OperatorEventFailingInvokable.class);
        Throwable th = null;
        try {
            try {
                Assert.assertThat(createExecutorWithRunningTask.getTaskExecutorGateway().sendOperatorEventToTask(createExecutionAttemptId, new OperatorID(), new SerializedValue(new TestOperatorEvent())), FlinkMatchers.futureWillCompleteExceptionally(FlinkException.class, Duration.ofSeconds(10L)));
                Assert.assertEquals(ExecutionState.FAILED, createExecutorWithRunningTask.getTaskSlotTable().getTask(createExecutionAttemptId).getExecutionState());
                if (createExecutorWithRunningTask != null) {
                    if (0 == 0) {
                        createExecutorWithRunningTask.close();
                        return;
                    }
                    try {
                        createExecutorWithRunningTask.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createExecutorWithRunningTask != null) {
                if (th != null) {
                    try {
                        createExecutorWithRunningTask.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createExecutorWithRunningTask.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void eventToCoordinatorDeliveryFailureFailsTask() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        TaskSubmissionTestEnvironment createExecutorWithRunningTask = createExecutorWithRunningTask(jobID, createExecutionAttemptId, OperatorEventSendingInvokable.class);
        Throwable th = null;
        try {
            Task task = createExecutorWithRunningTask.getTaskSlotTable().getTask(createExecutionAttemptId);
            task.getExecutingThread().join(10000L);
            Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
            if (createExecutorWithRunningTask != null) {
                if (0 == 0) {
                    createExecutorWithRunningTask.close();
                    return;
                }
                try {
                    createExecutorWithRunningTask.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createExecutorWithRunningTask != null) {
                if (0 != 0) {
                    try {
                        createExecutorWithRunningTask.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createExecutorWithRunningTask.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void requestToCoordinatorDeliveryFailureFailsTask() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        TaskSubmissionTestEnvironment createExecutorWithRunningTask = createExecutorWithRunningTask(jobID, createExecutionAttemptId, CoordinationRequestSendingInvokable.class);
        Throwable th = null;
        try {
            Task task = createExecutorWithRunningTask.getTaskSlotTable().getTask(createExecutionAttemptId);
            task.getExecutingThread().join(10000L);
            Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
            if (createExecutorWithRunningTask != null) {
                if (0 == 0) {
                    createExecutorWithRunningTask.close();
                    return;
                }
                try {
                    createExecutorWithRunningTask.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createExecutorWithRunningTask != null) {
                if (0 != 0) {
                    try {
                        createExecutorWithRunningTask.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createExecutorWithRunningTask.close();
                }
            }
            throw th3;
        }
    }

    private TaskSubmissionTestEnvironment createExecutorWithRunningTask(JobID jobID, ExecutionAttemptID executionAttemptID, Class<? extends AbstractInvokable> cls) throws Exception {
        TaskDeploymentDescriptor createTaskDeploymentDescriptor = createTaskDeploymentDescriptor(jobID, executionAttemptID, cls);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        JobMasterId generate = JobMasterId.generate();
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(jobID).setJobMasterId(generate).setSlotSize(1).addTaskManagerActionListener(executionAttemptID, ExecutionState.RUNNING, completableFuture).setMetricQueryServiceAddress(this.metricRegistry.getMetricQueryServiceGatewayRpcAddress()).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return generate;
        }).setOperatorEventSender((executionAttemptID2, operatorID, serializedValue) -> {
            throw new RuntimeException();
        }).setDeliverCoordinationRequestFunction((operatorID2, serializedValue2) -> {
            throw new RuntimeException();
        }).build()).build((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor());
        build.getTaskSlotTable().allocateSlot(0, jobID, createTaskDeploymentDescriptor.getAllocationId(), Time.seconds(60L));
        build.getTaskExecutorGateway().submitTask(createTaskDeploymentDescriptor, build.getJobMasterId(), Time.seconds(10L)).get();
        completableFuture.get();
        return build;
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobID, ExecutionAttemptID executionAttemptID, Class<? extends AbstractInvokable> cls) throws IOException {
        return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(jobID, "test job", executionAttemptID, new SerializedValue(new ExecutionConfig()), "test task", 64, 17, new Configuration(), new Configuration(), cls.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
    }
}
