package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.class */
public class DefaultExecutionDeployer implements ExecutionDeployer {
    private final Logger log;
    private final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionOperations executionOperations;
    private final ExecutionVertexVersioner executionVertexVersioner;
    private final Time partitionRegistrationTimeout;
    private final BiConsumer<ExecutionVertexID, AllocationID> allocationReservationFunc;
    private final ComponentMainThreadExecutor mainThreadExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionDeployer$ExecutionDeploymentHandle.class */
    public static class ExecutionDeploymentHandle {
        private final Execution execution;
        private final ExecutionSlotAssignment executionSlotAssignment;
        private final ExecutionVertexVersion requiredVertexVersion;

        ExecutionDeploymentHandle(Execution execution, ExecutionSlotAssignment executionSlotAssignment, ExecutionVertexVersion executionVertexVersion) {
            this.execution = (Execution) Preconditions.checkNotNull(execution);
            this.executionSlotAssignment = (ExecutionSlotAssignment) Preconditions.checkNotNull(executionSlotAssignment);
            this.requiredVertexVersion = (ExecutionVertexVersion) Preconditions.checkNotNull(executionVertexVersion);
        }

        Execution getExecution() {
            return this.execution;
        }

        ExecutionAttemptID getExecutionAttemptId() {
            return this.execution.getAttemptId();
        }

        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
            return this.executionSlotAssignment.getLogicalSlotFuture();
        }

