package org.apache.beam.runners.spark.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.Accumulator;
import org.apache.spark.Dependency;
import org.apache.spark.HashPartitioner;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.immutable.Seq;

/* loaded from: input_file:org/apache/beam/runners/spark/io/SourceRDD.class */
public class SourceRDD {

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceRDD$Bounded.class */
    public static class Bounded<T> extends RDD<WindowedValue<T>> {
        private final BoundedSource<T> source;
        private final SparkRuntimeContext runtimeContext;
        private final int numPartitions;
        private final String stepName;
        private final Accumulator<SparkMetricsContainer> metricsAccum;
        private static final long DEFAULT_BUNDLE_SIZE = 67108864;
        private static final Logger LOG = LoggerFactory.getLogger(Bounded.class);
        private static final Seq<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();

        public Bounded(SparkContext sparkContext, BoundedSource<T> boundedSource, SparkRuntimeContext sparkRuntimeContext, String str) {
            super(sparkContext, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.source = boundedSource;
            this.runtimeContext = sparkRuntimeContext;
            this.numPartitions = sparkContext.defaultParallelism();
            Preconditions.checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero.");
            this.stepName = str;
            this.metricsAccum = MetricsAccumulator.getInstance();
        }

        public Partition[] getPartitions() {
            long j = 67108864;
            try {
                j = this.source.getEstimatedSizeBytes(this.runtimeContext.getPipelineOptions()) / this.numPartitions;
            } catch (Exception e) {
                LOG.warn("Failed to get estimated bundle size for source {}, using default bundle size of {} bytes.", this.source, Long.valueOf(DEFAULT_BUNDLE_SIZE));
            }
            try {
                List split = this.source.split(j, this.runtimeContext.getPipelineOptions());
                SourcePartition[] sourcePartitionArr = new SourcePartition[split.size()];
                for (int i = 0; i < split.size(); i++) {
                    sourcePartitionArr[i] = new SourcePartition(id(), i, (Source) split.get(i));
                }
                return sourcePartitionArr;
            } catch (Exception e2) {
                throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e2);
            }
        }

        public Iterator<WindowedValue<T>> compute(final Partition partition, TaskContext taskContext) {
            final MetricsContainer container = ((SparkMetricsContainer) this.metricsAccum.localValue()).getContainer(this.stepName);
            return new InterruptibleIterator(taskContext, JavaConversions.asScalaIterator(new java.util.Iterator<WindowedValue<T>>() { // from class: org.apache.beam.runners.spark.io.SourceRDD.Bounded.1
                SourcePartition<T> partition;
                BoundedSource.BoundedReader<T> reader;
                private boolean finished = false;
                private boolean started = false;
                private boolean closed = false;

                {
                    this.partition = (SourcePartition) partition;
                    this.reader = Bounded.this.createReader(this.partition);
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    try {
                        Closeable scopedMetricsContainer = MetricsEnvironment.scopedMetricsContainer(container);
                        Throwable th = null;
                        try {
                            try {
                                if (this.started) {
                                    this.finished = !this.reader.advance();
                                } else {
                                    this.started = true;
                                    this.finished = !this.reader.start();
                                }
                                if (this.finished) {
                                    closeIfNotClosed();
                                }
                                return !this.finished;
                            } finally {
                                if (scopedMetricsContainer != null) {
                                    if (0 != 0) {
                                        try {
                                            scopedMetricsContainer.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        scopedMetricsContainer.close();
                                    }
                                }
                            }
                        } catch (IOException e) {
                            closeIfNotClosed();
                            throw new RuntimeException("Failed to read from reader.", e);
                        }
                    } catch (IOException e2) {
                        throw new RuntimeException(e2);
                    }
                }

                @Override // java.util.Iterator
                public WindowedValue<T> next() {
                    return WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp());
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException("Remove from partition iterator is not allowed.");
                }

                private void closeIfNotClosed() {
                    if (this.closed) {
                        return;
                    }
                    this.closed = true;
                    try {
                        this.reader.close();
                    } catch (IOException e) {
                        throw new RuntimeException("Failed to close Reader.", e);
                    }
                }
            }));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public BoundedSource.BoundedReader<T> createReader(SourcePartition<T> sourcePartition) {
            try {
                return ((SourcePartition) sourcePartition).source.createReader(this.runtimeContext.getPipelineOptions());
            } catch (IOException e) {
                throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceRDD$CheckpointableSourcePartition.class */
    private static class CheckpointableSourcePartition<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends SourcePartition<T> {
        private final CheckpointMarkT checkpointMark;

        CheckpointableSourcePartition(int i, int i2, Source<T> source, CheckpointMarkT checkpointmarkt) {
            super(i, i2, source);
            this.checkpointMark = checkpointmarkt;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceRDD$SourcePartition.class */
    public static class SourcePartition<T> implements Partition {
        private final int rddId;
        private final int index;
        private final Source<T> source;

        SourcePartition(int i, int i2, Source<T> source) {
            this.rddId = i;
            this.index = i2;
            this.source = source;
        }

        public int index() {
            return this.index;
        }

        public int hashCode() {
            return (41 * (41 + this.rddId)) + this.index;
        }

        public Source<T> getSource() {
            return this.source;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceRDD$Unbounded.class */
    public static class Unbounded<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RDD<Tuple2<Source<T>, CheckpointMarkT>> {
        private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
        private final SparkRuntimeContext runtimeContext;
        private final Partitioner partitioner;
        private static final scala.collection.immutable.List<Dependency<?>> NIL = JavaConversions.asScalaBuffer(Collections.emptyList()).toList();

        public Unbounded(SparkContext sparkContext, SparkRuntimeContext sparkRuntimeContext, MicrobatchSource<T, CheckpointMarkT> microbatchSource, int i) {
            super(sparkContext, NIL, JavaSparkContext$.MODULE$.fakeClassTag());
            this.runtimeContext = sparkRuntimeContext;
            this.microbatchSource = microbatchSource;
            this.partitioner = new HashPartitioner(i);
        }

        public Partition[] getPartitions() {
            try {
                List<? extends Source<T>> split = this.microbatchSource.split(this.runtimeContext.getPipelineOptions());
                CheckpointableSourcePartition[] checkpointableSourcePartitionArr = new CheckpointableSourcePartition[split.size()];
                for (int i = 0; i < split.size(); i++) {
                    checkpointableSourcePartitionArr[i] = new CheckpointableSourcePartition(id(), i, split.get(i), EmptyCheckpointMark.get());
                }
                return checkpointableSourcePartitionArr;
            } catch (Exception e) {
                throw new RuntimeException("Failed to create partitions.", e);
            }
        }

        public Option<Partitioner> partitioner() {
            return Some.apply(this.partitioner);
        }

        public Iterator<Tuple2<Source<T>, CheckpointMarkT>> compute(Partition partition, TaskContext taskContext) {
            CheckpointableSourcePartition checkpointableSourcePartition = (CheckpointableSourcePartition) partition;
            return JavaConversions.asScalaIterator(Collections.singleton(new Tuple2(checkpointableSourcePartition.getSource(), checkpointableSourcePartition.checkpointMark)).iterator());
        }
    }
}
