/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.runners.spark.io.SourceRDD;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.scheduler.RateController;
import org.apache.spark.streaming.scheduler.RateController$;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.apache.spark.streaming.scheduler.rate.RateEstimator$;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends InputDStream<Tuple2<Source<T>, CheckpointMarkT>> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
    private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
    private final SparkRuntimeContext runtimeContext;
    private final Duration boundReadDuration;
    private final int initialParallelism;
    private Long boundMaxRecords = null;
    private final RateController rateController = new SourceRateController(this.id(), RateEstimator$.MODULE$.create(this.ssc().conf(), this.ssc().graph().batchDuration()));

    SourceDStream(StreamingContext ssc, UnboundedSource<T, CheckpointMarkT> unboundedSource, SparkRuntimeContext runtimeContext) {
        super(ssc, JavaSparkContext$.MODULE$.fakeClassTag());
        this.unboundedSource = unboundedSource;
        this.runtimeContext = runtimeContext;
        SparkPipelineOptions options = (SparkPipelineOptions)runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class);
        this.boundReadDuration = this.boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis());
        this.initialParallelism = this.ssc().sc().defaultParallelism();
        Preconditions.checkArgument((this.initialParallelism > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
    }

    public void setMaxRecordsPerBatch(long maxRecordsPerBatch) {
        this.boundMaxRecords = maxRecordsPerBatch;
    }

    public Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
        long maxNumRecords = this.boundMaxRecords != null ? this.boundMaxRecords.longValue() : this.rateControlledMaxRecords();
        MicrobatchSource<T, CheckpointMarkT> microbatchSource = new MicrobatchSource<T, CheckpointMarkT>(this.unboundedSource, this.boundReadDuration, this.initialParallelism, maxNumRecords, -1, this.id());
        SourceRDD.Unbounded<T, CheckpointMarkT> rdd = new SourceRDD.Unbounded<T, CheckpointMarkT>(this.ssc().sc(), this.runtimeContext, microbatchSource);
        return Option.apply(rdd);
    }

    public void start() {
    }

    public void stop() {
    }

    public String name() {
        return "Beam UnboundedSource [" + this.id() + "]";
    }

    private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
        Duration lowerBoundDuration;
        long batchDurationMillis = this.ssc().graph().batchDuration().milliseconds();
        Duration proportionalDuration = new Duration(Math.round((double)batchDurationMillis * readTimePercentage));
        Duration readDuration = proportionalDuration.isLongerThan((ReadableDuration)(lowerBoundDuration = new Duration(minReadTimeMillis))) ? proportionalDuration : lowerBoundDuration;
        LOG.info("Read duration set to: " + readDuration);
        return readDuration;
    }

    private long rateControlledMaxRecords() {
        long rateLimitPerSecond;
        Option<RateController> rateControllerOption = this.rateController();
        if (rateControllerOption.isDefined() && (rateLimitPerSecond = ((RateController)rateControllerOption.get()).getLatestRate()) > 0L) {
            long totalRateLimit = rateLimitPerSecond * (this.ssc().graph().batchDuration().milliseconds() / 1000L);
            LOG.info("RateController set limit to {}", (Object)totalRateLimit);
            return totalRateLimit;
        }
        LOG.info("RateController had nothing to report, default is Long.MAX_VALUE");
        return Long.MAX_VALUE;
    }

    public Option<RateController> rateController() {
        if (RateController$.MODULE$.isBackPressureEnabled(this.ssc().conf())) {
            return Option.apply((Object)this.rateController);
        }
        return Option.empty();
    }

    private static class SourceRateController
    extends RateController {
        private SourceRateController(int id, RateEstimator rateEstimator) {
            super(id, rateEstimator);
        }

        public void publish(long rate) {
        }
    }
}

