package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.RemovalListener;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.RemovalNotification;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.class */
public final class ExecutorServiceParallelExecutor implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final int targetParallelism;
    private final ExecutorService executorService;
    private final DirectGraph graph;
    private final RootProviderRegistry rootProviderRegistry;
    private final TransformEvaluatorRegistry registry;
    private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements;
    private final EvaluationContext evaluationContext;
    private final TransformExecutorService parallelExecutorService;
    private final AtomicReference<ExecutorState> state = new AtomicReference<>(ExecutorState.QUIESCENT);
    private final AtomicLong outstandingWork = new AtomicLong();
    private AtomicReference<PipelineResult.State> pipelineState = new AtomicReference<>(PipelineResult.State.RUNNING);
    private final LoadingCache<StepAndKey, TransformExecutorService> executorServices = CacheBuilder.newBuilder().weakValues().removalListener(shutdownExecutorServiceListener()).build(serialTransformExecutorServiceCacheLoader());
    private final Queue<ExecutorUpdate> allUpdates = new ConcurrentLinkedQueue();
    private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates = new LinkedBlockingQueue();
    private final CompletionCallback defaultCompletionCallback = new TimerIterableCompletionCallback(Collections.emptyList());
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<DirectRunner.CommittedBundle<?>>> pendingRootBundles = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$ExecutorState.class */
    public enum ExecutorState {
        ACTIVE,
        PROCESSING,
        QUIESCING,
        QUIESCENT
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$ExecutorUpdate.class */
    public static abstract class ExecutorUpdate {
        public static ExecutorUpdate fromBundle(DirectRunner.CommittedBundle<?> committedBundle, Collection<AppliedPTransform<?, ?, ?>> collection) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.of(committedBundle), collection, Optional.absent());
        }

        public static ExecutorUpdate fromException(Exception exc) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.absent(), Collections.emptyList(), Optional.of(exc));
        }

        public abstract Optional<? extends DirectRunner.CommittedBundle<?>> getBundle();

        public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();

        public abstract Optional<? extends Exception> getException();
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$MonitorRunnable.class */
    private class MonitorRunnable implements Runnable {
        private final String runnableName;
        private boolean exceptionThrown;

        private MonitorRunnable() {
            this.runnableName = String.format("%s$%s-monitor", ExecutorServiceParallelExecutor.this.evaluationContext.getPipelineOptions().getAppName(), ExecutorServiceParallelExecutor.class.getSimpleName());
            this.exceptionThrown = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(this.runnableName);
            try {
                try {
                    boolean z = ExecutorServiceParallelExecutor.this.outstandingWork.get() == 0;
                    ExecutorState executorState = (ExecutorState) ExecutorServiceParallelExecutor.this.state.get();
                    if (executorState == ExecutorState.ACTIVE) {
                        ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
                    } else if (executorState == ExecutorState.PROCESSING && z) {
                        ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
                    } else if (executorState == ExecutorState.QUIESCING && z) {
                        ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
                    }
                    fireTimers();
                    ArrayList arrayList = new ArrayList();
                    ExecutorUpdate executorUpdate = (ExecutorUpdate) ExecutorServiceParallelExecutor.this.allUpdates.poll();
                    while (executorUpdate != null) {
                        arrayList.add(executorUpdate);
                        executorUpdate = (ExecutorUpdate) ExecutorServiceParallelExecutor.this.allUpdates.poll();
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        applyUpdate(z, executorState, (ExecutorUpdate) it.next());
                    }
                    addWorkIfNecessary();
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ExecutorServiceParallelExecutor.LOG.error("Monitor died due to being interrupted");
                    while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) {
                        ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                    }
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                } catch (Exception e2) {
                    ExecutorServiceParallelExecutor.LOG.error("Monitor thread died due to exception", e2);
                    while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(e2))) {
                        ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                    }
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                }
            } catch (Throwable th) {
                if (!shouldShutdown()) {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
                Thread.currentThread().setName(name);
                throw th;
            }
        }

        private void applyUpdate(boolean z, ExecutorState executorState, ExecutorUpdate executorUpdate) {
            ExecutorServiceParallelExecutor.LOG.debug("Executor Update: {}", executorUpdate);
            if (!executorUpdate.getBundle().isPresent()) {
                if (executorUpdate.getException().isPresent()) {
                    Preconditions.checkState(ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(executorUpdate.getException().get())), "VisibleUpdates should always be able to receive an offered update");
                    this.exceptionThrown = true;
                    return;
                }
                return;
            }
            if (ExecutorState.ACTIVE == executorState || (ExecutorState.PROCESSING == executorState && z)) {
                ExecutorServiceParallelExecutor.this.scheduleConsumers(executorUpdate);
            } else {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(executorUpdate);
            }
        }

        private void fireTimers() throws Exception {
            try {
                for (WatermarkManager.FiredTimers firedTimers : ExecutorServiceParallelExecutor.this.evaluationContext.extractFiredTimers()) {
                    Collection<TimerInternals.TimerData> timers = firedTimers.getTimers();
                    ExecutorServiceParallelExecutor.this.scheduleConsumption(firedTimers.getTransform(), ExecutorServiceParallelExecutor.this.evaluationContext.createKeyedBundle(firedTimers.getKey(), ((TaggedPValue) Iterables.getOnlyElement(firedTimers.getTransform().getInputs())).getValue()).add(WindowedValue.valueInGlobalWindow(KeyedWorkItems.timersWorkItem(firedTimers.getKey().getKey(), timers))).commit(ExecutorServiceParallelExecutor.this.evaluationContext.now()), new TimerIterableCompletionCallback(timers));
                    ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                }
            } catch (Exception e) {
                ExecutorServiceParallelExecutor.LOG.error("Internal Error while delivering timers", e);
                throw e;
            }
        }

        private boolean shouldShutdown() {
            PipelineResult.State state = PipelineResult.State.UNKNOWN;
            if (this.exceptionThrown) {
                state = PipelineResult.State.FAILED;
            } else if (ExecutorServiceParallelExecutor.this.evaluationContext.isDone()) {
                ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.finished());
                state = PipelineResult.State.DONE;
            }
            ExecutorServiceParallelExecutor.this.shutdownIfNecessary(state);
            return ((PipelineResult.State) ExecutorServiceParallelExecutor.this.pipelineState.get()).isTerminal();
        }

        private void addWorkIfNecessary() {
            if (ExecutorServiceParallelExecutor.this.state.get() == ExecutorState.QUIESCENT) {
                for (Map.Entry entry : ExecutorServiceParallelExecutor.this.pendingRootBundles.entrySet()) {
                    ArrayList arrayList = new ArrayList();
                    while (!((ConcurrentLinkedQueue) entry.getValue()).isEmpty()) {
                        arrayList.add((DirectRunner.CommittedBundle) ((ConcurrentLinkedQueue) entry.getValue()).poll());
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ExecutorServiceParallelExecutor.this.scheduleConsumption((AppliedPTransform) entry.getKey(), (DirectRunner.CommittedBundle) it.next(), ExecutorServiceParallelExecutor.this.defaultCompletionCallback);
                        ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$TimerIterableCompletionCallback.class */
    public class TimerIterableCompletionCallback implements CompletionCallback {
        private final Iterable<TimerInternals.TimerData> timers;

        protected TimerIterableCompletionCallback(Iterable<TimerInternals.TimerData> iterable) {
            this.timers = iterable;
        }

        @Override // org.apache.beam.runners.direct.CompletionCallback
        public final CommittedResult handleResult(DirectRunner.CommittedBundle<?> committedBundle, TransformResult<?> transformResult) {
            CommittedResult handleResult = ExecutorServiceParallelExecutor.this.evaluationContext.handleResult(committedBundle, this.timers, transformResult);
            for (DirectRunner.CommittedBundle<?> committedBundle2 : handleResult.getOutputs()) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(committedBundle2, ExecutorServiceParallelExecutor.this.graph.getPrimitiveConsumers(committedBundle2.getPCollection())));
            }
            DirectRunner.CommittedBundle<?> unprocessedInputs = handleResult.getUnprocessedInputs();
            if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
                if (committedBundle.getPCollection() == null) {
                    ((ConcurrentLinkedQueue) ExecutorServiceParallelExecutor.this.pendingRootBundles.get(transformResult.getTransform())).offer(unprocessedInputs);
                } else {
                    ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs, Collections.singleton(handleResult.getTransform())));
                }
            }
            if (!handleResult.getProducedOutputTypes().isEmpty()) {
                ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
            }
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
            return handleResult;
        }

        @Override // org.apache.beam.runners.direct.CompletionCallback
        public void handleEmpty(AppliedPTransform<?, ?, ?> appliedPTransform) {
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }

        @Override // org.apache.beam.runners.direct.CompletionCallback
        public final void handleException(DirectRunner.CommittedBundle<?> committedBundle, Exception exc) {
            ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromException(exc));
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$VisibleExecutorUpdate.class */
    public static class VisibleExecutorUpdate {
        private final Optional<? extends Exception> exception;

        @Nullable
        private final PipelineResult.State newState;

        public static VisibleExecutorUpdate fromException(Exception exc) {
            return new VisibleExecutorUpdate(null, exc);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(PipelineResult.State.DONE, null);
        }

        public static VisibleExecutorUpdate cancelled() {
            return new VisibleExecutorUpdate(PipelineResult.State.CANCELLED, null);
        }

        private VisibleExecutorUpdate(PipelineResult.State state, @Nullable Exception exc) {
            this.exception = Optional.fromNullable(exc);
            this.newState = state;
        }

        public PipelineResult.State getNewState() {
            return this.newState;
        }
    }

    public static ExecutorServiceParallelExecutor create(int i, DirectGraph directGraph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry transformEvaluatorRegistry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> map, EvaluationContext evaluationContext) {
        return new ExecutorServiceParallelExecutor(i, directGraph, rootProviderRegistry, transformEvaluatorRegistry, map, evaluationContext);
    }

    private ExecutorServiceParallelExecutor(int i, DirectGraph directGraph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry transformEvaluatorRegistry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> map, EvaluationContext evaluationContext) {
        this.targetParallelism = i;
        this.executorService = Executors.newFixedThreadPool(i);
        this.graph = directGraph;
        this.rootProviderRegistry = rootProviderRegistry;
        this.registry = transformEvaluatorRegistry;
        this.transformEnforcements = map;
        this.evaluationContext = evaluationContext;
        this.parallelExecutorService = TransformExecutorServices.parallel(this.executorService);
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>() { // from class: org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.1
            @Override // org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.CacheLoader
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
        return new RemovalListener<StepAndKey, TransformExecutorService>() { // from class: org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.2
            @Override // org.apache.beam.runners.direct.java.repackaged.com.google.common.cache.RemovalListener
            public void onRemoval(RemovalNotification<StepAndKey, TransformExecutorService> removalNotification) {
                TransformExecutorService value = removalNotification.getValue();
                if (value != null) {
                    value.shutdown();
                }
            }
        };
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public void start(Collection<AppliedPTransform<?, ?, ?>> collection) {
        int max = Math.max(3, this.targetParallelism);
        for (AppliedPTransform<?, ?, ?> appliedPTransform : collection) {
            ConcurrentLinkedQueue<DirectRunner.CommittedBundle<?>> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
            try {
                concurrentLinkedQueue.addAll(this.rootProviderRegistry.getInitialInputs(appliedPTransform, max));
                this.pendingRootBundles.put(appliedPTransform, concurrentLinkedQueue);
            } catch (Exception e) {
                throw UserCodeException.wrap(e);
            }
        }
        this.evaluationContext.initialize(this.pendingRootBundles);
        this.executorService.submit(new MonitorRunnable());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleConsumption(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle, CompletionCallback completionCallback) {
        evaluateBundle(appliedPTransform, committedBundle, completionCallback);
    }

    private <T> void evaluateBundle(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<T> committedBundle, CompletionCallback completionCallback) {
        TransformExecutorService transformExecutorService;
        if (isKeyed(committedBundle.getPCollection())) {
            transformExecutorService = this.executorServices.getUnchecked(StepAndKey.of(appliedPTransform, committedBundle.getKey()));
        } else {
            transformExecutorService = this.parallelExecutorService;
        }
        TransformExecutor<?> create = TransformExecutor.create(this.evaluationContext, this.registry, (Collection) MoreObjects.firstNonNull(this.transformEnforcements.get(appliedPTransform.getTransform().getClass()), Collections.emptyList()), committedBundle, appliedPTransform, completionCallback, transformExecutorService);
        this.outstandingWork.incrementAndGet();
        if (this.pipelineState.get().isTerminal()) {
            return;
        }
        transformExecutorService.schedule(create);
    }

    private boolean isKeyed(PValue pValue) {
        return this.evaluationContext.isKeyed(pValue);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleConsumers(ExecutorUpdate executorUpdate) {
        DirectRunner.CommittedBundle<?> committedBundle = executorUpdate.getBundle().get();
        Iterator<AppliedPTransform<?, ?, ?>> it = executorUpdate.getConsumers().iterator();
        while (it.hasNext()) {
            scheduleConsumption(it.next(), committedBundle, this.defaultCompletionCallback);
        }
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public PipelineResult.State waitUntilFinish(Duration duration) throws Exception {
        Instant instant = duration.equals(Duration.ZERO) ? new Instant(Long.MAX_VALUE) : Instant.now().plus(duration);
        VisibleExecutorUpdate visibleExecutorUpdate = null;
        while (Instant.now().isBefore(instant) && (visibleExecutorUpdate == null || isTerminalStateUpdate(visibleExecutorUpdate))) {
            visibleExecutorUpdate = this.visibleUpdates.poll(25L, TimeUnit.MILLISECONDS);
            if (visibleExecutorUpdate == null && this.pipelineState.get().isTerminal()) {
                return this.pipelineState.get();
            }
            if (visibleExecutorUpdate != null && visibleExecutorUpdate.exception.isPresent()) {
                throw ((Exception) visibleExecutorUpdate.exception.get());
            }
        }
        return this.pipelineState.get();
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public PipelineResult.State getPipelineState() {
        return this.pipelineState.get();
    }

    private boolean isTerminalStateUpdate(VisibleExecutorUpdate visibleExecutorUpdate) {
        return (visibleExecutorUpdate.getNewState() == null && visibleExecutorUpdate.getNewState().isTerminal()) ? false : true;
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public void stop() {
        shutdownIfNecessary(PipelineResult.State.CANCELLED);
        while (!this.visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) {
            this.visibleUpdates.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdownIfNecessary(PipelineResult.State state) {
        if (state.isTerminal()) {
            LOG.debug("Pipeline has terminated. Shutting down.");
            this.pipelineState.compareAndSet(PipelineResult.State.RUNNING, state);
            this.executorServices.invalidateAll();
            this.executorServices.cleanUp();
            this.parallelExecutorService.shutdown();
            this.executorService.shutdown();
            try {
                this.registry.cleanup();
            } catch (Exception e) {
                this.visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
            }
        }
    }
}
