package org.apache.flink.runtime.operators.hash;

import java.io.EOFException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.CopyingListCollector;
import org.apache.flink.api.common.typeutils.SameTypePairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.operators.hash.InPlaceMutableHashTable;
import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.runtime.operators.testutils.types.StringPair;
import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.shaded.guava30.com.google.common.collect.Ordering;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest.class */
public class InPlaceMutableHashTableTest extends MutableHashTableTestBase {
    private static final long RANDOM_SEED = 58723953465322L;
    private static final int PAGE_SIZE = 16384;
    private final TypeSerializer<Tuple2<Long, String>> serializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{LongSerializer.INSTANCE, StringSerializer.INSTANCE});
    private final TypeComparator<Tuple2<Long, String>> comparator = new TupleComparator(new int[]{0}, new TypeComparator[]{new LongComparator(true)}, new TypeSerializer[]{LongSerializer.INSTANCE});
    private final TypeComparator<Long> probeComparator = new LongComparator(true);
    private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>() { // from class: org.apache.flink.runtime.operators.hash.InPlaceMutableHashTableTest.1
        private long ref;

        public void setReference(Long l) {
            this.ref = l.longValue();
        }

        public boolean equalToReference(Tuple2<Long, String> tuple2) {
            return ((Long) tuple2.f0).longValue() == this.ref;
        }

        public int compareToReference(Tuple2<Long, String> tuple2) {
            long j = this.ref;
            long longValue = ((Long) tuple2.f0).longValue();
            if (j < longValue) {
                return -1;
            }
            return j == longValue ? 0 : 1;
        }
    };

    /* loaded from: input_file:org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest$ConcatReducer.class */
    class ConcatReducer implements ReduceFunction<StringPair> {
        ConcatReducer() {
        }

        public StringPair reduce(StringPair stringPair, StringPair stringPair2) throws Exception {
            if (stringPair.getKey().compareTo(stringPair2.getKey()) != 0) {
                throw new RuntimeException("ConcatReducer was called with two records that have differing keys.");
            }
            return new StringPair(stringPair.getKey(), stringPair.getValue().concat(stringPair2.getValue()));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest$InPlaceMutableHashTableWithJavaHashMap.class */
    private class InPlaceMutableHashTableWithJavaHashMap<T, K> {
        TypeSerializer<T> serializer;
        TypeComparator<T> comparator;
        ReduceFunction<T> reducer;
        Collector<T> outputCollector;
        HashMap<K, T> map = new HashMap<>();

        public InPlaceMutableHashTableWithJavaHashMap(TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, ReduceFunction<T> reduceFunction, Collector<T> collector) {
            this.serializer = typeSerializer;
            this.comparator = typeComparator;
            this.reducer = reduceFunction;
            this.outputCollector = collector;
        }

        public void updateTableEntryWithReduce(T t, K k) throws Exception {
            Object copy = this.serializer.copy(t);
            if (!this.map.containsKey(k)) {
                this.map.put(k, copy);
                return;
            }
            this.map.put(k, this.reducer.reduce(this.map.get(k), copy));
        }

        public void emitAndReset() {
            Iterator<T> it = this.map.values().iterator();
            while (it.hasNext()) {
                this.outputCollector.collect(it.next());
            }
            this.map.clear();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest$SumReducer.class */
    class SumReducer implements ReduceFunction<IntPair> {
        SumReducer() {
        }

        public IntPair reduce(IntPair intPair, IntPair intPair2) throws Exception {
            if (intPair.getKey() != intPair2.getKey()) {
                throw new RuntimeException("SumReducer was called with two records that have differing keys.");
            }
            intPair.setValue(intPair.getValue() + intPair2.getValue());
            return intPair;
        }
    }

    @Override // org.apache.flink.runtime.operators.hash.MutableHashTableTestBase
    protected <T> AbstractMutableHashTable<T> getHashTable(TypeSerializer<T> typeSerializer, TypeComparator<T> typeComparator, List<MemorySegment> list) {
        return new InPlaceMutableHashTable(typeSerializer, typeComparator, list);
    }

    @Test
    public void testHashTableGrowthWithInsert() {
        try {
            InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(this.serializer, this.comparator, getMemory(10000, 32768));
            inPlaceMutableHashTable.open();
            for (long j = 0; j < 1000000; j++) {
                inPlaceMutableHashTable.insert(new Tuple2(Long.valueOf(j), String.valueOf(j)));
            }
            BitSet bitSet = new BitSet(1000000);
            InPlaceMutableHashTable.EntryIterator entryIterator = inPlaceMutableHashTable.getEntryIterator();
            while (true) {
                Tuple2 tuple2 = (Tuple2) entryIterator.next();
                if (tuple2 == null) {
                    break;
                }
                Assert.assertNotNull(tuple2.f0);
                Assert.assertNotNull(tuple2.f1);
                Assert.assertEquals(((Long) tuple2.f0).longValue(), Long.parseLong((String) tuple2.f1));
                bitSet.set(((Long) tuple2.f0).intValue());
            }
            Assert.assertEquals(1000000L, bitSet.cardinality());
            InPlaceMutableHashTable.HashTableProber prober = inPlaceMutableHashTable.getProber(this.probeComparator, this.pairComparator);
            Tuple2 tuple22 = new Tuple2();
            for (long j2 = 0; j2 < 1000000; j2++) {
                Assert.assertNotNull(prober.getMatchFor(Long.valueOf(j2), tuple22));
                Assert.assertNull(prober.getMatchFor(Long.valueOf(j2 + 1000000), tuple22));
            }
            inPlaceMutableHashTable.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testHashTableGrowthWithInsertOrReplace() {
        try {
            InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(this.serializer, this.comparator, getMemory(1000, 32768));
            inPlaceMutableHashTable.open();
            for (long j = 0; j < 1000000; j++) {
                inPlaceMutableHashTable.insertOrReplaceRecord(Tuple2.of(Long.valueOf(j), String.valueOf(j)));
            }
            BitSet bitSet = new BitSet(1000000);
            InPlaceMutableHashTable.EntryIterator entryIterator = inPlaceMutableHashTable.getEntryIterator();
            while (true) {
                Tuple2 tuple2 = (Tuple2) entryIterator.next();
                if (tuple2 == null) {
                    break;
                }
                Assert.assertNotNull(tuple2.f0);
                Assert.assertNotNull(tuple2.f1);
                Assert.assertEquals(((Long) tuple2.f0).longValue(), Long.parseLong((String) tuple2.f1));
                bitSet.set(((Long) tuple2.f0).intValue());
            }
            Assert.assertEquals(1000000L, bitSet.cardinality());
            InPlaceMutableHashTable.HashTableProber prober = inPlaceMutableHashTable.getProber(this.probeComparator, this.pairComparator);
            Tuple2 tuple22 = new Tuple2();
            for (long j2 = 0; j2 < 1000000; j2++) {
                Assert.assertNotNull(prober.getMatchFor(Long.valueOf(j2), tuple22));
                Assert.assertNull(prober.getMatchFor(Long.valueOf(j2 + 1000000), tuple22));
            }
            inPlaceMutableHashTable.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testWithIntPair() throws Exception {
        Random random = new Random(RANDOM_SEED);
        IntPairSerializer intPairSerializer = new IntPairSerializer();
        IntPairComparator intPairComparator = new IntPairComparator();
        SumReducer sumReducer = new SumReducer();
        ArrayList arrayList = new ArrayList();
        InPlaceMutableHashTableWithJavaHashMap inPlaceMutableHashTableWithJavaHashMap = new InPlaceMutableHashTableWithJavaHashMap(intPairSerializer, intPairComparator, sumReducer, new CopyingListCollector(arrayList, intPairSerializer));
        ArrayList arrayList2 = new ArrayList();
        InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(intPairSerializer, intPairComparator, getMemory(1953, PAGE_SIZE));
        inPlaceMutableHashTable.getClass();
        InPlaceMutableHashTable.ReduceFacade reduceFacade = new InPlaceMutableHashTable.ReduceFacade(inPlaceMutableHashTable, sumReducer, new CopyingListCollector(arrayList2, intPairSerializer), true);
        inPlaceMutableHashTable.open();
        ArrayList<IntPair> arrayList3 = new ArrayList();
        for (int i = 0; i < 1000000; i++) {
            arrayList3.add(new IntPair(random.nextInt(1000000), random.nextInt(10)));
        }
        for (IntPair intPair : arrayList3) {
            reduceFacade.updateTableEntryWithReduce(intPairSerializer.copy(intPair));
            inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(intPairSerializer.copy(intPair), Integer.valueOf(intPair.getKey()));
            if (random.nextDouble() < 5.0E-6d) {
                inPlaceMutableHashTableWithJavaHashMap.emitAndReset();
                reduceFacade.emitAndReset();
            }
        }
        inPlaceMutableHashTableWithJavaHashMap.emitAndReset();
        reduceFacade.emit();
        inPlaceMutableHashTable.close();
        Assert.assertEquals(arrayList.size(), arrayList2.size());
        Integer[] numArr = new Integer[arrayList.size()];
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            numArr[i2] = Integer.valueOf(((IntPair) arrayList.get(i2)).getValue());
        }
        Integer[] numArr2 = new Integer[arrayList2.size()];
        for (int i3 = 0; i3 < arrayList2.size(); i3++) {
            numArr2[i3] = Integer.valueOf(((IntPair) arrayList2.get(i3)).getValue());
        }
        Arrays.sort(numArr, Ordering.natural());
        Arrays.sort(numArr2, Ordering.natural());
        Assert.assertArrayEquals(numArr, numArr2);
    }

    @Test
    public void testWithLengthChangingReduceFunction() throws Exception {
        Random random = new Random(RANDOM_SEED);
        StringPairSerializer stringPairSerializer = new StringPairSerializer();
        StringPairComparator stringPairComparator = new StringPairComparator();
        ConcatReducer concatReducer = new ConcatReducer();
        ArrayList arrayList = new ArrayList();
        InPlaceMutableHashTableWithJavaHashMap inPlaceMutableHashTableWithJavaHashMap = new InPlaceMutableHashTableWithJavaHashMap(stringPairSerializer, stringPairComparator, concatReducer, new CopyingListCollector(arrayList, stringPairSerializer));
        ArrayList arrayList2 = new ArrayList();
        InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(stringPairSerializer, stringPairComparator, getMemory(61, PAGE_SIZE));
        inPlaceMutableHashTable.getClass();
        InPlaceMutableHashTable.ReduceFacade reduceFacade = new InPlaceMutableHashTable.ReduceFacade(inPlaceMutableHashTable, concatReducer, new CopyingListCollector(arrayList2, stringPairSerializer), true);
        for (int i = 0; i < 3; i++) {
            inPlaceMutableHashTable.open();
            reduceFacade.emit();
            inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("foo", "bar")), "foo");
            inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("foo", "baz")), "foo");
            inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("alma", "xyz")), "alma");
            reduceFacade.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("foo", "bar")));
            reduceFacade.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("foo", "baz")));
            reduceFacade.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("alma", "xyz")));
            for (int i2 = 0; i2 < 5; i2++) {
                reduceFacade.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("korte", "abc")));
                inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(stringPairSerializer.copy(new StringPair("korte", "abc")), "korte");
            }
            inPlaceMutableHashTableWithJavaHashMap.emitAndReset();
            reduceFacade.emitAndReset();
            UniformStringPairGenerator uniformStringPairGenerator = new UniformStringPairGenerator(10000, 10, true);
            ArrayList<StringPair> arrayList3 = new ArrayList();
            StringPair stringPair = new StringPair();
            while (uniformStringPairGenerator.next(stringPair) != null) {
                arrayList3.add(stringPairSerializer.copy(stringPair));
            }
            Collections.shuffle(arrayList3, random);
            for (StringPair stringPair2 : arrayList3) {
                inPlaceMutableHashTableWithJavaHashMap.updateTableEntryWithReduce(stringPairSerializer.copy(stringPair2), stringPair2.getKey());
                reduceFacade.updateTableEntryWithReduce(stringPairSerializer.copy(stringPair2));
                if (random.nextDouble() < 5.0E-5d) {
                    inPlaceMutableHashTableWithJavaHashMap.emitAndReset();
                    reduceFacade.emitAndReset();
                }
            }
            inPlaceMutableHashTableWithJavaHashMap.emitAndReset();
            reduceFacade.emit();
            inPlaceMutableHashTable.close();
            Assert.assertEquals(arrayList.size(), arrayList2.size());
            String[] strArr = new String[arrayList.size()];
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                strArr[i3] = ((StringPair) arrayList.get(i3)).getValue();
            }
            String[] strArr2 = new String[arrayList2.size()];
            for (int i4 = 0; i4 < arrayList2.size(); i4++) {
                strArr2[i4] = ((StringPair) arrayList2.get(i4)).getValue();
            }
            Arrays.sort(strArr, Ordering.natural());
            Arrays.sort(strArr2, Ordering.natural());
            Assert.assertArrayEquals(strArr, strArr2);
            arrayList.clear();
            arrayList2.clear();
        }
    }

    private static List<MemorySegment> getMemory(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(MemorySegmentFactory.allocateUnpooledSegment(i2));
        }
        return arrayList;
    }

    @Test
    public void testLargeRecordsWithManyCompactions() {
        try {
            String longString = getLongString(100000);
            String longString2 = getLongString(110000);
            InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(this.serializer, this.comparator, getMemory(3800, 32768));
            inPlaceMutableHashTable.open();
            for (long j = 0; j < 1000; j++) {
                inPlaceMutableHashTable.insertOrReplaceRecord(Tuple2.of(Long.valueOf(j), longString));
            }
            for (long j2 = 0; j2 < 1000; j2++) {
                inPlaceMutableHashTable.insertOrReplaceRecord(Tuple2.of(Long.valueOf(j2), longString2));
            }
            InPlaceMutableHashTable.HashTableProber prober = inPlaceMutableHashTable.getProber(this.comparator, new SameTypePairComparator(this.comparator));
            Tuple2 tuple2 = new Tuple2();
            for (long j3 = 0; j3 < 1000; j3++) {
                Assert.assertNotNull(prober.getMatchFor(Tuple2.of(Long.valueOf(j3), longString2), tuple2));
            }
            inPlaceMutableHashTable.close();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private static String getLongString(int i) {
        StringBuilder sb = new StringBuilder(i);
        for (int i2 = 0; i2 < i; i2++) {
            sb.append('a');
        }
        return sb.toString();
    }

    @Test
    public void testOutOfMemory() {
        try {
            InPlaceMutableHashTable inPlaceMutableHashTable = new InPlaceMutableHashTable(this.serializer, this.comparator, getMemory(100, 1024));
            try {
                inPlaceMutableHashTable.open();
                for (long j = 0; j < 100000; j++) {
                    inPlaceMutableHashTable.insertOrReplaceRecord(Tuple2.of(Long.valueOf(j), "alma"));
                }
                Assert.fail("We should have got out of memory (EOFException)");
            } catch (EOFException e) {
                inPlaceMutableHashTable.close();
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }
}
