/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.stateful;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Sets;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.spark.broadcast.Broadcast;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class SparkTimerInternals
implements TimerInternals {
    private final Instant highWatermark;
    private final Instant synchronizedProcessingTime;
    private final Set<TimerInternals.TimerData> timers = Sets.newHashSet();
    private Instant inputWatermark;

    private SparkTimerInternals(Instant lowWatermark, Instant highWatermark, Instant synchronizedProcessingTime) {
        this.inputWatermark = lowWatermark;
        this.highWatermark = highWatermark;
        this.synchronizedProcessingTime = synchronizedProcessingTime;
    }

    public static SparkTimerInternals forStreamFromSources(List<Integer> sourceIds, @Nullable Broadcast<Map<Integer, GlobalWatermarkHolder.SparkWatermarks>> broadcast) {
        if (broadcast == null || ((Map)broadcast.getValue()).isEmpty() || Collections.disjoint(sourceIds, ((Map)broadcast.getValue()).keySet())) {
            return new SparkTimerInternals(BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0L));
        }
        Instant slowestLowWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
        Instant slowestHighWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
        Instant synchronizedProcessingTime = null;
        for (Integer sourceId : sourceIds) {
            GlobalWatermarkHolder.SparkWatermarks sparkWatermarks = (GlobalWatermarkHolder.SparkWatermarks)((Map)broadcast.getValue()).get(sourceId);
            if (sparkWatermarks == null) continue;
            slowestLowWatermark = slowestLowWatermark.isBefore((ReadableInstant)sparkWatermarks.getLowWatermark()) ? slowestLowWatermark : sparkWatermarks.getLowWatermark();
            Instant instant = slowestHighWatermark = slowestHighWatermark.isBefore((ReadableInstant)sparkWatermarks.getHighWatermark()) ? slowestHighWatermark : sparkWatermarks.getHighWatermark();
            if (synchronizedProcessingTime == null) {
                synchronizedProcessingTime = sparkWatermarks.getSynchronizedProcessingTime();
                continue;
            }
            Preconditions.checkArgument(sparkWatermarks.getSynchronizedProcessingTime().equals((Object)synchronizedProcessingTime), "Synchronized time is expected to keep synchronized across sources.");
        }
        return new SparkTimerInternals(slowestLowWatermark, slowestHighWatermark, synchronizedProcessingTime);
    }

    public static SparkTimerInternals global(@Nullable Broadcast<Map<Integer, GlobalWatermarkHolder.SparkWatermarks>> broadcast) {
        return broadcast == null ? SparkTimerInternals.forStreamFromSources(Collections.emptyList(), null) : SparkTimerInternals.forStreamFromSources(Lists.newArrayList(((Map)broadcast.getValue()).keySet()), broadcast);
    }

    Collection<TimerInternals.TimerData> getTimers() {
        return this.timers;
    }

    Collection<TimerInternals.TimerData> getTimersReadyToProcess() {
        HashSet<TimerInternals.TimerData> toFire = Sets.newHashSet();
        Iterator<TimerInternals.TimerData> iterator = this.timers.iterator();
        while (iterator.hasNext()) {
            TimerInternals.TimerData timer = iterator.next();
            if (!timer.getTimestamp().isBefore((ReadableInstant)this.inputWatermark)) continue;
            toFire.add(timer);
            iterator.remove();
        }
        return toFire;
    }

    void addTimers(Iterable<TimerInternals.TimerData> timers) {
        for (TimerInternals.TimerData timer : timers) {
            this.timers.add(timer);
        }
    }

    public void setTimer(TimerInternals.TimerData timer) {
        this.timers.add(timer);
    }

    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported.");
    }

    public void deleteTimer(TimerInternals.TimerData timer) {
        this.timers.remove(timer);
    }

    public Instant currentProcessingTime() {
        return Instant.now();
    }

    @Nullable
    public Instant currentSynchronizedProcessingTime() {
        return this.synchronizedProcessingTime;
    }

    public Instant currentInputWatermarkTime() {
        return this.inputWatermark;
    }

    public void advanceWatermark() {
        this.inputWatermark = this.highWatermark;
    }

    @Nullable
    public Instant currentOutputWatermarkTime() {
        return null;
    }

    public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Setting a timer by ID not yet supported.");
    }

    public void deleteTimer(StateNamespace namespace, String timerId) {
        throw new UnsupportedOperationException("Deleting a timer by ID is not yet supported.");
    }

    public static Collection<byte[]> serializeTimers(Collection<TimerInternals.TimerData> timers, TimerInternals.TimerDataCoder timerDataCoder) {
        return CoderHelpers.toByteArrays(timers, timerDataCoder);
    }

    public static Iterable<TimerInternals.TimerData> deserializeTimers(Collection<byte[]> serTimers, TimerInternals.TimerDataCoder timerDataCoder) {
        return CoderHelpers.fromByteArrays(serTimers, timerDataCoder);
    }
}

