/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.async;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.heap.async.AbstractHeapMergingState;
import org.apache.flink.runtime.state.heap.async.StateTable;
import org.apache.flink.util.Preconditions;

public class HeapListState<K, N, V>
extends AbstractHeapMergingState<K, N, V, Iterable<V>, ArrayList<V>, ListState<V>, ListStateDescriptor<V>>
implements ListState<V> {
    public HeapListState(ListStateDescriptor<V> stateDesc, StateTable<K, N, ArrayList<V>> stateTable, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer) {
        super(stateDesc, stateTable, keySerializer, namespaceSerializer);
    }

    public Iterable<V> get() {
        return (Iterable)this.stateTable.get(this.currentNamespace);
    }

    public void add(V value) {
        Object namespace = this.currentNamespace;
        if (value == null) {
            this.clear();
            return;
        }
        StateTable map = this.stateTable;
        ArrayList<V> list = (ArrayList<V>)map.get(namespace);
        if (list == null) {
            list = new ArrayList<V>();
            map.put(namespace, list);
        }
        list.add(value);
    }

    @Override
    public byte[] getSerializedValue(K key, N namespace) throws Exception {
        Preconditions.checkState((namespace != null ? 1 : 0) != 0, (Object)"No namespace given.");
        Preconditions.checkState((key != null ? 1 : 0) != 0, (Object)"No key given.");
        ArrayList result = (ArrayList)this.stateTable.get(key, namespace);
        if (result == null) {
            return null;
        }
        TypeSerializer serializer = ((ListStateDescriptor)this.stateDesc).getSerializer();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper((OutputStream)baos);
        for (int i = 0; i < result.size(); ++i) {
            serializer.serialize(result.get(i), (DataOutputView)view);
            if (i >= result.size() - 1) continue;
            view.writeByte(44);
        }
        view.flush();
        return baos.toByteArray();
    }

    @Override
    protected ArrayList<V> mergeState(ArrayList<V> a, ArrayList<V> b) {
        a.addAll(b);
        return a;
    }
}

