package org.apache.beam.runners.spark.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.MapState;
import org.apache.beam.sdk.util.state.ReadableState;
import org.apache.beam.sdk.util.state.SetState;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.util.state.StateContexts;
import org.apache.beam.sdk.util.state.ValueState;
import org.apache.beam.sdk.util.state.WatermarkHoldState;
import org.apache.beam.spark.repackaged.com.google.common.collect.HashBasedTable;
import org.apache.beam.spark.repackaged.com.google.common.collect.Table;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals.class */
public class SparkStateInternals<K> implements StateInternals<K> {
    private final K key;
    private final Table<String, String, byte[]> stateTable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$AbstractState.class */
    public class AbstractState<T> {
        final StateNamespace namespace;
        final StateTag<?, ? extends State> address;
        final Coder<T> coder;

        private AbstractState(StateNamespace stateNamespace, StateTag<?, ? extends State> stateTag, Coder<T> coder) {
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.coder = coder;
        }

        T readValue() {
            byte[] bArr = (byte[]) SparkStateInternals.this.stateTable.get(this.namespace.stringKey(), this.address.getId());
            if (bArr != null) {
                return (T) CoderHelpers.fromByteArray(bArr, this.coder);
            }
            return null;
        }

        void writeValue(T t) {
            SparkStateInternals.this.stateTable.put(this.namespace.stringKey(), this.address.getId(), CoderHelpers.toByteArray(t, this.coder));
        }

        public void clear() {
            SparkStateInternals.this.stateTable.remove(this.namespace.stringKey(), this.address.getId());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AbstractState abstractState = (AbstractState) obj;
            return this.namespace.equals(abstractState.namespace) && this.address.equals(abstractState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkAccumulatorCombiningState.class */
    public class SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> extends SparkStateInternals<K>.AbstractState<AccumT> implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
        private final K key;
        private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;

        private SparkAccumulatorCombiningState(StateNamespace stateNamespace, StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, K k, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> keyedCombineFn) {
            super(stateNamespace, stateTag, coder);
            this.key = k;
            this.combineFn = keyedCombineFn;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SparkStateInternals<K>.SparkAccumulatorCombiningState<K, InputT, AccumT, OutputT> m27readLater() {
            return this;
        }

        public OutputT read() {
            return (OutputT) this.combineFn.extractOutput(this.key, getAccum());
        }

        public void add(InputT inputt) {
            AccumT accum = getAccum();
            this.combineFn.addInput(this.key, accum, inputt);
            writeValue(accum);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public AccumT getAccum() {
            AccumT readValue = readValue();
            if (readValue == null) {
                readValue = this.combineFn.createAccumulator(this.key);
            }
            return readValue;
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkAccumulatorCombiningState.1
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m28read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkAccumulatorCombiningState.this.namespace.stringKey(), SparkAccumulatorCombiningState.this.address.getId()) == null);
                }
            };
        }

        public void addAccum(AccumT accumt) {
            writeValue(this.combineFn.mergeAccumulators(this.key, Arrays.asList(getAccum(), accumt)));
        }

        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return (AccumT) this.combineFn.mergeAccumulators(this.key, iterable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkBagState.class */
    public final class SparkBagState<T> extends SparkStateInternals<K>.AbstractState<List<T>> implements BagState<T> {
        private SparkBagState(StateNamespace stateNamespace, StateTag<?, BagState<T>> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, ListCoder.of(coder));
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SparkStateInternals<K>.SparkBagState<T> m31readLater() {
            return this;
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public List<T> m32read() {
            List<T> list = (List) super.readValue();
            if (list == null) {
                list = new ArrayList();
            }
            return list;
        }

        public void add(T t) {
            List<T> m32read = m32read();
            m32read.add(t);
            writeValue(m32read);
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkBagState.1
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m33read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkBagState.this.namespace.stringKey(), SparkBagState.this.address.getId()) == null);
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkStateBinder.class */
    public class SparkStateBinder implements StateTag.StateBinder<K> {
        private final K key;
        private final StateNamespace namespace;
        private final StateContext<?> c;

        private SparkStateBinder(K k, StateNamespace stateNamespace, StateContext<?> stateContext) {
            this.key = k;
            this.namespace = stateNamespace;
            this.c = stateContext;
        }

        public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> stateTag, Coder<T> coder) {
            return new SparkValueState(this.namespace, stateTag, coder);
        }

        public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> stateTag, Coder<T> coder) {
            return new SparkBagState(this.namespace, stateTag, coder);
        }

        public <T> SetState<T> bindSet(StateTag<? super K, SetState<T>> stateTag, Coder<T> coder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
        }

        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<? super K, MapState<KeyT, ValueT>> stateTag, Coder<KeyT> coder, Coder<ValueT> coder2) {
            throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            return new SparkAccumulatorCombiningState(this.namespace, stateTag, coder, this.key, combineFn.asKeyedFn());
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> keyedCombineFn) {
            return new SparkAccumulatorCombiningState(this.namespace, stateTag, coder, this.key, keyedCombineFn);
        }

        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> keyedCombineFnWithContext) {
            return new SparkAccumulatorCombiningState(this.namespace, stateTag, coder, this.key, CombineFnUtil.bindContext(keyedCombineFnWithContext, this.c));
        }

