/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.runners.spark.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class GlobalWatermarkHolder {
    private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = null;
    private static final transient Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap<Integer, Queue<SparkWatermarks>>();

    public static void add(int sourceId, SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> timesQueue = sourceTimes.get(sourceId);
        if (timesQueue == null) {
            timesQueue = new ConcurrentLinkedQueue<SparkWatermarks>();
        }
        timesQueue.offer(sparkWatermarks);
        sourceTimes.put(sourceId, timesQueue);
    }

    @VisibleForTesting
    public static void addAll(Map<Integer, Queue<SparkWatermarks>> sourceTimes) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> en : sourceTimes.entrySet()) {
            int sourceId = en.getKey();
            Queue<SparkWatermarks> timesQueue = en.getValue();
            while (!timesQueue.isEmpty()) {
                GlobalWatermarkHolder.add(sourceId, timesQueue.poll());
            }
        }
    }

    public static Broadcast<Map<Integer, SparkWatermarks>> get() {
        return broadcast;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void advance(JavaSparkContext jsc) {
        Class<GlobalWatermarkHolder> clazz = GlobalWatermarkHolder.class;
        synchronized (GlobalWatermarkHolder.class) {
            if (sourceTimes.isEmpty()) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            HashMap<Integer, SparkWatermarks> newBroadcast = new HashMap<Integer, SparkWatermarks>();
            for (Map.Entry<Integer, Queue<SparkWatermarks>> en : sourceTimes.entrySet()) {
                SparkWatermarks next;
                if (en.getValue().isEmpty()) continue;
                Integer sourceId = en.getKey();
                Queue<SparkWatermarks> timesQueue = en.getValue();
                Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
                Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
                if (broadcast != null && ((Map)broadcast.getValue()).containsKey(sourceId)) {
                    SparkWatermarks currentTimes = (SparkWatermarks)((Map)broadcast.getValue()).get(sourceId);
                    currentLowWatermark = currentTimes.getLowWatermark();
                    currentHighWatermark = currentTimes.getHighWatermark();
                    currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime();
                }
                Instant nextLowWatermark = (next = timesQueue.poll()).getLowWatermark().isAfter((ReadableInstant)currentLowWatermark) ? next.getLowWatermark() : currentLowWatermark;
                Instant nextHighWatermark = next.getHighWatermark().isAfter((ReadableInstant)currentHighWatermark) ? next.getHighWatermark() : currentHighWatermark;
                Instant nextSynchronizedProcessingTime = next.getSynchronizedProcessingTime();
                Preconditions.checkState(!nextLowWatermark.isAfter((ReadableInstant)nextHighWatermark), String.format("Low watermark %s cannot be later then high watermark %s", nextLowWatermark, nextHighWatermark));
                Preconditions.checkState(nextSynchronizedProcessingTime.isAfter((ReadableInstant)currentSynchronizedProcessingTime), "Synchronized processing time must advance.");
                newBroadcast.put(sourceId, new SparkWatermarks(nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime));
            }
            if (!newBroadcast.isEmpty()) {
                if (broadcast != null) {
                    broadcast.destroy();
                }
                broadcast = jsc.broadcast(newBroadcast);
            }
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        broadcast = null;
    }

    public static class WatermarksListener
    extends JavaStreamingListener {
        private final JavaStreamingContext jssc;

        public WatermarksListener(JavaStreamingContext jssc) {
            this.jssc = jssc;
        }

        public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
            GlobalWatermarkHolder.advance(this.jssc.sparkContext());
        }
    }

    public static class SparkWatermarks
    implements Serializable {
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(Instant lowWatermark, Instant highWatermark, Instant synchronizedProcessingTime) {
            this.lowWatermark = lowWatermark;
            this.highWatermark = highWatermark;
            this.synchronizedProcessingTime = synchronizedProcessingTime;
        }

        public Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public Instant getHighWatermark() {
            return this.highWatermark;
        }

        public Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }
}

