package org.apache.hadoop.mapreduce.v2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SimpleExponentialTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
@Ignore
/* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster.class */
public class TestSpeculativeExecOnCluster {
    private static final int NODE_MANAGERS_COUNT = 2;
    private static final boolean ENABLE_SPECULATIVE_MAP = true;
    private static final boolean ENABLE_SPECULATIVE_REDUCE = true;
    private static final int NUM_MAP_DEFAULT = 16;
    private static final int NUM_REDUCE_DEFAULT = 8;
    private static final int MAP_SLEEP_TIME_DEFAULT = 60000;
    private static final int REDUCE_SLEEP_TIME_DEFAULT = 10000;
    private static final int MAP_SLEEP_COUNT_DEFAULT = 10000;
    private static final int REDUCE_SLEEP_COUNT_DEFAULT = 1000;
    private static final String MAP_SLEEP_COUNT = "mapreduce.sleepjob.map.sleep.count";
    private static final String REDUCE_SLEEP_COUNT = "mapreduce.sleepjob.reduce.sleep.count";
    private static final String MAP_SLEEP_TIME = "mapreduce.sleepjob.map.sleep.time";
    private static final String REDUCE_SLEEP_TIME = "mapreduce.sleepjob.reduce.sleep.time";
    private static final String MAP_SLEEP_CALCULATOR_TYPE = "mapreduce.sleepjob.map.sleep.time.calculator";
    private static final String MAP_SLEEP_CALCULATOR_TYPE_DEFAULT = "normal_run";
    private static FileSystem localFs;
    private static final Path TEST_ROOT_DIR;
    private static final Path APP_JAR;
    private static final Path TEST_OUT_DIR;
    private MiniMRYarnCluster mrCluster;
    private int myNumMapper;
    private int myNumReduce;
    private int myMapSleepTime;
    private int myReduceSleepTime;
    private int myMapSleepCount;
    private int myReduceSleepCount;
    private String chosenSleepCalc;
    private Class<?> estimatorClass;
    private List<String> ignoredTests;
    private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeExecOnCluster.class);
    private static Map<String, SleepDurationCalculator> mapSleepTypeMapper = new HashMap();

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$DynamicSleepDurationCalcImpl.class */
    public static class DynamicSleepDurationCalcImpl implements SleepDurationCalculator {
        private double[] thresholds = {0.1d, 0.25d, 0.4d, 0.5d, 0.6d, 0.65d, 0.7d, 0.8d, 0.9d};
        private double[] slowFactors = {2.0d, 4.0d, 5.0d, 6.0d, 10.0d, 15.0d, 20.0d, 25.0d, 30.0d};

        DynamicSleepDurationCalcImpl() {
        }

        @Override // org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SleepDurationCalculator
        public long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j) {
            if (taskAttemptID.getTaskType() != TaskType.MAP || taskAttemptID.getTaskID().getId() != 0 || taskAttemptID.getId() != 0) {
                return j;
            }
            double d = i / i2;
            double d2 = 1.0d;
            for (int i3 = 0; i3 < this.thresholds.length && this.thresholds[i3] < d; i3++) {
                d2 = this.slowFactors[i3];
            }
            return (long) (d2 * j);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$EmptySplit.class */
    public static class EmptySplit extends InputSplit implements Writable {
        public void write(DataOutput dataOutput) throws IOException {
        }

        public void readFields(DataInput dataInput) throws IOException {
        }

        public long getLength() {
            return 0L;
        }

        public String[] getLocations() {
            return new String[0];
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$EstimatorMetricsPair.class */
    class EstimatorMetricsPair {
        private Class<?> estimatorClass;
        private int expectedMapTasks;
        private int expectedReduceTasks;
        private boolean speculativeEstimator;

        EstimatorMetricsPair(Class<?> cls, int i, int i2, boolean z) {
            this.estimatorClass = cls;
            this.expectedMapTasks = i;
            this.expectedReduceTasks = i2;
            this.speculativeEstimator = z;
        }

        boolean didSpeculate(Counters counters) {
            return counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue() > ((long) this.expectedMapTasks) || counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue() > ((long) this.expectedReduceTasks);
        }

        String getErrorMessage(Counters counters) {
            String str = "Unexpected tasks running estimator " + this.estimatorClass.getName() + "\n\t";
            long value = counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
            long value2 = counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue();
            if (this.speculativeEstimator) {
                if (value < this.expectedMapTasks) {
                    str = str + "maps " + value + ", expected: " + this.expectedMapTasks;
                }
                if (value2 < this.expectedReduceTasks) {
                    str = str + ", reduces " + value2 + ", expected: " + this.expectedReduceTasks;
                }
            } else {
                if (value > this.expectedMapTasks) {
                    str = str + "maps " + value + ", expected: " + this.expectedMapTasks;
                }
                if (value2 > this.expectedReduceTasks) {
                    str = str + ", reduces " + value2 + ", expected: " + this.expectedReduceTasks;
                }
            }
            return str;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SleepDurationCalcImpl.class */
    public static class SleepDurationCalcImpl implements SleepDurationCalculator {
        private double threshold = 1.0d;
        private double slowFactor = 1.0d;

        SleepDurationCalcImpl() {
        }

        @Override // org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SleepDurationCalculator
        public long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j) {
            return this.threshold <= ((double) i) / ((double) i2) ? (long) (this.slowFactor * j) : j;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SleepDurationCalculator.class */
    public interface SleepDurationCalculator {
        long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j);
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SlowingSleepDurationCalcImpl.class */
    public static class SlowingSleepDurationCalcImpl implements SleepDurationCalculator {
        private double threshold = 0.4d;
        private double slowFactor = 1.2d;

        SlowingSleepDurationCalcImpl() {
        }

        @Override // org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SleepDurationCalculator
        public long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j) {
            return (taskAttemptID.getTaskType() == TaskType.MAP && taskAttemptID.getTaskID().getId() == 0 && taskAttemptID.getId() == 0 && this.threshold <= ((double) i) / ((double) i2)) ? (long) (this.slowFactor * j) : j;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SpeculativeSleepInputFormat.class */
    public static class SpeculativeSleepInputFormat extends InputFormat<IntWritable, IntWritable> {
        public List<InputSplit> getSplits(JobContext jobContext) {
            ArrayList arrayList = new ArrayList();
            int i = jobContext.getConfiguration().getInt("mapreduce.job.maps", 1);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(new EmptySplit());
            }
            return arrayList;
        }

        public RecordReader<IntWritable, IntWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
            Configuration configuration = taskAttemptContext.getConfiguration();
            final int i = configuration.getInt(TestSpeculativeExecOnCluster.MAP_SLEEP_COUNT, 10000);
            if (i < 0) {
                throw new IOException("Invalid map count: " + i);
            }
            int i2 = configuration.getInt(TestSpeculativeExecOnCluster.REDUCE_SLEEP_COUNT, TestSpeculativeExecOnCluster.REDUCE_SLEEP_COUNT_DEFAULT);
            if (i2 < 0) {
                throw new IOException("Invalid reduce count: " + i2);
            }
            final int numReduceTasks = i2 * taskAttemptContext.getNumReduceTasks();
            return new RecordReader<IntWritable, IntWritable>() { // from class: org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SpeculativeSleepInputFormat.1
                private int records = 0;
                private int emitCount = 0;
                private IntWritable key = null;
                private IntWritable value = null;

                public void initialize(InputSplit inputSplit2, TaskAttemptContext taskAttemptContext2) {
                }

                public boolean nextKeyValue() throws IOException {
                    if (i == 0) {
                        return false;
                    }
                    this.key = new IntWritable();
                    this.key.set(this.emitCount);
                    int i3 = numReduceTasks / i;
                    if (numReduceTasks % i > this.records) {
                        i3++;
                    }
                    this.emitCount += i3;
                    this.value = new IntWritable();
                    this.value.set(i3);
                    int i4 = this.records;
                    this.records = i4 + 1;
                    return i4 < i;
                }

                /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
                public IntWritable m273getCurrentKey() {
                    return this.key;
                }

                /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
                public IntWritable m272getCurrentValue() {
                    return this.value;
                }

                public void close() throws IOException {
                }

                public float getProgress() throws IOException {
                    if (i == 0) {
                        return 100.0f;
                    }
                    return this.records / i;
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SpeculativeSleepJobPartitioner.class */
    public static class SpeculativeSleepJobPartitioner extends Partitioner<IntWritable, NullWritable> {
        public int getPartition(IntWritable intWritable, NullWritable nullWritable, int i) {
            return intWritable.get() % i;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SpeculativeSleepMapper.class */
    public static class SpeculativeSleepMapper extends Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
        private long mapSleepDuration = 60000;
        private int mapSleepCount = 1;
        private int count = 0;
        private SleepDurationCalculator sleepCalc = new SleepDurationCalcImpl();

        protected void setup(Mapper<IntWritable, IntWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            Configuration configuration = context.getConfiguration();
            this.mapSleepCount = configuration.getInt(TestSpeculativeExecOnCluster.MAP_SLEEP_COUNT, this.mapSleepCount);
            this.mapSleepDuration = this.mapSleepCount == 0 ? 0L : configuration.getLong(TestSpeculativeExecOnCluster.MAP_SLEEP_TIME, 60000L) / this.mapSleepCount;
            this.sleepCalc = (SleepDurationCalculator) TestSpeculativeExecOnCluster.mapSleepTypeMapper.get(configuration.get(TestSpeculativeExecOnCluster.MAP_SLEEP_CALCULATOR_TYPE, TestSpeculativeExecOnCluster.MAP_SLEEP_CALCULATOR_TYPE_DEFAULT));
        }

        public void map(IntWritable intWritable, IntWritable intWritable2, Mapper<IntWritable, IntWritable, IntWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            try {
                context.setStatus("Sleeping... (" + (this.mapSleepDuration * (this.mapSleepCount - this.count)) + ") ms left");
                Thread.sleep(this.sleepCalc.calcSleepDuration(context.getTaskAttemptID(), this.count, this.mapSleepCount, this.mapSleepDuration));
                this.count++;
                int i = intWritable.get();
                for (int i2 = 0; i2 < intWritable2.get(); i2++) {
                    context.write(new IntWritable(i + i2), NullWritable.get());
                }
            } catch (InterruptedException e) {
                throw ((IOException) new IOException("Interrupted while sleeping").initCause(e));
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((IntWritable) obj, (IntWritable) obj2, (Mapper<IntWritable, IntWritable, IntWritable, NullWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$SpeculativeSleepReducer.class */
    public static class SpeculativeSleepReducer extends Reducer<IntWritable, NullWritable, NullWritable, NullWritable> {
        private long reduceSleepDuration = 10000;
        private int reduceSleepCount = 1;
        private int count = 0;

        protected void setup(Reducer<IntWritable, NullWritable, NullWritable, NullWritable>.Context context) throws IOException, InterruptedException {
            Configuration configuration = context.getConfiguration();
            this.reduceSleepCount = configuration.getInt(TestSpeculativeExecOnCluster.REDUCE_SLEEP_COUNT, this.reduceSleepCount);
            this.reduceSleepDuration = this.reduceSleepCount == 0 ? 0L : configuration.getLong(TestSpeculativeExecOnCluster.REDUCE_SLEEP_TIME, 10000L) / this.reduceSleepCount;
        }

        public void reduce(IntWritable intWritable, Iterable<NullWritable> iterable, Reducer<IntWritable, NullWritable, NullWritable, NullWritable>.Context context) throws IOException {
            try {
                context.setStatus("Sleeping... (" + (this.reduceSleepDuration * (this.reduceSleepCount - this.count)) + ") ms left");
                Thread.sleep(this.reduceSleepDuration);
                this.count++;
            } catch (InterruptedException e) {
                throw ((IOException) new IOException("Interrupted while sleeping").initCause(e));
            }
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((IntWritable) obj, (Iterable<NullWritable>) iterable, (Reducer<IntWritable, NullWritable, NullWritable, NullWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$StalledSleepDurationCalcImpl.class */
    public static class StalledSleepDurationCalcImpl implements SleepDurationCalculator {
        StalledSleepDurationCalcImpl() {
        }

        @Override // org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SleepDurationCalculator
        public long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j) {
            return (taskAttemptID.getTaskType() == TaskType.MAP && taskAttemptID.getTaskID().getId() == 0 && taskAttemptID.getId() == 0) ? 1000 * j : j;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapreduce/v2/TestSpeculativeExecOnCluster$StepStalledSleepDurationCalcImpl.class */
    public static class StepStalledSleepDurationCalcImpl implements SleepDurationCalculator {
        private double threshold = 0.4d;
        private double slowFactor = 10000.0d;

        StepStalledSleepDurationCalcImpl() {
        }

        @Override // org.apache.hadoop.mapreduce.v2.TestSpeculativeExecOnCluster.SleepDurationCalculator
        public long calcSleepDuration(TaskAttemptID taskAttemptID, int i, int i2, long j) {
            return (taskAttemptID.getTaskType() == TaskType.MAP && taskAttemptID.getTaskID().getId() == 0 && taskAttemptID.getId() == 0 && this.threshold <= ((double) i) / ((double) i2)) ? (long) (this.slowFactor * j) : j;
        }
    }

    @Parameterized.Parameters(name = "{index}: TaskEstimator(EstimatorClass {0})")
    public static Collection<Object[]> getTestParameters() {
        List asList = Arrays.asList("stalled_run", "slowing_run", "step_stalled_run");
        return Arrays.asList(new Object[]{SimpleExponentialTaskRuntimeEstimator.class, asList, 16, 8}, new Object[]{LegacyTaskRuntimeEstimator.class, asList, 16, 8});
    }

    public TestSpeculativeExecOnCluster(Class<? extends TaskRuntimeEstimator> cls, List<String> list, Integer num, Integer num2) {
        this.ignoredTests = list;
        this.estimatorClass = cls;
        this.myNumMapper = num.intValue();
        this.myNumReduce = num2.intValue();
    }

    @Before
    public void setup() throws IOException {
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
            return;
        }
        if (this.mrCluster == null) {
            this.mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 2);
            this.mrCluster.init(new Configuration());
            this.mrCluster.start();
        }
        localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
        localFs.setPermission(APP_JAR, new FsPermission("700"));
        this.myMapSleepTime = MAP_SLEEP_TIME_DEFAULT;
        this.myReduceSleepTime = 10000;
        this.myMapSleepCount = 10000;
        this.myReduceSleepCount = REDUCE_SLEEP_COUNT_DEFAULT;
        this.chosenSleepCalc = MAP_SLEEP_CALCULATOR_TYPE_DEFAULT;
    }

    @After
    public void tearDown() {
        if (this.mrCluster != null) {
            this.mrCluster.stop();
            this.mrCluster = null;
        }
    }

    @Test
    public void testExecDynamicSlowingSpeculative() throws Exception {
        this.chosenSleepCalc = "dynamic_slowing_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair estimatorMetricsPair : new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (this.estimatorClass.equals(estimatorMetricsPair.estimatorClass)) {
                LOG.info("+++ Dynamic Slow Progress testing against " + this.estimatorClass.getName() + " +++");
                Job runSpecTest = runSpecTest();
                Assert.assertTrue("Job expected to succeed with estimator " + this.estimatorClass.getName(), runSpecTest.waitForCompletion(true));
                Assert.assertEquals("Job expected to succeed with estimator " + this.estimatorClass.getName(), JobStatus.State.SUCCEEDED, runSpecTest.getJobState());
                Counters counters = runSpecTest.getCounters();
                Assert.assertEquals(estimatorMetricsPair.getErrorMessage(counters), Boolean.valueOf(estimatorMetricsPair.didSpeculate(counters)), Boolean.valueOf(estimatorMetricsPair.speculativeEstimator));
                Assert.assertEquals("Failed maps higher than 0 " + this.estimatorClass.getName(), 0L, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
            }
        }
    }

    @Test
    public void testExecSlowNonSpeculative() throws Exception {
        this.chosenSleepCalc = "slowing_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair estimatorMetricsPair : new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, false), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (this.estimatorClass.equals(estimatorMetricsPair.estimatorClass)) {
                LOG.info("+++ Linear Slow Progress Non Speculative testing against " + this.estimatorClass.getName() + " +++");
                Job runSpecTest = runSpecTest();
                Assert.assertTrue("Job expected to succeed with estimator " + this.estimatorClass.getName(), runSpecTest.waitForCompletion(true));
                Assert.assertEquals("Job expected to succeed with estimator " + this.estimatorClass.getName(), JobStatus.State.SUCCEEDED, runSpecTest.getJobState());
                Counters counters = runSpecTest.getCounters();
                Assert.assertEquals(estimatorMetricsPair.getErrorMessage(counters), Boolean.valueOf(estimatorMetricsPair.didSpeculate(counters)), Boolean.valueOf(estimatorMetricsPair.speculativeEstimator));
                Assert.assertEquals("Failed maps higher than 0 " + this.estimatorClass.getName(), 0L, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
            }
        }
    }

    @Test
    public void testExecStepStalledSpeculative() throws Exception {
        this.chosenSleepCalc = "step_stalled_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair estimatorMetricsPair : new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (this.estimatorClass.equals(estimatorMetricsPair.estimatorClass)) {
                LOG.info("+++ Stalled Progress testing against " + this.estimatorClass.getName() + " +++");
                Job runSpecTest = runSpecTest();
                Assert.assertTrue("Job expected to succeed with estimator " + this.estimatorClass.getName(), runSpecTest.waitForCompletion(true));
                Assert.assertEquals("Job expected to succeed with estimator " + this.estimatorClass.getName(), JobStatus.State.SUCCEEDED, runSpecTest.getJobState());
                Counters counters = runSpecTest.getCounters();
                Assert.assertEquals(estimatorMetricsPair.getErrorMessage(counters), Boolean.valueOf(estimatorMetricsPair.didSpeculate(counters)), Boolean.valueOf(estimatorMetricsPair.speculativeEstimator));
                Assert.assertEquals("Failed maps higher than 0 " + this.estimatorClass.getName(), 0L, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
            }
        }
    }

    @Test
    public void testExecStalledSpeculative() throws Exception {
        this.chosenSleepCalc = "stalled_run";
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair estimatorMetricsPair : new EstimatorMetricsPair[]{new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (this.estimatorClass.equals(estimatorMetricsPair.estimatorClass)) {
                LOG.info("+++ Stalled Progress testing against " + this.estimatorClass.getName() + " +++");
                Job runSpecTest = runSpecTest();
                Assert.assertTrue("Job expected to succeed with estimator " + this.estimatorClass.getName(), runSpecTest.waitForCompletion(true));
                Assert.assertEquals("Job expected to succeed with estimator " + this.estimatorClass.getName(), JobStatus.State.SUCCEEDED, runSpecTest.getJobState());
                Counters counters = runSpecTest.getCounters();
                Assert.assertEquals(estimatorMetricsPair.getErrorMessage(counters), Boolean.valueOf(estimatorMetricsPair.didSpeculate(counters)), Boolean.valueOf(estimatorMetricsPair.speculativeEstimator));
                Assert.assertEquals("Failed maps higher than 0 " + this.estimatorClass.getName(), 0L, counters.findCounter(JobCounter.NUM_FAILED_MAPS).getValue());
            }
        }
    }

    @Test
    public void testExecNonSpeculative() throws Exception {
        if (!new File(MiniMRYarnCluster.APPJAR).exists()) {
            LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
            return;
        }
        if (this.ignoredTests.contains(this.chosenSleepCalc)) {
            return;
        }
        for (EstimatorMetricsPair estimatorMetricsPair : new EstimatorMetricsPair[]{new EstimatorMetricsPair(LegacyTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true), new EstimatorMetricsPair(SimpleExponentialTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, false), new EstimatorMetricsPair(ExponentiallySmoothedTaskRuntimeEstimator.class, this.myNumMapper, this.myNumReduce, true)}) {
            if (this.estimatorClass.equals(estimatorMetricsPair.estimatorClass)) {
                LOG.info("+++ No Speculation testing against " + this.estimatorClass.getName() + " +++");
                Job runSpecTest = runSpecTest();
                Assert.assertTrue("Job expected to succeed with estimator " + this.estimatorClass.getName(), runSpecTest.waitForCompletion(true));
                Assert.assertEquals("Job expected to succeed with estimator " + this.estimatorClass.getName(), JobStatus.State.SUCCEEDED, runSpecTest.getJobState());
                Counters counters = runSpecTest.getCounters();
                Assert.assertEquals(estimatorMetricsPair.getErrorMessage(counters), Boolean.valueOf(estimatorMetricsPair.didSpeculate(counters)), Boolean.valueOf(estimatorMetricsPair.speculativeEstimator));
            }
        }
    }

    private Job runSpecTest() throws IOException, ClassNotFoundException, InterruptedException {
        Configuration config = this.mrCluster.getConfig();
        config.setBoolean("mapreduce.map.speculative", true);
        config.setBoolean("mapreduce.reduce.speculative", true);
        config.setClass("yarn.app.mapreduce.am.job.task.estimator.class", this.estimatorClass, TaskRuntimeEstimator.class);
        config.setLong(MAP_SLEEP_TIME, this.myMapSleepTime);
        config.setLong(REDUCE_SLEEP_TIME, this.myReduceSleepTime);
        config.setInt(MAP_SLEEP_COUNT, this.myMapSleepCount);
        config.setInt(REDUCE_SLEEP_COUNT, this.myReduceSleepCount);
        config.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 1.0f);
        config.setInt("mapreduce.job.maps", this.myNumMapper);
        config.set(MAP_SLEEP_CALCULATOR_TYPE, this.chosenSleepCalc);
        Job job = Job.getInstance(config);
        job.setJarByClass(TestSpeculativeExecution.class);
        job.setMapperClass(SpeculativeSleepMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(SpeculativeSleepReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        job.setInputFormatClass(SpeculativeSleepInputFormat.class);
        job.setPartitionerClass(SpeculativeSleepJobPartitioner.class);
        job.setNumReduceTasks(this.myNumReduce);
        FileInputFormat.addInputPath(job, new Path("ignored"));
        try {
            localFs.delete(TEST_OUT_DIR, true);
        } catch (IOException e) {
        }
        FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
        job.addFileToClassPath(APP_JAR);
        job.setMaxMapAttempts(2);
        job.submit();
        return job;
    }

    static {
        mapSleepTypeMapper.put(MAP_SLEEP_CALCULATOR_TYPE_DEFAULT, new SleepDurationCalcImpl());
        mapSleepTypeMapper.put("stalled_run", new StalledSleepDurationCalcImpl());
        mapSleepTypeMapper.put("slowing_run", new SlowingSleepDurationCalcImpl());
        mapSleepTypeMapper.put("dynamic_slowing_run", new DynamicSleepDurationCalcImpl());
        mapSleepTypeMapper.put("step_stalled_run", new StepStalledSleepDurationCalcImpl());
        try {
            localFs = FileSystem.getLocal(new Configuration());
            TEST_ROOT_DIR = new Path("target", TestSpeculativeExecOnCluster.class.getName() + "-tmpDir").makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
            APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
            TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
        } catch (IOException e) {
            throw new RuntimeException("problem getting local fs", e);
        }
    }
}
