/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyUtils;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class VertexwiseSchedulingStrategy
implements SchedulingStrategy,
SchedulingTopologyListener {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Set<ExecutionVertexID> newVertices = new HashSet<ExecutionVertexID>();

    public VertexwiseSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        schedulingTopology.registerSchedulingTopologyListener(this);
    }

    @Override
    public void startScheduling() {
        Set<ExecutionVertexID> sourceVertices = IterableUtils.toStream(this.schedulingTopology.getVertices()).filter(vertex -> vertex.getConsumedPartitionGroups().isEmpty()).map(Vertex::getId).collect(Collectors.toSet());
        this.maybeScheduleVertices(sourceVertices);
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        this.maybeScheduleVertices(verticesToRestart);
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            SchedulingExecutionVertex executionVertex = this.schedulingTopology.getVertex(executionVertexId);
            Set<ExecutionVertexID> consumerVertices = IterableUtils.toStream(executionVertex.getProducedResults()).map(SchedulingResultPartition::getConsumerVertexGroup).filter(Optional::isPresent).flatMap(consumerVertexGroup -> IterableUtils.toStream((Iterable)((Iterable)consumerVertexGroup.get()))).collect(Collectors.toSet());
            this.maybeScheduleVertices(consumerVertices);
        }
    }

    @Override
    public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) {
    }

    @Override
    public void notifySchedulingTopologyUpdated(SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) {
        Preconditions.checkState((schedulingTopology == this.schedulingTopology ? 1 : 0) != 0);
        this.newVertices.addAll(newExecutionVertices);
    }

    private void maybeScheduleVertices(Set<ExecutionVertexID> vertices) {
        Set<ExecutionVertexID> allCandidates;
        HashMap consumableStatusCache = new HashMap();
        if (this.newVertices.isEmpty()) {
            allCandidates = vertices;
        } else {
            allCandidates = new HashSet<ExecutionVertexID>(vertices);
            allCandidates.addAll(this.newVertices);
            this.newVertices.clear();
        }
        Set<ExecutionVertexID> verticesToDeploy = allCandidates.stream().filter(vertexId -> {
            SchedulingExecutionVertex vertex = this.schedulingTopology.getVertex((ExecutionVertexID)vertexId);
            Preconditions.checkState((vertex.getState() == ExecutionState.CREATED ? 1 : 0) != 0);
            return this.areVertexInputsAllConsumable(vertex, consumableStatusCache);
        }).collect(Collectors.toSet());
        this.scheduleVerticesOneByOne(verticesToDeploy);
    }

    private void scheduleVerticesOneByOne(Set<ExecutionVertexID> verticesToDeploy) {
        if (verticesToDeploy.isEmpty()) {
            return;
        }
        List<ExecutionVertexID> sortedVerticesToDeploy = SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder(this.schedulingTopology, verticesToDeploy);
        sortedVerticesToDeploy.forEach(id -> this.schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(id)));
    }

    private boolean areVertexInputsAllConsumable(SchedulingExecutionVertex vertex, Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
        for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
            if (consumableStatusCache.computeIfAbsent(consumedPartitionGroup, this::isConsumedPartitionGroupConsumable).booleanValue()) continue;
            return false;
        }
        return true;
    }

    private boolean isConsumedPartitionGroupConsumable(ConsumedPartitionGroup consumedPartitionGroup) {
        for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
            if (this.schedulingTopology.getResultPartition(partitionId).getState() == ResultPartitionState.CONSUMABLE) continue;
            return false;
        }
        return true;
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }
}

