/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.HashSet;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.util.state.StateContexts;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateTable;
import org.apache.beam.sdk.util.state.StateTag;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class CopyOnAccessInMemoryStateInternals<K>
implements StateInternals<K> {
    private final K key;
    private final CopyOnAccessInMemoryStateTable<K> table;

    public static <K> CopyOnAccessInMemoryStateInternals<K> withUnderlying(K key, @Nullable CopyOnAccessInMemoryStateInternals<K> underlying) {
        return new CopyOnAccessInMemoryStateInternals<K>(key, underlying);
    }

    private CopyOnAccessInMemoryStateInternals(K key, CopyOnAccessInMemoryStateInternals<K> underlying) {
        this.key = key;
        this.table = new CopyOnAccessInMemoryStateTable<K>(key, underlying == null ? null : underlying.table);
    }

    public CopyOnAccessInMemoryStateInternals<K> commit() {
        ((CopyOnAccessInMemoryStateTable)this.table).commit();
        return this;
    }

    public Instant getEarliestWatermarkHold() {
        Preconditions.checkState(((CopyOnAccessInMemoryStateTable)this.table).earliestWatermarkHold.isPresent(), "Can't get the earliest watermark hold in a %s before it is committed", (Object)this.getClass().getSimpleName());
        return (Instant)((CopyOnAccessInMemoryStateTable)this.table).earliestWatermarkHold.get();
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) {
        return this.state(namespace, address, StateContexts.nullContext());
    }

    public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) {
        return (T)this.table.get(namespace, address, c);
    }

    public K getKey() {
        return this.key;
    }

    public boolean isEmpty() {
        return Iterables.isEmpty(this.table.values());
    }

    private static class CopyOnAccessInMemoryStateTable<K>
    extends StateTable<K> {
        private final K key;
        private Optional<StateTable<K>> underlying;
        private StateBinderFactory<K> binderFactory;
        private Optional<Instant> earliestWatermarkHold;

        public CopyOnAccessInMemoryStateTable(K key, StateTable<K> underlying) {
            this.key = key;
            this.underlying = Optional.fromNullable(underlying);
            this.binderFactory = new CopyOnBindBinderFactory<K>(key, this.underlying);
            this.earliestWatermarkHold = Optional.absent();
        }

        private void commit() {
            Instant earliestHold = this.getEarliestWatermarkHold();
            if (this.underlying.isPresent()) {
                ReadThroughBinderFactory<K> readThroughBinder = new ReadThroughBinderFactory<K>(this.underlying.get());
                this.binderFactory = readThroughBinder;
                Instant earliestUnderlyingHold = readThroughBinder.readThroughAndGetEarliestHold(this);
                if (earliestUnderlyingHold.isBefore((ReadableInstant)earliestHold)) {
                    earliestHold = earliestUnderlyingHold;
                }
            }
            this.earliestWatermarkHold = Optional.of(earliestHold);
            this.clearEmpty();
            this.binderFactory = new InMemoryStateBinderFactory<K>(this.key);
            this.underlying = Optional.absent();
        }

        private Instant getEarliestWatermarkHold() {
            Instant earliest = BoundedWindow.TIMESTAMP_MAX_VALUE;
            for (State existingState : this.values()) {
                Instant hold;
                if (!(existingState instanceof WatermarkHoldState) || (hold = (Instant)((WatermarkHoldState)existingState).read()) == null || !hold.isBefore((ReadableInstant)earliest)) continue;
                earliest = hold;
            }
            return earliest;
        }

        private void clearEmpty() {
            HashSet emptyNamespaces = new HashSet(this.getNamespacesInUse());
            block0: for (StateNamespace namespace : this.getNamespacesInUse()) {
                for (State existingState : this.getTagsInUse(namespace).values()) {
                    if (((InMemoryStateInternals.InMemoryState)existingState).isCleared()) continue;
                    emptyNamespaces.remove(namespace);
                    continue block0;
                }
            }
            for (StateNamespace empty : emptyNamespaces) {
                this.clearNamespace(empty);
            }
        }

        protected StateTag.StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) {
            return this.binderFactory.forNamespace(namespace, c);
        }

        private static class InMemoryStateBinderFactory<K>
        implements StateBinderFactory<K> {
            private final K key;

            public InMemoryStateBinderFactory(K key) {
                this.key = key;
            }

            @Override
            public StateTag.StateBinder<K> forNamespace(StateNamespace namespace, StateContext<?> c) {
                return new InMemoryStateInternals.InMemoryStateBinder(this.key, c);
            }
        }

        private static class ReadThroughBinderFactory<K>
        implements StateBinderFactory<K> {
            private final StateTable<K> underlying;

            public ReadThroughBinderFactory(StateTable<K> underlying) {
                this.underlying = underlying;
            }

            public Instant readThroughAndGetEarliestHold(StateTable<K> readTo) {
                Instant earliestHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
                for (StateNamespace namespace : this.underlying.getNamespacesInUse()) {
                    for (Map.Entry existingState : this.underlying.getTagsInUse(namespace).entrySet()) {
                        Instant hold;
                        State state;
                        if (((InMemoryStateInternals.InMemoryState)existingState.getValue()).isCleared() || !((state = readTo.get(namespace, (StateTag)existingState.getKey(), StateContexts.nullContext())) instanceof WatermarkHoldState) || (hold = (Instant)((WatermarkHoldState)state).read()) == null || !hold.isBefore((ReadableInstant)earliestHold)) continue;
                        earliestHold = hold;
                    }
                }
                return earliestHold;
            }

            @Override
            public StateTag.StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
                return new StateTag.StateBinder<K>(){

                    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
                        return (WatermarkHoldState)ReadThroughBinderFactory.this.underlying.get(namespace, address, c);
                    }

                    public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
                        return (ValueState)ReadThroughBinderFactory.this.underlying.get(namespace, address, c);
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        return (AccumulatorCombiningState)ReadThroughBinderFactory.this.underlying.get(namespace, address, c);
                    }

                    public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
                        return (BagState)ReadThroughBinderFactory.this.underlying.get(namespace, address, c);
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
                        return (AccumulatorCombiningState)ReadThroughBinderFactory.this.underlying.get(namespace, address, c);
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
                        return this.bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, (StateContext)c));
                    }
                };
            }
        }

        private static class CopyOnBindBinderFactory<K>
        implements StateBinderFactory<K> {
            private final K key;
            private final Optional<StateTable<K>> underlying;

            public CopyOnBindBinderFactory(K key, Optional<StateTable<K>> underlying) {
                this.key = key;
                this.underlying = underlying;
            }

            private boolean containedInUnderlying(StateNamespace namespace, StateTag<? super K, ?> tag) {
                return this.underlying.isPresent() && this.underlying.get().isNamespaceInUse(namespace) && this.underlying.get().getTagsInUse(namespace).containsKey(tag);
            }

            @Override
            public StateTag.StateBinder<K> forNamespace(final StateNamespace namespace, final StateContext<?> c) {
                return new StateTag.StateBinder<K>(){

                    public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) {
                        if (CopyOnBindBinderFactory.this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)CopyOnBindBinderFactory.this.underlying.get()).get(namespace, address, c);
                            return (WatermarkHoldState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryWatermarkHold(outputTimeFn);
                    }

                    public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) {
                        if (CopyOnBindBinderFactory.this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)CopyOnBindBinderFactory.this.underlying.get()).get(namespace, address, c);
                            return (ValueState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryValue();
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                        if (CopyOnBindBinderFactory.this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)CopyOnBindBinderFactory.this.underlying.get()).get(namespace, address, c);
                            return (AccumulatorCombiningState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryCombiningValue(CopyOnBindBinderFactory.this.key, combineFn.asKeyedFn());
                    }

                    public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) {
                        if (CopyOnBindBinderFactory.this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)CopyOnBindBinderFactory.this.underlying.get()).get(namespace, address, c);
                            return (BagState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryBag();
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
                        if (CopyOnBindBinderFactory.this.containedInUnderlying(namespace, address)) {
                            InMemoryStateInternals.InMemoryState existingState = (InMemoryStateInternals.InMemoryState)((StateTable)CopyOnBindBinderFactory.this.underlying.get()).get(namespace, address, c);
                            return (AccumulatorCombiningState)existingState.copy();
                        }
                        return new InMemoryStateInternals.InMemoryCombiningValue(CopyOnBindBinderFactory.this.key, combineFn);
                    }

                    public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) {
                        return this.bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, (StateContext)c));
                    }
                };
            }
        }

        private static interface StateBinderFactory<K> {
            public StateTag.StateBinder<K> forNamespace(StateNamespace var1, StateContext<?> var2);
        }
    }
}

