/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorBackendSerializationProxy;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.util.Preconditions;

@Internal
public class DefaultOperatorStateBackend
implements OperatorStateBackend {
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredStates;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer;
    private final ClassLoader userClassloader;

    public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
        this.userClassloader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
        this.javaSerializer = new JavaSerializer();
        this.registeredStates = new HashMap();
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredStates.keySet();
    }

    @Override
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        this.registeredStates.clear();
    }

    public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
        return this.getOperatorState(new ListStateDescriptor(stateName, this.javaSerializer));
    }

    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws IOException {
        return this.getOperatorState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <T extends Serializable> ListState<T> getBroadcastSerializableListState(String stateName) throws Exception {
        return this.getBroadcastOperatorState(new ListStateDescriptor(stateName, this.javaSerializer));
    }

    public <S> ListState<S> getBroadcastOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getOperatorState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST);
    }

    private <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws IOException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        TypeSerializer partitionStateSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getSerializer());
        PartitionableListState<Object> partitionableListState = this.registeredStates.get(name);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState(name, partitionStateSerializer, mode);
            this.registeredStates.put(name, partitionableListState);
        } else {
            Preconditions.checkState((boolean)partitionableListState.getAssignmentMode().equals((Object)mode), (Object)("Incompatible assignment mode. Provided: " + (Object)((Object)mode) + ", expected: " + (Object)((Object)partitionableListState.getAssignmentMode())));
            Preconditions.checkState((boolean)partitionableListState.getPartitionStateSerializer().isCompatibleWith(stateDescriptor.getSerializer()), (Object)("Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() + ", found: " + partitionableListState.getPartitionStateSerializer()));
        }
        return partitionableListState;
    }

    private static <S> void deserializeStateValues(PartitionableListState<S> stateListForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws IOException {
        long[] offsets;
        if (null != metaInfo && null != (offsets = metaInfo.getOffsets())) {
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
            for (long offset : offsets) {
                in.seek(offset);
                stateListForName.add(serializer.deserialize((DataInputView)div));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RunnableFuture<OperatorStateHandle> snapshot(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
        if (this.registeredStates.isEmpty()) {
            return DoneFuture.nullValue();
        }
        ArrayList metaInfoList = new ArrayList(this.registeredStates.size());
        for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
            PartitionableListState<?> state = entry.getValue();
            OperatorBackendSerializationProxy.StateMetaInfo metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo(state.getName(), state.getPartitionStateSerializer(), state.getAssignmentMode());
            metaInfoList.add(metaInfo);
        }
        HashMap<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<String, OperatorStateHandle.StateMetaInfo>(this.registeredStates.size());
        CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
        try {
            this.closeStreamOnCancelRegistry.registerClosable((Closeable)((Object)out));
            DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)((Object)out));
            OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(metaInfoList);
            backendSerializationProxy.write((DataOutputView)dov);
            dov.writeInt(this.registeredStates.size());
            for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
                PartitionableListState<?> value = entry.getValue();
                long[] partitionOffsets = value.write(out);
                OperatorStateHandle.Mode mode = value.getAssignmentMode();
                writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
            }
            OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
            DoneFuture<OperatorStateHandle> doneFuture = new DoneFuture<OperatorStateHandle>(handle);
            return doneFuture;
        }
        finally {
            this.closeStreamOnCancelRegistry.unregisterClosable((Closeable)((Object)out));
            out.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception {
        if (null == restoreSnapshots) {
            return;
        }
        for (OperatorStateHandle stateHandle : restoreSnapshots) {
            if (stateHandle == null) continue;
            FSDataInputStream in = stateHandle.openInputStream();
            this.closeStreamOnCancelRegistry.registerClosable((Closeable)in);
            ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(this.userClassloader);
                OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                backendSerializationProxy.read((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
                List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = backendSerializationProxy.getNamedStateSerializationProxies();
                for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
                    PartitionableListState<?> listState = this.registeredStates.get(stateMetaInfo.getName());
                    if (null == listState) {
                        listState = new PartitionableListState(stateMetaInfo.getName(), stateMetaInfo.getStateSerializer(), stateMetaInfo.getMode());
                        this.registeredStates.put(listState.getName(), listState);
                        continue;
                    }
                    Preconditions.checkState((boolean)listState.getPartitionStateSerializer().isCompatibleWith(stateMetaInfo.getStateSerializer()), (Object)("Incompatible state serializers found: " + listState.getPartitionStateSerializer() + " is not compatible with " + stateMetaInfo.getStateSerializer()));
                }
                for (Map.Entry entry : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
                    PartitionableListState<?> stateListForName = this.registeredStates.get(entry.getKey());
                    Preconditions.checkState((null != stateListForName ? 1 : 0) != 0, (Object)("Found state without corresponding meta info: " + (String)entry.getKey()));
                    DefaultOperatorStateBackend.deserializeStateValues(stateListForName, in, (OperatorStateHandle.StateMetaInfo)entry.getValue());
                }
            }
            finally {
                Thread.currentThread().setContextClassLoader(restoreClassLoader);
                this.closeStreamOnCancelRegistry.unregisterClosable((Closeable)in);
                IOUtils.closeQuietly((InputStream)in);
            }
        }
    }

    static final class PartitionableListState<S>
    implements ListState<S> {
        private final String name;
        private final TypeSerializer<S> partitionStateSerializer;
        private final OperatorStateHandle.Mode assignmentMode;
        private final List<S> internalList;

        public PartitionableListState(String name, TypeSerializer<S> partitionStateSerializer, OperatorStateHandle.Mode assignmentMode) {
            this.name = (String)Preconditions.checkNotNull((Object)name);
            this.partitionStateSerializer = (TypeSerializer)Preconditions.checkNotNull(partitionStateSerializer);
            this.assignmentMode = (OperatorStateHandle.Mode)((Object)Preconditions.checkNotNull((Object)((Object)assignmentMode)));
            this.internalList = new ArrayList<S>();
        }

        public String getName() {
            return this.name;
        }

        public OperatorStateHandle.Mode getAssignmentMode() {
            return this.assignmentMode;
        }

        public TypeSerializer<S> getPartitionStateSerializer() {
            return this.partitionStateSerializer;
        }

        public List<S> getInternalList() {
            return this.internalList;
        }

        public void clear() {
            this.internalList.clear();
        }

        public Iterable<S> get() {
            return this.internalList;
        }

        public void add(S value) {
            this.internalList.add(value);
        }

        public String toString() {
            return "PartitionableListState{name='" + this.name + '\'' + ", assignmentMode=" + (Object)((Object)this.assignmentMode) + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream out) throws IOException {
            long[] partitionOffsets = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
            for (int i = 0; i < this.internalList.size(); ++i) {
                S element = this.internalList.get(i);
                partitionOffsets[i] = out.getPos();
                this.partitionStateSerializer.serialize(element, (DataOutputView)dov);
            }
            return partitionOffsets;
        }
    }
}

