package org.apache.flink.table.runtime.operators.aggregate;

import java.io.EOFException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.operators.sort.BufferedKVExternalSorter;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.collections.binary.BytesHashMap;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/aggregate/SumHashAggTestOperator.class */
public class SumHashAggTestOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> {
    private final long memorySize;
    private final LogicalType[] keyTypes = {new IntType()};
    private final LogicalType[] aggBufferTypes = {new IntType(), new BigIntType()};
    private transient BinaryRowData currentKey;
    private transient BinaryRowWriter currentKeyWriter;
    private transient BufferedKVExternalSorter sorter;
    private transient BytesHashMap aggregateMap;
    private transient BinaryRowData emptyAggBuffer;

    public SumHashAggTestOperator(long j) throws Exception {
        this.memorySize = j;
    }

    public void open() throws Exception {
        super.open();
        this.aggregateMap = new BytesHashMap(getOwner(), getMemoryManager(), this.memorySize, this.keyTypes, this.aggBufferTypes);
        this.currentKey = new BinaryRowData(1);
        this.currentKeyWriter = new BinaryRowWriter(this.currentKey);
        this.emptyAggBuffer = new BinaryRowData(1);
        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(this.emptyAggBuffer);
        binaryRowWriter.reset();
        binaryRowWriter.setNullAt(0);
        binaryRowWriter.complete();
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        RowData rowData = (RowData) streamRecord.getValue();
        this.currentKeyWriter.reset();
        if (rowData.isNullAt(0)) {
            this.currentKeyWriter.setNullAt(0);
        } else {
            this.currentKeyWriter.writeInt(0, rowData.getInt(0));
        }
        this.currentKeyWriter.complete();
        BytesMap.LookupInfo lookup = this.aggregateMap.lookup(this.currentKey);
        BinaryRowData binaryRowData = (BinaryRowData) lookup.getValue();
        if (!lookup.isFound()) {
            try {
                binaryRowData = this.aggregateMap.append(lookup, this.emptyAggBuffer);
            } catch (EOFException e) {
                if (this.sorter == null) {
                    this.sorter = new BufferedKVExternalSorter(getIOManager(), new BinaryRowDataSerializer(this.keyTypes.length), new BinaryRowDataSerializer(this.aggBufferTypes.length), new IntNormalizedKeyComputer(), new IntRecordComparator(), getMemoryManager().getPageSize(), ((Integer) ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES.defaultValue()).intValue(), ((Boolean) ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED.defaultValue()).booleanValue(), (int) ((MemorySize) ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE.defaultValue()).getBytes());
                }
                this.sorter.sortAndSpill(this.aggregateMap.getRecordAreaMemorySegments(), this.aggregateMap.getNumElements(), new BytesHashMapSpillMemorySegmentPool(this.aggregateMap.getBucketAreaMemorySegments()));
                this.aggregateMap.reset();
                try {
                    binaryRowData = this.aggregateMap.append(this.aggregateMap.lookup(this.currentKey), this.emptyAggBuffer);
                } catch (EOFException e2) {
                    throw new OutOfMemoryError("BytesHashMap Out of Memory.");
                }
            }
        }
        if (rowData.isNullAt(1)) {
            return;
        }
        long j = rowData.getLong(1);
        if (binaryRowData.isNullAt(0)) {
            binaryRowData.setLong(0, j);
        } else {
            binaryRowData.setLong(0, j + binaryRowData.getLong(0));
        }
    }

    public void endInput() throws Exception {
        StreamRecord streamRecord = new StreamRecord((Object) null);
        JoinedRowData joinedRowData = new JoinedRowData();
        GenericRowData genericRowData = new GenericRowData(1);
        if (this.sorter == null) {
            KeyValueIterator entryIterator = this.aggregateMap.getEntryIterator(false);
            while (entryIterator.advanceNext()) {
                genericRowData.setField(0, ((BinaryRowData) entryIterator.getValue()).isNullAt(0) ? null : Long.valueOf(((BinaryRowData) entryIterator.getValue()).getLong(0)));
                joinedRowData.replace((RowData) entryIterator.getKey(), genericRowData);
                getOutput().collect(streamRecord.replace(joinedRowData));
            }
            return;
        }
        this.sorter.sortAndSpill(this.aggregateMap.getRecordAreaMemorySegments(), this.aggregateMap.getNumElements(), new BytesHashMapSpillMemorySegmentPool(this.aggregateMap.getBucketAreaMemorySegments()));
        this.aggregateMap.free(true);
        BinaryRowData binaryRowData = null;
        JoinedRowData joinedRowData2 = new JoinedRowData();
        boolean z = false;
        long j = -1;
        MutableObjectIterator kVIterator = this.sorter.getKVIterator();
        while (true) {
            Tuple2 tuple2 = (Tuple2) kVIterator.next();
            if (tuple2 == null) {
                break;
            }
            BinaryRowData binaryRowData2 = (BinaryRowData) tuple2.f0;
            joinedRowData2.replace(binaryRowData2, (BinaryRowData) tuple2.f1);
            if (binaryRowData == null) {
                binaryRowData = binaryRowData2.copy();
                z = true;
                j = -1;
            } else if (binaryRowData2.getSizeInBytes() != binaryRowData.getSizeInBytes() || !BinaryRowDataUtil.byteArrayEquals(binaryRowData2.getSegments()[0].getArray(), binaryRowData.getSegments()[0].getArray(), binaryRowData2.getSizeInBytes())) {
                genericRowData.setField(0, z ? null : Long.valueOf(j));
                joinedRowData.replace(binaryRowData, genericRowData);
                getOutput().collect(streamRecord.replace(joinedRowData));
                binaryRowData = binaryRowData2.copy();
                z = true;
                j = -1;
            }
            if (!joinedRowData2.isNullAt(1)) {
                long j2 = joinedRowData2.getLong(1);
                j = z ? j2 : j + j2;
                z = false;
            }
        }
        genericRowData.setField(0, z ? null : Long.valueOf(j));
        joinedRowData.replace(binaryRowData, genericRowData);
        getOutput().collect(streamRecord.replace(joinedRowData));
    }

    public void close() throws Exception {
        super.close();
        this.aggregateMap.free();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }

    Object getOwner() {
        return getContainingTask();
    }

    Collector<StreamRecord<RowData>> getOutput() {
        return this.output;
    }

    MemoryManager getMemoryManager() {
        return getContainingTask().getEnvironment().getMemoryManager();
    }

    public IOManager getIOManager() {
        return getContainingTask().getEnvironment().getIOManager();
    }
}
