package org.apache.beam.runners.spark.io;

import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.scheduler.StreamInputInfo;
import org.joda.time.Instant;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource.class */
public class SparkUnboundedSource {

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$Metadata.class */
    public static class Metadata implements Serializable {
        private final long numRecords;
        private final Instant watermark;

        public Metadata(long j, Instant instant) {
            this.numRecords = j;
            this.watermark = instant;
        }

        public long getNumRecords() {
            return this.numRecords;
        }

        public Instant getWatermark() {
            return this.watermark;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/io/SparkUnboundedSource$ReportingDStream.class */
    private static class ReportingDStream extends DStream<BoxedUnit> {
        private final DStream<Metadata> parent;
        private final int inputDStreamId;
        private final String sourceName;

        ReportingDStream(DStream<Metadata> dStream, int i, String str) {
            super(dStream.ssc(), JavaSparkContext$.MODULE$.fakeClassTag());
            this.parent = dStream;
            this.inputDStreamId = i;
            this.sourceName = str;
        }

        public Duration slideDuration() {
            return this.parent.slideDuration();
        }

        public List<DStream<?>> dependencies() {
            return JavaConversions.asScalaBuffer(Collections.singletonList(this.parent)).toList();
        }

        public Option<RDD<BoxedUnit>> compute(Time time) {
            Option orCompute = this.parent.getOrCompute(time);
            long j = 0;
            Instant instant = new Instant(Long.MIN_VALUE);
            if (orCompute.isDefined()) {
                for (Metadata metadata : ((RDD) orCompute.get()).toJavaRDD().collect()) {
                    j += metadata.getNumRecords();
                    instant = instant.isBefore(metadata.getWatermark()) ? metadata.getWatermark() : instant;
                }
            }
            report(time, j, instant);
            return Option.empty();
        }

        private void report(Time time, long j, Instant instant) {
            ssc().scheduler().inputInfoTracker().reportInfo(time, new StreamInputInfo(this.inputDStreamId, j, new Map.Map1(StreamInputInfo.METADATA_KEY_DESCRIPTION(), String.format("Read %d records with observed watermark %s, from %s for batch time: %s", Long.valueOf(j), instant, this.sourceName, time))));
        }
    }

    public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark> JavaDStream<WindowedValue<T>> read(JavaStreamingContext javaStreamingContext, SparkRuntimeContext sparkRuntimeContext, UnboundedSource<T, CheckpointMarkT> unboundedSource) {
        SparkPipelineOptions sparkPipelineOptions = (SparkPipelineOptions) sparkRuntimeContext.getPipelineOptions().as(SparkPipelineOptions.class);
        Long maxRecordsPerBatch = sparkPipelineOptions.getMaxRecordsPerBatch();
        SourceDStream sourceDStream = new SourceDStream(javaStreamingContext.ssc(), unboundedSource, sparkRuntimeContext);
        if (maxRecordsPerBatch.longValue() > 0) {
            sourceDStream.setMaxRecordsPerBatch(maxRecordsPerBatch.longValue());
        }
        JavaPairInputDStream fromInputDStream = JavaPairInputDStream$.MODULE$.fromInputDStream(sourceDStream, JavaSparkContext$.MODULE$.fakeClassTag(), JavaSparkContext$.MODULE$.fakeClassTag());
        JavaMapWithStateDStream mapWithState = fromInputDStream.mapWithState(StateSpec.function(StateSpecFunctions.mapSourceFunction(sparkRuntimeContext)));
        checkpointStream(mapWithState, sparkPipelineOptions);
        mapWithState.cache();
        int id = fromInputDStream.inputDStream().id();
        new ReportingDStream(mapWithState.map(new Function<Tuple2<Iterable<byte[]>, Metadata>, Metadata>() { // from class: org.apache.beam.runners.spark.io.SparkUnboundedSource.1
            public Metadata call(Tuple2<Iterable<byte[]>, Metadata> tuple2) throws Exception {
                return (Metadata) tuple2._2();
            }
        }).dstream(), id, getSourceName(unboundedSource, id)).register();
        return mapWithState.flatMap(new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() { // from class: org.apache.beam.runners.spark.io.SparkUnboundedSource.2
            public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> tuple2) throws Exception {
                return (Iterable) tuple2._1();
            }
        }).map(CoderHelpers.fromByteFunction(WindowedValue.FullWindowedValueCoder.of(unboundedSource.getDefaultOutputCoder(), GlobalWindow.Coder.INSTANCE)));
    }

    private static <T> String getSourceName(Source<T> source, int i) {
        StringBuilder sb = new StringBuilder();
        for (String str : source.getClass().getSimpleName().replace("$", "").split("(?=[A-Z])")) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                sb.append(trim).append(" ");
            }
        }
        return sb.append("[").append(i).append("]").toString();
    }

    private static void checkpointStream(JavaDStream<?> javaDStream, SparkPipelineOptions sparkPipelineOptions) {
        long longValue = sparkPipelineOptions.getCheckpointDurationMillis().longValue();
        if (longValue > 0) {
            javaDStream.checkpoint(new Duration(longValue));
        }
    }
}
