/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Stopwatch;
import org.apache.beam.runners.spark.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.dstream.InputDStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

class WatermarkSyncedDStream<T>
extends InputDStream<WindowedValue<T>> {
    private static final Logger LOG = LoggerFactory.getLogger((String)(WatermarkSyncedDStream.class.getCanonicalName() + "#compute"));
    private static final int SLEEP_DURATION_MILLIS = 10;
    private final Queue<JavaRDD<WindowedValue<T>>> rdds;
    private final Long batchDuration;
    private volatile boolean isFirst = true;

    public WatermarkSyncedDStream(Queue<JavaRDD<WindowedValue<T>>> rdds, Long batchDuration, StreamingContext ssc) {
        super(ssc, JavaSparkContext$.MODULE$.fakeClassTag());
        this.rdds = rdds;
        this.batchDuration = batchDuration;
    }

    private void awaitWatermarkSyncWith(long batchTime) {
        while (!this.isFirstBatch() && this.watermarkOutOfSync(batchTime)) {
            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
        }
        Preconditions.checkState(this.isFirstBatch() || this.watermarkIsOneBatchBehind(batchTime), String.format("Watermark batch time:[%d] should be exactly one batch behind current batch time:[%d]", GlobalWatermarkHolder.getLastWatermarkedBatchTime(), batchTime));
    }

    private boolean watermarkOutOfSync(long batchTime) {
        return batchTime - GlobalWatermarkHolder.getLastWatermarkedBatchTime() > this.batchDuration;
    }

    private boolean isFirstBatch() {
        return this.isFirst;
    }

    private RDD<WindowedValue<T>> generateRdd() {
        return this.rdds.size() > 0 ? this.rdds.poll().rdd() : this.ssc().sparkContext().emptyRDD(JavaSparkContext$.MODULE$.fakeClassTag());
    }

    private boolean watermarkIsOneBatchBehind(long batchTime) {
        return GlobalWatermarkHolder.getLastWatermarkedBatchTime() == batchTime - this.batchDuration;
    }

    public Option<RDD<WindowedValue<T>>> compute(Time validTime) {
        long batchTime = validTime.milliseconds();
        LOG.trace("BEFORE waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}", (Object)GlobalWatermarkHolder.getLastWatermarkedBatchTime(), (Object)batchTime);
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.awaitWatermarkSyncWith(batchTime);
        stopwatch.stop();
        LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})", (Object)stopwatch.elapsed(TimeUnit.MILLISECONDS), (Object)batchTime);
        LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(this.batchDuration));
        LOG.trace("AFTER waiting for watermark sync, LastWatermarkedBatchTime: {}, current batch time: {}", (Object)GlobalWatermarkHolder.getLastWatermarkedBatchTime(), (Object)batchTime);
        RDD<WindowedValue<T>> rdd = this.generateRdd();
        this.isFirst = false;
        return Option.apply(rdd);
    }

    public void start() {
    }

    public void stop() {
    }
}