        ExecutionVertexVersion getRequiredVertexVersion() {
            return this.requiredVertexVersion;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionDeployer$Factory.class */
    public static class Factory implements ExecutionDeployer.Factory {
        @Override // org.apache.flink.runtime.scheduler.ExecutionDeployer.Factory
        public DefaultExecutionDeployer createInstance(Logger logger, ExecutionSlotAllocator executionSlotAllocator, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Time time, BiConsumer<ExecutionVertexID, AllocationID> biConsumer, ComponentMainThreadExecutor componentMainThreadExecutor) {
            return new DefaultExecutionDeployer(logger, executionSlotAllocator, executionOperations, executionVertexVersioner, time, biConsumer, componentMainThreadExecutor);
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionDeployer.Factory
        public /* bridge */ /* synthetic */ ExecutionDeployer createInstance(Logger logger, ExecutionSlotAllocator executionSlotAllocator, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Time time, BiConsumer biConsumer, ComponentMainThreadExecutor componentMainThreadExecutor) {
            return createInstance(logger, executionSlotAllocator, executionOperations, executionVertexVersioner, time, (BiConsumer<ExecutionVertexID, AllocationID>) biConsumer, componentMainThreadExecutor);
        }
    }

    private DefaultExecutionDeployer(Logger logger, ExecutionSlotAllocator executionSlotAllocator, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, Time time, BiConsumer<ExecutionVertexID, AllocationID> biConsumer, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.executionSlotAllocator = (ExecutionSlotAllocator) Preconditions.checkNotNull(executionSlotAllocator);
        this.executionOperations = (ExecutionOperations) Preconditions.checkNotNull(executionOperations);
        this.executionVertexVersioner = (ExecutionVertexVersioner) Preconditions.checkNotNull(executionVertexVersioner);
        this.partitionRegistrationTimeout = (Time) Preconditions.checkNotNull(time);
        this.allocationReservationFunc = (BiConsumer) Preconditions.checkNotNull(biConsumer);
        this.mainThreadExecutor = (ComponentMainThreadExecutor) Preconditions.checkNotNull(componentMainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionDeployer
    public void allocateSlotsAndDeploy(List<Execution> list, Map<ExecutionVertexID, ExecutionVertexVersion> map) {
        validateExecutionStates(list);
        transitionToScheduled(list);
        waitForAllSlotsAndDeploy(createDeploymentHandles(list, map, allocateSlotsFor(list)));
    }

    private void validateExecutionStates(Collection<Execution> collection) {
        collection.forEach(execution -> {
            Preconditions.checkState(execution.getState() == ExecutionState.CREATED, "Expected execution %s to be in CREATED state, was: %s", new Object[]{execution.getAttemptId(), execution.getState()});
        });
    }

    private void transitionToScheduled(List<Execution> list) {
        list.forEach(execution -> {
            execution.transitionState(ExecutionState.SCHEDULED);
        });
    }

    private List<ExecutionSlotAssignment> allocateSlotsFor(List<Execution> list) {
        return this.executionSlotAllocator.allocateSlotsFor((List) list.stream().map((v0) -> {
            return v0.getAttemptId();
        }).collect(Collectors.toList()));
    }

    private List<ExecutionDeploymentHandle> createDeploymentHandles(List<Execution> list, Map<ExecutionVertexID, ExecutionVertexVersion> map, List<ExecutionSlotAssignment> list2) {
        ArrayList arrayList = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            Execution execution = list.get(i);
            ExecutionSlotAssignment executionSlotAssignment = list2.get(i);
            Preconditions.checkState(execution.getAttemptId().equals(executionSlotAssignment.getExecutionAttemptId()));
            arrayList.add(new ExecutionDeploymentHandle(execution, executionSlotAssignment, map.get(execution.getVertex().getID())));
        }
        return arrayList;
    }

    private void waitForAllSlotsAndDeploy(List<ExecutionDeploymentHandle> list) {
        FutureUtils.assertNoException(assignAllResourcesAndRegisterProducedPartitions(list).handle((BiFunction<? super Void, Throwable, ? extends U>) deployAll(list)));
    }

    private CompletableFuture<Void> assignAllResourcesAndRegisterProducedPartitions(List<ExecutionDeploymentHandle> list) {
        ArrayList arrayList = new ArrayList();
        for (ExecutionDeploymentHandle executionDeploymentHandle : list) {
            arrayList.add(executionDeploymentHandle.getLogicalSlotFuture().handle((BiFunction<? super LogicalSlot, Throwable, ? extends U>) assignResource(executionDeploymentHandle)).thenCompose((Function<? super U, ? extends CompletionStage<U>>) registerProducedPartitions(executionDeploymentHandle)).handle((r6, th) -> {
                if (th == null) {
                    return null;
                }
                handleTaskDeploymentFailure(executionDeploymentHandle.getExecution(), th);
                return null;
            }));
        }
        return FutureUtils.waitForAll(arrayList);
    }

    private BiFunction<Void, Throwable, Void> deployAll(List<ExecutionDeploymentHandle> list) {
        return (r6, th) -> {
            propagateIfNonNull(th);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ExecutionDeploymentHandle executionDeploymentHandle = (ExecutionDeploymentHandle) it.next();
                CompletableFuture<LogicalSlot> logicalSlotFuture = executionDeploymentHandle.getLogicalSlotFuture();
                Preconditions.checkState(logicalSlotFuture.isDone());
                FutureUtils.assertNoException(logicalSlotFuture.handle((BiFunction<? super LogicalSlot, Throwable, ? extends U>) deployOrHandleError(executionDeploymentHandle)));
            }
            return null;
        };
    }

    private static void propagateIfNonNull(Throwable th) {
        if (th != null) {
            throw new CompletionException(th);
        }
    }

    private BiFunction<LogicalSlot, Throwable, LogicalSlot> assignResource(ExecutionDeploymentHandle executionDeploymentHandle) {
        return (logicalSlot, th) -> {
            ExecutionVertexVersion requiredVertexVersion = executionDeploymentHandle.getRequiredVertexVersion();
            Execution execution = executionDeploymentHandle.getExecution();
            if (execution.getState() != ExecutionState.SCHEDULED || this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                if (th != null) {
                    return null;
                }
                this.log.debug("Refusing to assign slot to execution {} because this deployment was superseded by another deployment", executionDeploymentHandle.getExecutionAttemptId());
                releaseSlotIfPresent(logicalSlot);
                return null;
            }
            if (th != null) {
                throw new CompletionException(maybeWrapWithNoResourceAvailableException(th));
            }
            if (!execution.tryAssignResource(logicalSlot)) {
                throw new IllegalStateException("Could not assign resource " + logicalSlot + " to execution " + execution + '.');
            }
            this.allocationReservationFunc.accept(execution.getAttemptId().getExecutionVertexId(), logicalSlot.getAllocationId());
            return logicalSlot;
        };
    }

    private static void releaseSlotIfPresent(@Nullable LogicalSlot logicalSlot) {
        if (logicalSlot != null) {
            logicalSlot.releaseSlot(null);
        }
    }

    private static Throwable maybeWrapWithNoResourceAvailableException(Throwable th) {
        return ExceptionUtils.stripCompletionException(th) instanceof TimeoutException ? new NoResourceAvailableException("Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.", th) : th;
    }

    private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions(ExecutionDeploymentHandle executionDeploymentHandle) {
        return logicalSlot -> {
            if (logicalSlot == null) {
                return FutureUtils.completedVoidFuture();
            }
            Execution execution = executionDeploymentHandle.getExecution();
            return FutureUtils.orTimeout(execution.registerProducedPartitions(logicalSlot.getTaskManagerLocation()), this.partitionRegistrationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS, this.mainThreadExecutor, String.format("Registering produced partitions for execution %s timed out after %d ms.", execution.getAttemptId(), Long.valueOf(this.partitionRegistrationTimeout.toMilliseconds())));
        };
    }

    private BiFunction<Object, Throwable, Void> deployOrHandleError(ExecutionDeploymentHandle executionDeploymentHandle) {
        return (obj, th) -> {
            ExecutionVertexVersion requiredVertexVersion = executionDeploymentHandle.getRequiredVertexVersion();
            Execution execution = executionDeploymentHandle.getExecution();
            if (execution.getState() != ExecutionState.SCHEDULED || this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                if (th != null) {
                    return null;
                }
                this.log.debug("Refusing to assign slot to execution {} because this deployment was superseded by another deployment", executionDeploymentHandle.getExecutionAttemptId());
                return null;
            }
            if (th == null) {
                deployTaskSafe(execution);
                return null;
            }
            handleTaskDeploymentFailure(execution, th);
            return null;
        };
    }

    private void deployTaskSafe(Execution execution) {
        try {
            this.executionOperations.deploy(execution);
        } catch (Throwable th) {
            handleTaskDeploymentFailure(execution, th);
        }
    }

    private void handleTaskDeploymentFailure(Execution execution, Throwable th) {
        this.executionOperations.markFailed(execution, th);
    }
}
