/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.async;

import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.heap.async.StateTable;
import org.apache.flink.runtime.state.heap.async.StateTableByKeyGroupReader;

class StateTableByKeyGroupReaders {
    StateTableByKeyGroupReaders() {
    }

    static <K, N, S> StateTableByKeyGroupReader readerForVersion(StateTable<K, N, S> table, int version) {
        switch (version) {
            case 1: {
                return new StateTableByKeyGroupReaderV1<K, N, S>(table);
            }
            case 2: {
                return new StateTableByKeyGroupReaderV2<K, N, S>(table);
            }
        }
        throw new IllegalArgumentException("Unknown version: " + version);
    }

    private static final class StateTableByKeyGroupReaderV2<K, N, S>
    extends AbstractStateTableByKeyGroupReader<K, N, S> {
        StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
            super(stateTable);
        }

        @Override
        public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
            TypeSerializer keySerializer = this.getKeySerializer();
            TypeSerializer namespaceSerializer = this.getNamespaceSerializer();
            TypeSerializer stateSerializer = this.getStateSerializer();
            int numKeys = inView.readInt();
            for (int i = 0; i < numKeys; ++i) {
                Object namespace = namespaceSerializer.deserialize(inView);
                Object key = keySerializer.deserialize(inView);
                Object state = stateSerializer.deserialize(inView);
                this.stateTable.put(key, keyGroupId, namespace, state);
            }
        }
    }

    static final class StateTableByKeyGroupReaderV1<K, N, S>
    extends AbstractStateTableByKeyGroupReader<K, N, S> {
        StateTableByKeyGroupReaderV1(StateTable<K, N, S> stateTable) {
            super(stateTable);
        }

        @Override
        public void readMappingsInKeyGroup(DataInputView inView, int keyGroupId) throws IOException {
            if (inView.readByte() == 0) {
                return;
            }
            TypeSerializer keySerializer = this.getKeySerializer();
            TypeSerializer namespaceSerializer = this.getNamespaceSerializer();
            TypeSerializer stateSerializer = this.getStateSerializer();
            int numNamespaces = inView.readInt();
            for (int k = 0; k < numNamespaces; ++k) {
                Object namespace = namespaceSerializer.deserialize(inView);
                int numEntries = inView.readInt();
                for (int l = 0; l < numEntries; ++l) {
                    Object key = keySerializer.deserialize(inView);
                    Object state = stateSerializer.deserialize(inView);
                    this.stateTable.put(key, keyGroupId, namespace, state);
                }
            }
        }
    }

    static abstract class AbstractStateTableByKeyGroupReader<K, N, S>
    implements StateTableByKeyGroupReader {
        protected final StateTable<K, N, S> stateTable;

        AbstractStateTableByKeyGroupReader(StateTable<K, N, S> stateTable) {
            this.stateTable = stateTable;
        }

        @Override
        public abstract void readMappingsInKeyGroup(DataInputView var1, int var2) throws IOException;

        protected TypeSerializer<K> getKeySerializer() {
            return this.stateTable.keyContext.getKeySerializer();
        }

        protected TypeSerializer<N> getNamespaceSerializer() {
            return this.stateTable.getNamespaceSerializer();
        }

        protected TypeSerializer<S> getStateSerializer() {
            return this.stateTable.getStateSerializer();
        }
    }
}

