package org.apache.flink.runtime.operators.hash;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.class */
public class ReusingReOpenableHashTableITCase {
    private static final int PAGE_SIZE = 8192;
    private static final long MEMORY_SIZE = 8192000;
    private static final long SEED1 = 561349061987311L;
    private static final long SEED2 = 231434613412342L;
    private static final int NUM_PROBES = 3;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
    private TypeComparator<Tuple2<Integer, String>> record1Comparator;
    private TypeComparator<Tuple2<Integer, String>> record2Comparator;
    private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
    private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
    private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
    private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
    private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
    private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
    private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;

    @Before
    public void beforeTest() {
        this.recordSerializer = TestData.getIntStringTupleSerializer();
        this.record1Comparator = TestData.getIntStringTupleComparator();
        this.record2Comparator = TestData.getIntStringTupleComparator();
        this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
        this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
        this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
        this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
        this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
        this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
        this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
        this.ioManager = new IOManagerAsync();
    }

    @After
    public void afterTest() {
        if (this.ioManager != null) {
            this.ioManager.shutdown();
            if (!this.ioManager.isProperlyShutDown()) {
                Assert.fail("I/O manager failed to properly shut down.");
            }
            this.ioManager = null;
        }
        if (this.memoryManager != null) {
            Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", this.memoryManager.verifyEmpty());
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    public void testOverflow() {
        try {
            TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED1, 200, 1024, TestData.TupleGenerator.KeyMode.RANDOM, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator tupleGenerator2 = new TestData.TupleGenerator(SEED2, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            doTest(new TestData.TupleGeneratorIterator(tupleGenerator, 1000), new TestData.TupleGeneratorIterator(tupleGenerator2, 1000), tupleGenerator, tupleGenerator2);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("An exception occurred during the test: " + e.getMessage());
        }
    }

    @Test
    public void testDoubleProbeSpilling() {
        try {
            TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED1, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator tupleGenerator2 = new TestData.TupleGenerator(SEED2, 0, 1024, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            doTest(new TestData.TupleGeneratorIterator(tupleGenerator, 1000), new TestData.TupleGeneratorIterator(tupleGenerator2, 1000), tupleGenerator, tupleGenerator2);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("An exception occurred during the test: " + e.getMessage());
        }
    }

    @Test
    public void testDoubleProbeInMemory() {
        try {
            TestData.TupleGenerator tupleGenerator = new TestData.TupleGenerator(SEED1, 0, 28, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            TestData.TupleGenerator tupleGenerator2 = new TestData.TupleGenerator(SEED2, 0, 28, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.FIX_LENGTH);
            doTest(new TestData.TupleGeneratorIterator(tupleGenerator, 1000), new TestData.TupleGeneratorIterator(tupleGenerator2, 1000), tupleGenerator, tupleGenerator2);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("An exception occurred during the test: " + e.getMessage());
        }
    }

    private void doTest(TestData.TupleGeneratorIterator tupleGeneratorIterator, TestData.TupleGeneratorIterator tupleGeneratorIterator2, TestData.TupleGenerator tupleGenerator, TestData.TupleGenerator tupleGenerator2) throws Exception {
        Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> joinTuples = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(tupleGeneratorIterator), NonReusingHashJoinIteratorITCase.collectTupleData(tupleGeneratorIterator2));
        ArrayList arrayList = new ArrayList(NUM_PROBES);
        NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin[] tupleMatchRemovingJoinArr = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin[NUM_PROBES];
        for (int i = 0; i < NUM_PROBES; i++) {
            Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> deepCopy = deepCopy(joinTuples);
            arrayList.add(deepCopy);
            tupleMatchRemovingJoinArr[i] = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin(deepCopy);
        }
        NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin tupleMatchRemovingJoin = new NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin(joinTuples);
        DiscardingOutputCollector discardingOutputCollector = new DiscardingOutputCollector();
        tupleGenerator.reset();
        tupleGenerator2.reset();
        tupleGeneratorIterator.reset();
        tupleGeneratorIterator2.reset();
        ReusingBuildFirstReOpenableHashJoinIterator reusingBuildFirstReOpenableHashJoinIterator = new ReusingBuildFirstReOpenableHashJoinIterator(tupleGeneratorIterator, tupleGeneratorIterator2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, this.memoryManager, this.ioManager, this.parentTask, 1.0d, false, false, true);
        reusingBuildFirstReOpenableHashJoinIterator.open();
        do {
        } while (reusingBuildFirstReOpenableHashJoinIterator.callWithNextKey(tupleMatchRemovingJoin, discardingOutputCollector));
        for (Map.Entry<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> entry : joinTuples.entrySet()) {
            if (!entry.getValue().isEmpty()) {
                Assert.fail("Collection for key " + entry.getKey() + " is not empty");
            }
        }
        for (int i2 = 0; i2 < NUM_PROBES; i2++) {
            tupleGenerator2.reset();
            tupleGeneratorIterator2.reset();
            reusingBuildFirstReOpenableHashJoinIterator.reopenProbe(tupleGeneratorIterator2);
            do {
            } while (reusingBuildFirstReOpenableHashJoinIterator.callWithNextKey(tupleMatchRemovingJoinArr[i2], discardingOutputCollector));
            for (Map.Entry entry2 : ((Map) arrayList.get(i2)).entrySet()) {
                if (!((Collection) entry2.getValue()).isEmpty()) {
                    Assert.fail("Collection for key " + entry2.getKey() + " is not empty");
                }
            }
        }
        reusingBuildFirstReOpenableHashJoinIterator.close();
    }

    private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(int i, int i2, int i3, int i4) {
        UniformIntTupleGenerator uniformIntTupleGenerator = new UniformIntTupleGenerator(i, i2, true);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator = new TestData.ConstantIntIntTuplesIterator(i3, 17, 5);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator2 = new TestData.ConstantIntIntTuplesIterator(i4, 23, 5);
        ArrayList arrayList = new ArrayList();
        arrayList.add(uniformIntTupleGenerator);
        arrayList.add(constantIntIntTuplesIterator);
        arrayList.add(constantIntIntTuplesIterator2);
        return new UnionIterator(arrayList);
    }

    @Test
    public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
        UniformIntTupleGenerator uniformIntTupleGenerator = new UniformIntTupleGenerator(1000000, NUM_PROBES, false);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator = new TestData.ConstantIntIntTuplesIterator(40559, 17, 200000);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator2 = new TestData.ConstantIntIntTuplesIterator(92882, 23, 200000);
        ArrayList arrayList = new ArrayList();
        arrayList.add(uniformIntTupleGenerator);
        arrayList.add(constantIntIntTuplesIterator);
        arrayList.add(constantIntIntTuplesIterator2);
        UnionIterator unionIterator = new UnionIterator(arrayList);
        try {
            List allocatePages = this.memoryManager.allocatePages(MEM_OWNER, 896);
            HashMap hashMap = new HashMap(1000000);
            ReOpenableMutableHashTable reOpenableMutableHashTable = new ReOpenableMutableHashTable(this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, allocatePages, this.ioManager, true);
            for (int i = 0; i < NUM_PROBES; i++) {
                MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(1000000, 10, 40559, 92882);
                if (i == 0) {
                    reOpenableMutableHashTable.open(unionIterator, probeInput);
                } else {
                    reOpenableMutableHashTable.reopenProbe(probeInput);
                }
                Tuple2 tuple2 = new Tuple2();
                while (reOpenableMutableHashTable.nextRecord()) {
                    long j = 0;
                    Integer num = (Integer) ((Tuple2) reOpenableMutableHashTable.getCurrentProbeRecord()).f0;
                    MutableObjectIterator buildSideIterator = reOpenableMutableHashTable.getBuildSideIterator();
                    Tuple2 tuple22 = (Tuple2) buildSideIterator.next(tuple2);
                    Tuple2 tuple23 = tuple22;
                    if (tuple22 != null) {
                        j = 1;
                        Assert.assertEquals("Probe-side key was different than build-side key.", num, tuple23.f0);
                    } else {
                        Assert.fail("No build side values found for a probe key.");
                    }
                    while (true) {
                        Tuple2 tuple24 = (Tuple2) buildSideIterator.next(tuple23);
                        tuple23 = tuple24;
                        if (tuple24 == null) {
                            break;
                        }
                        j++;
                        Assert.assertEquals("Probe-side key was different than build-side key.", num, tuple23.f0);
                    }
                    Long l = (Long) hashMap.get(num);
                    hashMap.put(num, l == null ? Long.valueOf(j) : Long.valueOf(l.longValue() + j));
                }
            }
            reOpenableMutableHashTable.close();
            Assert.assertEquals("Wrong number of keys", 1000000L, hashMap.size());
            for (Map.Entry entry : hashMap.entrySet()) {
                long longValue = ((Long) entry.getValue()).longValue();
                int intValue = ((Integer) entry.getKey()).intValue();
                if (intValue == 40559 || intValue == 92882) {
                    Assert.assertEquals("Wrong number of values in per-key cross product for key " + intValue, 9000135L, longValue);
                } else {
                    Assert.assertEquals("Wrong number of values in per-key cross product for key " + intValue, 90L, longValue);
                }
            }
            this.memoryManager.release(reOpenableMutableHashTable.getFreedMemory());
        } catch (MemoryAllocationException e) {
            Assert.fail("Memory for the Join could not be provided.");
        }
    }

    @Test
    public void testSpillingHashJoinWithTwoRecursions() throws IOException {
        UniformIntTupleGenerator uniformIntTupleGenerator = new UniformIntTupleGenerator(1000000, NUM_PROBES, false);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator = new TestData.ConstantIntIntTuplesIterator(40559, 17, 200000);
        TestData.ConstantIntIntTuplesIterator constantIntIntTuplesIterator2 = new TestData.ConstantIntIntTuplesIterator(92882, 23, 200000);
        ArrayList arrayList = new ArrayList();
        arrayList.add(uniformIntTupleGenerator);
        arrayList.add(constantIntIntTuplesIterator);
        arrayList.add(constantIntIntTuplesIterator2);
        UnionIterator unionIterator = new UnionIterator(arrayList);
        try {
            List allocatePages = this.memoryManager.allocatePages(MEM_OWNER, 896);
            HashMap hashMap = new HashMap(1000000);
            ReOpenableMutableHashTable reOpenableMutableHashTable = new ReOpenableMutableHashTable(this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, allocatePages, this.ioManager, true);
            for (int i = 0; i < NUM_PROBES; i++) {
                MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(1000000, 10, 40559, 92882);
                if (i == 0) {
                    reOpenableMutableHashTable.open(unionIterator, probeInput);
                } else {
                    reOpenableMutableHashTable.reopenProbe(probeInput);
                }
                Tuple2 tuple2 = new Tuple2();
                while (reOpenableMutableHashTable.nextRecord()) {
                    long j = 0;
                    Integer num = (Integer) ((Tuple2) reOpenableMutableHashTable.getCurrentProbeRecord()).f0;
                    MutableObjectIterator buildSideIterator = reOpenableMutableHashTable.getBuildSideIterator();
                    Tuple2 tuple22 = (Tuple2) buildSideIterator.next(tuple2);
                    if (tuple22 != null) {
                        j = 1;
                        Assert.assertEquals("Probe-side key was different than build-side key.", num, tuple22.f0);
                    } else {
                        Assert.fail("No build side values found for a probe key.");
                    }
                    while (true) {
                        Tuple2 tuple23 = (Tuple2) buildSideIterator.next(tuple2);
                        if (tuple23 == null) {
                            break;
                        }
                        j++;
                        Assert.assertEquals("Probe-side key was different than build-side key.", num, tuple23.f0);
                    }
                    Long l = (Long) hashMap.get(num);
                    hashMap.put(num, l == null ? Long.valueOf(j) : Long.valueOf(l.longValue() + j));
                }
            }
            reOpenableMutableHashTable.close();
            Assert.assertEquals("Wrong number of keys", 1000000L, hashMap.size());
            for (Map.Entry entry : hashMap.entrySet()) {
                long longValue = ((Long) entry.getValue()).longValue();
                int intValue = ((Integer) entry.getKey()).intValue();
                if (intValue == 40559 || intValue == 92882) {
                    Assert.assertEquals("Wrong number of values in per-key cross product for key " + intValue, 9000135L, longValue);
                } else {
                    Assert.assertEquals("Wrong number of values in per-key cross product for key " + intValue, 90L, longValue);
                }
            }
            this.memoryManager.release(reOpenableMutableHashTable.getFreedMemory());
        } catch (MemoryAllocationException e) {
            Assert.fail("Memory for the Join could not be provided.");
        }
    }

    static Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> deepCopy(Map<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> map) {
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<Integer, Collection<NonReusingHashJoinIteratorITCase.TupleMatch>> entry : map.entrySet()) {
            ArrayList arrayList = new ArrayList(entry.getValue().size());
            Iterator<NonReusingHashJoinIteratorITCase.TupleMatch> it = entry.getValue().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            hashMap.put(entry.getKey(), arrayList);
        }
        return hashMap;
    }
}
