/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapFoldingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap();

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
        LOG.info("Initializing heap keyed state backend with stream factory.");
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
        String name = stateDesc.getName();
        StateTable<K, ?, ?> stateTable = this.stateTables.get(name);
        RegisteredBackendStateMetaInfo newMetaInfo = new RegisteredBackendStateMetaInfo(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
        return this.tryRegisterStateTable(stateTable, newMetaInfo);
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
        if (stateTable == null) {
            stateTable = new StateTable(newMetaInfo, this.keyGroupRange);
            this.stateTables.put(newMetaInfo.getName(), stateTable);
        } else {
            if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
                throw new RuntimeException("Trying to access state using incompatible meta info, was " + stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
            }
            stateTable.setMetaInfo(newMetaInfo);
        }
        return stateTable;
    }

    @Override
    public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
        StateTable<K, N, V> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor<?, V>)stateDesc);
        return new HeapValueState<K, N, V>(this, stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        String name = stateDesc.getName();
        StateTable<K, ?, ?> stateTable = this.stateTables.get(name);
        RegisteredBackendStateMetaInfo newMetaInfo = new RegisteredBackendStateMetaInfo(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer(stateDesc.getSerializer()));
        stateTable = this.tryRegisterStateTable(stateTable, newMetaInfo);
        return new HeapListState(this, stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapReducingState<K, N, T>(this, stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        StateTable<K, N, ACC> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapFoldingState<K, N, T, ACC>(this, stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public RunnableFuture<KeyGroupsStateHandle> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
        if (this.stateTables.isEmpty()) {
            return new DoneFuture<Object>(null);
        }
        try (CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);){
            DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)stream));
            Preconditions.checkState((this.stateTables.size() <= Short.MAX_VALUE ? 1 : 0) != 0, (Object)("Too many KV-States: " + this.stateTables.size() + ". Currently at most " + Short.MAX_VALUE + " states are supported"));
            ArrayList metaInfoProxyList = new ArrayList(this.stateTables.size());
            HashMap<String, Integer> kVStateToId = new HashMap<String, Integer>(this.stateTables.size());
            for (Map.Entry<String, StateTable<K, ?, ?>> kvState : this.stateTables.entrySet()) {
                RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
                KeyedBackendSerializationProxy.StateMetaInfo metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(metaInfo.getStateType(), metaInfo.getName(), metaInfo.getNamespaceSerializer(), metaInfo.getStateSerializer());
                metaInfoProxyList.add(metaInfoProxy);
                kVStateToId.put(kvState.getKey(), kVStateToId.size());
            }
            KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, metaInfoProxyList);
            serializationProxy.write((DataOutputView)outView);
            int offsetCounter = 0;
            long[] keyGroupRangeOffsets = new long[this.keyGroupRange.getNumberOfKeyGroups()];
            for (int keyGroupIndex = this.keyGroupRange.getStartKeyGroup(); keyGroupIndex <= this.keyGroupRange.getEndKeyGroup(); ++keyGroupIndex) {
                keyGroupRangeOffsets[offsetCounter++] = stream.getPos();
                outView.writeInt(keyGroupIndex);
                for (Map.Entry<String, StateTable<K, ?, ?>> kvState : this.stateTables.entrySet()) {
                    outView.writeShort(((Integer)kVStateToId.get(kvState.getKey())).intValue());
                    this.writeStateTableForKeyGroup((DataOutputView)outView, kvState.getValue(), keyGroupIndex);
                }
            }
            StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
            KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(this.keyGroupRange, keyGroupRangeOffsets);
            KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
            DoneFuture<KeyGroupsStateHandle> doneFuture = new DoneFuture<KeyGroupsStateHandle>(keyGroupsStateHandle);
            return doneFuture;
        }
    }

    @Override
    public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
        }
        if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
            this.restoreOldSavepointKeyedState(restoredState);
        } else {
            this.restorePartitionedState(restoredState);
        }
    }

    private <N, S> void writeStateTableForKeyGroup(DataOutputView outView, StateTable<K, N, S> stateTable, int keyGroupIndex) throws IOException {
        TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
        TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
        Map<N, Map<K, S>> namespaceMap = stateTable.get(keyGroupIndex);
        if (namespaceMap == null) {
            outView.writeByte(0);
        } else {
            outView.writeByte(1);
            outView.writeInt(namespaceMap.size());
            for (Map.Entry<N, Map<K, S>> namespace : namespaceMap.entrySet()) {
                namespaceSerializer.serialize(namespace.getKey(), outView);
                Map<K, S> entryMap = namespace.getValue();
                outView.writeInt(entryMap.size());
                for (Map.Entry<K, S> entry : entryMap.entrySet()) {
                    this.keySerializer.serialize(entry.getKey(), outView);
                    stateSerializer.serialize(entry.getValue(), outView);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
        int numRegisteredKvStates = 0;
        HashMap<Integer, String> kvStatesById = new HashMap<Integer, String>();
        for (KeyGroupsStateHandle keyGroupsHandle : state) {
            if (keyGroupsHandle == null) continue;
            FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream();
            this.cancelStreamRegistry.registerClosable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read((DataInputView)inView);
                List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList = serializationProxy.getNamedStateSerializationProxies();
                for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
                    StateTable<K, ?, ?> stateTable = this.stateTables.get(metaInfoSerializationProxy.getStateName());
                    if (null != stateTable) continue;
                    RegisteredBackendStateMetaInfo registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo(metaInfoSerializationProxy);
                    stateTable = new StateTable(registeredBackendStateMetaInfo, this.keyGroupRange);
                    this.stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
                    kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
                    ++numRegisteredKvStates;
                }
                for (Tuple2 groupOffset : keyGroupsHandle.getGroupRangeOffsets()) {
                    int keyGroupIndex = (Integer)groupOffset.f0;
                    long offset = (Long)groupOffset.f1;
                    fsDataInputStream.seek(offset);
                    int writtenKeyGroupIndex = inView.readInt();
                    assert (writtenKeyGroupIndex == keyGroupIndex);
                    for (int i = 0; i < metaInfoList.size(); ++i) {
                        short kvStateId = inView.readShort();
                        byte isPresent = inView.readByte();
                        if (isPresent == 0) continue;
                        StateTable<K, ?, ?> stateTable = this.stateTables.get(kvStatesById.get(kvStateId));
                        Preconditions.checkNotNull(stateTable);
                        this.readStateTableForKeyGroup((DataInputView)inView, stateTable, keyGroupIndex);
                    }
                }
            }
            finally {
                this.cancelStreamRegistry.unregisterClosable((Closeable)fsDataInputStream);
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
    }

    private <N, S> void readStateTableForKeyGroup(DataInputView inView, StateTable<K, N, S> stateTable, int keyGroupIndex) throws IOException {
        TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
        TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
        HashMap namespaceMap = new HashMap();
        stateTable.set(keyGroupIndex, namespaceMap);
        int numNamespaces = inView.readInt();
        for (int k = 0; k < numNamespaces; ++k) {
            Object namespace = namespaceSerializer.deserialize(inView);
            HashMap<Object, Object> entryMap = new HashMap<Object, Object>();
            namespaceMap.put(namespace, entryMap);
            int numEntries = inView.readInt();
            for (int l = 0; l < numEntries; ++l) {
                Object key = this.keySerializer.deserialize(inView);
                Object state = stateSerializer.deserialize(inView);
                entryMap.put(key, state);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Deprecated
    private void restoreOldSavepointKeyedState(Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
        HashMap namedStates;
        if (stateHandles.isEmpty()) {
            return;
        }
        Preconditions.checkState((1 == stateHandles.size() ? 1 : 0) != 0, (Object)"Only one element expected here.");
        try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream();){
            namedStates = (HashMap)InstantiationUtil.deserializeObject((InputStream)inputStream, (ClassLoader)this.userCodeClassLoader);
        }
        for (Map.Entry nameToState : namedStates.entrySet()) {
            Map nullNameSpaceFix;
            RestoredState restoredState;
            KvStateSnapshot stateSnapshot;
            KvStateSnapshot genericSnapshot = (KvStateSnapshot)nameToState.getValue();
            if (genericSnapshot instanceof AbstractMemStateSnapshot) {
                stateSnapshot = (AbstractMemStateSnapshot)nameToState.getValue();
                restoredState = this.restoreHeapState((AbstractMemStateSnapshot<K, ?, ?, ?, ?>)stateSnapshot);
            } else if (genericSnapshot instanceof AbstractFsStateSnapshot) {
                stateSnapshot = (AbstractFsStateSnapshot)nameToState.getValue();
                restoredState = this.restoreFsState((AbstractFsStateSnapshot<K, ?, ?, ?, ?>)stateSnapshot);
            } else {
                throw new IllegalStateException("Unknown state: " + genericSnapshot);
            }
            Map rawResultMap = restoredState.getRawResultMap();
            Object namespaceSerializer = restoredState.getNamespaceSerializer();
            TypeSerializer<?> stateSerializer = restoredState.getStateSerializer();
            if (namespaceSerializer instanceof VoidSerializer) {
                namespaceSerializer = VoidNamespaceSerializer.INSTANCE;
            }
            if (null != (nullNameSpaceFix = (Map)rawResultMap.remove(null))) {
                rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
            }
            RegisteredBackendStateMetaInfo registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo(StateDescriptor.Type.UNKNOWN, (String)nameToState.getKey(), (TypeSerializer<?>)namespaceSerializer, stateSerializer);
            StateTable stateTable = new StateTable(registeredBackendStateMetaInfo, this.keyGroupRange);
            stateTable.getState().set(0, rawResultMap);
            this.stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
        }
    }

    private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
        return new RestoredState(stateSnapshot.deserialize(), stateSnapshot.getNamespaceSerializer(), stateSnapshot.getStateSerializer());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
        FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
        try (FSDataInputStream inStream = fs.open(stateSnapshot.getFilePath());){
            DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)inStream);
            int numNamespaces = inView.readInt();
            HashMap rawResultMap = new HashMap(numNamespaces);
            TypeSerializer<K> keySerializer = stateSnapshot.getKeySerializer();
            TypeSerializer<?> namespaceSerializer = stateSnapshot.getNamespaceSerializer();
            TypeSerializer<?> stateSerializer = stateSnapshot.getStateSerializer();
            for (int i = 0; i < numNamespaces; ++i) {
                Object namespace = namespaceSerializer.deserialize((DataInputView)inView);
                int numKV = inView.readInt();
                HashMap<Object, Object> namespaceMap = new HashMap<Object, Object>(numKV);
                rawResultMap.put(namespace, namespaceMap);
                for (int j = 0; j < numKV; ++j) {
                    Object key = keySerializer.deserialize((DataInputView)inView);
                    Object value = stateSerializer.deserialize((DataInputView)inView);
                    namespaceMap.put(key, value);
                }
            }
            RestoredState restoredState = new RestoredState(rawResultMap, namespaceSerializer, stateSerializer);
            return restoredState;
        }
        catch (Exception e) {
            throw new IOException("Failed to restore state from file system", e);
        }
    }

    @VisibleForTesting
    public int numStateEntries() {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            for (Map<?, Map<K, ?>> namespaceMap : stateTable.getState()) {
                if (namespaceMap == null) continue;
                Map<?, Map<K, ?>> typedMap = namespaceMap;
                for (Map<K, ?> entriesMap : typedMap.values()) {
                    sum += entriesMap.size();
                }
            }
        }
        return sum;
    }

    @VisibleForTesting
    public <N> int numStateEntries(N namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            for (Map<?, Map<K, ?>> namespaceMap : stateTable.getState()) {
                Map<?, Map<K, ?>> typedMap;
                Map<K, ?> singleNamespace;
                if (namespaceMap == null || (singleNamespace = (typedMap = namespaceMap).get(namespace)) == null) continue;
                sum += singleNamespace.size();
            }
        }
        return sum;
    }

    static final class RestoredState {
        private final Map rawResultMap;
        private final TypeSerializer<?> namespaceSerializer;
        private final TypeSerializer<?> stateSerializer;

        public RestoredState(Map rawResultMap, TypeSerializer<?> namespaceSerializer, TypeSerializer<?> stateSerializer) {
            this.rawResultMap = rawResultMap;
            this.namespaceSerializer = namespaceSerializer;
            this.stateSerializer = stateSerializer;
        }

        public Map getRawResultMap() {
            return this.rawResultMap;
        }

        public TypeSerializer<?> getNamespaceSerializer() {
            return this.namespaceSerializer;
        }

        public TypeSerializer<?> getStateSerializer() {
            return this.stateSerializer;
        }
    }
}

