package org.apache.flink.runtime.state.heap;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapReducingStateTest.class */
public class HeapReducingStateTest extends HeapStateBackendTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapReducingStateTest$AddingFunction.class */
    private static class AddingFunction implements ReduceFunction<Long> {
        private AddingFunction() {
        }

        public Long reduce(Long l, Long l2) {
            return Long.valueOf(l.longValue() + l2.longValue());
        }
    }

    @Test
    public void testAddAndGet() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("my-state", new AddingFunction(), Long.class);
        reducingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        HeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            HeapReducingState createReducingState = createKeyedBackend.createReducingState(VoidNamespaceSerializer.INSTANCE, reducingStateDescriptor);
            createReducingState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createReducingState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createReducingState.get());
            createReducingState.add(17L);
            createReducingState.add(11L);
            Assert.assertEquals(28L, ((Long) createReducingState.get()).longValue());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createReducingState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(createReducingState.get());
            createReducingState.add(1L);
            createReducingState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(28L, ((Long) createReducingState.get()).longValue());
            createReducingState.clear();
            Assert.assertNull(createReducingState.get());
            createKeyedBackend.setCurrentKey("g");
            createReducingState.add(3L);
            createReducingState.add(2L);
            createReducingState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createReducingState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(9L, ((Long) createReducingState.get()).longValue());
            createReducingState.clear();
            Assert.assertTrue(createReducingState.stateTable.isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMerging() throws Exception {
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("my-state", new AddingFunction(), Long.class);
        reducingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        HeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            HeapReducingState createReducingState = createKeyedBackend.createReducingState(IntSerializer.INSTANCE, reducingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            createReducingState.setCurrentNamespace(1);
            createReducingState.add(33L);
            createReducingState.add(55L);
            createReducingState.setCurrentNamespace(2);
            createReducingState.add(22L);
            createReducingState.add(11L);
            createReducingState.setCurrentNamespace(3);
            createReducingState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            createReducingState.setCurrentNamespace(1);
            createReducingState.add(11L);
            createReducingState.add(44L);
            createReducingState.setCurrentNamespace(3);
            createReducingState.add(22L);
            createReducingState.add(55L);
            createReducingState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            createReducingState.setCurrentNamespace(1);
            createReducingState.add(11L);
            createReducingState.add(22L);
            createReducingState.add(33L);
            createReducingState.add(44L);
            createReducingState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            createReducingState.setCurrentNamespace(3);
            createReducingState.add(11L);
            createReducingState.add(22L);
            createReducingState.add(33L);
            createReducingState.add(44L);
            createReducingState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            createReducingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createReducingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createReducingState.get());
            createKeyedBackend.setCurrentKey("def");
            createReducingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createReducingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createReducingState.get());
            createKeyedBackend.setCurrentKey("ghi");
            createReducingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createReducingState.setCurrentNamespace(1);
            Assert.assertNull(createReducingState.get());
            createKeyedBackend.setCurrentKey("jkl");
            createReducingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createReducingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createReducingState.get());
            createKeyedBackend.setCurrentKey("mno");
            createReducingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createReducingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createReducingState.get());
            createKeyedBackend.setCurrentKey("abc");
            createReducingState.setCurrentNamespace(1);
            createReducingState.clear();
            createKeyedBackend.setCurrentKey("def");
            createReducingState.setCurrentNamespace(1);
            createReducingState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            createReducingState.setCurrentNamespace(1);
            createReducingState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            createReducingState.setCurrentNamespace(1);
            createReducingState.clear();
            createKeyedBackend.setCurrentKey("mno");
            createReducingState.setCurrentNamespace(1);
            createReducingState.clear();
            Assert.assertTrue(createReducingState.stateTable.isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }
}
