package org.apache.beam.runners.spark.util;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.spark.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder.class */
public class GlobalWatermarkHolder {
    private static volatile Broadcast<Map<Integer, SparkWatermarks>> broadcast = null;
    private static final transient Map<Integer, Queue<SparkWatermarks>> sourceTimes = new HashMap();

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$SparkWatermarks.class */
    public static class SparkWatermarks implements Serializable {
        private final Instant lowWatermark;
        private final Instant highWatermark;
        private final Instant synchronizedProcessingTime;

        @VisibleForTesting
        public SparkWatermarks(Instant instant, Instant instant2, Instant instant3) {
            this.lowWatermark = instant;
            this.highWatermark = instant2;
            this.synchronizedProcessingTime = instant3;
        }

        public Instant getLowWatermark() {
            return this.lowWatermark;
        }

        public Instant getHighWatermark() {
            return this.highWatermark;
        }

        public Instant getSynchronizedProcessingTime() {
            return this.synchronizedProcessingTime;
        }

        public String toString() {
            return "SparkWatermarks{lowWatermark=" + this.lowWatermark + ", highWatermark=" + this.highWatermark + ", synchronizedProcessingTime=" + this.synchronizedProcessingTime + '}';
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/util/GlobalWatermarkHolder$WatermarksListener.class */
    public static class WatermarksListener extends JavaStreamingListener {
        private final JavaStreamingContext jssc;

        public WatermarksListener(JavaStreamingContext javaStreamingContext) {
            this.jssc = javaStreamingContext;
        }

        public void onBatchCompleted(JavaStreamingListenerBatchCompleted javaStreamingListenerBatchCompleted) {
            GlobalWatermarkHolder.advance(this.jssc.sparkContext());
        }
    }

    public static void add(int i, SparkWatermarks sparkWatermarks) {
        Queue<SparkWatermarks> queue = sourceTimes.get(Integer.valueOf(i));
        if (queue == null) {
            queue = new ConcurrentLinkedQueue();
        }
        queue.offer(sparkWatermarks);
        sourceTimes.put(Integer.valueOf(i), queue);
    }

    @VisibleForTesting
    public static void addAll(Map<Integer, Queue<SparkWatermarks>> map) {
        for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            Queue<SparkWatermarks> value = entry.getValue();
            while (!value.isEmpty()) {
                add(intValue, value.poll());
            }
        }
    }

    public static Broadcast<Map<Integer, SparkWatermarks>> get() {
        return broadcast;
    }

    public static void advance(JavaSparkContext javaSparkContext) {
        synchronized (GlobalWatermarkHolder.class) {
            if (sourceTimes.isEmpty()) {
                return;
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<Integer, Queue<SparkWatermarks>> entry : sourceTimes.entrySet()) {
                if (!entry.getValue().isEmpty()) {
                    Integer key = entry.getKey();
                    Queue<SparkWatermarks> value = entry.getValue();
                    Instant instant = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Instant instant2 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    Instant instant3 = BoundedWindow.TIMESTAMP_MIN_VALUE;
                    if (broadcast != null && ((Map) broadcast.getValue()).containsKey(key)) {
                        SparkWatermarks sparkWatermarks = (SparkWatermarks) ((Map) broadcast.getValue()).get(key);
                        instant = sparkWatermarks.getLowWatermark();
                        instant2 = sparkWatermarks.getHighWatermark();
                        instant3 = sparkWatermarks.getSynchronizedProcessingTime();
                    }
                    SparkWatermarks poll = value.poll();
                    Instant lowWatermark = poll.getLowWatermark().isAfter(instant) ? poll.getLowWatermark() : instant;
                    Instant highWatermark = poll.getHighWatermark().isAfter(instant2) ? poll.getHighWatermark() : instant2;
                    Instant synchronizedProcessingTime = poll.getSynchronizedProcessingTime();
                    Preconditions.checkState(!lowWatermark.isAfter(highWatermark), String.format("Low watermark %s cannot be later then high watermark %s", lowWatermark, highWatermark));
                    Preconditions.checkState(synchronizedProcessingTime.isAfter(instant3), "Synchronized processing time must advance.");
                    hashMap.put(key, new SparkWatermarks(lowWatermark, highWatermark, synchronizedProcessingTime));
                }
            }
            if (!hashMap.isEmpty()) {
                if (broadcast != null) {
                    broadcast.unpersist(true);
                }
                broadcast = javaSparkContext.broadcast(hashMap);
            }
        }
    }

    @VisibleForTesting
    public static synchronized void clear() {
        sourceTimes.clear();
        broadcast = null;
    }
}
