package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.heap.AbstractHeapState;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.shaded.com.google.common.base.Joiner;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase.class */
public abstract class StateBackendTestBase<B extends AbstractStateBackend> extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingFold.class */
    private static class AppendingFold implements FoldFunction<Integer, String> {
        private static final long serialVersionUID = 1;

        private AppendingFold() {
        }

        public String fold(String str, Integer num) throws Exception {
            return str + "," + num;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateBackendTestBase$AppendingReduce.class */
    private static class AppendingReduce implements ReduceFunction<String> {
        private AppendingReduce() {
        }

        public String reduce(String str, String str2) throws Exception {
            return str + "," + str2;
        }
    }

    protected abstract B getStateBackend() throws Exception;

    protected CheckpointStreamFactory createStreamFactory() throws Exception {
        return getStateBackend().createStreamFactory(new JobID(), "test_op");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer) throws Exception {
        return createKeyedBackend(typeSerializer, new DummyEnvironment("test", 1, 0));
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, Environment environment) throws Exception {
        return createKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), environment);
    }

    protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, Environment environment) throws Exception {
        return getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry());
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyGroupsStateHandle keyGroupsStateHandle) throws Exception {
        return restoreKeyedBackend(typeSerializer, keyGroupsStateHandle, new DummyEnvironment("test", 1, 0));
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, KeyGroupsStateHandle keyGroupsStateHandle, Environment environment) throws Exception {
        return restoreKeyedBackend(typeSerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(keyGroupsStateHandle), environment);
    }

    protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, List<KeyGroupsStateHandle> list, Environment environment) throws Exception {
        AbstractKeyedStateBackend<K> createKeyedStateBackend = getStateBackend().createKeyedStateBackend(environment, new JobID(), "test_op", typeSerializer, i, keyGroupRange, environment.getTaskKvStateRegistry());
        if (null != list) {
            createKeyedStateBackend.restore(list);
        }
        return createKeyedStateBackend;
    }

    @Test
    public void testValueState() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        IntSerializer intSerializer = IntSerializer.INSTANCE;
        VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
        TypeSerializer serializer = valueStateDescriptor.getSerializer();
        KvState kvState = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        KvState kvState2 = kvState;
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(kvState.value());
        Assert.assertNull(getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        kvState.update("1");
        createKeyedBackend.setCurrentKey(2);
        Assert.assertNull(kvState.value());
        Assert.assertNull(getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        kvState.update("2");
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", kvState.value());
        Assert.assertEquals("1", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
        createKeyedBackend.setCurrentKey(1);
        kvState.update("u1");
        createKeyedBackend.setCurrentKey(2);
        kvState.update("u2");
        createKeyedBackend.setCurrentKey(3);
        kvState.update("u3");
        KeyGroupsStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory));
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("u1", kvState.value());
        Assert.assertEquals("u1", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("u2", kvState.value());
        Assert.assertEquals("u2", getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.setCurrentKey(3);
        Assert.assertEquals("u3", kvState.value());
        Assert.assertEquals("u3", getSerializedValue(kvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        KvState kvState3 = (ValueState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        KvState kvState4 = kvState3;
        restoreKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("1", kvState3.value());
        Assert.assertEquals("1", getSerializedValue(kvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.setCurrentKey(2);
        Assert.assertEquals("2", kvState3.value());
        Assert.assertEquals("2", getSerializedValue(kvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
        runSnapshot2.discardState();
        KvState kvState5 = (ValueState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        KvState kvState6 = kvState5;
        restoreKeyedBackend2.setCurrentKey(1);
        Assert.assertEquals("u1", kvState5.value());
        Assert.assertEquals("u1", getSerializedValue(kvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(2);
        Assert.assertEquals("u2", kvState5.value());
        Assert.assertEquals("u2", getSerializedValue(kvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.setCurrentKey(3);
        Assert.assertEquals("u3", kvState5.value());
        Assert.assertEquals("u3", getSerializedValue(kvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testValueStateRace() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        final int i = 1;
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        final IntSerializer intSerializer = IntSerializer.INSTANCE;
        final IntSerializer intSerializer2 = IntSerializer.INSTANCE;
        final TypeSerializer serializer = valueStateDescriptor.getSerializer();
        final KvState kvState = (ValueState) createKeyedBackend.getPartitionedState(1, IntSerializer.INSTANCE, valueStateDescriptor);
        final KvState kvState2 = kvState;
        createKeyedBackend.setCurrentKey(1);
        kvState2.setCurrentNamespace(2);
        kvState.update("2");
        Assert.assertEquals("2", kvState.value());
        Assert.assertNull(getSerializedValue(kvState2, 3, intSerializer, 1, IntSerializer.INSTANCE, serializer));
        Assert.assertEquals("2", kvState.value());
        kvState2.setCurrentNamespace(1);
        createKeyedBackend.setCurrentKey(10);
        Assert.assertNull(kvState.value());
        Assert.assertNull(getSerializedValue(kvState2, 10, intSerializer, 1, intSerializer2, serializer));
        kvState.update("1");
        final CheckedThread checkedThread = new CheckedThread("State getter") { // from class: org.apache.flink.runtime.state.StateBackendTestBase.1
            public void go() throws Exception {
                while (!isInterrupted()) {
                    Assert.assertEquals("1", kvState.value());
                }
            }
        };
        final CheckedThread checkedThread2 = new CheckedThread("Serialized state getter") { // from class: org.apache.flink.runtime.state.StateBackendTestBase.2
            public void go() throws Exception {
                while (!isInterrupted() && checkedThread.isAlive()) {
                    Assert.assertEquals("1", (String) StateBackendTestBase.getSerializedValue(kvState2, 10, intSerializer, i, intSerializer2, serializer));
                }
            }
        };
        checkedThread.start();
        checkedThread2.start();
        Timer timer = new Timer("stopper");
        timer.schedule(new TimerTask() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.3
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                checkedThread.interrupt();
                checkedThread2.interrupt();
                cancel();
            }
        }, 100L);
        try {
            checkedThread2.sync();
            checkedThread.interrupt();
            checkedThread.sync();
            timer.cancel();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMultipleValueStates() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("a-string", StringSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor2 = new ValueStateDescriptor("an-integer", IntSerializer.INSTANCE);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        valueStateDescriptor2.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ValueState partitionedState2 = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor2);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.value());
        Assert.assertNull(partitionedState2.value());
        partitionedState.update("1");
        Assert.assertEquals("1", partitionedState.value());
        Assert.assertNull(partitionedState2.value());
        partitionedState2.update(13);
        Assert.assertEquals("1", partitionedState.value());
        Assert.assertEquals(13L, ((Integer) partitionedState2.value()).intValue());
        KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), Collections.singletonList(runSnapshot), new DummyEnvironment("test_op", 1, 0));
        runSnapshot.discardState();
        restoreKeyedBackend.setCurrentKey(1);
        ValueState partitionedState3 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ValueState partitionedState4 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor2);
        Assert.assertEquals("1", partitionedState3.value());
        Assert.assertEquals(13L, ((Integer) partitionedState4.value()).intValue());
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testValueStateNullUpdate() throws Exception {
        try {
            LongSerializer.INSTANCE.serialize((Long) null, new DataOutputViewStreamWrapper(new ByteArrayOutputStream()));
            Assert.fail("Should fail with NullPointerException");
        } catch (NullPointerException e) {
        }
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", LongSerializer.INSTANCE, 42L);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(1L);
        Assert.assertEquals(1L, ((Long) partitionedState.value()).longValue());
        createKeyedBackend.setCurrentKey(2);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        createKeyedBackend.setCurrentKey(1);
        partitionedState.clear();
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        partitionedState.update(17L);
        Assert.assertEquals(17L, ((Long) partitionedState.value()).longValue());
        partitionedState.update((Object) null);
        Assert.assertEquals(42L, ((Long) partitionedState.value()).longValue());
        KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
        runSnapshot.discardState();
        restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testListState() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("id", String.class);
            listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            IntSerializer intSerializer = IntSerializer.INSTANCE;
            VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
            TypeSerializer serializer = listStateDescriptor.getSerializer();
            KvState kvState = (ListState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            KvState kvState2 = kvState;
            Joiner on = Joiner.on(",");
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertEquals((Object) null, getSerializedList(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add("1");
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertEquals((Object) null, getSerializedList(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add("2");
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) kvState.get()));
            Assert.assertEquals("1", on.join(getSerializedList(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            kvState.add("u1");
            createKeyedBackend.setCurrentKey(2);
            kvState.add("u2");
            createKeyedBackend.setCurrentKey(3);
            kvState.add("u3");
            KeyGroupsStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) kvState.get()));
            Assert.assertEquals("1,u1", on.join(getSerializedList(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) kvState.get()));
            Assert.assertEquals("2,u2", on.join(getSerializedList(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            createKeyedBackend.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) kvState.get()));
            Assert.assertEquals("u3", on.join(getSerializedList(kvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            KvState kvState3 = (ListState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            KvState kvState4 = kvState3;
            restoreKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1", on.join((Iterable) kvState3.get()));
            Assert.assertEquals("1", on.join(getSerializedList(kvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            restoreKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("2", on.join((Iterable) kvState3.get()));
            Assert.assertEquals("2", on.join(getSerializedList(kvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            restoreKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
            runSnapshot2.discardState();
            KvState kvState5 = (ListState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            KvState kvState6 = kvState5;
            restoreKeyedBackend2.setCurrentKey(1);
            Assert.assertEquals("1,u1", on.join((Iterable) kvState5.get()));
            Assert.assertEquals("1,u1", on.join(getSerializedList(kvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            restoreKeyedBackend2.setCurrentKey(2);
            Assert.assertEquals("2,u2", on.join((Iterable) kvState5.get()));
            Assert.assertEquals("2,u2", on.join(getSerializedList(kvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            restoreKeyedBackend2.setCurrentKey(3);
            Assert.assertEquals("u3", on.join((Iterable) kvState5.get()));
            Assert.assertEquals("u3", on.join(getSerializedList(kvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer)));
            restoreKeyedBackend2.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testReducingState() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("id", new AppendingReduce(), String.class);
            reducingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            IntSerializer intSerializer = IntSerializer.INSTANCE;
            VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
            TypeSerializer serializer = reducingStateDescriptor.getSerializer();
            KvState kvState = (ReducingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
            KvState kvState2 = kvState;
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertNull(getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add("1");
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertNull(getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add("2");
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1", kvState.get());
            Assert.assertEquals("1", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            kvState.add("u1");
            createKeyedBackend.setCurrentKey(2);
            kvState.add("u2");
            createKeyedBackend.setCurrentKey(3);
            kvState.add("u3");
            KeyGroupsStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1,u1", kvState.get());
            Assert.assertEquals("1,u1", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("2,u2", kvState.get());
            Assert.assertEquals("2,u2", getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.setCurrentKey(3);
            Assert.assertEquals("u3", kvState.get());
            Assert.assertEquals("u3", getSerializedValue(kvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            KvState kvState3 = (ReducingState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
            KvState kvState4 = kvState3;
            restoreKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("1", kvState3.get());
            Assert.assertEquals("1", getSerializedValue(kvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("2", kvState3.get());
            Assert.assertEquals("2", getSerializedValue(kvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
            runSnapshot2.discardState();
            KvState kvState5 = (ReducingState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
            KvState kvState6 = kvState5;
            restoreKeyedBackend2.setCurrentKey(1);
            Assert.assertEquals("1,u1", kvState5.get());
            Assert.assertEquals("1,u1", getSerializedValue(kvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.setCurrentKey(2);
            Assert.assertEquals("2,u2", kvState5.get());
            Assert.assertEquals("2,u2", getSerializedValue(kvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.setCurrentKey(3);
            Assert.assertEquals("u3", kvState5.get());
            Assert.assertEquals("u3", getSerializedValue(kvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFoldingState() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("id", "Fold-Initial:", new AppendingFold(), String.class);
            foldingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            IntSerializer intSerializer = IntSerializer.INSTANCE;
            VoidNamespaceSerializer voidNamespaceSerializer = VoidNamespaceSerializer.INSTANCE;
            TypeSerializer serializer = foldingStateDescriptor.getSerializer();
            KvState kvState = (FoldingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
            KvState kvState2 = kvState;
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertEquals((Object) null, getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add(1);
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals((Object) null, kvState.get());
            Assert.assertEquals((Object) null, getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            kvState.add(2);
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", kvState.get());
            Assert.assertEquals("Fold-Initial:,1", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            kvState.clear();
            kvState.add(101);
            createKeyedBackend.setCurrentKey(2);
            kvState.add(102);
            createKeyedBackend.setCurrentKey(3);
            kvState.add(103);
            KeyGroupsStateHandle runSnapshot2 = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory));
            createKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", kvState.get());
            Assert.assertEquals("Fold-Initial:,101", getSerializedValue(kvState2, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", kvState.get());
            Assert.assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState2, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", kvState.get());
            Assert.assertEquals("Fold-Initial:,103", getSerializedValue(kvState2, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            KvState kvState3 = (FoldingState) restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
            KvState kvState4 = kvState3;
            restoreKeyedBackend.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,1", kvState3.get());
            Assert.assertEquals("Fold-Initial:,1", getSerializedValue(kvState4, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2", kvState3.get());
            Assert.assertEquals("Fold-Initial:,2", getSerializedValue(kvState4, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot2);
            runSnapshot.discardState();
            KvState kvState5 = (FoldingState) restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
            KvState kvState6 = kvState5;
            restoreKeyedBackend2.setCurrentKey(1);
            Assert.assertEquals("Fold-Initial:,101", kvState5.get());
            Assert.assertEquals("Fold-Initial:,101", getSerializedValue(kvState6, 1, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.setCurrentKey(2);
            Assert.assertEquals("Fold-Initial:,2,102", kvState5.get());
            Assert.assertEquals("Fold-Initial:,2,102", getSerializedValue(kvState6, 2, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.setCurrentKey(3);
            Assert.assertEquals("Fold-Initial:,103", kvState5.get());
            Assert.assertEquals("Fold-Initial:,103", getSerializedValue(kvState6, 3, intSerializer, VoidNamespace.INSTANCE, voidNamespaceSerializer, serializer));
            restoreKeyedBackend2.dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testValueStateNullAsDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, (Object) null);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals((Object) null, partitionedState.value());
        partitionedState.update("Ciao");
        Assert.assertEquals("Ciao", partitionedState.value());
        partitionedState.clear();
        Assert.assertEquals((Object) null, partitionedState.value());
        createKeyedBackend.dispose();
    }

    @Test
    public void testValueStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class, "Hello");
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertEquals("Hello", partitionedState.value());
        partitionedState.update("Ciao");
        Assert.assertEquals("Ciao", partitionedState.value());
        partitionedState.clear();
        Assert.assertEquals("Hello", partitionedState.value());
        createKeyedBackend.dispose();
    }

    @Test
    public void testReducingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("id", new AppendingReduce(), String.class);
        reducingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ReducingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.add("Ciao");
        Assert.assertEquals("Ciao", partitionedState.get());
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testFoldingStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("id", "Fold-Initial:", new AppendingFold(), String.class);
        foldingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        FoldingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.add(1);
        partitionedState.add(2);
        Assert.assertEquals("Fold-Initial:,1,2", partitionedState.get());
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testListStateDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("id", String.class);
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        Assert.assertNull(partitionedState.get());
        partitionedState.add("Ciao");
        partitionedState.add("Bello");
        Assert.assertThat(partitionedState.get(), Matchers.containsInAnyOrder(new String[]{"Ciao", "Bello"}));
        partitionedState.clear();
        Assert.assertNull(partitionedState.get());
        createKeyedBackend.dispose();
    }

    @Test
    public void testKeyGroupSnapshotRestore() throws Exception {
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 9), new DummyEnvironment("test", 1, 0));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        int i = 42;
        Random random = new Random(0L);
        int assignKeyToParallelOperator = KeyGroupRangeAssignment.assignKeyToParallelOperator(17, 10, 2);
        int assignKeyToParallelOperator2 = KeyGroupRangeAssignment.assignKeyToParallelOperator(17, 10, 2);
        while (assignKeyToParallelOperator == assignKeyToParallelOperator2) {
            i = random.nextInt();
            assignKeyToParallelOperator2 = KeyGroupRangeAssignment.assignKeyToParallelOperator(Integer.valueOf(i), 10, 2);
        }
        createKeyedBackend.setCurrentKey(17);
        partitionedState.update("ShouldBeInFirstHalf");
        createKeyedBackend.setCurrentKey(Integer.valueOf(i));
        partitionedState.update("ShouldBeInSecondHalf");
        KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(0L, 0L, createStreamFactory));
        List<KeyGroupsStateHandle> keyGroupsStateHandles = StateAssignmentOperation.getKeyGroupsStateHandles(Collections.singletonList(runSnapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(10, 2, 0));
        List<KeyGroupsStateHandle> keyGroupsStateHandles2 = StateAssignmentOperation.getKeyGroupsStateHandles(Collections.singletonList(runSnapshot), KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(10, 2, 1));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, 10, new KeyGroupRange(0, 4), keyGroupsStateHandles, new DummyEnvironment("test", 1, 0));
        AbstractKeyedStateBackend<K> restoreKeyedBackend2 = restoreKeyedBackend(IntSerializer.INSTANCE, 10, new KeyGroupRange(5, 9), keyGroupsStateHandles2, new DummyEnvironment("test", 1, 0));
        ValueState partitionedState2 = restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        restoreKeyedBackend.setCurrentKey(17);
        Assert.assertTrue(((String) partitionedState2.value()).equals("ShouldBeInFirstHalf"));
        restoreKeyedBackend.setCurrentKey(Integer.valueOf(i));
        Assert.assertTrue(partitionedState2.value() == null);
        ValueState partitionedState3 = restoreKeyedBackend2.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        restoreKeyedBackend2.setCurrentKey(17);
        Assert.assertTrue(partitionedState3.value() == null);
        restoreKeyedBackend2.setCurrentKey(Integer.valueOf(i));
        Assert.assertTrue(((String) partitionedState3.value()).equals("ShouldBeInSecondHalf"));
        restoreKeyedBackend.dispose();
        restoreKeyedBackend2.dispose();
    }

    @Test
    public void testValueStateRestoreWithWrongSerializers() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", String.class);
            valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
            ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            createKeyedBackend.setCurrentKey(1);
            partitionedState.update("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.update("2");
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                try {
                    restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("id", FloatSerializer.INSTANCE)).value();
                    Assert.fail("should recognize wrong serializers");
                } catch (Exception e) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (IOException e2) {
                if (!e2.getMessage().contains("Trying to access state using wrong")) {
                    Assert.fail("wrong exception " + e2);
                }
            }
            restoreKeyedBackend.dispose();
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testListStateRestoreWithWrongSerializers() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            ListState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("id", String.class));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.add("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.add("2");
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                try {
                    restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("id", FloatSerializer.INSTANCE)).get();
                    Assert.fail("should recognize wrong serializers");
                } catch (Exception e) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (IOException e2) {
                if (!e2.getMessage().contains("Trying to access state using wrong")) {
                    Assert.fail("wrong exception " + e2);
                }
            }
            restoreKeyedBackend.dispose();
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testReducingStateRestoreWithWrongSerializers() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            ReducingState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), StringSerializer.INSTANCE));
            createKeyedBackend.setCurrentKey(1);
            partitionedState.add("1");
            createKeyedBackend.setCurrentKey(2);
            partitionedState.add("2");
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462378L, 2L, createStreamFactory));
            createKeyedBackend.dispose();
            AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot);
            runSnapshot.discardState();
            try {
                try {
                    restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, new ReducingStateDescriptor("id", new AppendingReduce(), FloatSerializer.INSTANCE)).get();
                    Assert.fail("should recognize wrong serializers");
                } catch (Exception e) {
                    Assert.fail("wrong exception " + e);
                }
            } catch (IOException e2) {
                if (!e2.getMessage().contains("Trying to access state using wrong ")) {
                    Assert.fail("wrong exception " + e2);
                }
            }
            restoreKeyedBackend.dispose();
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testCopyDefaultValue() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", IntValue.class, new IntValue(-1));
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        ValueState partitionedState = createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        createKeyedBackend.setCurrentKey(1);
        IntValue intValue = (IntValue) partitionedState.value();
        createKeyedBackend.setCurrentKey(2);
        IntValue intValue2 = (IntValue) partitionedState.value();
        Assert.assertNotNull(intValue);
        Assert.assertNotNull(intValue2);
        Assert.assertEquals(intValue, intValue2);
        Assert.assertFalse(intValue == intValue2);
        createKeyedBackend.dispose();
    }

    @Test
    public void testRequireNonNullNamespace() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("id", IntValue.class, new IntValue(-1));
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        try {
            createKeyedBackend.getPartitionedState((Object) null, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e) {
        }
        try {
            createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, (TypeSerializer) null, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e2) {
        }
        try {
            createKeyedBackend.getPartitionedState((Object) null, (TypeSerializer) null, valueStateDescriptor);
            Assert.fail("Did not throw expected NullPointerException");
        } catch (NullPointerException e3) {
        }
        createKeyedBackend.dispose();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void testConcurrentMapIfQueryable() throws Exception {
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new DummyEnvironment("test_op", 1, 0));
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("value-state", Integer.class, -1);
        valueStateDescriptor.setQueryable("my-query");
        valueStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AbstractHeapState abstractHeapState = (ValueState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        AbstractHeapState abstractHeapState2 = (KvState) abstractHeapState;
        Assert.assertTrue(abstractHeapState2 instanceof AbstractHeapState);
        abstractHeapState2.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState.update(121818273);
        int assignToKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(1, 1);
        StateTable stateTable = abstractHeapState2.getStateTable();
        Assert.assertNotNull("State not set", stateTable.get(assignToKeyGroup));
        Assert.assertTrue(stateTable.get(assignToKeyGroup) instanceof ConcurrentHashMap);
        Assert.assertTrue(stateTable.get(assignToKeyGroup).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("list-state", Integer.class);
        listStateDescriptor.setQueryable("my-query");
        listStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AbstractHeapState abstractHeapState3 = (ListState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
        AbstractHeapState abstractHeapState4 = (KvState) abstractHeapState3;
        Assert.assertTrue(abstractHeapState4 instanceof AbstractHeapState);
        abstractHeapState4.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState3.add(121818273);
        int assignToKeyGroup2 = KeyGroupRangeAssignment.assignToKeyGroup(1, 1);
        StateTable stateTable2 = abstractHeapState4.getStateTable();
        Assert.assertNotNull("State not set", stateTable2.get(assignToKeyGroup2));
        Assert.assertTrue(stateTable2.get(assignToKeyGroup2) instanceof ConcurrentHashMap);
        Assert.assertTrue(stateTable2.get(assignToKeyGroup2).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("reducing-state", new ReduceFunction<Integer>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.4
            public Integer reduce(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, Integer.class);
        reducingStateDescriptor.setQueryable("my-query");
        reducingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AbstractHeapState abstractHeapState5 = (ReducingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
        AbstractHeapState abstractHeapState6 = (KvState) abstractHeapState5;
        Assert.assertTrue(abstractHeapState6 instanceof AbstractHeapState);
        abstractHeapState6.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState5.add(121818273);
        int assignToKeyGroup3 = KeyGroupRangeAssignment.assignToKeyGroup(1, 1);
        StateTable stateTable3 = abstractHeapState6.getStateTable();
        Assert.assertNotNull("State not set", stateTable3.get(assignToKeyGroup3));
        Assert.assertTrue(stateTable3.get(assignToKeyGroup3) instanceof ConcurrentHashMap);
        Assert.assertTrue(stateTable3.get(assignToKeyGroup3).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("folding-state", 0, new FoldFunction<Integer, Integer>() { // from class: org.apache.flink.runtime.state.StateBackendTestBase.5
            public Integer fold(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, Integer.class);
        foldingStateDescriptor.setQueryable("my-query");
        foldingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        AbstractHeapState abstractHeapState7 = (FoldingState) createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, foldingStateDescriptor);
        AbstractHeapState abstractHeapState8 = (KvState) abstractHeapState7;
        Assert.assertTrue(abstractHeapState8 instanceof AbstractHeapState);
        abstractHeapState8.setCurrentNamespace(VoidNamespace.INSTANCE);
        createKeyedBackend.setCurrentKey(1);
        abstractHeapState7.add(121818273);
        int assignToKeyGroup4 = KeyGroupRangeAssignment.assignToKeyGroup(1, 1);
        StateTable stateTable4 = abstractHeapState8.getStateTable();
        Assert.assertNotNull("State not set", stateTable4.get(assignToKeyGroup4));
        Assert.assertTrue(stateTable4.get(assignToKeyGroup4) instanceof ConcurrentHashMap);
        Assert.assertTrue(stateTable4.get(assignToKeyGroup4).get(VoidNamespace.INSTANCE) instanceof ConcurrentHashMap);
        createKeyedBackend.dispose();
    }

    @Test
    public void testQueryableStateRegistration() throws Exception {
        DummyEnvironment dummyEnvironment = new DummyEnvironment("test", 1, 0);
        KvStateRegistry kvStateRegistry = dummyEnvironment.getKvStateRegistry();
        CheckpointStreamFactory createStreamFactory = createStreamFactory();
        AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE, dummyEnvironment);
        KeyGroupRange keyGroupRange = createKeyedBackend.getKeyGroupRange();
        KvStateRegistryListener kvStateRegistryListener = (KvStateRegistryListener) Mockito.mock(KvStateRegistryListener.class);
        kvStateRegistry.registerListener(kvStateRegistryListener);
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test", IntSerializer.INSTANCE);
        valueStateDescriptor.setQueryable("banana");
        createKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(1))).notifyKvStateRegistered((JobID) org.mockito.Matchers.eq(dummyEnvironment.getJobID()), (JobVertexID) org.mockito.Matchers.eq(dummyEnvironment.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"), (KvStateID) org.mockito.Matchers.any(KvStateID.class));
        KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462379L, 4L, createStreamFactory));
        createKeyedBackend.dispose();
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(1))).notifyKvStateUnregistered((JobID) org.mockito.Matchers.eq(dummyEnvironment.getJobID()), (JobVertexID) org.mockito.Matchers.eq(dummyEnvironment.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"));
        createKeyedBackend.dispose();
        AbstractKeyedStateBackend<K> restoreKeyedBackend = restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot, dummyEnvironment);
        runSnapshot.discardState();
        restoreKeyedBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
        ((KvStateRegistryListener) Mockito.verify(kvStateRegistryListener, Mockito.times(2))).notifyKvStateRegistered((JobID) org.mockito.Matchers.eq(dummyEnvironment.getJobID()), (JobVertexID) org.mockito.Matchers.eq(dummyEnvironment.getJobVertexId()), (KeyGroupRange) org.mockito.Matchers.eq(keyGroupRange), (String) org.mockito.Matchers.eq("banana"), (KvStateID) org.mockito.Matchers.any(KvStateID.class));
        restoreKeyedBackend.dispose();
    }

    @Test
    public void testEmptyStateCheckpointing() {
        try {
            CheckpointStreamFactory createStreamFactory = createStreamFactory();
            AbstractKeyedStateBackend<K> createKeyedBackend = createKeyedBackend(IntSerializer.INSTANCE);
            new ListStateDescriptor("id", String.class);
            KeyGroupsStateHandle runSnapshot = runSnapshot(createKeyedBackend.snapshot(682375462379L, 1L, createStreamFactory));
            Assert.assertNull(runSnapshot);
            createKeyedBackend.dispose();
            restoreKeyedBackend(IntSerializer.INSTANCE, runSnapshot).dispose();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <V, K, N> V getSerializedValue(KvState<N> kvState, K k, TypeSerializer<K> typeSerializer, N n, TypeSerializer<N> typeSerializer2, TypeSerializer<V> typeSerializer3) throws Exception {
        byte[] serializedValue = kvState.getSerializedValue(KvStateRequestSerializer.serializeKeyAndNamespace(k, typeSerializer, n, typeSerializer2));
        if (serializedValue == null) {
            return null;
        }
        return (V) KvStateRequestSerializer.deserializeValue(serializedValue, typeSerializer3);
    }

    private static <V, K, N> List<V> getSerializedList(KvState<N> kvState, K k, TypeSerializer<K> typeSerializer, N n, TypeSerializer<N> typeSerializer2, TypeSerializer<V> typeSerializer3) throws Exception {
        byte[] serializedValue = kvState.getSerializedValue(KvStateRequestSerializer.serializeKeyAndNamespace(k, typeSerializer, n, typeSerializer2));
        if (serializedValue == null) {
            return null;
        }
        return KvStateRequestSerializer.deserializeList(serializedValue, typeSerializer3);
    }

    private KeyGroupsStateHandle runSnapshot(RunnableFuture<KeyGroupsStateHandle> runnableFuture) throws Exception {
        if (!runnableFuture.isDone()) {
            new Thread(runnableFuture).start();
        }
        return runnableFuture.get();
    }
}
