package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.class */
public class TaskExecutorSlotLifetimeTest extends TestLogger {

    @ClassRule
    public static final TestingRpcServiceResource TESTING_RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest$UserClassLoaderExtractingInvokable.class */
    public static final class UserClassLoaderExtractingInvokable extends AbstractInvokable {
        private static BlockingQueue<ClassLoader> userCodeClassLoaders = new ArrayBlockingQueue(2);

        public UserClassLoaderExtractingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            userCodeClassLoaders.put(getEnvironment().getUserCodeClassLoader().asClassLoader());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void clearQueue() {
            userCodeClassLoaders.clear();
        }

        private static ClassLoader take() throws InterruptedException {
            return userCodeClassLoaders.take();
        }

        static /* synthetic */ ClassLoader access$100() throws InterruptedException {
            return take();
        }
    }

    @Before
    public void setup() {
        UserClassLoaderExtractingInvokable.clearQueue();
    }

    @Test
    public void testUserCodeClassLoaderIsBoundToSlot() throws Exception {
        Configuration configuration = new Configuration();
        TestingRpcService testingRpcService = TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(tuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
        OneShotLatch oneShotLatch = new OneShotLatch();
        TestingJobMasterGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(collection);
        }).setUpdateTaskExecutionStateFunction(FunctionUtils.uncheckedFunction(taskExecutionState -> {
            arrayBlockingQueue.put(taskExecutionState);
            return CompletableFuture.completedFuture(Acknowledge.get());
        })).build();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m321getFencingToken().toUUID());
        SettableLeaderRetrievalService settableLeaderRetrievalService2 = new SettableLeaderRetrievalService(build.getAddress(), build.m185getFencingToken().toUUID());
        TestingHighAvailabilityServices build2 = new TestingHighAvailabilityServicesBuilder().setResourceManagerLeaderRetriever(settableLeaderRetrievalService).setJobMasterLeaderRetrieverFunction(jobID -> {
            return settableLeaderRetrievalService2;
        }).build();
        testingRpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        testingRpcService.registerGateway(build.getAddress(), build);
        TaskExecutor createTaskExecutor = createTaskExecutor(configuration, testingRpcService, build2, new LocalUnresolvedTaskManagerLocation());
        Throwable th = null;
        try {
            try {
                createTaskExecutor.start();
                SlotID slotID = ((SlotStatus) ((SlotReport) completableFuture.join()).iterator().next()).getSlotID();
                TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
                JobID jobID2 = new JobID();
                AllocationID allocationID = new AllocationID();
                selfGateway.requestSlot(slotID, jobID2, allocationID, ResourceProfile.ZERO, build.getAddress(), testingResourceManagerGateway.m321getFencingToken(), RpcUtils.INF_TIMEOUT).join();
                TaskDeploymentDescriptor build3 = TaskDeploymentDescriptorBuilder.newBuilder(jobID2, UserClassLoaderExtractingInvokable.class).setAllocationId(allocationID).build();
                oneShotLatch.await();
                selfGateway.submitTask(build3, build.m185getFencingToken(), RpcUtils.INF_TIMEOUT).join();
                ClassLoader access$100 = UserClassLoaderExtractingInvokable.access$100();
                do {
                } while (!((TaskExecutionState) arrayBlockingQueue.take()).getExecutionState().isTerminal());
                selfGateway.submitTask(build3, build.m185getFencingToken(), RpcUtils.INF_TIMEOUT).join();
                Assert.assertThat(access$100, CoreMatchers.sameInstance(UserClassLoaderExtractingInvokable.access$100()));
                if (createTaskExecutor != null) {
                    if (0 == 0) {
                        createTaskExecutor.close();
                        return;
                    }
                    try {
                        createTaskExecutor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutor != null) {
                if (th != null) {
                    try {
                        createTaskExecutor.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutor.close();
                }
            }
            throw th4;
        }
    }

    private TaskExecutor createTaskExecutor(Configuration configuration, TestingRpcService testingRpcService, TestingHighAvailabilityServices testingHighAvailabilityServices, LocalUnresolvedTaskManagerLocation localUnresolvedTaskManagerLocation) throws IOException {
        return new TaskExecutor(testingRpcService, TaskManagerConfiguration.fromConfiguration(configuration, TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(configuration), InetAddress.getLoopbackAddress().getHostAddress()), testingHighAvailabilityServices, new TaskManagerServicesBuilder().setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1)).setUnresolvedTaskManagerLocation(localUnresolvedTaskManagerLocation).build(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new TestingHeartbeatServices(), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), (String) null, new BlobCacheService(configuration, new VoidBlobStore(), (InetSocketAddress) null), this.testingFatalErrorHandlerResource.getFatalErrorHandler(), new TestingTaskExecutorPartitionTracker());
    }
}
