/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobQueueTaskScheduler;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskScheduler;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskTrackerStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;

public class TestClusterStatus
extends TestCase {
    private static String[] trackers = new String[]{"tracker_tracker1:1000", "tracker_tracker2:1000", "tracker_tracker3:1000"};
    private static JobTracker jobTracker;
    private static int mapSlotsPerTracker;
    private static int reduceSlotsPerTracker;
    private static MiniMRCluster mr;
    private static JobClient client;
    private static short responseId;
    private static FakeJobInProgress fakeJob;
    private static FakeTaskScheduler scheduler;

    public static Test suite() {
        TestSetup setup = new TestSetup((Test)new TestSuite(TestClusterStatus.class)){

            protected void setUp() throws Exception {
                JobConf conf = new JobConf();
                conf.setClass("mapred.jobtracker.taskScheduler", FakeTaskScheduler.class, TaskScheduler.class);
                mr = new MiniMRCluster(0, 0, 0, "file:///", 1, null, null, null, conf);
                jobTracker = mr.getJobTrackerRunner().getJobTracker();
                for (String tracker : trackers) {
                    TestClusterStatus.establishFirstContact(jobTracker, tracker);
                }
                client = new JobClient(mr.createJobConf());
            }

            protected void tearDown() throws Exception {
                client.close();
                mr.shutdown();
            }
        };
        return setup;
    }

    static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, String tracker, short responseId) throws IOException {
        if (status == null) {
            status = new TaskTrackerStatus(tracker, JobInProgress.convertTrackerNameToHostName((String)tracker));
        }
        jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
        responseId = (short)(responseId + 1);
        return responseId;
    }

    static void establishFirstContact(JobTracker jt, String tracker) throws IOException {
        TestClusterStatus.sendHeartBeat(jt, null, true, false, tracker, (short)0);
    }

    private TaskTrackerStatus getTTStatus(String trackerName, List<TaskStatus> taskStatuses) {
        return new TaskTrackerStatus(trackerName, JobInProgress.convertTrackerNameToHostName((String)trackerName), 0, taskStatuses, 0, mapSlotsPerTracker, reduceSlotsPerTracker);
    }

    public void testClusterMetrics() throws IOException, InterruptedException {
        TestClusterStatus.assertEquals((String)"tasktracker count doesn't match", (int)trackers.length, (int)client.getClusterStatus().getTaskTrackers());
        ArrayList<TaskStatus> list = new ArrayList<TaskStatus>();
        int mapSlotsPerTask = 2;
        this.addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.RUNNING);
        int reduceSlotsPerTask = 1;
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.RUNNING);
        this.sendHeartbeats(list);
        ClusterMetrics metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((String)"occupied map slots do not match", (int)mapSlotsPerTask, (int)metrics.getOccupiedMapSlots());
        TestClusterStatus.assertEquals((String)"occupied reduce slots do not match", (int)reduceSlotsPerTask, (int)metrics.getOccupiedReduceSlots());
        TestClusterStatus.assertEquals((String)"map slot capacities do not match", (int)(mapSlotsPerTracker * trackers.length), (int)metrics.getMapSlotCapacity());
        TestClusterStatus.assertEquals((String)"reduce slot capacities do not match", (int)(reduceSlotsPerTracker * trackers.length), (int)metrics.getReduceSlotCapacity());
        TestClusterStatus.assertEquals((String)"running map tasks do not match", (int)1, (int)metrics.getRunningMaps());
        TestClusterStatus.assertEquals((String)"running reduce tasks do not match", (int)1, (int)metrics.getRunningReduces());
        ClusterStatus stat = client.getClusterStatus();
        TestClusterStatus.assertEquals((String)"running map tasks do not match", (int)1, (int)stat.getMapTasks());
        TestClusterStatus.assertEquals((String)"running reduce tasks do not match", (int)1, (int)stat.getReduceTasks());
        TestClusterStatus.assertEquals((String)"map slot capacities do not match", (int)(mapSlotsPerTracker * trackers.length), (int)stat.getMaxMapTasks());
        TestClusterStatus.assertEquals((String)"reduce slot capacities do not match", (int)(reduceSlotsPerTracker * trackers.length), (int)stat.getMaxReduceTasks());
        list.clear();
        this.addMapTaskAttemptToList(list, mapSlotsPerTask, TaskStatus.State.SUCCEEDED);
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.RUNNING);
        this.sendHeartbeats(list);
        metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((int)0, (int)metrics.getOccupiedMapSlots());
        TestClusterStatus.assertEquals((int)reduceSlotsPerTask, (int)metrics.getOccupiedReduceSlots());
        list.clear();
        this.addReduceTaskAttemptToList(list, reduceSlotsPerTask, TaskStatus.State.SUCCEEDED);
        this.sendHeartbeats(list);
        metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((int)0, (int)metrics.getOccupiedReduceSlots());
    }

    private void sendHeartbeats(List<TaskStatus> list) throws IOException {
        TaskTrackerStatus[] status = new TaskTrackerStatus[trackers.length];
        status[0] = this.getTTStatus(trackers[0], list);
        status[1] = this.getTTStatus(trackers[1], new ArrayList<TaskStatus>());
        status[2] = this.getTTStatus(trackers[2], new ArrayList<TaskStatus>());
        for (int i = 0; i < trackers.length; ++i) {
            TestClusterStatus.sendHeartBeat(jobTracker, status[i], false, false, trackers[i], responseId);
        }
        responseId = (short)(responseId + 1);
    }

    private void addReduceTaskAttemptToList(List<TaskStatus> list, int reduceSlotsPerTask, TaskStatus.State state) {
        TaskStatus ts = TaskStatus.createTaskStatus((boolean)false, (TaskAttemptID)new TaskAttemptID("jt", 1, false, 0, 0), (float)0.0f, (int)reduceSlotsPerTask, (TaskStatus.State)state, (String)"", (String)"", (String)trackers[0], (TaskStatus.Phase)TaskStatus.Phase.REDUCE, null);
        list.add(ts);
    }

    private void addMapTaskAttemptToList(List<TaskStatus> list, int mapSlotsPerTask, TaskStatus.State state) {
        TaskStatus ts = TaskStatus.createTaskStatus((boolean)true, (TaskAttemptID)new TaskAttemptID("jt", 1, true, 0, 0), (float)0.0f, (int)mapSlotsPerTask, (TaskStatus.State)state, (String)"", (String)"", (String)trackers[0], (TaskStatus.Phase)TaskStatus.Phase.MAP, null);
        list.add(ts);
    }

    public void testReservedSlots() throws IOException {
        JobConf conf = mr.createJobConf();
        conf.setNumReduceTasks(1);
        conf.setSpeculativeExecution(false);
        TaskTracker tt1 = jobTracker.getTaskTracker(trackers[0]);
        TaskTracker tt2 = jobTracker.getTaskTracker(trackers[1]);
        TaskTrackerStatus status1 = new TaskTrackerStatus(trackers[0], JobInProgress.convertTrackerNameToHostName((String)trackers[0]), 0, new ArrayList(), 0, 2, 2);
        TaskTrackerStatus status2 = new TaskTrackerStatus(trackers[1], JobInProgress.convertTrackerNameToHostName((String)trackers[1]), 0, new ArrayList(), 0, 2, 2);
        tt1.setStatus(status1);
        tt2.setStatus(status2);
        fakeJob = new FakeJobInProgress(new JobID("jt", 1), new JobConf((Configuration)conf), jobTracker);
        TestClusterStatus.sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
        TestClusterStatus.sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
        responseId = (short)(responseId + 1);
        ClusterMetrics metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((String)"reserved map slots do not match", (int)2, (int)metrics.getReservedMapSlots());
        TestClusterStatus.assertEquals((String)"reserved reduce slots do not match", (int)2, (int)metrics.getReservedReduceSlots());
        TestClusterStatus.sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
        TestClusterStatus.sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
        responseId = (short)(responseId + 1);
        metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((String)"reserved map slots do not match", (int)4, (int)metrics.getReservedMapSlots());
        TestClusterStatus.assertEquals((String)"reserved reduce slots do not match", (int)4, (int)metrics.getReservedReduceSlots());
        scheduler.setUnreserveSlots(true);
        TestClusterStatus.sendHeartBeat(jobTracker, status1, false, true, trackers[0], responseId);
        TestClusterStatus.sendHeartBeat(jobTracker, status2, false, true, trackers[1], responseId);
        responseId = (short)(responseId + 1);
        metrics = jobTracker.getClusterMetrics();
        TestClusterStatus.assertEquals((String)"map slots should have been unreserved", (int)0, (int)metrics.getReservedMapSlots());
        TestClusterStatus.assertEquals((String)"reduce slots should have been unreserved", (int)0, (int)metrics.getReservedReduceSlots());
    }

    public void testClusterStatus() throws Exception {
        ClusterStatus clusterStatus = client.getClusterStatus();
        TestClusterStatus.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getUsedMemory());
        TestClusterStatus.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getMaxMemory());
        clusterStatus = client.getClusterStatus(false);
        TestClusterStatus.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getUsedMemory());
        TestClusterStatus.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory() + ", expected " + -1L), (long)-1L, (long)clusterStatus.getMaxMemory());
        clusterStatus = client.getClusterStatus(true);
        if (-1L == clusterStatus.getUsedMemory()) {
            TestClusterStatus.assertEquals((String)("JobTracker used-memory is " + clusterStatus.getUsedMemory()), (boolean)true, (boolean)false);
        }
        if (-1L == clusterStatus.getMaxMemory()) {
            TestClusterStatus.assertEquals((String)("JobTracker max-memory is " + clusterStatus.getMaxMemory()), (boolean)true, (boolean)false);
        }
    }

    static {
        mapSlotsPerTracker = 4;
        reduceSlotsPerTracker = 2;
        responseId = 1;
    }

    static class FakeJobInProgress
    extends JobInProgress {
        public FakeJobInProgress(JobID jId, JobConf jobConf, JobTracker jt) throws IOException {
            super(jId, jobConf, jt);
        }
    }

    static class FakeTaskScheduler
    extends JobQueueTaskScheduler {
        private Map<TaskTracker, Integer> reservedCounts = new HashMap<TaskTracker, Integer>();
        private boolean unreserveSlots;

        public FakeTaskScheduler() {
            scheduler = this;
        }

        void setUnreserveSlots(boolean shouldUnreserve) {
            this.unreserveSlots = shouldUnreserve;
        }

        public List<Task> assignTasks(TaskTracker tt) {
            if (this.unreserveSlots) {
                tt.unreserveSlots(TaskType.MAP, (JobInProgress)fakeJob);
                tt.unreserveSlots(TaskType.REDUCE, (JobInProgress)fakeJob);
            } else {
                int currCount = 1;
                if (this.reservedCounts.containsKey(tt)) {
                    currCount = this.reservedCounts.get(tt) + 1;
                }
                this.reservedCounts.put(tt, currCount);
                tt.reserveSlots(TaskType.MAP, (JobInProgress)fakeJob, currCount);
                tt.reserveSlots(TaskType.REDUCE, (JobInProgress)fakeJob, currCount);
            }
            return new ArrayList<Task>();
        }
    }
}

