/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.AggregatorContainer;
import org.apache.beam.runners.direct.BundleFactory;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.CloningBundleFactory;
import org.apache.beam.runners.direct.DirectGBKIntoKeyedWorkItemsOverrideFactory;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.DirectGraphVisitor;
import org.apache.beam.runners.direct.DirectGroupByKeyOverrideFactory;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DisplayDataValidator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ExecutorServiceParallelExecutor;
import org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory;
import org.apache.beam.runners.direct.ImmutabilityEnforcementFactory;
import org.apache.beam.runners.direct.ImmutableListBundleFactory;
import org.apache.beam.runners.direct.KeyedPValueTrackingVisitor;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.NanosOffsetClock;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.ParDoSingleViaMultiOverrideFactory;
import org.apache.beam.runners.direct.PipelineExecutor;
import org.apache.beam.runners.direct.RootProviderRegistry;
import org.apache.beam.runners.direct.StructuralKey;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.ViewEvaluatorFactory;
import org.apache.beam.runners.direct.WriteWithShardingFactory;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Supplier;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableCollection;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class DirectRunner
extends PipelineRunner<DirectPipelineResult> {
    private static Map<Class<? extends PTransform>, PTransformOverrideFactory> defaultTransformOverrides = ImmutableMap.builder().put(View.CreatePCollectionView.class, new ViewEvaluatorFactory.ViewOverrideFactory()).put((Class<View.CreatePCollectionView>)GroupByKey.class, new DirectGroupByKeyOverrideFactory()).put((Class<GroupByKey>)TestStream.class, new TestStreamEvaluatorFactory.DirectTestStreamFactory()).put((Class<TestStream>)Write.Bound.class, new WriteWithShardingFactory()).put((Class<Write.Bound>)ParDo.Bound.class, new ParDoSingleViaMultiOverrideFactory()).put((Class<ParDo.Bound>)ParDo.BoundMulti.class, new ParDoMultiOverrideFactory()).put((Class<ParDo.BoundMulti>)SplittableParDo.GBKIntoKeyedWorkItems.class, new DirectGBKIntoKeyedWorkItemsOverrideFactory()).build();
    private static final Set<Class<? extends PTransform>> CONTAINS_UDF = ImmutableSet.of(Read.Bounded.class, Read.Unbounded.class, ParDo.Bound.class, ParDo.BoundMulti.class);
    private final DirectOptions options;
    private final Set<Enforcement> enabledEnforcements;
    private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();

    public static DirectRunner fromOptions(PipelineOptions options) {
        return new DirectRunner((DirectOptions)options.as(DirectOptions.class));
    }

    private DirectRunner(DirectOptions options) {
        this.options = options;
        this.enabledEnforcements = Enforcement.enabled(options);
    }

    public DirectOptions getPipelineOptions() {
        return this.options;
    }

    Supplier<Clock> getClockSupplier() {
        return this.clockSupplier;
    }

    void setClockSupplier(Supplier<Clock> supplier) {
        this.clockSupplier = supplier;
    }

    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
        PTransform customTransform;
        PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
        if (overrideFactory != null && (customTransform = overrideFactory.getReplacementTransform(transform)) != transform) {
            return (OutputT)Pipeline.applyTransform((String)transform.getName(), input, (PTransform)customTransform);
        }
        return (OutputT)super.apply(transform, input);
    }

    public DirectPipelineResult run(Pipeline pipeline) {
        MetricsEnvironment.setMetricsSupported((boolean)true);
        DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)graphVisitor);
        KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)keyedPValueVisitor);
        DisplayDataValidator.validatePipeline(pipeline);
        DirectGraph graph = graphVisitor.getGraph();
        EvaluationContext context = EvaluationContext.create(this.getPipelineOptions(), this.clockSupplier.get(), Enforcement.bundleFactoryFor(this.enabledEnforcements, graph), graph, keyedPValueVisitor.getKeyedPValues());
        RootProviderRegistry rootInputProvider = RootProviderRegistry.defaultRegistry(context);
        TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
        ExecutorServiceParallelExecutor executor = ExecutorServiceParallelExecutor.create(this.options.getTargetParallelism(), graph, rootInputProvider, registry, Enforcement.defaultModelEnforcements(this.enabledEnforcements), context);
        executor.start(graph.getRootTransforms());
        Map aggregatorSteps = pipeline.getAggregatorSteps();
        DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps);
        if (this.options.isBlockOnRun()) {
            try {
                result.waitUntilFinish();
            }
            catch (UserCodeException userException) {
                throw new Pipeline.PipelineExecutionException(userException.getCause());
            }
            catch (Throwable t) {
                if (t instanceof RuntimeException) {
                    throw (RuntimeException)t;
                }
                throw new RuntimeException(t);
            }
        }
        return result;
    }

    private static class NanosOffsetClockSupplier
    implements Supplier<Clock> {
        private NanosOffsetClockSupplier() {
        }

        @Override
        public Clock get() {
            return NanosOffsetClock.create();
        }
    }

    public static class DirectPipelineResult
    implements PipelineResult {
        private final PipelineExecutor executor;
        private final EvaluationContext evaluationContext;
        private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
        private PipelineResult.State state;

        private DirectPipelineResult(PipelineExecutor executor, EvaluationContext evaluationContext, Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
            this.executor = executor;
            this.evaluationContext = evaluationContext;
            this.aggregatorSteps = aggregatorSteps;
            this.state = PipelineResult.State.RUNNING;
        }

        public PipelineResult.State getState() {
            return this.state;
        }

        public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) throws AggregatorRetrievalException {
            AggregatorContainer aggregators = this.evaluationContext.getAggregatorContainer();
            Collection<PTransform<?, ?>> steps = this.aggregatorSteps.get(aggregator);
            final HashMap stepValues = new HashMap();
            for (AppliedPTransform<?, ?, ?> transform : this.evaluationContext.getSteps()) {
                Object aggregate;
                if (!steps.contains(transform.getTransform()) || (aggregate = aggregators.getAggregate(this.evaluationContext.getStepName(transform), aggregator.getName())) == null) continue;
                stepValues.put(transform.getFullName(), aggregate);
            }
            return new AggregatorValues<T>(){

                public Map<String, T> getValuesAtSteps() {
                    return stepValues;
                }

                public String toString() {
                    return MoreObjects.toStringHelper((Object)this).add("stepValues", stepValues).toString();
                }
            };
        }

        public MetricResults metrics() {
            return this.evaluationContext.getMetrics();
        }

        public PipelineResult.State waitUntilFinish() {
            if (!this.state.isTerminal()) {
                try {
                    this.executor.awaitCompletion();
                    this.state = PipelineResult.State.DONE;
                }
                catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException)e;
                    }
                    throw new RuntimeException(e);
                }
            }
            return this.state;
        }

        public PipelineResult.State cancel() throws IOException {
            throw new UnsupportedOperationException("DirectPipelineResult does not support cancel.");
        }

        public PipelineResult.State waitUntilFinish(Duration duration) {
            throw new UnsupportedOperationException("DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See BEAM-596.");
        }
    }

    static enum Enforcement {
        ENCODABILITY{

            @Override
            public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
                return true;
            }
        }
        ,
        IMMUTABILITY{

            @Override
            public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
                return CONTAINS_UDF.contains(graph.getProducer((PValue)collection).getTransform().getClass());
            }
        };


        public abstract boolean appliesTo(PCollection<?> var1, DirectGraph var2);

        public static Set<Enforcement> enabled(DirectOptions options) {
            EnumSet<Enforcement> enabled = EnumSet.noneOf(Enforcement.class);
            if (options.isEnforceEncodability()) {
                enabled.add(ENCODABILITY);
            }
            if (options.isEnforceImmutability()) {
                enabled.add(IMMUTABILITY);
            }
            return Collections.unmodifiableSet(enabled);
        }

        public static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) {
            BundleFactory bundleFactory;
            BundleFactory bundleFactory2 = bundleFactory = enforcements.contains((Object)ENCODABILITY) ? CloningBundleFactory.create() : ImmutableListBundleFactory.create();
            if (enforcements.contains((Object)IMMUTABILITY)) {
                bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory, graph);
            }
            return bundleFactory;
        }

        private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
            ImmutableMap.Builder<Class, ImmutableCollection> enforcements = ImmutableMap.builder();
            ImmutableList.Builder enabledParDoEnforcements = ImmutableList.builder();
            if (enabledEnforcements.contains((Object)IMMUTABILITY)) {
                enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
            }
            ImmutableCollection parDoEnforcements = enabledParDoEnforcements.build();
            enforcements.put(ParDo.Bound.class, parDoEnforcements);
            enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
            return enforcements.build();
        }
    }

    static interface PCollectionViewWriter<ElemT, ViewT> {
        public void add(Iterable<WindowedValue<ElemT>> var1);
    }

    static interface CommittedBundle<T> {
        @Nullable
        public PCollection<T> getPCollection();

        public StructuralKey<?> getKey();

        public Iterable<WindowedValue<T>> getElements();

        public Instant getMinTimestamp();

        public Instant getSynchronizedProcessingOutputWatermark();

        public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> var1);
    }

    static interface UncommittedBundle<T> {
        @Nullable
        public PCollection<T> getPCollection();

        public UncommittedBundle<T> add(WindowedValue<T> var1);

        public CommittedBundle<T> commit(Instant var1);
    }
}

