package org.apache.flink.runtime.state.heap;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.class */
public class HeapAggregatingStateTest extends HeapStateBackendTestBase {

    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapAggregatingStateTest$AddingFunction.class */
    private static class AddingFunction implements AggregateFunction<Long, MutableLong, Long> {
        private AddingFunction() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public MutableLong m322createAccumulator() {
            return new MutableLong();
        }

        public void add(Long l, MutableLong mutableLong) {
            mutableLong.value += l.longValue();
        }

        public Long getResult(MutableLong mutableLong) {
            return Long.valueOf(mutableLong.value);
        }

        public MutableLong merge(MutableLong mutableLong, MutableLong mutableLong2) {
            mutableLong.value += mutableLong2.value;
            return mutableLong;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/heap/HeapAggregatingStateTest$MutableLong.class */
    public static final class MutableLong {
        long value;

        private MutableLong() {
        }
    }

    @Test
    public void testAddAndGet() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new AddingFunction(), MutableLong.class);
        aggregatingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        HeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            HeapAggregatingState createAggregatingState = createKeyedBackend.createAggregatingState(VoidNamespaceSerializer.INSTANCE, aggregatingStateDescriptor);
            createAggregatingState.setCurrentNamespace(VoidNamespace.INSTANCE);
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createAggregatingState.get());
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createAggregatingState.get());
            createAggregatingState.add(17L);
            createAggregatingState.add(11L);
            Assert.assertEquals(28L, ((Long) createAggregatingState.get()).longValue());
            createKeyedBackend.setCurrentKey("abc");
            Assert.assertNull(createAggregatingState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertNull(createAggregatingState.get());
            createAggregatingState.add(1L);
            createAggregatingState.add(2L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertEquals(28L, ((Long) createAggregatingState.get()).longValue());
            createAggregatingState.clear();
            Assert.assertNull(createAggregatingState.get());
            createKeyedBackend.setCurrentKey("g");
            createAggregatingState.add(3L);
            createAggregatingState.add(2L);
            createAggregatingState.add(1L);
            createKeyedBackend.setCurrentKey("def");
            Assert.assertNull(createAggregatingState.get());
            createKeyedBackend.setCurrentKey("g");
            Assert.assertEquals(9L, ((Long) createAggregatingState.get()).longValue());
            createAggregatingState.clear();
            Assert.assertTrue(createAggregatingState.stateTable.isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }

    @Test
    public void testMerging() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("my-state", new AddingFunction(), MutableLong.class);
        aggregatingStateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        HeapKeyedStateBackend<String> createKeyedBackend = createKeyedBackend();
        try {
            HeapAggregatingState createAggregatingState = createKeyedBackend.createAggregatingState(IntSerializer.INSTANCE, aggregatingStateDescriptor);
            createKeyedBackend.setCurrentKey("abc");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.add(33L);
            createAggregatingState.add(55L);
            createAggregatingState.setCurrentNamespace(2);
            createAggregatingState.add(22L);
            createAggregatingState.add(11L);
            createAggregatingState.setCurrentNamespace(3);
            createAggregatingState.add(44L);
            createKeyedBackend.setCurrentKey("def");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.add(11L);
            createAggregatingState.add(44L);
            createAggregatingState.setCurrentNamespace(3);
            createAggregatingState.add(22L);
            createAggregatingState.add(55L);
            createAggregatingState.add(33L);
            createKeyedBackend.setCurrentKey("jkl");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.add(11L);
            createAggregatingState.add(22L);
            createAggregatingState.add(33L);
            createAggregatingState.add(44L);
            createAggregatingState.add(55L);
            createKeyedBackend.setCurrentKey("mno");
            createAggregatingState.setCurrentNamespace(3);
            createAggregatingState.add(11L);
            createAggregatingState.add(22L);
            createAggregatingState.add(33L);
            createAggregatingState.add(44L);
            createAggregatingState.add(55L);
            createKeyedBackend.setCurrentKey("abc");
            createAggregatingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createAggregatingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createAggregatingState.get());
            createKeyedBackend.setCurrentKey("def");
            createAggregatingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createAggregatingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createAggregatingState.get());
            createKeyedBackend.setCurrentKey("ghi");
            createAggregatingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createAggregatingState.setCurrentNamespace(1);
            Assert.assertNull(createAggregatingState.get());
            createKeyedBackend.setCurrentKey("jkl");
            createAggregatingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createAggregatingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createAggregatingState.get());
            createKeyedBackend.setCurrentKey("mno");
            createAggregatingState.mergeNamespaces(1, Arrays.asList(2, 3));
            createAggregatingState.setCurrentNamespace(1);
            Assert.assertEquals(165L, createAggregatingState.get());
            createKeyedBackend.setCurrentKey("abc");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.clear();
            createKeyedBackend.setCurrentKey("def");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.clear();
            createKeyedBackend.setCurrentKey("ghi");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.clear();
            createKeyedBackend.setCurrentKey("jkl");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.clear();
            createKeyedBackend.setCurrentKey("mno");
            createAggregatingState.setCurrentNamespace(1);
            createAggregatingState.clear();
            Assert.assertTrue(createAggregatingState.stateTable.isEmpty());
            createKeyedBackend.close();
            createKeyedBackend.dispose();
        } catch (Throwable th) {
            createKeyedBackend.close();
            createKeyedBackend.dispose();
            throw th;
        }
    }
}
