package org.apache.flink.runtime.state;

import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateBackendTest.class */
public class OperatorStateBackendTest {
    AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096);

    static Environment createMockEnvironment() {
        Environment environment = (Environment) Mockito.mock(Environment.class);
        Mockito.when(environment.getUserClassLoader()).thenReturn(Thread.currentThread().getContextClassLoader());
        return environment;
    }

    private DefaultOperatorStateBackend createNewOperatorStateBackend() throws Exception {
        return this.abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-operator");
    }

    @Test
    public void testCreateNew() throws Exception {
        DefaultOperatorStateBackend createNewOperatorStateBackend = createNewOperatorStateBackend();
        Assert.assertNotNull(createNewOperatorStateBackend);
        Assert.assertTrue(createNewOperatorStateBackend.getRegisteredStateNames().isEmpty());
    }

    @Test
    public void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend createNewOperatorStateBackend = createNewOperatorStateBackend();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        ListState operatorState = createNewOperatorStateBackend.getOperatorState(listStateDescriptor);
        Assert.assertNotNull(operatorState);
        Assert.assertEquals(1L, createNewOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertTrue(!((Iterable) operatorState.get()).iterator().hasNext());
        operatorState.add(42);
        operatorState.add(4711);
        Iterator it = ((Iterable) operatorState.get()).iterator();
        Assert.assertEquals(42, it.next());
        Assert.assertEquals(4711, it.next());
        Assert.assertTrue(!it.hasNext());
        ListState operatorState2 = createNewOperatorStateBackend.getOperatorState(listStateDescriptor2);
        Assert.assertNotNull(operatorState2);
        Assert.assertEquals(2L, createNewOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertTrue(!it.hasNext());
        operatorState2.add(7);
        operatorState2.add(13);
        operatorState2.add(23);
        Iterator it2 = ((Iterable) operatorState2.get()).iterator();
        Assert.assertEquals(7, it2.next());
        Assert.assertEquals(13, it2.next());
        Assert.assertEquals(23, it2.next());
        Assert.assertTrue(!it2.hasNext());
        ListState broadcastOperatorState = createNewOperatorStateBackend.getBroadcastOperatorState(listStateDescriptor3);
        Assert.assertNotNull(broadcastOperatorState);
        Assert.assertEquals(3L, createNewOperatorStateBackend.getRegisteredStateNames().size());
        Assert.assertTrue(!it2.hasNext());
        broadcastOperatorState.add(17);
        broadcastOperatorState.add(3);
        broadcastOperatorState.add(123);
        Iterator it3 = ((Iterable) broadcastOperatorState.get()).iterator();
        Assert.assertEquals(17, it3.next());
        Assert.assertEquals(3, it3.next());
        Assert.assertEquals(123, it3.next());
        Assert.assertTrue(!it3.hasNext());
        ListState operatorState3 = createNewOperatorStateBackend.getOperatorState(listStateDescriptor);
        Assert.assertNotNull(operatorState3);
        operatorState3.add(123);
        Iterator it4 = ((Iterable) operatorState3.get()).iterator();
        Assert.assertEquals(42, it4.next());
        Assert.assertEquals(4711, it4.next());
        Assert.assertEquals(123, it4.next());
        Assert.assertTrue(!it4.hasNext());
        Iterator it5 = ((Iterable) operatorState.get()).iterator();
        Assert.assertEquals(42, it5.next());
        Assert.assertEquals(4711, it5.next());
        Assert.assertEquals(123, it5.next());
        Assert.assertTrue(!it5.hasNext());
        Iterator it6 = ((Iterable) operatorState3.get()).iterator();
        Assert.assertEquals(42, it6.next());
        Assert.assertEquals(4711, it6.next());
        Assert.assertEquals(123, it6.next());
        Assert.assertTrue(!it6.hasNext());
        try {
            createNewOperatorStateBackend.getBroadcastOperatorState(listStateDescriptor2);
            Assert.fail("Did not detect changed mode");
        } catch (IllegalStateException e) {
        }
        try {
            createNewOperatorStateBackend.getOperatorState(listStateDescriptor3);
            Assert.fail("Did not detect changed mode");
        } catch (IllegalStateException e2) {
        }
    }

    @Test
    public void testSnapshotRestore() throws Exception {
        DefaultOperatorStateBackend createNewOperatorStateBackend = createNewOperatorStateBackend();
        ListStateDescriptor listStateDescriptor = new ListStateDescriptor("test1", new JavaSerializer());
        ListStateDescriptor listStateDescriptor2 = new ListStateDescriptor("test2", new JavaSerializer());
        ListStateDescriptor listStateDescriptor3 = new ListStateDescriptor("test3", new JavaSerializer());
        ListState operatorState = createNewOperatorStateBackend.getOperatorState(listStateDescriptor);
        ListState operatorState2 = createNewOperatorStateBackend.getOperatorState(listStateDescriptor2);
        ListState broadcastOperatorState = createNewOperatorStateBackend.getBroadcastOperatorState(listStateDescriptor3);
        operatorState.add(42);
        operatorState.add(4711);
        operatorState2.add(7);
        operatorState2.add(13);
        operatorState2.add(23);
        broadcastOperatorState.add(17);
        broadcastOperatorState.add(18);
        broadcastOperatorState.add(19);
        broadcastOperatorState.add(20);
        OperatorStateHandle operatorStateHandle = (OperatorStateHandle) createNewOperatorStateBackend.snapshot(1L, 1L, this.abstractStateBackend.createStreamFactory(new JobID(), "testOperator")).get();
        try {
            createNewOperatorStateBackend.close();
            createNewOperatorStateBackend.dispose();
            DefaultOperatorStateBackend createOperatorStateBackend = this.abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
            createOperatorStateBackend.restore(Collections.singletonList(operatorStateHandle));
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredStateNames().size());
            ListState operatorState3 = createOperatorStateBackend.getOperatorState(listStateDescriptor);
            ListState operatorState4 = createOperatorStateBackend.getOperatorState(listStateDescriptor2);
            ListState broadcastOperatorState2 = createOperatorStateBackend.getBroadcastOperatorState(listStateDescriptor3);
            Assert.assertEquals(3L, createOperatorStateBackend.getRegisteredStateNames().size());
            Iterator it = ((Iterable) operatorState3.get()).iterator();
            Assert.assertEquals(42, it.next());
            Assert.assertEquals(4711, it.next());
            Assert.assertTrue(!it.hasNext());
            Iterator it2 = ((Iterable) operatorState4.get()).iterator();
            Assert.assertEquals(7, it2.next());
            Assert.assertEquals(13, it2.next());
            Assert.assertEquals(23, it2.next());
            Assert.assertTrue(!it2.hasNext());
            Iterator it3 = ((Iterable) broadcastOperatorState2.get()).iterator();
            Assert.assertEquals(17, it3.next());
            Assert.assertEquals(18, it3.next());
            Assert.assertEquals(19, it3.next());
            Assert.assertEquals(20, it3.next());
            Assert.assertTrue(!it3.hasNext());
            createOperatorStateBackend.close();
            createOperatorStateBackend.dispose();
            operatorStateHandle.discardState();
        } catch (Throwable th) {
            operatorStateHandle.discardState();
            throw th;
        }
    }
}
