/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskStateUpdateListener;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class StateChangeNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeNotifier.class);
    private final DAG dag;
    private final SetMultimap<TezVertexID, ListenerContainer> vertexListeners;
    private final ListMultimap<TezVertexID, VertexStateUpdate> lastKnowStatesMap;
    private final ReentrantReadWriteLock listenersLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.listenersLock.writeLock();
    BlockingQueue<NotificationEvent> eventQueue = new LinkedBlockingQueue<NotificationEvent>();
    private Thread eventHandlingThread;
    private volatile boolean stopEventHandling = false;
    private final SetMultimap<TezVertexID, TaskStateUpdateListener> taskListeners = Multimaps.synchronizedSetMultimap((SetMultimap)HashMultimap.create());
    private final ReentrantReadWriteLock taskListenerLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock taskReadLock = this.taskListenerLock.readLock();
    private final ReentrantReadWriteLock.WriteLock taskWriteLock = this.taskListenerLock.writeLock();

    public StateChangeNotifier(DAG dag) {
        this.dag = dag;
        this.vertexListeners = Multimaps.synchronizedSetMultimap((SetMultimap)HashMultimap.create());
        this.lastKnowStatesMap = LinkedListMultimap.create();
        this.startThread();
    }

    private void startThread() {
        this.eventHandlingThread = new Thread("State Change Notifier DAG: " + this.dag.getID()){

            @Override
            public void run() {
                while (!StateChangeNotifier.this.stopEventHandling && !Thread.currentThread().isInterrupted()) {
                    NotificationEvent event;
                    try {
                        event = StateChangeNotifier.this.eventQueue.take();
                    }
                    catch (InterruptedException e) {
                        if (StateChangeNotifier.this.stopEventHandling) continue;
                        LOG.warn("Continuing after interrupt : ", (Throwable)e);
                        continue;
                    }
                    try {
                        event.sendUpdate();
                        StateChangeNotifier.this.processedEventFromQueue();
                    }
                    catch (Exception e) {
                        LOG.error("Error in state update notification for " + event, (Throwable)e);
                        StateChangeNotifier.this.dag.getEventHandler().handle((Event)new DAGEventInternalError(StateChangeNotifier.this.dag.getID(), "Internal Error in State Update Notification: " + ExceptionUtils.getStackTrace((Throwable)e)));
                        return;
                    }
                }
            }
        };
        this.eventHandlingThread.setDaemon(true);
        this.eventHandlingThread.start();
    }

    @VisibleForTesting
    protected void processedEventFromQueue() {
    }

    @VisibleForTesting
    protected void addedEventToQueue() {
    }

    public void stop() {
        this.stopEventHandling = true;
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerForVertexUpdates(String vertexName, Set<VertexState> stateSet, VertexStateUpdateListener listener) {
        block6: {
            TezVertexID vertexId = this.validateAndGetVertexId(vertexName);
            this.writeLock.lock();
            try {
                List previousUpdates = this.lastKnowStatesMap.get((Object)vertexId);
                ListenerContainer listenerContainer = new ListenerContainer(listener, stateSet);
                Set listenerContainers = this.vertexListeners.get((Object)vertexId);
                if (listenerContainers == null || !listenerContainers.contains(listenerContainer)) {
                    this.vertexListeners.put((Object)vertexId, (Object)listenerContainer);
                    if (previousUpdates != null && !previousUpdates.isEmpty()) {
                        for (VertexStateUpdate update : previousUpdates) {
                            listenerContainer.sendStateUpdate(update);
                        }
                    }
                    break block6;
                }
                throw new TezUncheckedException("Only allowed to register once for a listener. CurrentContext: vertexName=" + vertexName + ", Listener: " + listener);
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterForVertexUpdates(String vertexName, VertexStateUpdateListener listener) {
        TezVertexID vertexId = this.validateAndGetVertexId(vertexName);
        this.writeLock.lock();
        try {
            ListenerContainer listenerContainer = new ListenerContainer(listener, null);
            this.vertexListeners.remove((Object)vertexId, (Object)listenerContainer);
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stateChanged(TezVertexID vertexId, VertexStateUpdate vertexStateUpdate) {
        this.writeLock.lock();
        try {
            this.lastKnowStatesMap.put((Object)vertexId, (Object)vertexStateUpdate);
            if (this.vertexListeners.containsKey((Object)vertexId)) {
                this.sendStateUpdate(vertexId, vertexStateUpdate);
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private void sendStateUpdate(TezVertexID vertexId, VertexStateUpdate event) {
        for (ListenerContainer listenerContainer : this.vertexListeners.get((Object)vertexId)) {
            listenerContainer.sendStateUpdate(event);
        }
    }

    private void enqueueNotification(NotificationEvent event) {
        try {
            this.eventQueue.put(event);
            this.addedEventToQueue();
        }
        catch (InterruptedException e) {
            LOG.error("Failed to put event", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
        TezVertexID vertexId = this.validateAndGetVertexId(vertexName);
        Preconditions.checkNotNull((Object)listener, (Object)"listener cannot be null");
        this.taskWriteLock.lock();
        try {
            this.taskListeners.put((Object)vertexId, (Object)listener);
        }
        finally {
            this.taskWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterForTaskSuccessUpdates(String vertexName, TaskStateUpdateListener listener) {
        TezVertexID vertexId = this.validateAndGetVertexId(vertexName);
        Preconditions.checkNotNull((Object)listener, (Object)"listener cannot be null");
        this.taskWriteLock.lock();
        try {
            this.taskListeners.remove((Object)vertexId, (Object)listener);
        }
        finally {
            this.taskWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void taskSucceeded(String vertexName, TezTaskID taskId, int attemptId) {
        this.taskReadLock.lock();
        try {
            for (TaskStateUpdateListener listener : this.taskListeners.get((Object)taskId.getVertexID())) {
                listener.onTaskSucceeded(vertexName, taskId, attemptId);
            }
        }
        finally {
            this.taskReadLock.unlock();
        }
    }

    private TezVertexID validateAndGetVertexId(String vertexName) {
        Preconditions.checkNotNull((Object)vertexName, (Object)"VertexName cannot be null");
        Vertex vertex = this.dag.getVertex(vertexName);
        Preconditions.checkNotNull((Object)vertex, (Object)("Vertex does not exist: " + vertexName));
        return vertex.getVertexId();
    }

    private final class ListenerContainer {
        final VertexStateUpdateListener listener;
        final Set<VertexState> states;

        private ListenerContainer(VertexStateUpdateListener listener, Set<VertexState> states) {
            this.listener = listener;
            this.states = states == null ? EnumSet.allOf(VertexState.class) : states;
        }

        private void sendStateUpdate(VertexStateUpdate stateUpdate) {
            if (this.states.contains(stateUpdate.getVertexState())) {
                StateChangeNotifier.this.enqueueNotification(new NotificationEvent(stateUpdate, this.listener));
            }
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ListenerContainer that = (ListenerContainer)o;
            return this.listener == that.listener;
        }

        public int hashCode() {
            return System.identityHashCode(this.listener);
        }
    }

    private static class NotificationEvent {
        final VertexStateUpdate update;
        final VertexStateUpdateListener listener;

        public NotificationEvent(VertexStateUpdate update, VertexStateUpdateListener listener) {
            this.update = update;
            this.listener = listener;
        }

        void sendUpdate() {
            this.listener.onStateUpdated(this.update);
        }

        public String toString() {
            return "[ VertexState:(" + this.update + ") Listener:" + this.listener + " ]";
        }
    }
}

