package org.apache.flink.table.runtime.operators.aggregate;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/HashAggTest.class */
public class HashAggTest {
    private static final int MEMORY_SIZE = 33554432;
    private Map<Integer, Long> outputMap = new HashMap();
    private MemoryManager memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(33554432).build();
    private IOManager ioManager;
    private SumHashAggTestOperator operator;

    @Before
    public void before() throws Exception {
        this.ioManager = new IOManagerAsync();
        this.operator = new SumHashAggTestOperator(1310720L) { // from class: org.apache.flink.table.runtime.operators.aggregate.HashAggTest.1
            @Override // org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator
            Object getOwner() {
                return HashAggTest.this;
            }

            @Override // org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator
            MemoryManager getMemoryManager() {
                return HashAggTest.this.memoryManager;
            }

            @Override // org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator
            Collector<StreamRecord<RowData>> getOutput() {
                return new Collector<StreamRecord<RowData>>() { // from class: org.apache.flink.table.runtime.operators.aggregate.HashAggTest.1.1
                    public void collect(StreamRecord<RowData> streamRecord) {
                        RowData rowData = (RowData) streamRecord.getValue();
                        HashAggTest.this.outputMap.put(rowData.isNullAt(0) ? null : Integer.valueOf(rowData.getInt(0)), rowData.isNullAt(1) ? null : Long.valueOf(rowData.getLong(1)));
                    }

                    public void close() {
                    }
                };
            }

            @Override // org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator
            Configuration getConf() {
                return new Configuration();
            }

            @Override // org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator
            public IOManager getIOManager() {
                return HashAggTest.this.ioManager;
            }
        };
        this.operator.open();
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert) Assertions.assertThat(this.memoryManager.verifyEmpty()).as("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    private void addRow(RowData rowData) throws Exception {
        this.operator.processElement(new StreamRecord<>(rowData));
    }

    @Test
    public void testNormal() throws Exception {
        addRow(GenericRowData.of(new Object[]{1, 1L}));
        addRow(GenericRowData.of(new Object[]{5, 2L}));
        addRow(GenericRowData.of(new Object[]{2, 3L}));
        addRow(GenericRowData.of(new Object[]{2, null}));
        addRow(GenericRowData.of(new Object[]{1, 4L}));
        addRow(GenericRowData.of(new Object[]{4, 5L}));
        addRow(GenericRowData.of(new Object[]{1, 6L}));
        addRow(GenericRowData.of(new Object[]{1, null}));
        addRow(GenericRowData.of(new Object[]{2, 8L}));
        addRow(GenericRowData.of(new Object[]{5, 9L}));
        addRow(GenericRowData.of(new Object[]{10, null}));
        addRow(GenericRowData.of(new Object[]{null, 5L}));
        this.operator.endInput();
        this.operator.close();
        HashMap hashMap = new HashMap();
        hashMap.put(null, 5L);
        hashMap.put(1, 11L);
        hashMap.put(2, 11L);
        hashMap.put(4, 5L);
        hashMap.put(5, 11L);
        hashMap.put(10, null);
        Assertions.assertThat(this.outputMap).isEqualTo(hashMap);
    }

    @Test
    public void testSpill() throws Exception {
        for (int i = 0; i < 30000; i++) {
            addRow(GenericRowData.of(new Object[]{Integer.valueOf(i), Long.valueOf(i)}));
            addRow(GenericRowData.of(new Object[]{Integer.valueOf(i + 1), Long.valueOf(i)}));
        }
        addRow(GenericRowData.of(new Object[]{1, null}));
        addRow(GenericRowData.of(new Object[]{null, 5L}));
        this.operator.endInput();
        this.operator.close();
        Assertions.assertThat(this.outputMap).hasSize(30002);
    }
}
