/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.RootInputProvider;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class UnboundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadEvaluatorFactory.class);
    private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
    private final EvaluationContext evaluationContext;
    private final double readerReuseChance;

    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
        this(evaluationContext, 0.95);
    }

    @VisibleForTesting
    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double readerReuseChance) {
        this.evaluationContext = evaluationContext;
        this.readerReuseChance = readerReuseChance;
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, DirectRunner.CommittedBundle<?> inputBundle) {
        return this.createEvaluator(application);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> application) {
        return new UnboundedReadEvaluator(application, this.evaluationContext, this.readerReuseChance);
    }

    @Override
    public void cleanup() {
    }

    static class InputProvider<OutputT>
    implements RootInputProvider<OutputT, UnboundedSourceShard<OutputT, ?>, PBegin, Read.Unbounded<OutputT>> {
        private final EvaluationContext evaluationContext;

        InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override
        public Collection<DirectRunner.CommittedBundle<UnboundedSourceShard<OutputT, ?>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> transform, int targetParallelism) throws Exception {
            UnboundedSource source = ((Read.Unbounded)transform.getTransform()).getSource();
            List splits = source.generateInitialSplits(targetParallelism, (PipelineOptions)this.evaluationContext.getPipelineOptions());
            UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
            ImmutableList.Builder initialShards = ImmutableList.builder();
            for (UnboundedSource split : splits) {
                UnboundedSourceShard shard = UnboundedSourceShard.unstarted(split, deduplicator);
                initialShards.add(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(shard)).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return initialShards.build();
        }
    }

    static abstract class UnboundedSourceShard<T, CheckpointT extends UnboundedSource.CheckpointMark> {
        UnboundedSourceShard() {
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator) {
            return UnboundedSourceShard.of(source, deduplicator, null, null);
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator, @Nullable UnboundedSource.UnboundedReader<T> reader, @Nullable CheckpointT checkpoint) {
            return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard<T, CheckpointT>(source, deduplicator, reader, checkpoint);
        }

        abstract UnboundedSource<T, CheckpointT> getSource();

        abstract UnboundedReadDeduplicator getDeduplicator();

        @Nullable
        abstract UnboundedSource.UnboundedReader<T> getExistingReader();

        @Nullable
        abstract CheckpointT getCheckpoint();

        UnboundedSourceShard<T, CheckpointT> withCheckpoint(CheckpointT newCheckpoint) {
            return UnboundedSourceShard.of(this.getSource(), this.getDeduplicator(), this.getExistingReader(), newCheckpoint);
        }
    }

    private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
        private final EvaluationContext evaluationContext;
        private final double readerReuseChance;
        private final StepTransformResult.Builder resultBuilder;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> transform, EvaluationContext evaluationContext, double readerReuseChance) {
            this.transform = transform;
            this.evaluationContext = evaluationContext;
            this.readerReuseChance = readerReuseChance;
            this.resultBuilder = StepTransformResult.withoutHold(transform);
        }

        @Override
        public void processElement(WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
            DirectRunner.UncommittedBundle output = this.evaluationContext.createBundle((PCollection)this.transform.getOutput());
            UnboundedSourceShard shard = (UnboundedSourceShard)element.getValue();
            UnboundedSource.UnboundedReader<OutputT> reader = null;
            try {
                reader = this.getReader(shard);
                boolean elementAvailable = this.startReader(reader, shard);
                if (elementAvailable) {
                    UnboundedReadDeduplicator deduplicator = shard.getDeduplicator();
                    int numElements = 0;
                    do {
                        if (!deduplicator.shouldOutput(reader.getCurrentRecordId())) continue;
                        output.add(WindowedValue.timestampedValueInGlobalWindow((Object)reader.getCurrent(), (Instant)reader.getCurrentTimestamp()));
                    } while (++numElements < 10 && reader.advance());
                    Instant watermark = reader.getWatermark();
                    UnboundedSourceShard<OutputT, CheckpointMarkT> residual = this.finishRead(reader, shard);
                    this.resultBuilder.addOutput(output, new DirectRunner.UncommittedBundle[0]).addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(residual, (Instant)watermark)));
                } else if (reader.getWatermark().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), reader, shard.getCheckpoint()), (Instant)reader.getWatermark())));
                }
            }
            catch (IOException e) {
                if (reader != null) {
                    reader.close();
                }
                throw e;
            }
        }

        private UnboundedSource.UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            UnboundedSource.UnboundedReader<OutputT> existing = shard.getExistingReader();
            if (existing == null) {
                return shard.getSource().createReader((PipelineOptions)this.evaluationContext.getPipelineOptions(), shard.getCheckpoint());
            }
            return existing;
        }

        private boolean startReader(UnboundedSource.UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            if (shard.getExistingReader() == null) {
                if (shard.getCheckpoint() != null) {
                    shard.getCheckpoint().finalizeCheckpoint();
                }
                return reader.start();
            }
            return shard.getExistingReader().advance();
        }

        private UnboundedSourceShard<OutputT, CheckpointMarkT> finishRead(UnboundedSource.UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            CheckpointMarkT oldMark = shard.getCheckpoint();
            final UnboundedSource.CheckpointMark mark = reader.getCheckpointMark();
            if (oldMark != null) {
                oldMark.finalizeCheckpoint();
            }
            if (!reader.getWatermark().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                this.evaluationContext.scheduleAfterOutputWouldBeProduced((PValue)this.transform.getOutput(), (BoundedWindow)GlobalWindow.INSTANCE, ((PCollection)this.transform.getOutput()).getWindowingStrategy(), new Runnable(){

                    @Override
                    public void run() {
                        try {
                            mark.finalizeCheckpoint();
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                        }
                    }
                });
            }
            if (ThreadLocalRandom.current().nextDouble(1.0) >= this.readerReuseChance) {
                reader.close();
                return UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), null, mark);
            }
            return shard.withCheckpoint(mark);
        }

        @Override
        public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() throws IOException {
            return this.resultBuilder.build();
        }
    }
}

