package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.operators.testutils.BinaryOperatorTestBase;
import org.apache.flink.runtime.operators.testutils.DelayingIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.shaded.guava31.com.google.common.base.Throwables;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest.class */
public abstract class AbstractOuterJoinTaskTest extends BinaryOperatorTestBase<FlatJoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    private static final long HASH_MEM = 6291456;
    private static final long SORT_MEM = 3145728;
    private static final int NUM_SORTER = 2;
    private static final long BNLJN_MEM = 327680;
    private final double bnljn_frac;
    protected final TypeComparator<Tuple2<Integer, Integer>> comparator1;
    protected final TypeComparator<Tuple2<Integer, Integer>> comparator2;
    protected final List<Tuple2<Integer, Integer>> outList;
    protected final TypeSerializer<Tuple2<Integer, Integer>> serializer;

    /* loaded from: input_file:org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest$MockFailingJoinStub.class */
    public static final class MockFailingJoinStub implements FlatJoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private int cnt = 0;

        public void join(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 10) {
                throw new ExpectedTestException();
            }
            collector.collect(tuple2 != null ? tuple2 : tuple22);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple2<Integer, Integer>) obj, (Tuple2<Integer, Integer>) obj2, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/AbstractOuterJoinTaskTest$MockJoinStub.class */
    public static final class MockJoinStub implements FlatJoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        public void join(Tuple2<Integer, Integer> tuple2, Tuple2<Integer, Integer> tuple22, Collector<Tuple2<Integer, Integer>> collector) throws Exception {
            collector.collect(tuple2 != null ? tuple2 : tuple22);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Tuple2<Integer, Integer>) obj, (Tuple2<Integer, Integer>) obj2, (Collector<Tuple2<Integer, Integer>>) collector);
        }
    }

    public AbstractOuterJoinTaskTest(ExecutionConfig executionConfig) {
        super(executionConfig, HASH_MEM, 2, SORT_MEM);
        this.comparator1 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
        this.comparator2 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
        this.outList = new ArrayList();
        this.serializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE});
        this.bnljn_frac = 327680.0d / getMemoryManager().getMemorySize();
    }

    @Test
    public void testSortBoth1OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(20, 1, 10, 2);
    }

    @Test
    public void testSortBoth2OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(20, 1, 20, 1);
    }

    @Test
    public void testSortBoth3OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(20, 1, 20, 20);
    }

    @Test
    public void testSortBoth4OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(20, 20, 20, 1);
    }

    @Test
    public void testSortBoth5OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(20, 20, 20, 20);
    }

    @Test
    public void testSortBoth6OuterJoinTask() throws Exception {
        testSortBothOuterJoinTask(10, 1, 20, 2);
    }

    private void testSortBothOuterJoinTask(int i, int i2, int i3, int i4) throws Exception {
        setOutput(this.outList, this.serializer);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInputSorted(new UniformIntTupleGenerator(i, i2, false), this.serializer, this.comparator1.duplicate());
        addInputSorted(new UniformIntTupleGenerator(i3, i4, false), this.serializer, this.comparator2.duplicate());
        testDriver(outerJoinDriver, MockJoinStub.class);
        int calculateExpectedCount = calculateExpectedCount(i, i2, i3, i4);
        Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + calculateExpectedCount, this.outList.size() == calculateExpectedCount);
        this.outList.clear();
    }

    @Test
    public void testSortFirstOuterJoinTask() throws Exception {
        setOutput(this.outList, this.serializer);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInputSorted(new UniformIntTupleGenerator(20, 20, false), this.serializer, this.comparator1.duplicate());
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        testDriver(outerJoinDriver, MockJoinStub.class);
        int calculateExpectedCount = calculateExpectedCount(20, 20, 20, 20);
        Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + calculateExpectedCount, this.outList.size() == calculateExpectedCount);
        this.outList.clear();
    }

    @Test
    public void testSortSecondOuterJoinTask() throws Exception {
        setOutput(this.outList, this.serializer);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        addInputSorted(new UniformIntTupleGenerator(20, 20, false), this.serializer, this.comparator2.duplicate());
        testDriver(outerJoinDriver, MockJoinStub.class);
        int calculateExpectedCount = calculateExpectedCount(20, 20, 20, 20);
        Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + calculateExpectedCount, this.outList.size() == calculateExpectedCount);
        this.outList.clear();
    }

    @Test
    public void testMergeOuterJoinTask() throws Exception {
        setOutput(this.outList, this.serializer);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        testDriver(outerJoinDriver, MockJoinStub.class);
        int calculateExpectedCount = calculateExpectedCount(20, 20, 20, 20);
        Assert.assertTrue("Result set size was " + this.outList.size() + ". Expected was " + calculateExpectedCount, this.outList.size() == calculateExpectedCount);
        this.outList.clear();
    }

    @Test(expected = ExpectedTestException.class)
    public void testFailingOuterJoinTask() throws Exception {
        setOutput(new DiscardingOutputCollector());
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        addInput(new UniformIntTupleGenerator(20, 20, true), this.serializer);
        testDriver(outerJoinDriver, MockFailingJoinStub.class);
    }

    @Test
    public void testCancelOuterJoinTaskWhileSort1() throws Exception {
        setOutput(new DiscardingOutputCollector());
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInputSorted(new DelayingIterator(new InfiniteIntTupleIterator(), 100), this.serializer, this.comparator1.duplicate());
        addInput(new DelayingIterator(new InfiniteIntTupleIterator(), 100), this.serializer);
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread("Task runner for testCancelOuterJoinTaskWhileSort1()") { // from class: org.apache.flink.runtime.operators.AbstractOuterJoinTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AbstractOuterJoinTaskTest.this.testDriver(outerJoinDriver, MockJoinStub.class);
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        };
        thread.start();
        Thread.sleep(1000L);
        cancel();
        thread.interrupt();
        thread.join(60000L);
        Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            Assert.fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(th));
        }
    }

    @Test
    public void testCancelOuterJoinTaskWhileSort2() throws Exception {
        setOutput(new DiscardingOutputCollector());
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInput(new DelayingIterator(new InfiniteIntTupleIterator(), 1), this.serializer);
        addInputSorted(new DelayingIterator(new InfiniteIntTupleIterator(), 1), this.serializer, this.comparator2.duplicate());
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread("Task runner for testCancelOuterJoinTaskWhileSort2()") { // from class: org.apache.flink.runtime.operators.AbstractOuterJoinTaskTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AbstractOuterJoinTaskTest.this.testDriver(outerJoinDriver, MockJoinStub.class);
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        };
        thread.start();
        Thread.sleep(1000L);
        cancel();
        thread.interrupt();
        thread.join(60000L);
        Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            Assert.fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(th));
        }
    }

    @Test
    public void testCancelOuterJoinTaskWhileRunning() throws Exception {
        setOutput(new DiscardingOutputCollector());
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(new RuntimePairComparatorFactory());
        getTaskConfig().setDriverStrategy(getSortDriverStrategy());
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        final AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> outerJoinDriver = getOuterJoinDriver();
        addInput(new DelayingIterator(new InfiniteIntTupleIterator(), 100), this.serializer);
        addInput(new DelayingIterator(new InfiniteIntTupleIterator(), 100), this.serializer);
        final AtomicReference atomicReference = new AtomicReference();
        Thread thread = new Thread("Task runner for testCancelOuterJoinTaskWhileRunning()") { // from class: org.apache.flink.runtime.operators.AbstractOuterJoinTaskTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    AbstractOuterJoinTaskTest.this.testDriver(outerJoinDriver, MockJoinStub.class);
                } catch (Throwable th) {
                    atomicReference.set(th);
                }
            }
        };
        thread.start();
        Thread.sleep(1000L);
        cancel();
        thread.interrupt();
        thread.join(60000L);
        Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            Assert.fail("Error in task while canceling:\n" + Throwables.getStackTraceAsString(th));
        }
    }

    protected abstract AbstractOuterJoinDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> getOuterJoinDriver();

    protected abstract int calculateExpectedCount(int i, int i2, int i3, int i4);

    protected abstract DriverStrategy getSortDriverStrategy();
}
