package org.apache.beam.runners.spark.io;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.spark.relocated.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.RateController$;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.streaming.scheduler.rate.RateEstimator$;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

/* loaded from: input_file:org/apache/beam/runners/spark/io/SourceDStream.class */
class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends InputDStream<Tuple2<Source<T>, CheckpointMarkT>> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
    private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
    private final SparkRuntimeContext runtimeContext;
    private final Duration boundReadDuration;
    private final int initialParallelism;
    private Long boundMaxRecords;
    private final RateController rateController;

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SourceDStream$SourceRateController.class */
    private static class SourceRateController extends RateController {
        private SourceRateController(int i, RateEstimator rateEstimator) {
            super(i, rateEstimator);
        }

        public void publish(long j) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceDStream(StreamingContext streamingContext, UnboundedSource<T, CheckpointMarkT> unboundedSource, SparkRuntimeContext sparkRuntimeContext) {
        super(streamingContext, JavaSparkContext$.MODULE$.fakeClassTag());
        this.boundMaxRecords = null;
        this.rateController = new SourceRateController(id(), RateEstimator$.MODULE$.create(ssc().conf(), ssc().graph().batchDuration()));
        this.unboundedSource = unboundedSource;
        this.runtimeContext = sparkRuntimeContext;
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) sparkRuntimeContext.getPipelineOptions().as(SparkPipelineOptions.class);
        this.boundReadDuration = boundReadDuration(sparkPipelineOptions.getReadTimePercentage().doubleValue(), sparkPipelineOptions.getMinReadTimeMillis().longValue());
        this.initialParallelism = ssc().sc().defaultParallelism();
        Preconditions.checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
    }

    public void setMaxRecordsPerBatch(long j) {
        this.boundMaxRecords = Long.valueOf(j);
    }

    public Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time time) {
        return Option.apply(new SourceRDD.Unbounded(ssc().sc(), this.runtimeContext, new MicrobatchSource(this.unboundedSource, this.boundReadDuration, this.initialParallelism, this.boundMaxRecords != null ? this.boundMaxRecords.longValue() : rateControlledMaxRecords(), -1, id())));
    }

    public void start() {
    }

    public void stop() {
    }

    public String name() {
        return "Beam UnboundedSource [" + id() + "]";
    }

    private Duration boundReadDuration(double d, long j) {
        Duration duration = new Duration(Math.round(ssc().graph().batchDuration().milliseconds() * d));
        Duration duration2 = new Duration(j);
        Duration duration3 = duration.isLongerThan(duration2) ? duration : duration2;
        LOG.info("Read duration set to: " + duration3);
        return duration3;
    }

    private long rateControlledMaxRecords() {
        Option<RateController> rateController = rateController();
        if (rateController.isDefined()) {
            long latestRate = ((RateController) rateController.get()).getLatestRate();
            if (latestRate > 0) {
                long milliseconds = latestRate * (ssc().graph().batchDuration().milliseconds() / 1000);
                LOG.info("RateController set limit to {}", Long.valueOf(milliseconds));
                return milliseconds;
            }
        }
        LOG.info("RateController had nothing to report, default is Long.MAX_VALUE");
        return Long.MAX_VALUE;
    }

    public Option<RateController> rateController() {
        return RateController$.MODULE$.isBackPressureEnabled(ssc().conf()) ? Option.apply(this.rateController) : Option.empty();
    }
}
