/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReduceTaskTest
extends DriverTestBase<RichGroupReduceFunction<Record, Record>> {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceTaskTest.class);
    private final RecordComparator comparator = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final List<Record> outList = new ArrayList<Record>();

    public ReduceTaskTest(ExecutionConfig config) {
        super(config, 0L, 1, 0x300000L);
    }

    @Test
    public void testReduceTaskWithSortingInput() {
        int keyCnt = 100;
        int valCnt = 20;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try {
            this.addInputSorted(new UniformRecordGenerator(100, 20, false), this.comparator.duplicate());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.info("Exception while running the test task.", (Throwable)e);
            Assert.fail((String)("Exception in Test: " + e.getMessage()));
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 100), (this.outList.size() == 100 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == 20 - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    @Test
    public void testReduceTaskOnPreSortedInput() {
        int keyCnt = 100;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(100, 20, true));
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        GroupReduceDriver testTask = new GroupReduceDriver();
        try {
            this.testDriver((Driver)testTask, MockReduceStub.class);
        }
        catch (Exception e) {
            LOG.info("Exception while running the test task.", (Throwable)e);
            Assert.fail((String)("Invoke method caused exception: " + e.getMessage()));
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 100), (this.outList.size() == 100 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == 20 - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCombiningReduceTask() {
        int keyCnt = 100;
        int valCnt = 20;
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        try (CombiningUnilateralSortMerger sorter = null;){
            sorter = new CombiningUnilateralSortMerger((GroupCombineFunction)new MockCombiningReduceStub(), this.getMemoryManager(), this.getIOManager(), (MutableObjectIterator)new UniformRecordGenerator(100, 20, false), this.getContainingTask(), (TypeSerializerFactory)RecordSerializerFactory.get(), (TypeComparator)this.comparator.duplicate(), this.perSortFractionMem, 4, 0.8f, true, true);
            this.addInput((MutableObjectIterator<Record>)sorter.getIterator());
            GroupReduceDriver testTask = new GroupReduceDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        int expSum = 0;
        for (int i = 1; i < 20; ++i) {
            expSum += i;
        }
        Assert.assertTrue((String)("Resultset size was " + this.outList.size() + ". Expected was " + 100), (this.outList.size() == 100 ? 1 : 0) != 0);
        for (Record record : this.outList) {
            Assert.assertTrue((String)"Incorrect result", (((IntValue)record.getField(1, IntValue.class)).getValue() == expSum - ((IntValue)record.getField(0, IntValue.class)).getValue() ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    @Test
    public void testFailingReduceTask() {
        int keyCnt = 100;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(100, 20, true));
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        GroupReduceDriver testTask = new GroupReduceDriver();
        try {
            this.testDriver((Driver)testTask, MockFailingReduceStub.class);
            Assert.fail((String)"Function exception was not forwarded.");
        }
        catch (ExpectedTestException expectedTestException) {
        }
        catch (Exception e) {
            LOG.info("Exception which was not the ExpectedTestException while running the test task.", (Throwable)e);
            Assert.fail((String)("Test caused exception: " + e.getMessage()));
        }
        this.outList.clear();
    }

    @Test
    public void testCancelReduceTaskWhileSorting() {
        this.addDriverComparator(this.comparator);
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        final GroupReduceDriver testTask = new GroupReduceDriver();
        try {
            this.addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator.duplicate());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail();
        }
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    ReduceTaskTest.this.testDriver((Driver)testTask, MockReduceStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
        Assert.assertTrue((String)"Test threw an exception even though it was properly canceled.", (boolean)success.get());
    }

    @Test
    public void testCancelReduceTaskWhileReducing() {
        int keyCnt = 1000;
        int valCnt = 2;
        this.addInput(new UniformRecordGenerator(1000, 2, true));
        this.addDriverComparator(this.comparator);
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        final GroupReduceDriver testTask = new GroupReduceDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    ReduceTaskTest.this.testDriver((Driver)testTask, MockDelayingReduceStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(2, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
    }

    public static class MockDelayingReduceStub
    extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            for (Record r : records) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    public static class MockFailingReduceStub
    extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int valCnt = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                ++valCnt;
            }
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(valCnt - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }
    }

    public static class MockCombiningReduceStub
    implements GroupReduceFunction<Record, Record>,
    GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private final IntValue combineValue = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                element.getField(1, (Value)this.value);
                sum += this.value.getValue();
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(sum - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }

        public void combine(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                element.getField(1, (Value)this.combineValue);
                sum += this.combineValue.getValue();
            }
            this.combineValue.setValue(sum);
            element.setField(1, (Value)this.combineValue);
            out.collect((Object)element);
        }
    }

    public static class MockReduceStub
    extends RichGroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int cnt = 0;
            Iterator<Record> i$ = records.iterator();
            while (i$.hasNext()) {
                Record next;
                element = next = i$.next();
                ++cnt;
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(cnt - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }
    }
}