        public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> stateTag, OutputTimeFn<? super W> outputTimeFn) {
            return new SparkWatermarkHoldState(this.namespace, stateTag, outputTimeFn);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkValueState.class */
    private class SparkValueState<T> extends SparkStateInternals<K>.AbstractState<T> implements ValueState<T> {
        private SparkValueState(StateNamespace stateNamespace, StateTag<?, ValueState<T>> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, coder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SparkStateInternals<K>.SparkValueState<T> m35readLater() {
            return this;
        }

        public T read() {
            return readValue();
        }

        public void write(T t) {
            writeValue(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkWatermarkHoldState.class */
    public class SparkWatermarkHoldState<W extends BoundedWindow> extends SparkStateInternals<K>.AbstractState<Instant> implements WatermarkHoldState<W> {
        private final OutputTimeFn<? super W> outputTimeFn;

        public SparkWatermarkHoldState(StateNamespace stateNamespace, StateTag<?, WatermarkHoldState<W>> stateTag, OutputTimeFn<? super W> outputTimeFn) {
            super(stateNamespace, stateTag, InstantCoder.of());
            this.outputTimeFn = outputTimeFn;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* renamed from: readLater, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SparkStateInternals<K>.SparkWatermarkHoldState<W> m38readLater() {
            return this;
        }

        /* renamed from: read, reason: merged with bridge method [inline-methods] */
        public Instant m39read() {
            return readValue();
        }

        public void add(Instant instant) {
            Instant m39read = m39read();
            writeValue(m39read == null ? instant : this.outputTimeFn.combine(m39read, instant));
        }

        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkWatermarkHoldState.1
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* renamed from: read, reason: merged with bridge method [inline-methods] */
                public Boolean m40read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkWatermarkHoldState.this.namespace.stringKey(), SparkWatermarkHoldState.this.address.getId()) == null);
                }
            };
        }

        public OutputTimeFn<? super W> getOutputTimeFn() {
            return this.outputTimeFn;
        }
    }

    private SparkStateInternals(K k) {
        this.key = k;
        this.stateTable = HashBasedTable.create();
    }

    private SparkStateInternals(K k, Table<String, String, byte[]> table) {
        this.key = k;
        this.stateTable = table;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> SparkStateInternals<K> forKey(K k) {
        return new SparkStateInternals<>(k);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> SparkStateInternals<K> forKeyAndState(K k, Table<String, String, byte[]> table) {
        return new SparkStateInternals<>(k, table);
    }

    public Table<String, String, byte[]> getState() {
        return this.stateTable;
    }

    public K getKey() {
        return this.key;
    }

    public <T extends State> T state(StateNamespace stateNamespace, StateTag<? super K, T> stateTag) {
        return (T) state(stateNamespace, stateTag, StateContexts.nullContext());
    }

    public <T extends State> T state(StateNamespace stateNamespace, StateTag<? super K, T> stateTag, StateContext<?> stateContext) {
        return (T) stateTag.bind(new SparkStateBinder(this.key, stateNamespace, stateContext));
    }
}
