package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.class */
public class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedReadEvaluatorFactory.class);
    private static final double DEFAULT_READER_REUSE_CHANCE = 0.95d;
    private final EvaluationContext evaluationContext;
    private final double readerReuseChance;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$InputProvider.class */
    public static class InputProvider<OutputT> implements RootInputProvider<OutputT, UnboundedSourceShard<OutputT, ?>, PBegin, Read.Unbounded<OutputT>> {
        private final EvaluationContext evaluationContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext) {
            this.evaluationContext = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<DirectRunner.CommittedBundle<UnboundedSourceShard<OutputT, ?>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform, int i) throws Exception {
            UnboundedSource source = appliedPTransform.getTransform().getSource();
            List generateInitialSplits = source.generateInitialSplits(i, this.evaluationContext.getPipelineOptions());
            UnboundedReadDeduplicator create = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = generateInitialSplits.iterator();
            while (it.hasNext()) {
                builder.add((ImmutableList.Builder) this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(UnboundedSourceShard.unstarted((UnboundedSource) it.next(), create))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return builder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.class */
    public static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
        private final EvaluationContext evaluationContext;
        private final double readerReuseChance;
        private final StepTransformResult.Builder resultBuilder;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform, EvaluationContext evaluationContext, double d) {
            this.transform = appliedPTransform;
            this.evaluationContext = evaluationContext;
            this.readerReuseChance = d;
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> windowedValue) throws IOException {
            UnboundedSourceShard<OutputT, CheckpointMarkT> withCheckpoint;
            DirectRunner.UncommittedBundle<?> createBundle = this.evaluationContext.createBundle(((TaggedPValue) Iterables.getOnlyElement(this.transform.getOutputs())).getValue());
            UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard = (UnboundedSourceShard) windowedValue.getValue();
            UnboundedSource.UnboundedReader unboundedReader = null;
            try {
                UnboundedSource.UnboundedReader<OutputT> reader = getReader(unboundedSourceShard);
                if (startReader(reader, unboundedSourceShard)) {
                    UnboundedReadDeduplicator deduplicator = unboundedSourceShard.getDeduplicator();
                    int i = 0;
                    do {
                        if (deduplicator.shouldOutput(reader.getCurrentRecordId())) {
                            createBundle.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
                        }
                        i++;
                        if (i >= 10) {
                            break;
                        }
                    } while (reader.advance());
                    Instant watermark = reader.getWatermark();
                    CheckpointMarkT finishRead = finishRead(reader, unboundedSourceShard);
                    if (ThreadLocalRandom.current().nextDouble(1.0d) >= this.readerReuseChance) {
                        reader.close();
                        withCheckpoint = UnboundedSourceShard.of(unboundedSourceShard.getSource(), unboundedSourceShard.getDeduplicator(), null, finishRead);
                    } else {
                        withCheckpoint = unboundedSourceShard.withCheckpoint(finishRead);
                    }
                    this.resultBuilder.addOutput(createBundle, new DirectRunner.UncommittedBundle[0]).addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(withCheckpoint, watermark)));
                } else if (reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(unboundedSourceShard.getSource(), unboundedSourceShard.getDeduplicator(), reader, unboundedSourceShard.getCheckpoint()), reader.getWatermark())));
                }
            } catch (IOException e) {
                if (0 != 0) {
                    unboundedReader.close();
                }
                throw e;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.beam.sdk.io.UnboundedSource$CheckpointMark] */
        private UnboundedSource.UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            UnboundedSource.UnboundedReader<OutputT> existingReader = unboundedSourceShard.getExistingReader();
            if (existingReader != null) {
                return existingReader;
            }
            CheckpointMarkT checkpoint = unboundedSourceShard.getCheckpoint();
            if (checkpoint != null) {
                checkpoint = (UnboundedSource.CheckpointMark) CoderUtils.clone(unboundedSourceShard.getSource().getCheckpointMarkCoder(), checkpoint);
            }
            return unboundedSourceShard.getSource().createReader(this.evaluationContext.getPipelineOptions(), checkpoint);
        }

        private boolean startReader(UnboundedSource.UnboundedReader<OutputT> unboundedReader, UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            return unboundedSourceShard.getExistingReader() == null ? unboundedReader.start() : unboundedSourceShard.getExistingReader().advance();
        }

        private CheckpointMarkT finishRead(UnboundedSource.UnboundedReader<OutputT> unboundedReader, UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            CheckpointMarkT checkpoint = unboundedSourceShard.getCheckpoint();
            final CheckpointMarkT checkpointmarkt = (CheckpointMarkT) unboundedReader.getCheckpointMark();
            if (checkpoint != null) {
                checkpoint.finalizeCheckpoint();
            }
            if (!unboundedReader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                PValue pValue = (PCollection) ((TaggedPValue) Iterables.getOnlyElement(this.transform.getOutputs())).getValue();
                this.evaluationContext.scheduleAfterOutputWouldBeProduced(pValue, GlobalWindow.INSTANCE, pValue.getWindowingStrategy(), new Runnable() { // from class: org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedReadEvaluator.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            checkpointmarkt.finalizeCheckpoint();
                        } catch (IOException e) {
                            throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                        }
                    }
                });
            }
            return checkpointmarkt;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() throws IOException {
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedSourceShard.class */
    public static abstract class UnboundedSourceShard<T, CheckpointT extends UnboundedSource.CheckpointMark> {
        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(UnboundedSource<T, CheckpointT> unboundedSource, UnboundedReadDeduplicator unboundedReadDeduplicator) {
            return of(unboundedSource, unboundedReadDeduplicator, null, null);
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(UnboundedSource<T, CheckpointT> unboundedSource, UnboundedReadDeduplicator unboundedReadDeduplicator, @Nullable UnboundedSource.UnboundedReader<T> unboundedReader, @Nullable CheckpointT checkpointt) {
            return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard(unboundedSource, unboundedReadDeduplicator, unboundedReader, checkpointt);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract UnboundedSource<T, CheckpointT> getSource();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract UnboundedReadDeduplicator getDeduplicator();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract UnboundedSource.UnboundedReader<T> getExistingReader();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract CheckpointT getCheckpoint();

        UnboundedSourceShard<T, CheckpointT> withCheckpoint(CheckpointT checkpointt) {
            return of(getSource(), getDeduplicator(), getExistingReader(), checkpointt);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
        this(evaluationContext, DEFAULT_READER_REUSE_CHANCE);
    }

    @VisibleForTesting
    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, double d) {
        this.evaluationContext = evaluationContext;
        this.readerReuseChance = d;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, DirectRunner.CommittedBundle<?> committedBundle) {
        return (TransformEvaluator<InputT>) createEvaluator(appliedPTransform);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform) {
        return new UnboundedReadEvaluator(appliedPTransform, this.evaluationContext, this.readerReuseChance);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }
}
