/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.direct.AutoValue_WatermarkManager_PendingWatermarkUpdate;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.StructuralKey;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ComparisonChain;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashBasedTable;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableCollection;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Ordering;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.SortedMultiset;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Table;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.TreeMultiset;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class WatermarkManager {
    private static final int MAX_INCREMENTAL_UPDATES = 10;
    private static final Watermark THE_END_OF_TIME = new Watermark(){

        @Override
        public WatermarkUpdate refresh() {
            return WatermarkUpdate.NO_CHANGE;
        }

        @Override
        public Instant get() {
            return BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
    };
    private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
    private final Clock clock;
    private final DirectGraph graph;
    private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
    private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
    private final Lock refreshLock;
    @GuardedBy(value="refreshLock")
    private final Set<AppliedPTransform<?, ?, ?>> pendingRefreshes;

    private static Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredTimers(Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers) {
        HashMap result = new HashMap();
        HashSet emptyKeys = new HashSet();
        for (Map.Entry<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> pendingTimers : objectTimers.entrySet()) {
            NavigableSet<TimerInternals.TimerData> timers = pendingTimers.getValue();
            if (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                ArrayList keyFiredTimers = new ArrayList();
                result.put(pendingTimers.getKey(), keyFiredTimers);
                while (!timers.isEmpty() && ((TimerInternals.TimerData)timers.first()).getTimestamp().isBefore((ReadableInstant)latestTime)) {
                    keyFiredTimers.add(timers.first());
                    timers.remove(timers.first());
                }
            }
            if (!timers.isEmpty()) continue;
            emptyKeys.add(pendingTimers.getKey());
        }
        objectTimers.keySet().removeAll(emptyKeys);
        return result;
    }

    public static WatermarkManager create(Clock clock, DirectGraph graph) {
        return new WatermarkManager(clock, graph);
    }

    private WatermarkManager(Clock clock, DirectGraph graph) {
        this.clock = clock;
        this.graph = graph;
        this.pendingUpdates = new ConcurrentLinkedQueue();
        this.refreshLock = new ReentrantLock();
        this.pendingRefreshes = new HashSet();
        this.transformToWatermarks = new HashMap();
        for (AppliedPTransform<?, ?, ?> rootTransform : graph.getRootTransforms()) {
            this.getTransformWatermark(rootTransform);
        }
        for (AppliedPTransform<?, ?, ?> primitiveTransform : graph.getPrimitiveTransforms()) {
            this.getTransformWatermark(primitiveTransform);
        }
    }

    private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
        TransformWatermarks wms = this.transformToWatermarks.get(transform);
        if (wms == null) {
            List<Watermark> inputCollectionWatermarks = this.getInputWatermarks(transform);
            AppliedPTransformInputWatermark inputWatermark = new AppliedPTransformInputWatermark(inputCollectionWatermarks);
            AppliedPTransformOutputWatermark outputWatermark = new AppliedPTransformOutputWatermark(inputWatermark);
            SynchronizedProcessingTimeInputWatermark inputProcessingWatermark = new SynchronizedProcessingTimeInputWatermark(this.getInputProcessingWatermarks(transform));
            SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark = new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
            wms = new TransformWatermarks(transform, inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
            this.transformToWatermarks.put(transform, wms);
        }
        return wms;
    }

    private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
        ImmutableList.Builder inputWmsBuilder = ImmutableList.builder();
        List inputs = transform.getInputs();
        if (inputs.isEmpty()) {
            inputWmsBuilder.add(THE_END_OF_TIME);
        }
        for (TaggedPValue pvalue : inputs) {
            SynchronizedProcessingTimeOutputWatermark producerOutputWatermark = this.getTransformWatermark(this.graph.getProducer(pvalue.getValue())).synchronizedProcessingOutputWatermark;
            inputWmsBuilder.add(producerOutputWatermark);
        }
        return inputWmsBuilder.build();
    }

    private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
        ImmutableList.Builder inputWatermarksBuilder = ImmutableList.builder();
        List inputs = transform.getInputs();
        if (inputs.isEmpty()) {
            inputWatermarksBuilder.add(THE_END_OF_TIME);
        }
        for (TaggedPValue pvalue : inputs) {
            AppliedPTransformOutputWatermark producerOutputWatermark = this.getTransformWatermark(this.graph.getProducer(pvalue.getValue())).outputWatermark;
            inputWatermarksBuilder.add(producerOutputWatermark);
        }
        ImmutableCollection inputCollectionWatermarks = inputWatermarksBuilder.build();
        return inputCollectionWatermarks;
    }

    public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
        return this.transformToWatermarks.get(transform);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initialize(Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<DirectRunner.CommittedBundle<?>>> initialBundles) {
        this.refreshLock.lock();
        try {
            for (Map.Entry<AppliedPTransform<?, ?, ?>, Iterable<DirectRunner.CommittedBundle<?>>> rootEntry : initialBundles.entrySet()) {
                TransformWatermarks rootWms = this.transformToWatermarks.get(rootEntry.getKey());
                for (DirectRunner.CommittedBundle<?> initialBundle : rootEntry.getValue()) {
                    rootWms.addPending(initialBundle);
                }
                this.pendingRefreshes.add(rootEntry.getKey());
            }
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    public void updateWatermarks(@Nullable DirectRunner.CommittedBundle<?> completed, TimerUpdate timerUpdate, CommittedResult result, Instant earliestHold) {
        this.pendingUpdates.offer(PendingWatermarkUpdate.create(completed, timerUpdate, result, earliestHold));
        this.tryApplyPendingUpdates();
    }

    private void tryApplyPendingUpdates() {
        if (this.refreshLock.tryLock()) {
            try {
                this.applyNUpdates(10);
            }
            finally {
                this.refreshLock.unlock();
            }
        }
    }

    private void applyAllPendingUpdates() {
        this.refreshLock.lock();
        try {
            this.applyNUpdates(-1);
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    @GuardedBy(value="refreshLock")
    private void applyNUpdates(int numUpdates) {
        for (int i = 0; !(this.pendingUpdates.isEmpty() || i >= numUpdates && numUpdates > 0); ++i) {
            PendingWatermarkUpdate pending = this.pendingUpdates.poll();
            this.applyPendingUpdate(pending);
            this.pendingRefreshes.add(pending.getTransform());
        }
    }

    private void applyPendingUpdate(PendingWatermarkUpdate pending) {
        CommittedResult result = pending.getResult();
        AppliedPTransform<?, ?, ?> transform = result.getTransform();
        DirectRunner.CommittedBundle<?> inputBundle = pending.getInputBundle();
        this.updatePending(inputBundle, pending.getTimerUpdate(), result);
        TransformWatermarks transformWms = this.transformToWatermarks.get(transform);
        transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold());
    }

    private void updatePending(DirectRunner.CommittedBundle<?> input, TimerUpdate timerUpdate, CommittedResult result) {
        for (DirectRunner.CommittedBundle<?> bundle : result.getOutputs()) {
            for (AppliedPTransform<?, ?, ?> consumer : this.graph.getPrimitiveConsumers((PValue)bundle.getPCollection())) {
                TransformWatermarks watermarks = this.transformToWatermarks.get(consumer);
                watermarks.addPending(bundle);
            }
        }
        TransformWatermarks completedTransform = this.transformToWatermarks.get(result.getTransform());
        if (input != null) {
            completedTransform.addPending(result.getUnprocessedInputs());
        }
        completedTransform.updateTimers(timerUpdate);
        if (input != null) {
            completedTransform.removePending(input);
        }
    }

    synchronized void refreshAll() {
        this.refreshLock.lock();
        try {
            this.applyAllPendingUpdates();
            Set<AppliedPTransform<?, ?, ?>> toRefresh = this.pendingRefreshes;
            while (!toRefresh.isEmpty()) {
                toRefresh = this.refreshAllOf(toRefresh);
            }
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    private Set<AppliedPTransform<?, ?, ?>> refreshAllOf(Set<AppliedPTransform<?, ?, ?>> toRefresh) {
        HashSet newRefreshes = new HashSet();
        for (AppliedPTransform<?, ?, ?> transform : toRefresh) {
            newRefreshes.addAll(this.refreshWatermarks(transform));
        }
        return newRefreshes;
    }

    private Set<AppliedPTransform<?, ?, ?>> refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
        TransformWatermarks myWatermarks = this.transformToWatermarks.get(toRefresh);
        WatermarkUpdate updateResult = myWatermarks.refresh();
        if (updateResult.isAdvanced()) {
            HashSet additionalRefreshes = new HashSet();
            for (TaggedPValue outputPValue : toRefresh.getOutputs()) {
                additionalRefreshes.addAll(this.graph.getPrimitiveConsumers(outputPValue.getValue()));
            }
            return additionalRefreshes;
        }
        return Collections.emptySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<FiredTimers> extractFiredTimers() {
        ArrayList allTimers = new ArrayList();
        this.refreshLock.lock();
        try {
            for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : this.transformToWatermarks.entrySet()) {
                Collection firedTimers = watermarksEntry.getValue().extractFiredTimers();
                allTimers.addAll(firedTimers);
            }
            ArrayList arrayList = allTimers;
            return arrayList;
        }
        finally {
            this.refreshLock.unlock();
        }
    }

    public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
        HashSet result = new HashSet();
        for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms : this.transformToWatermarks.entrySet()) {
            if (!wms.getValue().getOutputWatermark().equals((Object)THE_END_OF_TIME.get())) continue;
            result.add(wms.getKey());
        }
        return result;
    }

    static abstract class PendingWatermarkUpdate {
        PendingWatermarkUpdate() {
        }

        @Nullable
        public abstract DirectRunner.CommittedBundle<?> getInputBundle();

        public abstract TimerUpdate getTimerUpdate();

        public abstract CommittedResult getResult();

        public abstract Instant getEarliestHold();

        public AppliedPTransform<?, ?, ?> getTransform() {
            return this.getResult().getTransform();
        }

        public static PendingWatermarkUpdate create(DirectRunner.CommittedBundle<?> inputBundle, TimerUpdate timerUpdate, CommittedResult result, Instant earliestHold) {
            return new AutoValue_WatermarkManager_PendingWatermarkUpdate(inputBundle, timerUpdate, result, earliestHold);
        }
    }

    private static class BundleByElementTimestampComparator
    extends Ordering<DirectRunner.CommittedBundle<?>>
    implements Serializable {
        private BundleByElementTimestampComparator() {
        }

        @Override
        public int compare(DirectRunner.CommittedBundle<?> o1, DirectRunner.CommittedBundle<?> o2) {
            return ComparisonChain.start().compare((Comparable<?>)o1.getMinTimestamp(), (Comparable<?>)o2.getMinTimestamp()).result();
        }
    }

    public static class FiredTimers {
        private final AppliedPTransform<?, ?, ?> transform;
        private final StructuralKey<?> key;
        private final Collection<TimerInternals.TimerData> timers;

        private FiredTimers(AppliedPTransform<?, ?, ?> transform, StructuralKey<?> key, Collection<TimerInternals.TimerData> timers) {
            this.transform = transform;
            this.key = key;
            this.timers = timers;
        }

        public AppliedPTransform<?, ?, ?> getTransform() {
            return this.transform;
        }

        public StructuralKey<?> getKey() {
            return this.key;
        }

        public Collection<TimerInternals.TimerData> getTimers() {
            return this.timers;
        }

        public String toString() {
            return MoreObjects.toStringHelper(FiredTimers.class).add("timers", this.timers).toString();
        }
    }

    public static class TimerUpdate {
        private final StructuralKey<?> key;
        private final Iterable<? extends TimerInternals.TimerData> completedTimers;
        private final Iterable<? extends TimerInternals.TimerData> setTimers;
        private final Iterable<? extends TimerInternals.TimerData> deletedTimers;

        public static TimerUpdate empty() {
            return new TimerUpdate(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
        }

        public static TimerUpdateBuilder builder(StructuralKey<?> key) {
            return new TimerUpdateBuilder(key);
        }

        private TimerUpdate(StructuralKey<?> key, Iterable<? extends TimerInternals.TimerData> completedTimers, Iterable<? extends TimerInternals.TimerData> setTimers, Iterable<? extends TimerInternals.TimerData> deletedTimers) {
            this.key = key;
            this.completedTimers = completedTimers;
            this.setTimers = setTimers;
            this.deletedTimers = deletedTimers;
        }

        @VisibleForTesting
        StructuralKey<?> getKey() {
            return this.key;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getCompletedTimers() {
            return this.completedTimers;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getSetTimers() {
            return this.setTimers;
        }

        @VisibleForTesting
        Iterable<? extends TimerInternals.TimerData> getDeletedTimers() {
            return this.deletedTimers;
        }

        public TimerUpdate withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
            return new TimerUpdate(this.key, completedTimers, this.setTimers, this.deletedTimers);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.completedTimers, this.setTimers, this.deletedTimers);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof TimerUpdate)) {
                return false;
            }
            TimerUpdate that = (TimerUpdate)other;
            return Objects.equals(this.key, that.key) && Objects.equals(this.completedTimers, that.completedTimers) && Objects.equals(this.setTimers, that.setTimers) && Objects.equals(this.deletedTimers, that.deletedTimers);
        }

        public static final class TimerUpdateBuilder {
            private final StructuralKey<?> key;
            private final Collection<TimerInternals.TimerData> completedTimers;
            private final Collection<TimerInternals.TimerData> setTimers;
            private final Collection<TimerInternals.TimerData> deletedTimers;

            private TimerUpdateBuilder(StructuralKey<?> key) {
                this.key = key;
                this.completedTimers = new HashSet<TimerInternals.TimerData>();
                this.setTimers = new HashSet<TimerInternals.TimerData>();
                this.deletedTimers = new HashSet<TimerInternals.TimerData>();
            }

            public TimerUpdateBuilder withCompletedTimers(Iterable<TimerInternals.TimerData> completedTimers) {
                Iterables.addAll(this.completedTimers, completedTimers);
                return this;
            }

            public TimerUpdateBuilder setTimer(TimerInternals.TimerData setTimer) {
                this.deletedTimers.remove(setTimer);
                this.setTimers.add(setTimer);
                return this;
            }

            public TimerUpdateBuilder deletedTimer(TimerInternals.TimerData deletedTimer) {
                this.deletedTimers.add(deletedTimer);
                this.setTimers.remove(deletedTimer);
                return this;
            }

            public TimerUpdate build() {
                return new TimerUpdate(this.key, ImmutableSet.copyOf(this.completedTimers), ImmutableSet.copyOf(this.setTimers), ImmutableSet.copyOf(this.deletedTimers));
            }
        }
    }

    public class TransformWatermarks {
        private final AppliedPTransform<?, ?, ?> transform;
        private final AppliedPTransformInputWatermark inputWatermark;
        private final AppliedPTransformOutputWatermark outputWatermark;
        private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
        private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
        private Instant latestSynchronizedInputWm;
        private Instant latestSynchronizedOutputWm;

        private TransformWatermarks(AppliedPTransform<?, ?, ?> transform, AppliedPTransformInputWatermark inputWatermark, AppliedPTransformOutputWatermark outputWatermark, SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
            this.transform = transform;
            this.inputWatermark = inputWatermark;
            this.outputWatermark = outputWatermark;
            this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
            this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
            this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        public Instant getInputWatermark() {
            return Preconditions.checkNotNull(this.inputWatermark.get());
        }

        public Instant getOutputWatermark() {
            return this.outputWatermark.get();
        }

        public synchronized Instant getSynchronizedProcessingInputTime() {
            this.latestSynchronizedInputWm = INSTANT_ORDERING.max(this.latestSynchronizedInputWm, INSTANT_ORDERING.min(WatermarkManager.this.clock.now(), this.synchronizedProcessingInputWatermark.get()));
            return this.latestSynchronizedInputWm;
        }

        public synchronized Instant getSynchronizedProcessingOutputTime() {
            this.latestSynchronizedOutputWm = INSTANT_ORDERING.max(this.latestSynchronizedOutputWm, INSTANT_ORDERING.min(WatermarkManager.this.clock.now(), this.synchronizedProcessingOutputWatermark.get()));
            return this.latestSynchronizedOutputWm;
        }

        private WatermarkUpdate refresh() {
            this.inputWatermark.refresh();
            this.synchronizedProcessingInputWatermark.refresh();
            WatermarkUpdate eventOutputUpdate = this.outputWatermark.refresh();
            WatermarkUpdate syncOutputUpdate = this.synchronizedProcessingOutputWatermark.refresh();
            return eventOutputUpdate.union(syncOutputUpdate);
        }

        private void setEventTimeHold(Object key, Instant newHold) {
            this.outputWatermark.updateHold(key, newHold);
        }

        private void removePending(DirectRunner.CommittedBundle<?> bundle) {
            this.inputWatermark.removePending(bundle);
            this.synchronizedProcessingInputWatermark.removePending(bundle);
        }

        private void addPending(DirectRunner.CommittedBundle<?> bundle) {
            this.inputWatermark.addPending(bundle);
            this.synchronizedProcessingInputWatermark.addPending(bundle);
        }

        private Collection<FiredTimers> extractFiredTimers() {
            Map eventTimeTimers = this.inputWatermark.extractFiredEventTimeTimers();
            Map processingTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.PROCESSING_TIME, WatermarkManager.this.clock.now());
            Map synchronizedTimers = this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, this.getSynchronizedProcessingInputTime());
            Map<StructuralKey<?>, List<TimerInternals.TimerData>> timersPerKey = this.groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers);
            ArrayList<FiredTimers> keyFiredTimers = new ArrayList<FiredTimers>(timersPerKey.size());
            for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> firedTimers : timersPerKey.entrySet()) {
                keyFiredTimers.add(new FiredTimers(this.transform, firedTimers.getKey(), firedTimers.getValue()));
            }
            return keyFiredTimers;
        }

        @SafeVarargs
        private final Map<StructuralKey<?>, List<TimerInternals.TimerData>> groupFiredTimers(Map<StructuralKey<?>, List<TimerInternals.TimerData>> ... timersToGroup) {
            HashMap groupedTimers = new HashMap();
            for (Map<StructuralKey<?>, List<TimerInternals.TimerData>> subGroup : timersToGroup) {
                for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> newTimers : subGroup.entrySet()) {
                    ArrayList grouped = (ArrayList)groupedTimers.get(newTimers.getKey());
                    if (grouped == null) {
                        grouped = new ArrayList();
                        groupedTimers.put(newTimers.getKey(), grouped);
                    }
                    grouped.addAll(newTimers.getValue());
                }
            }
            return groupedTimers;
        }

        private void updateTimers(TimerUpdate update) {
            this.inputWatermark.updateTimers(update);
            this.synchronizedProcessingInputWatermark.updateTimers(update);
        }

        public String toString() {
            return MoreObjects.toStringHelper(TransformWatermarks.class).add("inputWatermark", this.inputWatermark).add("outputWatermark", this.outputWatermark).add("inputProcessingTime", this.synchronizedProcessingInputWatermark).add("outputProcessingTime", this.synchronizedProcessingOutputWatermark).toString();
        }
    }

    private static class PerKeyHolds {
        private final Map<Object, KeyedHold> keyedHolds = new HashMap<Object, KeyedHold>();
        private final NavigableSet<KeyedHold> allHolds = new TreeSet<KeyedHold>();

        private PerKeyHolds() {
        }

        public Instant getMinHold() {
            return this.allHolds.isEmpty() ? THE_END_OF_TIME.get() : ((KeyedHold)this.allHolds.first()).getTimestamp();
        }

        public void updateHold(@Nullable Object key, Instant newHold) {
            this.removeHold(key);
            KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
            this.keyedHolds.put(key, newKeyedHold);
            this.allHolds.add(newKeyedHold);
        }

        public void removeHold(Object key) {
            KeyedHold oldHold = this.keyedHolds.remove(key);
            if (oldHold != null) {
                this.allHolds.remove(oldHold);
            }
        }
    }

    private static final class KeyedHold
    implements Comparable<KeyedHold> {
        private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
        private final Object key;
        private final Instant timestamp;

        public static KeyedHold of(Object key, Instant timestamp) {
            return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
        }

        private KeyedHold(Object key, Instant timestamp) {
            this.key = key;
            this.timestamp = timestamp;
        }

        @Override
        public int compareTo(KeyedHold that) {
            return ComparisonChain.start().compare((Comparable<?>)this.timestamp, (Comparable<?>)that.timestamp).compare(this.key, that.key, KEY_ORDERING).result();
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.key);
        }

        public boolean equals(Object other) {
            if (other == null || !(other instanceof KeyedHold)) {
                return false;
            }
            KeyedHold that = (KeyedHold)other;
            return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
        }

        public Instant getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(KeyedHold.class).add("key", this.key).add("hold", this.timestamp).toString();
        }
    }

    private static class SynchronizedProcessingTimeOutputWatermark
    implements Watermark {
        private final SynchronizedProcessingTimeInputWatermark inputWm;
        private AtomicReference<Instant> latestRefresh;

        public SynchronizedProcessingTimeOutputWatermark(SynchronizedProcessingTimeInputWatermark inputWm) {
            this.inputWm = inputWm;
            this.latestRefresh = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant get() {
            return this.latestRefresh.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldRefresh = this.latestRefresh.get();
            Instant newTimestamp = INSTANT_ORDERING.min(this.inputWm.get(), this.inputWm.getEarliestTimerTimestamp());
            this.latestRefresh.set(newTimestamp);
            return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class).add("latestRefresh", this.latestRefresh).toString();
        }
    }

    private static class SynchronizedProcessingTimeInputWatermark
    implements Watermark {
        private final Collection<? extends Watermark> inputWms;
        private final Collection<DirectRunner.CommittedBundle<?>> pendingBundles;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> processingTimers;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> synchronizedProcessingTimers;
        private final NavigableSet<TimerInternals.TimerData> pendingTimers;
        private AtomicReference<Instant> earliestHold;

        public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
            this.inputWms = inputWms;
            this.pendingBundles = new HashSet();
            this.processingTimers = new HashMap();
            this.synchronizedProcessingTimers = new HashMap();
            this.pendingTimers = new TreeSet<TimerInternals.TimerData>();
            Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : inputWms) {
                initialHold = INSTANT_ORDERING.min(initialHold, watermark.get());
            }
            this.earliestHold = new AtomicReference<Instant>(initialHold);
        }

        @Override
        public Instant get() {
            return this.earliestHold.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldHold = this.earliestHold.get();
            Instant minTime = THE_END_OF_TIME.get();
            for (Watermark watermark : this.inputWms) {
                minTime = INSTANT_ORDERING.min(minTime, watermark.get());
            }
            for (DirectRunner.CommittedBundle committedBundle : this.pendingBundles) {
                minTime = INSTANT_ORDERING.min(minTime, committedBundle.getSynchronizedProcessingOutputWatermark());
            }
            this.earliestHold.set(minTime);
            return WatermarkUpdate.fromTimestamps(oldHold, minTime);
        }

        public synchronized void addPending(DirectRunner.CommittedBundle<?> bundle) {
            this.pendingBundles.add(bundle);
        }

        public synchronized void removePending(DirectRunner.CommittedBundle<?> bundle) {
            this.pendingBundles.remove(bundle);
        }

        public synchronized Instant getEarliestTimerTimestamp() {
            Instant earliest = THE_END_OF_TIME.get();
            for (NavigableSet<TimerInternals.TimerData> timers : this.processingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = INSTANT_ORDERING.min(((TimerInternals.TimerData)timers.first()).getTimestamp(), earliest);
            }
            for (NavigableSet<TimerInternals.TimerData> timers : this.synchronizedProcessingTimers.values()) {
                if (timers.isEmpty()) continue;
                earliest = INSTANT_ORDERING.min(((TimerInternals.TimerData)timers.first()).getTimestamp(), earliest);
            }
            if (!this.pendingTimers.isEmpty()) {
                earliest = INSTANT_ORDERING.min(((TimerInternals.TimerData)this.pendingTimers.first()).getTimestamp(), earliest);
            }
            return earliest;
        }

        private synchronized void updateTimers(TimerUpdate update) {
            NavigableSet<TimerInternals.TimerData> timerQueue;
            Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap = this.timerMap(update.key);
            for (TimerInternals.TimerData addedTimer : update.setTimers) {
                timerQueue = timerMap.get(addedTimer.getDomain());
                if (timerQueue == null) continue;
                timerQueue.add(addedTimer);
            }
            for (TimerInternals.TimerData completedTimer : update.completedTimers) {
                this.pendingTimers.remove(completedTimer);
            }
            for (TimerInternals.TimerData deletedTimer : update.deletedTimers) {
                timerQueue = timerMap.get(deletedTimer.getDomain());
                if (timerQueue == null) continue;
                timerQueue.remove(deletedTimer);
            }
        }

        private synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredDomainTimers(TimeDomain domain, Instant firingTime) {
            Map firedTimers;
            switch (domain) {
                case PROCESSING_TIME: {
                    firedTimers = WatermarkManager.extractFiredTimers(firingTime, this.processingTimers);
                    break;
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    firedTimers = WatermarkManager.extractFiredTimers(INSTANT_ORDERING.min(firingTime, this.earliestHold.get()), this.synchronizedProcessingTimers);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Called getFiredTimers on a Synchronized Processing Time watermark and gave a non-processing time domain " + domain);
                }
            }
            for (Map.Entry firedTimer : firedTimers.entrySet()) {
                this.pendingTimers.addAll((Collection)firedTimer.getValue());
            }
            return firedTimers;
        }

        private Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap(StructuralKey<?> key) {
            NavigableSet<TimerInternals.TimerData> synchronizedProcessingQueue;
            NavigableSet<TimerInternals.TimerData> processingQueue = this.processingTimers.get(key);
            if (processingQueue == null) {
                processingQueue = new TreeSet<TimerInternals.TimerData>();
                this.processingTimers.put(key, processingQueue);
            }
            if ((synchronizedProcessingQueue = this.synchronizedProcessingTimers.get(key)) == null) {
                synchronizedProcessingQueue = new TreeSet<TimerInternals.TimerData>();
                this.synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
            }
            EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>> result = new EnumMap<TimeDomain, NavigableSet<TimerInternals.TimerData>>(TimeDomain.class);
            result.put(TimeDomain.PROCESSING_TIME, processingQueue);
            result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
            return result;
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class).add("earliestHold", this.earliestHold).toString();
        }
    }

    private static class AppliedPTransformOutputWatermark
    implements Watermark {
        private final AppliedPTransformInputWatermark inputWatermark;
        private final PerKeyHolds holds;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
            this.inputWatermark = inputWatermark;
            this.holds = new PerKeyHolds();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        public synchronized void updateHold(Object key, Instant newHold) {
            if (newHold == null) {
                this.holds.removeHold(key);
            } else {
                this.holds.updateHold(key, newHold);
            }
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant newWatermark = INSTANT_ORDERING.min(this.inputWatermark.get(), this.inputWatermark.getEarliestTimerTimestamp(), this.holds.getMinHold(), new Instant[0]);
            newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class).add("holds", this.holds).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    private static class AppliedPTransformInputWatermark
    implements Watermark {
        private final Collection<? extends Watermark> inputWatermarks;
        private final SortedMultiset<DirectRunner.CommittedBundle<?>> pendingElements;
        private final SortedMultiset<Instant> pendingTimers;
        private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerInternals.TimerData>> existingTimers;
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers;
        private AtomicReference<Instant> currentWatermark;

        public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
            this.inputWatermarks = inputWatermarks;
            Ordering<Object> pendingBundleComparator = new BundleByElementTimestampComparator().compound(Ordering.arbitrary());
            this.pendingElements = TreeMultiset.create(pendingBundleComparator);
            this.pendingTimers = TreeMultiset.create();
            this.objectTimers = new HashMap();
            this.existingTimers = new HashMap();
            this.currentWatermark = new AtomicReference<Instant>(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override
        public synchronized WatermarkUpdate refresh() {
            Instant oldWatermark = this.currentWatermark.get();
            Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (Watermark watermark : this.inputWatermarks) {
                minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, watermark.get());
            }
            if (!this.pendingElements.isEmpty()) {
                minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, this.pendingElements.firstEntry().getElement().getMinTimestamp());
            }
            Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
            this.currentWatermark.set(newWatermark);
            return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
        }

        private synchronized void addPending(DirectRunner.CommittedBundle<?> newPending) {
            this.pendingElements.add(newPending);
        }

        private synchronized void removePending(DirectRunner.CommittedBundle<?> completed) {
            this.pendingElements.remove(completed);
        }

        private synchronized Instant getEarliestTimerTimestamp() {
            if (this.pendingTimers.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            return this.pendingTimers.firstEntry().getElement();
        }

        private synchronized void updateTimers(TimerUpdate update) {
            TimerInternals.TimerData existingTimer;
            Table<StateNamespace, String, TimerInternals.TimerData> existingTimersForKey;
            NavigableSet<TimerInternals.TimerData> keyTimers = this.objectTimers.get(update.key);
            if (keyTimers == null) {
                keyTimers = new TreeSet<TimerInternals.TimerData>();
                this.objectTimers.put(update.key, keyTimers);
            }
            if ((existingTimersForKey = this.existingTimers.get(update.key)) == null) {
                existingTimersForKey = HashBasedTable.create();
                this.existingTimers.put(update.key, existingTimersForKey);
            }
            for (TimerInternals.TimerData timerData : update.getSetTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain())) continue;
                existingTimer = existingTimersForKey.get(timerData.getNamespace(), timerData.getTimerId());
                if (existingTimer == null) {
                    this.pendingTimers.add(timerData.getTimestamp());
                    keyTimers.add(timerData);
                } else if (!existingTimer.equals(timerData)) {
                    keyTimers.remove(existingTimer);
                    keyTimers.add(timerData);
                }
                existingTimersForKey.put(timerData.getNamespace(), timerData.getTimerId(), timerData);
            }
            for (TimerInternals.TimerData timerData : update.getDeletedTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain()) || (existingTimer = existingTimersForKey.get(timerData.getNamespace(), timerData.getTimerId())) == null) continue;
                this.pendingTimers.remove(existingTimer.getTimestamp());
                keyTimers.remove(existingTimer);
                existingTimersForKey.remove(existingTimer.getNamespace(), existingTimer.getTimerId());
            }
            for (TimerInternals.TimerData timerData : update.getCompletedTimers()) {
                if (!TimeDomain.EVENT_TIME.equals((Object)timerData.getDomain())) continue;
                this.pendingTimers.remove(timerData.getTimestamp());
            }
        }

        private synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredEventTimeTimers() {
            return WatermarkManager.extractFiredTimers(this.currentWatermark.get(), this.objectTimers);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class).add("pendingElements", this.pendingElements).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    private static enum WatermarkUpdate {
        ADVANCED(true),
        NO_CHANGE(false);

        private final boolean advanced;

        private WatermarkUpdate(boolean advanced) {
            this.advanced = advanced;
        }

        public boolean isAdvanced() {
            return this.advanced;
        }

        public WatermarkUpdate union(WatermarkUpdate that) {
            if (this.advanced) {
                return this;
            }
            return that;
        }

        public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
            if (currentTime.isAfter((ReadableInstant)oldTime)) {
                return ADVANCED;
            }
            return NO_CHANGE;
        }
    }

    private static interface Watermark {
        public Instant get();

        public WatermarkUpdate refresh();
    }
}

