/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.io.SourceDStream;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.runners.spark.translation.TranslationUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.scheduler.StreamInputInfo;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;

public class SparkUnboundedSource {
    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc, SparkRuntimeContext rc, UnboundedSource<T, CheckpointMarkT> source) {
        SparkPipelineOptions options = (SparkPipelineOptions)rc.getPipelineOptions().as(SparkPipelineOptions.class);
        Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
        SourceDStream<T, CheckpointMarkT> sourceDStream = new SourceDStream<T, CheckpointMarkT>(jssc.ssc(), source, rc);
        if (maxRecordsPerBatch > 0L) {
            sourceDStream.setMaxRecordsPerBatch(maxRecordsPerBatch);
        }
        JavaPairInputDStream inputDStream = JavaPairInputDStream$.MODULE$.fromInputDStream(sourceDStream, JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag());
        JavaMapWithStateDStream mapWithStateDStream = inputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(rc)));
        SparkUnboundedSource.checkpointStream(mapWithStateDStream, options);
        int id = inputDStream.inputDStream().id();
        ReportingFlatMappedDStream reportingFlatMappedDStream = new ReportingFlatMappedDStream(mapWithStateDStream.dstream(), id, SparkUnboundedSource.getSourceName(source, id));
        return JavaDStream.fromDStream(reportingFlatMappedDStream, (ClassTag)JavaSparkContext$.MODULE$.fakeClassTag());
    }

    private static <T> String getSourceName(Source<T> source, int id) {
        StringBuilder sb = new StringBuilder();
        for (String s : source.getClass().getSimpleName().replace("$", "").split("(?=[A-Z])")) {
            String trimmed = s.trim();
            if (trimmed.isEmpty()) continue;
            sb.append(trimmed).append(" ");
        }
        return sb.append("[").append(id).append("]").toString();
    }

    private static void checkpointStream(JavaDStream<?> dStream, SparkPipelineOptions options) {
        long checkpointDurationMillis = options.getCheckpointDurationMillis();
        if (checkpointDurationMillis > 0L) {
            dStream.checkpoint(new Duration(checkpointDurationMillis));
        }
    }

    private static class ReportingFlatMappedDStream<T>
    extends DStream<T> {
        private final DStream<Iterator<T>> parent;
        private final int inputDStreamId;
        private final String sourceName;

        ReportingFlatMappedDStream(DStream<Iterator<T>> parent, int inputDStreamId, String sourceName) {
            super(parent.ssc(), JavaSparkContext$.MODULE$.fakeClassTag());
            this.parent = parent;
            this.inputDStreamId = inputDStreamId;
            this.sourceName = sourceName;
        }

        public Duration slideDuration() {
            return this.parent.slideDuration();
        }

        public List<DStream<?>> dependencies() {
            return JavaConversions.asScalaBuffer(Collections.singletonList(this.parent)).toList();
        }

        public Option<RDD<T>> compute(Time validTime) {
            Option computedParentRDD = this.parent.getOrCompute(validTime);
            if (computedParentRDD.isDefined()) {
                RDD computedRDD = ((RDD)computedParentRDD.get()).toJavaRDD().flatMap(TranslationUtils.flattenIter()).rdd().cache();
                this.report(validTime, computedRDD.count());
                return Option.apply((Object)computedRDD);
            }
            this.report(validTime, 0L);
            return Option.empty();
        }

        private void report(Time batchTime, long count) {
            Map.Map1 metadata = new Map.Map1((Object)StreamInputInfo.METADATA_KEY_DESCRIPTION(), (Object)String.format("Read %d records from %s for batch time: %s", count, this.sourceName, batchTime));
            StreamInputInfo streamInputInfo = new StreamInputInfo(this.inputDStreamId, count, (Map)metadata);
            this.ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
        }
    }
}

