/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.async;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.migration.MigrationUtil;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.heap.async.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.async.HeapFoldingState;
import org.apache.flink.runtime.state.heap.async.HeapListState;
import org.apache.flink.runtime.state.heap.async.HeapReducingState;
import org.apache.flink.runtime.state.heap.async.HeapValueState;
import org.apache.flink.runtime.state.heap.async.StateTable;
import org.apache.flink.runtime.state.heap.async.StateTableByKeyGroupReader;
import org.apache.flink.runtime.state.heap.async.StateTableByKeyGroupReaders;
import org.apache.flink.runtime.state.heap.async.StateTableSnapshot;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncHeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncHeapKeyedStateBackend.class);
    private final HashMap<String, StateTable<K, ?, ?>> stateTables = new HashMap();

    public AsyncHeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
        LOG.info("Initializing heap keyed state backend with stream factory.");
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
        return this.tryRegisterStateTable(stateDesc.getName(), stateDesc.getType(), namespaceSerializer, stateDesc.getSerializer());
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(String stateName, StateDescriptor.Type stateType, TypeSerializer<N> namespaceSerializer, TypeSerializer<V> valueSerializer) {
        RegisteredBackendStateMetaInfo<N, V> newMetaInfo = new RegisteredBackendStateMetaInfo<N, V>(stateType, stateName, namespaceSerializer, valueSerializer);
        StateTable<K, Object, Object> stateTable = this.stateTables.get(stateName);
        if (stateTable == null) {
            stateTable = this.newStateTable(newMetaInfo);
            this.stateTables.put(stateName, stateTable);
        } else {
            if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
                throw new RuntimeException("Trying to access state using incompatible meta info, was " + stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
            }
            stateTable.setMetaInfo(newMetaInfo);
        }
        return stateTable;
    }

    private boolean hasRegisteredState() {
        return !this.stateTables.isEmpty();
    }

    @Override
    public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
        StateTable<K, N, V> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor<?, V>)stateDesc);
        return new HeapValueState<K, N, V>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(stateDesc.getName(), stateDesc.getType(), namespaceSerializer, new ArrayListSerializer(stateDesc.getSerializer()));
        return new HeapListState<K, N, T>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
        StateTable stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapReducingState<K, N, T>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
        StateTable<K, N, ACC> stateTable = this.tryRegisterStateTable(namespaceSerializer, (StateDescriptor)stateDesc);
        return new HeapFoldingState<K, N, T, ACC>(stateDesc, stateTable, this.keySerializer, namespaceSerializer);
    }

    @Override
    public RunnableFuture<KeyGroupsStateHandle> snapshot(final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory) throws Exception {
        if (!this.hasRegisteredState()) {
            return DoneFuture.nullValue();
        }
        long syncStartTime = System.currentTimeMillis();
        Preconditions.checkState((this.stateTables.size() <= Short.MAX_VALUE ? 1 : 0) != 0, (Object)("Too many KV-States: " + this.stateTables.size() + ". Currently at most " + Short.MAX_VALUE + " states are supported"));
        ArrayList metaInfoProxyList = new ArrayList(this.stateTables.size());
        final HashMap<String, Integer> kVStateToId = new HashMap<String, Integer>(this.stateTables.size());
        HashedMap cowStateStableSnapshots = new HashedMap(this.stateTables.size());
        for (Map.Entry<String, StateTable<K, ?, ?>> kvState : this.stateTables.entrySet()) {
            RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
            KeyedBackendSerializationProxy.StateMetaInfo metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(metaInfo.getStateType(), metaInfo.getName(), metaInfo.getNamespaceSerializer(), metaInfo.getStateSerializer());
            metaInfoProxyList.add(metaInfoProxy);
            kVStateToId.put(kvState.getKey(), kVStateToId.size());
            StateTable<K, ?, ?> stateTable = kvState.getValue();
            if (null == stateTable) continue;
            cowStateStableSnapshots.put(stateTable, stateTable.createSnapshot());
        }
        final KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.keySerializer, metaInfoProxyList);
        AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable = new AbstractAsyncIOCallable<KeyGroupsStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>((Map)cowStateStableSnapshots){
            AtomicBoolean open = new AtomicBoolean(false);
            final /* synthetic */ Map val$cowStateStableSnapshots;
            {
                this.val$cowStateStableSnapshots = map2;
            }

            @Override
            public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
                if (this.open.compareAndSet(false, true)) {
                    CheckpointStreamFactory.CheckpointStateOutputStream stream = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
                    try {
                        AsyncHeapKeyedStateBackend.this.cancelStreamRegistry.registerClosable((Closeable)((Object)stream));
                        return stream;
                    }
                    catch (Exception ex) {
                        this.open.set(false);
                        throw ex;
                    }
                }
                throw new IOException("Operation already opened.");
            }

            @Override
            public KeyGroupsStateHandle performOperation() throws Exception {
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream stream = (CheckpointStreamFactory.CheckpointStateOutputStream)((Object)this.getIoHandle());
                DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)stream));
                serializationProxy.write((DataOutputView)outView);
                long[] keyGroupRangeOffsets = new long[AsyncHeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                for (int keyGroupPos = 0; keyGroupPos < AsyncHeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                    int keyGroupId = AsyncHeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(keyGroupPos);
                    keyGroupRangeOffsets[keyGroupPos] = stream.getPos();
                    outView.writeInt(keyGroupId);
                    for (Map.Entry kvState : AsyncHeapKeyedStateBackend.this.stateTables.entrySet()) {
                        outView.writeShort(((Integer)kVStateToId.get(kvState.getKey())).intValue());
                        ((StateTableSnapshot)this.val$cowStateStableSnapshots.get(kvState.getValue())).writeMappingsInKeyGroup((DataOutputView)outView, keyGroupId);
                    }
                }
                if (this.open.compareAndSet(true, false)) {
                    StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
                    KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(AsyncHeapKeyedStateBackend.this.keyGroupRange, keyGroupRangeOffsets);
                    KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(offsets, streamStateHandle);
                    LOG.info("Heap backend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - asyncStartTime});
                    return keyGroupsStateHandle;
                }
                throw new IOException("Checkpoint stream already closed.");
            }

            @Override
            public void done(boolean canceled) {
                CheckpointStreamFactory.CheckpointStateOutputStream stream;
                if (this.open.compareAndSet(true, false) && null != (stream = (CheckpointStreamFactory.CheckpointStateOutputStream)((Object)this.getIoHandle()))) {
                    AsyncHeapKeyedStateBackend.this.cancelStreamRegistry.unregisterClosable((Closeable)((Object)stream));
                    IOUtils.closeQuietly((OutputStream)((Object)stream));
                }
                for (StateTableSnapshot snapshot : this.val$cowStateStableSnapshots.values()) {
                    snapshot.release();
                }
            }
        };
        AsyncStoppableTaskWithCallback<KeyGroupsStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
        LOG.info("Heap backend snapshot (" + streamFactory + ", synchronous part) in thread " + Thread.currentThread() + " took " + (System.currentTimeMillis() - syncStartTime) + " ms.");
        return task;
    }

    @Override
    public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
        }
        if (MigrationUtil.isOldSavepointKeyedState(restoredState)) {
            throw new UnsupportedOperationException("This async.HeapKeyedStateBackend does not support restore from old savepoints.");
        }
        this.restorePartitionedState(restoredState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
        HashMap<Integer, String> kvStatesById = new HashMap<Integer, String>();
        int numRegisteredKvStates = 0;
        this.stateTables.clear();
        for (KeyGroupsStateHandle keyGroupsHandle : state) {
            if (keyGroupsHandle == null) continue;
            FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream();
            this.cancelStreamRegistry.registerClosable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read((DataInputView)inView);
                List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList = serializationProxy.getNamedStateSerializationProxies();
                for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
                    StateTable<K, ?, ?> stateTable = this.stateTables.get(metaInfoSerializationProxy.getStateName());
                    if (null != stateTable) continue;
                    RegisteredBackendStateMetaInfo registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo(metaInfoSerializationProxy);
                    stateTable = this.newStateTable(registeredBackendStateMetaInfo);
                    this.stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
                    kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
                    ++numRegisteredKvStates;
                }
                for (Tuple2 groupOffset : keyGroupsHandle.getGroupRangeOffsets()) {
                    int keyGroupIndex = (Integer)groupOffset.f0;
                    long offset = (Long)groupOffset.f1;
                    fsDataInputStream.seek(offset);
                    int writtenKeyGroupIndex = inView.readInt();
                    Preconditions.checkState((writtenKeyGroupIndex == keyGroupIndex ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
                    for (int i = 0; i < metaInfoList.size(); ++i) {
                        short kvStateId = inView.readShort();
                        StateTable<K, ?, ?> stateTable = this.stateTables.get(kvStatesById.get(kvStateId));
                        StateTableByKeyGroupReader keyGroupReader = StateTableByKeyGroupReaders.readerForVersion(stateTable, 2);
                        keyGroupReader.readMappingsInKeyGroup((DataInputView)inView, keyGroupIndex);
                    }
                }
            }
            finally {
                this.cancelStreamRegistry.unregisterClosable((Closeable)fsDataInputStream);
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @VisibleForTesting
    public int numStateEntries() {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> stateTable : this.stateTables.values()) {
            sum += stateTable.sizeOfNamespace(namespace);
        }
        return sum;
    }

    private <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
        return new CopyOnWriteStateTable(this, newMetaInfo);
    }
}

