/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.daemon.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
import org.apache.hadoop.hive.llap.daemon.SchedulerFragmentCompletingListener;
import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
import org.apache.hadoop.hive.llap.daemon.impl.QueryIdentifier;
import org.apache.hadoop.hive.llap.daemon.impl.Scheduler;
import org.apache.hadoop.hive.llap.daemon.impl.TaskRunnerCallable;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.runtime.task.EndReason;
import org.apache.tez.runtime.task.TaskRunner2Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorService
extends AbstractService
implements Scheduler<TaskRunnerCallable>,
SchedulerFragmentCompletingListener {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class);
    private static final boolean isInfoEnabled = LOG.isInfoEnabled();
    private static final boolean isDebugEnabled = LOG.isDebugEnabled();
    private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d";
    private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d";
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final ListeningExecutorService executorService;
    @VisibleForTesting
    final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
    private final ListeningExecutorService waitQueueExecutorService;
    private final ListeningExecutorService executionCompletionExecutorService;
    @VisibleForTesting
    final BlockingQueue<TaskWrapper> preemptionQueue;
    private final boolean enablePreemption;
    private final ThreadPoolExecutor threadPoolExecutor;
    private final AtomicInteger numSlotsAvailable;
    private final int maxParallelExecutors;
    private final Clock clock = new MonotonicClock();
    private final AtomicInteger runningFragmentCount = new AtomicInteger(0);
    @VisibleForTesting
    final ConcurrentMap<String, TaskWrapper> knownTasks = new ConcurrentHashMap<String, TaskWrapper>();
    private final Object lock = new Object();
    private final LlapDaemonExecutorMetrics metrics;
    private static final ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>(){

        @Override
        protected SimpleDateFormat initialValue() {
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        }
    };
    @VisibleForTesting
    final ConcurrentMap<String, FragmentCompletion> completingFragmentMap = new ConcurrentHashMap<String, FragmentCompletion>();

    public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption, ClassLoader classLoader, LlapDaemonExecutorMetrics metrics) {
        super(TaskExecutorService.class.getSimpleName());
        LOG.info("TaskExecutorService is being setup with parameters: numExecutors=" + numExecutors + ", waitQueueSize=" + waitQueueSize + ", waitQueueComparatorClassName=" + waitQueueComparatorClassName + ", enablePreemption=" + enablePreemption);
        Comparator<TaskWrapper> waitQueueComparator = this.createComparator(waitQueueComparatorClassName);
        this.maxParallelExecutors = numExecutors;
        this.waitQueue = new EvictingPriorityBlockingQueue<TaskWrapper>(waitQueueComparator, waitQueueSize);
        this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, numExecutors, 1L, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new ExecutorThreadFactory(classLoader));
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)this.threadPoolExecutor);
        this.preemptionQueue = new PriorityBlockingQueue<TaskWrapper>(numExecutors, new PreemptionQueueComparator());
        this.enablePreemption = enablePreemption;
        this.numSlotsAvailable = new AtomicInteger(numExecutors);
        this.metrics = metrics;
        if (metrics != null) {
            metrics.setNumExecutorsAvailable(this.numSlotsAvailable.get());
        }
        ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build());
        this.waitQueueExecutorService = MoreExecutors.listeningDecorator((ExecutorService)wes);
        ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d").build());
        this.executionCompletionExecutorService = MoreExecutors.listeningDecorator((ExecutorService)executionCompletionExecutorServiceRaw);
        ListenableFuture future = this.waitQueueExecutorService.submit((Runnable)new WaitQueueWorker());
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new WaitQueueWorkerCallback());
    }

    private Comparator<TaskWrapper> createComparator(String waitQueueComparatorClassName) {
        Comparator waitQueueComparator;
        try {
            Class<?> waitQueueComparatorClazz = Class.forName(waitQueueComparatorClassName);
            Constructor<?> ctor = waitQueueComparatorClazz.getConstructor(null);
            waitQueueComparator = (Comparator)ctor.newInstance(null);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e);
        }
        catch (NoSuchMethodException e) {
            throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" + waitQueueComparatorClassName, e);
        }
        catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new RuntimeException("Failed to find instantiate wait queue comparator, class=" + waitQueueComparatorClassName, e);
        }
        return waitQueueComparator;
    }

    public void serviceStop() {
        this.shutDown(false);
    }

    @Override
    public int getNumActive() {
        int result = 0;
        for (Map.Entry e : this.knownTasks.entrySet()) {
            TaskRunnerCallable c;
            TaskWrapper task = (TaskWrapper)e.getValue();
            if (task.isInWaitQueue() || (c = task.getTaskRunnerCallable()) == null || c.getStartTime() == 0L) continue;
            ++result;
        }
        return result;
    }

    @Override
    public Set<String> getExecutorsStatus() {
        HashSet<String> result = new HashSet<String>();
        StringBuilder value = new StringBuilder();
        for (Map.Entry e : this.knownTasks.entrySet()) {
            value.setLength(0);
            value.append((String)e.getKey());
            TaskWrapper task = (TaskWrapper)e.getValue();
            boolean isFirst = true;
            TaskRunnerCallable c = task.getTaskRunnerCallable();
            if (c != null && c.getVertexSpec() != null) {
                LlapDaemonProtocolProtos.SignableVertexSpec fs = c.getVertexSpec();
                value.append(isFirst ? " (" : ", ").append(fs.getDagName()).append("/").append(fs.getVertexName());
                isFirst = false;
            }
            value.append(isFirst ? " (" : ", ");
            if (task.isInWaitQueue()) {
                value.append("in queue");
            } else if (c != null) {
                long startTime = c.getStartTime();
                if (startTime != 0L) {
                    value.append("started at ").append(sdf.get().format(new Date(startTime)));
                } else {
                    value.append("not started");
                }
            } else {
                value.append("has no callable");
            }
            if (task.isInPreemptionQueue()) {
                value.append(", ").append("preemptable");
            }
            value.append(")");
            result.add(value.toString());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Scheduler.SubmissionState schedule(TaskRunnerCallable task) {
        Scheduler.SubmissionState result;
        TaskWrapper evictedTask;
        TaskWrapper taskWrapper = new TaskWrapper(task, this);
        Object object = this.lock;
        synchronized (object) {
            boolean stateChanged;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Offering to wait queue with: waitQueueSize={}, numSlotsAvailable={}, runningFragmentCount={} ", new Object[]{this.waitQueue.size(), this.numSlotsAvailable.get(), this.runningFragmentCount.get()});
            }
            boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
            evictedTask = this.waitQueue.offer(taskWrapper, this.maxParallelExecutors - this.runningFragmentCount.get());
            if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
                this.knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
                taskWrapper.setIsInWaitQueue(true);
                if (isDebugEnabled) {
                    LOG.debug("{} added to wait queue. Current wait queue size={}", (Object)task.getRequestId(), (Object)this.waitQueue.size());
                }
                Scheduler.SubmissionState submissionState = result = evictedTask == null ? Scheduler.SubmissionState.ACCEPTED : Scheduler.SubmissionState.EVICTED_OTHER;
                if (isDebugEnabled && evictedTask != null) {
                    LOG.debug("Eviction: {} {} {}", new Object[]{taskWrapper, result, evictedTask});
                }
            } else {
                if (isInfoEnabled) {
                    LOG.info("wait queue full, size={}. numSlotsAvailable={}, runningFragmentCount={}. {} not added", new Object[]{this.waitQueue.size(), this.numSlotsAvailable.get(), this.runningFragmentCount.get(), task.getRequestId()});
                }
                evictedTask.getTaskRunnerCallable().killTask();
                Scheduler.SubmissionState result2 = Scheduler.SubmissionState.REJECTED;
                if (isDebugEnabled) {
                    LOG.debug("{} is {} as wait queue is full", (Object)taskWrapper.getRequestId(), (Object)result2);
                }
                if (this.metrics != null) {
                    this.metrics.incrTotalRejectedRequests();
                }
                return result2;
            }
            boolean bl = stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
            if (stateChanged) {
                if (isDebugEnabled) {
                    LOG.debug("Finishable state of {} updated to {} during registration for state updates", (Object)taskWrapper.getRequestId(), (Object)(!canFinish ? 1 : 0));
                }
                this.finishableStateUpdated(taskWrapper, !canFinish);
            }
        }
        if (isDebugEnabled) {
            LOG.debug("Wait Queue: {}", this.waitQueue);
        }
        if (evictedTask != null) {
            if (isInfoEnabled) {
                LOG.info("{} evicted from wait queue in favor of {} because of lower priority", (Object)evictedTask.getRequestId(), (Object)task.getRequestId());
            }
            try {
                this.knownTasks.remove(evictedTask.getRequestId());
                evictedTask.maybeUnregisterForFinishedStateNotifications();
                evictedTask.setIsInWaitQueue(false);
            }
            finally {
                evictedTask.getTaskRunnerCallable().killTask();
            }
            if (this.metrics != null) {
                this.metrics.incrTotalEvictedFromWaitQueue();
            }
        }
        object = this.lock;
        synchronized (object) {
            this.lock.notify();
        }
        if (this.metrics != null) {
            this.metrics.setExecutorNumQueuedRequests(this.waitQueue.size());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public QueryIdentifier findQueryByFragment(String fragmentId) {
        Object object = this.lock;
        synchronized (object) {
            TaskWrapper taskWrapper = (TaskWrapper)this.knownTasks.get(fragmentId);
            return taskWrapper == null ? null : taskWrapper.getTaskRunnerCallable().getFragmentInfo().getQueryInfo().getQueryIdentifier();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void killFragment(String fragmentId) {
        Object object = this.lock;
        synchronized (object) {
            TaskWrapper taskWrapper = (TaskWrapper)this.knownTasks.remove(fragmentId);
            if (taskWrapper != null) {
                if (taskWrapper.isInWaitQueue()) {
                    if (isDebugEnabled) {
                        LOG.debug("Removing {} from waitQueue", (Object)fragmentId);
                    }
                    taskWrapper.setIsInWaitQueue(false);
                    if (this.waitQueue.remove(taskWrapper) && this.metrics != null) {
                        this.metrics.setExecutorNumQueuedRequests(this.waitQueue.size());
                    }
                }
                if (taskWrapper.isInPreemptionQueue()) {
                    if (isDebugEnabled) {
                        LOG.debug("Removing {} from preemptionQueue", (Object)fragmentId);
                    }
                    this.removeFromPreemptionQueue(taskWrapper);
                }
                taskWrapper.getTaskRunnerCallable().killTask();
            } else {
                LOG.info("Ignoring killFragment request for {} since it isn't known", (Object)fragmentId);
            }
            this.lock.notify();
        }
    }

    @Override
    public void fragmentCompleting(String fragmentId, SchedulerFragmentCompletingListener.State state) {
        int count = this.runningFragmentCount.decrementAndGet();
        if (count < 0) {
            LOG.warn("RunningFragmentCount went negative. Multiple calls for the same completion. Resetting to 0");
            this.runningFragmentCount.set(0);
        }
        this.completingFragmentMap.put(fragmentId, new FragmentCompletion(state, this.clock.getTime()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void trySchedule(TaskWrapper taskWrapper) throws RejectedExecutionException {
        Object object = this.lock;
        synchronized (object) {
            boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
            LOG.info("Attempting to execute {}", (Object)taskWrapper);
            ListenableFuture future = this.executorService.submit((Callable)((Object)taskWrapper.getTaskRunnerCallable()));
            this.runningFragmentCount.incrementAndGet();
            taskWrapper.setIsInWaitQueue(false);
            InternalCompletionListener wrappedCallback = this.createInternalCompletionListener(taskWrapper);
            Futures.addCallback((ListenableFuture)future, (FutureCallback)wrappedCallback, (Executor)this.executionCompletionExecutorService);
            if (isDebugEnabled) {
                LOG.debug("{} scheduled for execution. canFinish={}", (Object)taskWrapper.getRequestId(), (Object)canFinish);
            }
            if (this.enablePreemption && !canFinish) {
                if (isInfoEnabled) {
                    LOG.info("{} is not finishable. Adding it to pre-emption queue", (Object)taskWrapper.getRequestId());
                }
                this.addToPreemptionQueue(taskWrapper);
            }
        }
        this.numSlotsAvailable.decrementAndGet();
        if (this.metrics != null) {
            this.metrics.setNumExecutorsAvailable(this.numSlotsAvailable.get());
        }
    }

    private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
        if (this.enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !this.preemptionQueue.isEmpty()) {
            TaskWrapper pRequest;
            if (isDebugEnabled) {
                LOG.debug("Preemption Queue: " + this.preemptionQueue);
            }
            if ((pRequest = this.removeAndGetNextFromPreemptionQueue()) != null) {
                if (pRequest.getTaskRunnerCallable().canFinish()) {
                    LOG.info("Removed {} from preemption queue, but not preempting since it's now finishable", (Object)pRequest.getRequestId());
                } else {
                    if (isInfoEnabled) {
                        LOG.info("Invoking kill task for {} due to pre-emption to run {}", (Object)pRequest.getRequestId(), (Object)taskWrapper.getRequestId());
                    }
                    pRequest.getTaskRunnerCallable().killTask();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) {
        Object object = this.lock;
        synchronized (object) {
            if (taskWrapper.isInWaitQueue()) {
                LOG.debug("Re-ordering the wait queue since {} finishable state moved to {}", (Object)taskWrapper.getRequestId(), (Object)newFinishableState);
                boolean reInserted = this.waitQueue.reinsertIfExists(taskWrapper);
                if (!reInserted) {
                    LOG.warn("Failed to remove {} from waitQueue", (Object)taskWrapper.getTaskRunnerCallable().getRequestId());
                }
            }
            if (newFinishableState && taskWrapper.isInPreemptionQueue()) {
                LOG.debug("Removing {} from preemption queue because it's state changed to {}", (Object)taskWrapper.getRequestId(), (Object)newFinishableState);
                this.removeFromPreemptionQueue(taskWrapper);
            } else if (!(newFinishableState || taskWrapper.isInPreemptionQueue() || taskWrapper.isInWaitQueue())) {
                LOG.debug("Adding {} to preemption queue since finishable state changed to {}", (Object)taskWrapper.getRequestId(), (Object)newFinishableState);
                this.addToPreemptionQueue(taskWrapper);
            }
            this.lock.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToPreemptionQueue(TaskWrapper taskWrapper) {
        Object object = this.lock;
        synchronized (object) {
            boolean added = this.preemptionQueue.offer(taskWrapper);
            if (!added) {
                LOG.warn("Failed to add element {} to preemption queue. Terminating", (Object)taskWrapper);
                Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), new IllegalStateException("Preemption queue full. Cannot proceed"));
            }
            taskWrapper.setIsInPreemptableQueue(true);
            if (this.metrics != null) {
                this.metrics.setExecutorNumPreemptableRequests(this.preemptionQueue.size());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean removeFromPreemptionQueue(TaskWrapper taskWrapper) {
        Object object = this.lock;
        synchronized (object) {
            return this.removeFromPreemptionQueueUnlocked(taskWrapper);
        }
    }

    private boolean removeFromPreemptionQueueUnlocked(TaskWrapper taskWrapper) {
        boolean removed = this.preemptionQueue.remove(taskWrapper);
        taskWrapper.setIsInPreemptableQueue(false);
        if (this.metrics != null) {
            this.metrics.setExecutorNumPreemptableRequests(this.preemptionQueue.size());
        }
        return removed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TaskWrapper removeAndGetNextFromPreemptionQueue() {
        TaskWrapper taskWrapper;
        Object object = this.lock;
        synchronized (object) {
            taskWrapper = (TaskWrapper)this.preemptionQueue.poll();
            if (taskWrapper != null) {
                taskWrapper.setIsInPreemptableQueue(false);
                if (this.metrics != null) {
                    this.metrics.setExecutorNumPreemptableRequests(this.preemptionQueue.size());
                }
            }
        }
        return taskWrapper;
    }

    @VisibleForTesting
    InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) {
        return new InternalCompletionListener(taskWrapper);
    }

    public void shutDown(boolean awaitTermination) {
        if (!this.isShutdown.getAndSet(true)) {
            if (awaitTermination) {
                if (isDebugEnabled) {
                    LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor service gracefully");
                }
                this.shutdownExecutor((ExecutorService)this.waitQueueExecutorService);
                this.shutdownExecutor((ExecutorService)this.executorService);
                this.shutdownExecutor((ExecutorService)this.executionCompletionExecutorService);
            } else {
                if (isDebugEnabled) {
                    LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor service immediately");
                }
                this.executorService.shutdownNow();
                this.waitQueueExecutorService.shutdownNow();
                this.executionCompletionExecutorService.shutdownNow();
            }
        }
    }

    private void shutdownExecutor(ExecutorService executorService) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(1L, TimeUnit.MINUTES)) {
                executorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }

    private static class ExecutorThreadFactory
    implements ThreadFactory {
        private final ClassLoader classLoader;
        private final ThreadFactory defaultFactory;
        private final AtomicLong count = new AtomicLong(0L);

        public ExecutorThreadFactory(ClassLoader classLoader) {
            this.classLoader = classLoader;
            this.defaultFactory = Executors.defaultThreadFactory();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = this.defaultFactory.newThread(r);
            thread.setName(String.format(TaskExecutorService.TASK_EXECUTOR_THREAD_NAME_FORMAT, this.count.getAndIncrement()));
            thread.setDaemon(true);
            thread.setContextClassLoader(this.classLoader);
            return thread;
        }
    }

    public static class TaskWrapper
    implements FinishableStateUpdateHandler {
        private final TaskRunnerCallable taskRunnerCallable;
        private final AtomicBoolean inWaitQueue = new AtomicBoolean(false);
        private final AtomicBoolean inPreemptionQueue = new AtomicBoolean(false);
        private final AtomicBoolean registeredForNotifications = new AtomicBoolean(false);
        private final TaskExecutorService taskExecutorService;

        public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService taskExecutorService) {
            this.taskRunnerCallable = taskRunnerCallable;
            this.taskExecutorService = taskExecutorService;
        }

        public boolean maybeRegisterForFinishedStateNotifications(boolean currentFinishableState) {
            if (!this.registeredForNotifications.getAndSet(true)) {
                return this.taskRunnerCallable.getFragmentInfo().registerForFinishableStateUpdates(this, currentFinishableState);
            }
            return true;
        }

        public void maybeUnregisterForFinishedStateNotifications() {
            if (this.registeredForNotifications.getAndSet(false)) {
                this.taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this);
            }
        }

        public TaskRunnerCallable getTaskRunnerCallable() {
            return this.taskRunnerCallable;
        }

        public boolean isInWaitQueue() {
            return this.inWaitQueue.get();
        }

        public boolean isInPreemptionQueue() {
            return this.inPreemptionQueue.get();
        }

        public void setIsInWaitQueue(boolean value) {
            this.inWaitQueue.set(value);
        }

        public void setIsInPreemptableQueue(boolean value) {
            this.inPreemptionQueue.set(value);
        }

        public String getRequestId() {
            return this.taskRunnerCallable.getRequestId();
        }

        public String toString() {
            return "TaskWrapper{task=" + this.taskRunnerCallable.getRequestId() + ", inWaitQueue=" + this.inWaitQueue.get() + ", inPreemptionQueue=" + this.inPreemptionQueue.get() + ", registeredForNotifications=" + this.registeredForNotifications.get() + ", canFinish=" + this.taskRunnerCallable.canFinish() + ", firstAttemptStartTime=" + this.taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() + ", dagStartTime=" + this.taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() + ", withinDagPriority=" + this.taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() + ", vertexParallelism= " + this.taskRunnerCallable.getVertexSpec().getVertexParallelism() + ", selfAndUpstreamParallelism= " + this.taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + ", selfAndUpstreamComplete= " + this.taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + '}';
        }

        @Override
        public void finishableStateUpdated(boolean finishableState) {
            LOG.info("Received finishable state update for {}, state={}", (Object)this.taskRunnerCallable.getRequestId(), (Object)finishableState);
            this.taskExecutorService.finishableStateUpdated(this, finishableState);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TaskWrapper that = (TaskWrapper)o;
            return this.taskRunnerCallable.getRequestId().equals(that.taskRunnerCallable.getRequestId());
        }

        public int hashCode() {
            return this.taskRunnerCallable.getRequestId().hashCode();
        }
    }

    @VisibleForTesting
    public static class PreemptionQueueComparator
    implements Comparator<TaskWrapper> {
        @Override
        public int compare(TaskWrapper t1, TaskWrapper t2) {
            TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
            TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
            LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo();
            LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo();
            if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) {
                return 1;
            }
            if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) {
                return -1;
            }
            return 0;
        }
    }

    @VisibleForTesting
    class InternalCompletionListener
    implements FutureCallback<TaskRunner2Result> {
        private final TaskWrapper taskWrapper;

        public InternalCompletionListener(TaskWrapper taskWrapper) {
            this.taskWrapper = taskWrapper;
        }

        public void onSuccess(TaskRunner2Result result) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received successful completion for: {}", (Object)this.taskWrapper.getRequestId());
            }
            this.updateFallOffStats(this.taskWrapper.getRequestId());
            TaskExecutorService.this.knownTasks.remove(this.taskWrapper.getRequestId());
            this.taskWrapper.setIsInPreemptableQueue(false);
            this.taskWrapper.maybeUnregisterForFinishedStateNotifications();
            this.taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
            this.updatePreemptionListAndNotify(result.getEndReason());
        }

        public void onFailure(Throwable t) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received failed completion for: {}", (Object)this.taskWrapper.getRequestId());
            }
            this.updateFallOffStats(this.taskWrapper.getRequestId());
            TaskExecutorService.this.knownTasks.remove(this.taskWrapper.getRequestId());
            this.taskWrapper.setIsInPreemptableQueue(false);
            this.taskWrapper.maybeUnregisterForFinishedStateNotifications();
            this.taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
            this.updatePreemptionListAndNotify(null);
            LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace((Throwable)t));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void updatePreemptionListAndNotify(EndReason reason) {
            if (TaskExecutorService.this.enablePreemption) {
                String state = reason == null ? "FAILED" : reason.name();
                boolean removed = TaskExecutorService.this.removeFromPreemptionQueueUnlocked(this.taskWrapper);
                if (removed && isInfoEnabled) {
                    TaskRunnerCallable trc = this.taskWrapper.getTaskRunnerCallable();
                    LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(), trc.getVertexSpec()) + " request " + state + "! Removed from preemption list.");
                }
            }
            TaskExecutorService.this.numSlotsAvailable.incrementAndGet();
            if (TaskExecutorService.this.metrics != null) {
                TaskExecutorService.this.metrics.setNumExecutorsAvailable(TaskExecutorService.this.numSlotsAvailable.get());
            }
            if (isDebugEnabled) {
                LOG.debug("Task {} complete. WaitQueueSize={}, numSlotsAvailable={}, preemptionQueueSize={}", new Object[]{this.taskWrapper.getRequestId(), TaskExecutorService.this.waitQueue.size(), TaskExecutorService.this.numSlotsAvailable.get(), TaskExecutorService.this.preemptionQueue.size()});
            }
            Object object = TaskExecutorService.this.lock;
            synchronized (object) {
                if (!TaskExecutorService.this.waitQueue.isEmpty()) {
                    TaskExecutorService.this.lock.notify();
                }
            }
        }

        private void updateFallOffStats(String requestId) {
            long now = TaskExecutorService.this.clock.getTime();
            FragmentCompletion fragmentCompletion = (FragmentCompletion)TaskExecutorService.this.completingFragmentMap.remove(requestId);
            if (fragmentCompletion == null) {
                LOG.warn("Received onSuccess/onFailure for a fragment for which a completing message was not received: {}", (Object)requestId);
                TaskExecutorService.this.runningFragmentCount.decrementAndGet();
            } else {
                long timeTaken = now - fragmentCompletion.completingTime;
                switch (fragmentCompletion.state) {
                    case SUCCESS: {
                        if (TaskExecutorService.this.metrics == null) break;
                        TaskExecutorService.this.metrics.addMetricsFallOffSuccessTimeLost(timeTaken);
                        break;
                    }
                    case FAILED: {
                        if (TaskExecutorService.this.metrics == null) break;
                        TaskExecutorService.this.metrics.addMetricsFallOffFailedTimeLost(timeTaken);
                        break;
                    }
                    case KILLED: {
                        if (TaskExecutorService.this.metrics == null) break;
                        TaskExecutorService.this.metrics.addMetricsFallOffKilledTimeLost(timeTaken);
                    }
                }
            }
        }
    }

    private static final class FragmentCompletion {
        SchedulerFragmentCompletingListener.State state;
        long completingTime;

        public FragmentCompletion(SchedulerFragmentCompletingListener.State state, long completingTime) {
            this.state = state;
            this.completingTime = completingTime;
        }
    }

    private class WaitQueueWorkerCallback
    implements FutureCallback {
        private WaitQueueWorkerCallback() {
        }

        public void onSuccess(Object result) {
            if (TaskExecutorService.this.isShutdown.get()) {
                LOG.info("Wait queue scheduler worker exited with success!");
            } else {
                LOG.error("Wait queue scheduler worker exited with success!");
                Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), new IllegalStateException("WaitQueue worked exited before shutdown"));
            }
        }

        public void onFailure(Throwable t) {
            LOG.error("Wait queue scheduler worker exited with failure!", t);
            Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
        }
    }

    private final class WaitQueueWorker
    implements Runnable {
        TaskWrapper task;

        private WaitQueueWorker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        @Override
        public void run() {
            try {
                while (!TaskExecutorService.this.isShutdown.get()) {
                    RejectedExecutionException rejectedException = null;
                    Object object = TaskExecutorService.this.lock;
                    // MONITORENTER : object
                    this.task = TaskExecutorService.this.waitQueue.peek();
                    if (this.task == null) {
                        if (!TaskExecutorService.this.isShutdown.get()) {
                            TaskExecutorService.this.lock.wait();
                        }
                        // MONITOREXIT : object
                        continue;
                    }
                    boolean shouldWait = false;
                    if (this.task.getTaskRunnerCallable().canFinish()) {
                        if (isDebugEnabled) {
                            LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={}, numSlotsAvailable={}, waitQueueSize={}", new Object[]{this.task.getRequestId(), this.task.getTaskRunnerCallable().canFinish(), TaskExecutorService.this.preemptionQueue.size(), TaskExecutorService.this.numSlotsAvailable.get(), TaskExecutorService.this.waitQueue.size()});
                        }
                        if (TaskExecutorService.this.numSlotsAvailable.get() == 0 && (!TaskExecutorService.this.enablePreemption || TaskExecutorService.this.preemptionQueue.isEmpty())) {
                            shouldWait = true;
                        }
                    } else if (TaskExecutorService.this.numSlotsAvailable.get() == 0) {
                        shouldWait = true;
                    }
                    if (shouldWait) {
                        if (!TaskExecutorService.this.isShutdown.get()) {
                            TaskExecutorService.this.lock.wait();
                        }
                        // MONITOREXIT : object
                        continue;
                    }
                    try {
                        TaskExecutorService.this.trySchedule(this.task);
                        if (TaskExecutorService.this.waitQueue.remove(this.task) && TaskExecutorService.this.metrics != null) {
                            TaskExecutorService.this.metrics.setExecutorNumQueuedRequests(TaskExecutorService.this.waitQueue.size());
                        }
                    }
                    catch (RejectedExecutionException e) {
                        rejectedException = e;
                    }
                    if (rejectedException != null) {
                        TaskExecutorService.this.handleScheduleAttemptedRejection(this.task);
                    }
                    object = TaskExecutorService.this.lock;
                    // MONITORENTER : object
                    while (TaskExecutorService.this.waitQueue.isEmpty()) {
                        if (TaskExecutorService.this.isShutdown.get()) continue;
                        TaskExecutorService.this.lock.wait();
                    }
                    // MONITOREXIT : object
                }
                return;
            }
            catch (InterruptedException e) {
                if (TaskExecutorService.this.isShutdown.get()) {
                    LOG.info("Wait-Queue-Scheduler-%d thread has been interrupted after shutdown.");
                    return;
                }
                LOG.warn("Wait-Queue-Scheduler-%d interrupted without shutdown", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
    }
}

