/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.direct.AutoValue_TestStreamEvaluatorFactory_TestStreamIndex;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.RootInputProvider;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Supplier;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;

class TestStreamEvaluatorFactory
implements TransformEvaluatorFactory {
    private final EvaluationContext evaluationContext;

    TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) {
        return this.createEvaluator(application);
    }

    @Override
    public void cleanup() throws Exception {
    }

    private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application) {
        return new Evaluator(application, this.evaluationContext);
    }

    static abstract class TestStreamIndex<T> {
        TestStreamIndex() {
        }

        static <T> TestStreamIndex<T> of(TestStream<T> stream) {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<T>(stream, 0);
        }

        abstract TestStream<T> getTestStream();

        abstract int getIndex();

        TestStreamIndex<T> next() {
            return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<T>(this.getTestStream(), this.getIndex() + 1);
        }
    }

    static class InputProvider<T>
    implements RootInputProvider<T, TestStreamIndex<T>, PBegin, DirectTestStreamFactory.DirectTestStream<T>> {
        private final EvaluationContext evaluationContext;

        InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override
        public Collection<DirectRunner.CommittedBundle<TestStreamIndex<T>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<T>, DirectTestStreamFactory.DirectTestStream<T>> transform, int targetParallelism) {
            DirectRunner.CommittedBundle initialBundle = this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(((DirectTestStreamFactory.DirectTestStream)transform.getTransform()).original))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
            return Collections.singleton(initialBundle);
        }
    }

    static class DirectTestStreamFactory<T>
    implements PTransformOverrideFactory<PBegin, PCollection<T>, TestStream<T>> {
        DirectTestStreamFactory() {
        }

        public PTransform<PBegin, PCollection<T>> getReplacementTransform(TestStream<T> transform) {
            return new DirectTestStream<T>(transform);
        }

        public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
            return p.begin();
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, PCollection<T> newOutput) {
            return ReplacementOutputs.singleton(outputs, newOutput);
        }

        static class DirectTestStream<T>
        extends PTransform<PBegin, PCollection<T>> {
            private final TestStream<T> original;

            @VisibleForTesting
            DirectTestStream(TestStream<T> transform) {
                this.original = transform;
            }

            public PCollection<T> expand(PBegin input) {
                PipelineRunner runner = input.getPipeline().getRunner();
                Preconditions.checkState(runner instanceof DirectRunner, "%s can only be used when running with the %s", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)DirectRunner.class.getSimpleName());
                ((DirectRunner)runner).setClockSupplier(new TestClockSupplier());
                return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED).setCoder(this.original.getValueCoder());
            }
        }
    }

    private static class TestClockSupplier
    implements Supplier<Clock> {
        private TestClockSupplier() {
        }

        @Override
        public Clock get() {
            return new TestClock();
        }
    }

    @VisibleForTesting
    static class TestClock
    implements Clock {
        private final AtomicReference<Instant> currentTime = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        TestClock() {
        }

        public void advance(Duration amount) {
            Instant now = this.currentTime.get();
            this.currentTime.compareAndSet(now, now.plus((ReadableDuration)amount));
        }

        @Override
        public Instant now() {
            return this.currentTime.get();
        }
    }

    private static class Evaluator<T>
    implements TransformEvaluator<TestStreamIndex<T>> {
        private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
        private final EvaluationContext context;
        private final StepTransformResult.Builder resultBuilder;

        private Evaluator(AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application, EvaluationContext context) {
            this.application = application;
            this.context = context;
            this.resultBuilder = StepTransformResult.withoutHold(application);
        }

        @Override
        public void processElement(WindowedValue<TestStreamIndex<T>> element) throws Exception {
            TestStreamIndex next;
            TestStreamIndex streamIndex = (TestStreamIndex)element.getValue();
            List events = streamIndex.getTestStream().getEvents();
            int index = streamIndex.getIndex();
            Instant watermark = element.getTimestamp();
            TestStream.Event event = (TestStream.Event)events.get(index);
            if (event.getType().equals((Object)TestStream.EventType.ELEMENT)) {
                DirectRunner.UncommittedBundle bundle = this.context.createBundle((PCollection)((TaggedPValue)Iterables.getOnlyElement(this.application.getOutputs())).getValue());
                for (TimestampedValue elem : ((TestStream.ElementEvent)event).getElements()) {
                    bundle.add(WindowedValue.timestampedValueInGlobalWindow((Object)elem.getValue(), (Instant)elem.getTimestamp()));
                }
                this.resultBuilder.addOutput(bundle, new DirectRunner.UncommittedBundle[0]);
            }
            if (event.getType().equals((Object)TestStream.EventType.WATERMARK)) {
                watermark = ((TestStream.WatermarkEvent)event).getWatermark();
            }
            if (event.getType().equals((Object)TestStream.EventType.PROCESSING_TIME)) {
                ((TestClock)this.context.getClock()).advance(((TestStream.ProcessingTimeEvent)event).getProcessingTimeAdvance());
            }
            if ((next = streamIndex.next()).getIndex() < events.size()) {
                this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, (Instant)watermark)));
            }
        }

        @Override
        public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
            return this.resultBuilder.build();
        }
    }
}

