/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.RecoverableRpcProxy;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.api.BaseContext;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.RequestFactory;
import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.debug.StdOutErrLog;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.PortContext;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.engine.WindowIdActivatedReservoir;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.stream.FastPublisher;
import com.datatorrent.stram.stream.FastSubscriber;
import com.datatorrent.stram.stream.InlineStream;
import com.datatorrent.stram.stream.MuxStream;
import com.datatorrent.stram.stream.OiOStream;
import com.datatorrent.stram.stream.PartitionAwareSink;
import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.DTLoggerFactory;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingContainer
extends StramUtils.YarnContainerMain {
    public static final String PROP_APP_PATH = "dt." + Context.DAGContext.APPLICATION_PATH.getName();
    private final transient String jvmName;
    private final String containerId;
    private final transient StreamingContainerUmbilicalProtocol umbilical;
    protected final Map<Integer, Node<?>> nodes = new ConcurrentHashMap();
    protected final Set<Integer> failedNodes = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Map<String, ComponentContextPair<Stream, StreamContext>> streams = new ConcurrentHashMap<String, ComponentContextPair<Stream, StreamContext>>();
    protected final Map<Integer, WindowGenerator> generators = new ConcurrentHashMap<Integer, WindowGenerator>();
    private transient List<GarbageCollectorMXBean> garbageCollectorMXBeans;
    protected final Map<Integer, ArrayList<Integer>> oioGroups = new ConcurrentHashMap<Integer, ArrayList<Integer>>();
    private final Map<Stream, StreamContext> activeStreams = new ConcurrentHashMap<Stream, StreamContext>();
    private final Map<WindowGenerator, Object> activeGenerators = new ConcurrentHashMap<WindowGenerator, Object>();
    private int heartbeatIntervalMillis = 1000;
    private volatile boolean exitHeartbeatLoop = false;
    private final Object heartbeatTrigger = new Object();
    public static DefaultEventLoop eventloop;
    private long firstWindowMillis;
    private int windowWidthMillis;
    private InetSocketAddress bufferServerAddress;
    private Server bufferServer;
    private int checkpointWindowCount;
    private boolean fastPublisherSubscriber;
    private StreamingContainerUmbilicalProtocol.StreamingContainerContext containerContext;
    private List<StreamingContainerUmbilicalProtocol.StramToNodeRequest> nodeRequests;
    private final HashMap<String, Object> singletons;
    private final MBassador<ContainerEvent> eventBus;
    HashSet<Component<ContainerContext>> components;
    private RequestFactory requestFactory;
    private long lastCommittedWindowId = -1L;
    private final StreamCodec<Object> nonSerializingStreamCodec = new StreamCodec<Object>(){

        public Object fromByteArray(Slice fragment) {
            return null;
        }

        public Slice toByteArray(Object o) {
            return null;
        }

        public int getPartition(Object o) {
            return o.hashCode();
        }
    };
    private static final Logger logger;

    protected StreamingContainer(String containerId, StreamingContainerUmbilicalProtocol umbilical) {
        this.jvmName = ManagementFactory.getRuntimeMXBean().getName();
        this.components = new HashSet();
        this.eventBus = new MBassador(BusConfiguration.Default((int)1, (int)1, (int)1));
        this.singletons = new HashMap();
        this.nodeRequests = new ArrayList<StreamingContainerUmbilicalProtocol.StramToNodeRequest>();
        logger.debug("instantiated StramChild {}", (Object)containerId);
        this.umbilical = umbilical;
        this.containerId = containerId;
    }

    public void setup(StreamingContainerUmbilicalProtocol.StreamingContainerContext ctx) {
        this.containerContext = ctx;
        this.requestFactory = new RequestFactory();
        ctx.attributes.put(ContainerContext.REQUEST_FACTORY, (Object)this.requestFactory);
        this.heartbeatIntervalMillis = (Integer)ctx.getValue(Context.DAGContext.HEARTBEAT_INTERVAL_MILLIS);
        this.firstWindowMillis = ctx.startWindowMillis;
        this.windowWidthMillis = (Integer)ctx.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
        this.checkpointWindowCount = (Integer)ctx.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
        this.fastPublisherSubscriber = ctx.getValue(LogicalPlan.FAST_PUBLISHER_SUBSCRIBER);
        Map codecs = (Map)ctx.getValue(Context.DAGContext.STRING_CODECS);
        StringCodecs.loadConverters(codecs);
        try {
            if (ctx.deployBufferServer) {
                int blocksize;
                int blockCount;
                eventloop.start();
                int bufferServerRAM = ctx.getValue(ContainerContext.BUFFER_SERVER_MB);
                logger.debug("buffer server memory {}", (Object)bufferServerRAM);
                if (bufferServerRAM < (Integer)ContainerContext.BUFFER_SERVER_MB.defaultValue) {
                    blockCount = 8;
                    blocksize = bufferServerRAM / blockCount;
                    if (blocksize < 1) {
                        blocksize = 1;
                    }
                } else {
                    blocksize = 64;
                    blockCount = bufferServerRAM / blocksize;
                }
                this.bufferServer = new Server(0, blocksize * 1024 * 1024, blockCount);
                this.bufferServer.setAuthToken((byte[])ctx.getValue(StreamingContainerUmbilicalProtocol.StreamingContainerContext.BUFFER_SERVER_TOKEN));
                if (((Boolean)ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)).booleanValue()) {
                    this.bufferServer.setSpoolStorage((Storage)new DiskStorage());
                }
                InetSocketAddress bindAddr = this.bufferServer.run((EventLoop)eventloop);
                logger.debug("Buffer server started: {}", (Object)bindAddr);
                this.bufferServerAddress = NetUtils.getConnectAddress((InetSocketAddress)bindAddr);
            }
        }
        catch (IOException ex) {
            logger.warn("deploy request failed due to {}", (Throwable)ex);
            throw new IllegalStateException("Failed to deploy buffer server", ex);
        }
        for (Class<?> clazz : ContainerEvent.CONTAINER_EVENTS_LISTENERS) {
            try {
                Object newInstance = clazz.newInstance();
                this.singletons.put(clazz.getName(), newInstance);
                if (newInstance instanceof Component) {
                    this.components.add((Component<ContainerContext>)((Component)newInstance));
                }
                this.eventBus.subscribe(newInstance);
            }
            catch (InstantiationException ex) {
                logger.warn("Container Event Listener Instantiation", (Throwable)ex);
            }
            catch (IllegalAccessException ex) {
                logger.warn("Container Event Listener Instantiation", (Throwable)ex);
            }
        }
        this.operateListeners(ctx, true);
    }

    public String getContainerId() {
        return this.containerId;
    }

    public Object getInstance(String classname) {
        return this.singletons.get(classname);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Throwable {
        StdOutErrLog.tieSystemOutAndErrToLog();
        logger.debug("PID: " + System.getenv().get("JVM_PID"));
        logger.info("Child starting with classpath: {}", (Object)System.getProperty("java.class.path"));
        String appPath = System.getProperty(PROP_APP_PATH);
        if (appPath == null) {
            logger.error("{} not set in container environment.", (Object)PROP_APP_PATH);
            System.exit(1);
        }
        int exitStatus = 1;
        RecoverableRpcProxy rpcProxy = new RecoverableRpcProxy(appPath, new Configuration());
        StreamingContainerUmbilicalProtocol umbilical = rpcProxy.getProxy();
        String childId = System.getProperty("dt.cid");
        try {
            StreamingContainerUmbilicalProtocol.StreamingContainerContext ctx = umbilical.getInitContext(childId);
            StreamingContainer stramChild = new StreamingContainer(childId, umbilical);
            logger.debug("Container Context = {}", (Object)ctx);
            stramChild.setup(ctx);
            try {
                stramChild.heartbeatLoop();
                exitStatus = 0;
            }
            finally {
                stramChild.teardown();
            }
        }
        catch (Error error) {
            logger.error("Fatal error in container!", (Throwable)error);
            String msg = ExceptionUtils.getStackTrace((Throwable)error);
            umbilical.reportError(childId, null, "FATAL: " + msg);
        }
        catch (Exception exception) {
            logger.error("Fatal exception in container!", (Throwable)exception);
            String msg = ExceptionUtils.getStackTrace((Throwable)exception);
            umbilical.reportError(childId, null, msg);
        }
        finally {
            rpcProxy.close();
            DefaultMetricsSystem.shutdown();
            logger.info("Exit status for container: {}", (Object)exitStatus);
            LogManager.shutdown();
        }
        if (exitStatus != 0) {
            System.exit(exitStatus);
        }
    }

    public synchronized void deactivate() {
        ArrayList<Thread> activeThreads = new ArrayList<Thread>();
        ArrayList<Integer> activeOperators = new ArrayList<Integer>();
        for (Map.Entry<Integer, Node<?>> e : this.nodes.entrySet()) {
            Thread t = e.getValue().context.getThread();
            if (t == null || !t.isAlive()) {
                this.disconnectNode(e.getKey());
                continue;
            }
            activeThreads.add(t);
            activeOperators.add(e.getKey());
            e.getValue().shutdown();
        }
        try {
            Iterator iterator = activeOperators.iterator();
            for (Thread t : activeThreads) {
                t.join(1000L);
                if (!t.getState().equals((Object)Thread.State.TERMINATED)) {
                    t.interrupt();
                }
                this.disconnectNode((Integer)iterator.next());
            }
        }
        catch (InterruptedException ex) {
            logger.warn("Aborting wait for operators to get deactivated!", (Throwable)ex);
        }
        for (WindowGenerator wg : this.activeGenerators.keySet()) {
            wg.deactivate();
        }
        this.activeGenerators.clear();
        for (Stream stream : this.activeStreams.keySet()) {
            stream.deactivate();
        }
        this.activeStreams.clear();
    }

    private void disconnectNode(int nodeid) {
        Node<?> node = this.nodes.get(nodeid);
        this.disconnectWindowGenerator(nodeid, node);
        Operators.PortMappingDescriptor portMappingDescriptor = node.getPortMappingDescriptor();
        Iterator<String> outputPorts = portMappingDescriptor.outputPorts.keySet().iterator();
        while (outputPorts.hasNext()) {
            String sourceIdentifier = String.valueOf(nodeid).concat(".").concat(outputPorts.next());
            ComponentContextPair<Stream, StreamContext> pair = this.streams.remove(sourceIdentifier);
            if (pair == null) continue;
            if (this.activeStreams.remove(pair.component) != null) {
                ((Stream)pair.component).deactivate();
                this.eventBus.publish((Object)new ContainerEvent.StreamDeactivationEvent(pair));
            }
            if (pair.component instanceof Stream.MultiSinkCapableStream) {
                String sinks = ((StreamContext)((Object)pair.context)).getSinkId();
                if (sinks == null) {
                    logger.error("mux sinks found connected at {} with sink id null", (Object)sourceIdentifier);
                } else {
                    String[] split = sinks.split(", ");
                    int i = split.length;
                    while (i-- > 0) {
                        ComponentContextPair<Stream, StreamContext> spair = this.streams.remove(split[i]);
                        if (spair == null) {
                            logger.error("mux is missing the stream for sink {}", (Object)split[i]);
                            continue;
                        }
                        if (this.activeStreams.remove(spair.component) != null) {
                            ((Stream)spair.component).deactivate();
                            this.eventBus.publish((Object)new ContainerEvent.StreamDeactivationEvent(spair));
                        }
                        ((Stream)spair.component).teardown();
                    }
                }
            }
            ((Stream)pair.component).teardown();
        }
        Iterator<String> inputPorts = portMappingDescriptor.inputPorts.keySet().iterator();
        while (inputPorts.hasNext()) {
            String sinkIdentifier = String.valueOf(nodeid).concat(".").concat(inputPorts.next());
            ComponentContextPair<Stream, StreamContext> pair = this.streams.remove(sinkIdentifier);
            if (pair == null) continue;
            if (this.activeStreams.remove(pair.component) != null) {
                ((Stream)pair.component).deactivate();
                this.eventBus.publish((Object)new ContainerEvent.StreamDeactivationEvent(pair));
            }
            ((Stream)pair.component).teardown();
            ComponentContextPair<Stream, StreamContext> sourcePair = this.streams.get(((StreamContext)((Object)pair.context)).getSourceId());
            if (sourcePair == null) continue;
            if (sourcePair == pair) {
                this.streams.remove(((StreamContext)((Object)pair.context)).getSourceId());
                continue;
            }
            this.unregisterSinkFromMux(sourcePair, sinkIdentifier);
        }
    }

    private boolean unregisterSinkFromMux(ComponentContextPair<Stream, StreamContext> muxpair, String sinkIdentifier) {
        String[] sinks = ((StreamContext)((Object)muxpair.context)).getSinkId().split(", ");
        boolean found = false;
        int i = sinks.length;
        while (i-- > 0) {
            if (!sinks[i].equals(sinkIdentifier)) continue;
            sinks[i] = null;
            found = true;
            break;
        }
        if (found) {
            ((Stream.MultiSinkCapableStream)muxpair.component).setSink(sinkIdentifier, null);
            if (sinks.length == 1) {
                ((StreamContext)((Object)muxpair.context)).setSinkId(null);
                this.streams.remove(((StreamContext)((Object)muxpair.context)).getSourceId());
                if (this.activeStreams.remove(muxpair.component) != null) {
                    ((Stream)muxpair.component).deactivate();
                    this.eventBus.publish((Object)new ContainerEvent.StreamDeactivationEvent(muxpair));
                }
                ((Stream)muxpair.component).teardown();
            } else {
                StringBuilder builder = new StringBuilder(((StreamContext)((Object)muxpair.context)).getSinkId().length() - ", ".length() - sinkIdentifier.length());
                found = false;
                int i2 = sinks.length;
                while (i2-- > 0) {
                    if (sinks[i2] == null) continue;
                    if (found) {
                        builder.append(", ").append(sinks[i2]);
                        continue;
                    }
                    builder.append(sinks[i2]);
                    found = true;
                }
                ((StreamContext)((Object)muxpair.context)).setSinkId(builder.toString());
            }
        } else {
            logger.error("{} was not connected to stream connected to {}", (Object)sinkIdentifier, (Object)((StreamContext)((Object)muxpair.context)).getSourceId());
        }
        return found;
    }

    private void disconnectWindowGenerator(int nodeid, Node<?> node) {
        WindowGenerator chosen1 = this.generators.remove(nodeid);
        if (chosen1 != null) {
            chosen1.releaseReservoir(Integer.toString(nodeid).concat(".").concat("input"));
            int count = 0;
            for (WindowGenerator wg : this.generators.values()) {
                if (chosen1 != wg) continue;
                ++count;
            }
            if (count == 0) {
                this.activeGenerators.remove(chosen1);
                chosen1.deactivate();
                chosen1.teardown();
            }
        }
    }

    private synchronized void undeploy(List<Integer> nodeList) {
        HashMap toUndeploy = new HashMap();
        for (Integer operatorId : nodeList) {
            Node<?> node = this.nodes.get(operatorId);
            if (node == null) {
                throw new IllegalArgumentException("Node " + operatorId + " is not hosted in this container!");
            }
            if (toUndeploy.containsKey(operatorId)) {
                throw new IllegalArgumentException("Node " + operatorId + " is requested to be undeployed more than once");
            }
            toUndeploy.put(operatorId, node);
        }
        ArrayList<Thread> joinList = new ArrayList<Thread>();
        ArrayList<Integer> discoList = new ArrayList<Integer>();
        for (Integer operatorId : nodeList) {
            Thread t = this.nodes.get((Object)operatorId).context.getThread();
            if (t == null || !t.isAlive()) {
                this.disconnectNode(operatorId);
                continue;
            }
            joinList.add(t);
            discoList.add(operatorId);
            this.nodes.get(operatorId).shutdown();
        }
        try {
            Iterator iterator = discoList.iterator();
            for (Thread t : joinList) {
                t.join(1000L);
                if (!t.getState().equals((Object)Thread.State.TERMINATED)) {
                    t.interrupt();
                }
                this.disconnectNode((Integer)iterator.next());
            }
            logger.info("Undeploy complete.");
        }
        catch (InterruptedException ex) {
            logger.warn("Aborting wait for operators to get deactivated!", (Throwable)ex);
        }
        for (Integer operatorId : nodeList) {
            this.nodes.remove(operatorId);
        }
    }

    public void teardown() {
        this.operateListeners(this.containerContext, false);
        this.deactivate();
        assert (this.streams.isEmpty());
        this.eventBus.shutdown();
        this.nodes.clear();
        HashSet<WindowGenerator> gens = new HashSet<WindowGenerator>();
        gens.addAll(this.generators.values());
        this.generators.clear();
        for (WindowGenerator wg : gens) {
            wg.teardown();
        }
        if (this.bufferServer != null) {
            eventloop.stop((Listener.ServerListener)this.bufferServer);
            eventloop.stop();
        }
        gens.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void triggerHeartbeat() {
        Object object = this.heartbeatTrigger;
        synchronized (object) {
            this.heartbeatTrigger.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void heartbeatLoop() throws Exception {
        this.umbilical.log(this.containerId, "[" + this.containerId + "] Entering heartbeat loop..");
        logger.debug("Entering heartbeat loop (interval is {} ms)", (Object)this.heartbeatIntervalMillis);
        YarnConfiguration conf = new YarnConfiguration();
        long tokenLifeTime = (long)(this.containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * (double)this.containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME).longValue());
        long expiryTime = System.currentTimeMillis();
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        for (Token token : credentials.getAllTokens()) {
            logger.debug("token: {}", (Object)token);
        }
        String hdfsKeyTabFile = this.containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
        block11: while (!this.exitHeartbeatLoop) {
            StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse rsp;
            if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
                expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), this.containerId, (Configuration)conf, hdfsKeyTabFile, credentials, null, false);
            }
            Object object = this.heartbeatTrigger;
            synchronized (object) {
                try {
                    this.heartbeatTrigger.wait(this.heartbeatIntervalMillis);
                }
                catch (InterruptedException e1) {
                    logger.warn("Interrupted in heartbeat loop, exiting..");
                    break;
                }
            }
            long currentTime = System.currentTimeMillis();
            StreamingContainerUmbilicalProtocol.ContainerHeartbeat msg = new StreamingContainerUmbilicalProtocol.ContainerHeartbeat();
            msg.jvmName = this.jvmName;
            if (this.bufferServerAddress != null) {
                msg.bufferServerHost = this.bufferServerAddress.getHostName();
                msg.bufferServerPort = this.bufferServerAddress.getPort();
                if (this.bufferServer != null && !eventloop.isActive()) {
                    logger.warn("Requesting restart due to terminated event loop");
                    msg.restartRequested = true;
                }
            }
            msg.memoryMBFree = (int)(Runtime.getRuntime().freeMemory() / 0x100000L);
            this.garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
            for (GarbageCollectorMXBean bean : this.garbageCollectorMXBeans) {
                msg.gcCollectionTime += bean.getCollectionTime();
                msg.gcCollectionCount += bean.getCollectionCount();
            }
            do {
                StreamingContainerUmbilicalProtocol.ContainerStats stats = new StreamingContainerUmbilicalProtocol.ContainerStats(this.containerId);
                for (Map.Entry<Integer, Node<?>> e : this.nodes.entrySet()) {
                    StreamingContainerUmbilicalProtocol.OperatorHeartbeat hb = new StreamingContainerUmbilicalProtocol.OperatorHeartbeat();
                    hb.setNodeId(e.getKey());
                    hb.setGeneratedTms(currentTime);
                    hb.setIntervalMs(this.heartbeatIntervalMillis);
                    if (e.getValue().commandResponse.size() > 0) {
                        BlockingQueue<StatsListener.OperatorResponse> commandResponse = e.getValue().commandResponse;
                        ArrayList response = new ArrayList();
                        for (int i = 0; i < commandResponse.size(); ++i) {
                            response.add(commandResponse.poll());
                        }
                        hb.requestResponse = response;
                    }
                    OperatorContext context = e.getValue().context;
                    context.drainStats(hb.getOperatorStatsContainer());
                    if (context.getThread() == null || context.getThread().getState() != Thread.State.TERMINATED) {
                        hb.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
                    } else if (this.failedNodes.contains(hb.nodeId)) {
                        hb.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.FAILED);
                    } else {
                        logger.debug("Reporting SHUTDOWN state because thread is {} and failedNodes is {}", (Object)context.getThread(), this.failedNodes);
                        hb.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.SHUTDOWN);
                    }
                    stats.addNodeStats(hb);
                }
                this.eventBus.publish((Object)new ContainerEvent.ContainerStatsEvent(stats));
                msg.setContainerStats(stats);
                msg.sentTms = System.currentTimeMillis();
                rsp = this.umbilical.processHeartbeat(msg);
                this.processHeartbeatResponse(rsp);
                if (!rsp.hasPendingRequests) continue;
                logger.info("Waiting for pending request.");
                Object object2 = this.heartbeatTrigger;
                synchronized (object2) {
                    try {
                        this.heartbeatTrigger.wait(500L);
                    }
                    catch (InterruptedException ie) {
                        logger.warn("Interrupted in heartbeat loop", (Throwable)ie);
                        continue block11;
                    }
                }
            } while (rsp.hasPendingRequests);
        }
        logger.debug("Exiting hearbeat loop");
        this.umbilical.log(this.containerId, "[" + this.containerId + "] Exiting heartbeat loop..");
    }

    private void processNodeRequests(boolean flagInvalid) {
        for (StreamingContainerUmbilicalProtocol.StramToNodeRequest req : this.nodeRequests) {
            if (req.isDeleted()) continue;
            if (req instanceof StramToNodeChangeLoggersRequest) {
                this.handleChangeLoggersRequest((StramToNodeChangeLoggersRequest)req);
                continue;
            }
            Node<?> node = this.nodes.get(req.getOperatorId());
            if (node == null) {
                logger.warn("Node for operator {} is not found, probably not deployed yet", (Object)req.getOperatorId());
                continue;
            }
            Thread thread = node.context.getThread();
            if (thread == null || !thread.isAlive()) {
                if (!flagInvalid) continue;
                logger.warn("Received request with invalid operator id {} ({})", (Object)req.getOperatorId(), (Object)req);
                req.setDeleted(true);
                continue;
            }
            logger.debug("request received: {}", (Object)req);
            StatsListener.OperatorRequest requestExecutor = this.requestFactory.getRequestExecutor(this.nodes.get(req.operatorId), req);
            if (requestExecutor != null) {
                node.context.request(requestExecutor);
            } else {
                logger.warn("No executor identified for the request {}", (Object)req);
            }
            req.setDeleted(true);
        }
    }

    public void processHeartbeatResponse(StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse rsp) {
        if (rsp.nodeRequests != null) {
            this.nodeRequests = rsp.nodeRequests;
        }
        if (rsp.committedWindowId != this.lastCommittedWindowId) {
            this.lastCommittedWindowId = rsp.committedWindowId;
            StatsListener.OperatorRequest nr = null;
            for (Map.Entry<Integer, Node<?>> e : this.nodes.entrySet()) {
                Thread thread = e.getValue().context.getThread();
                if (thread == null || !thread.isAlive() || !(e.getValue().getOperator() instanceof Operator.CheckpointListener)) continue;
                if (nr == null) {
                    nr = new StatsListener.OperatorRequest(){

                        public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                            ((Operator.CheckpointListener)operator).committed(StreamingContainer.this.lastCommittedWindowId);
                            return null;
                        }
                    };
                }
                e.getValue().context.request(nr);
            }
        }
        if (rsp.undeployRequest != null) {
            logger.info("Undeploy request: {}", rsp.undeployRequest);
            this.processNodeRequests(false);
            this.undeploy(rsp.undeployRequest);
        }
        if (rsp.shutdown) {
            logger.info("Received shutdown request");
            this.processNodeRequests(false);
            this.exitHeartbeatLoop = true;
            return;
        }
        if (rsp.deployRequest != null) {
            logger.info("Deploy request: {}", rsp.deployRequest);
            try {
                this.deploy(rsp.deployRequest);
            }
            catch (Exception e) {
                logger.error("deploy request failed", (Throwable)e);
                try {
                    this.umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest + " " + ExceptionUtils.getStackTrace((Throwable)e));
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                this.exitHeartbeatLoop = true;
                throw new IllegalStateException("Deploy request failed: " + rsp.deployRequest, e);
            }
        }
        this.processNodeRequests(true);
    }

    private int getOutputQueueCapacity(List<OperatorDeployInfo> operatorList, int sourceOperatorId, String sourcePortName) {
        for (OperatorDeployInfo odi : operatorList) {
            if (odi.id != sourceOperatorId) continue;
            for (OperatorDeployInfo.OutputDeployInfo odiodi : odi.outputs) {
                if (!odiodi.portName.equals(sourcePortName)) continue;
                return (Integer)this.getValue(PortContext.QUEUE_CAPACITY, odiodi, odi);
            }
        }
        return (Integer)PortContext.QUEUE_CAPACITY.defaultValue;
    }

    private synchronized void deploy(List<OperatorDeployInfo> nodeList) throws Exception {
        for (OperatorDeployInfo ndi : nodeList) {
            if (!this.nodes.containsKey(ndi.id)) continue;
            throw new IllegalStateException("Node with id: " + ndi.id + " already present in container " + this.containerId + "!");
        }
        this.deployNodes(nodeList);
        HashMap<String, ArrayList<String>> groupedInputStreams = new HashMap<String, ArrayList<String>>();
        for (OperatorDeployInfo ndi : nodeList) {
            this.groupInputStreams(groupedInputStreams, ndi);
        }
        HashMap<String, ComponentContextPair<Stream, StreamContext>> newStreams = this.deployOutputStreams(nodeList, groupedInputStreams);
        this.deployInputStreams(nodeList, newStreams);
        for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) {
            ((Stream)pair.component).setup((Context)pair.context);
        }
        this.streams.putAll(newStreams);
        HashMap<Integer, OperatorDeployInfo> operatorMap = new HashMap<Integer, OperatorDeployInfo>(nodeList.size());
        for (OperatorDeployInfo o : nodeList) {
            operatorMap.put(o.id, o);
        }
        this.activate(operatorMap, newStreams);
    }

    public static String getUnifierInputPortName(String portName, int sourceNodeId, String sourcePortName) {
        return portName + "(" + sourceNodeId + "." + sourcePortName + ")";
    }

    private void massageUnifierDeployInfo(OperatorDeployInfo odi) {
        for (OperatorDeployInfo.InputDeployInfo idi : odi.inputs) {
            idi.portName = StreamingContainer.getUnifierInputPortName(idi.portName, idi.sourceNodeId, idi.sourcePortName);
        }
    }

    private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException {
        for (OperatorDeployInfo ndi : nodeList) {
            BaseContext parentContext;
            StorageAgent backupAgent = (StorageAgent)this.getValue(OperatorContext.STORAGE_AGENT, ndi);
            assert (backupAgent != null);
            if (ndi instanceof OperatorDeployInfo.UnifierDeployInfo) {
                OperatorContext unifiedOperatorContext = new OperatorContext(0, ((OperatorDeployInfo.UnifierDeployInfo)ndi).operatorAttributes, this.containerContext);
                parentContext = new PortContext(ndi.inputs.get((int)0).contextAttributes, unifiedOperatorContext);
                this.massageUnifierDeployInfo(ndi);
            } else {
                parentContext = this.containerContext;
            }
            OperatorContext ctx = new OperatorContext(ndi.id, ndi.contextAttributes, parentContext);
            ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, (Object)ndi.checkpoint.windowId);
            logger.debug("Restoring operator {} to checkpoint {} stateless={}.", new Object[]{ndi.id, Codec.getStringWindowId((long)ndi.checkpoint.windowId), ctx.stateless});
            Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? -1L : ndi.checkpoint.windowId), ctx, ndi.type);
            node.currentWindowId = ndi.checkpoint.windowId;
            node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
            node.firstWindowMillis = this.firstWindowMillis;
            node.windowWidthMillis = this.windowWidthMillis;
            node.setId(ndi.id);
            this.nodes.put(ndi.id, node);
            logger.debug("Marking operator {} as deployed.", node);
        }
    }

    private AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher(String connIdentifier, StreamCodec<?> streamCodec, long finishedWindowId, int queueCapacity, OperatorDeployInfo.OutputDeployInfo nodi) throws UnknownHostException {
        String sinkIdentifier = "tcp://".concat(nodi.bufferServerHost).concat(":").concat(String.valueOf(nodi.bufferServerPort)).concat("/").concat(connIdentifier);
        StreamContext bssc = new StreamContext(nodi.declaredStreamId);
        bssc.setPortId(nodi.portName);
        bssc.setSourceId(connIdentifier);
        bssc.setSinkId(sinkIdentifier);
        bssc.setFinishedWindowId(finishedWindowId);
        bssc.put(StreamContext.CODEC, streamCodec);
        bssc.put(StreamContext.EVENT_LOOP, eventloop);
        bssc.setBufferServerAddress(InetSocketAddress.createUnresolved(nodi.bufferServerHost, nodi.bufferServerPort));
        bssc.put(StreamContext.BUFFER_SERVER_TOKEN, nodi.bufferServerToken);
        InetAddress inetAddress = bssc.getBufferServerAddress().getAddress();
        if (inetAddress != null && NetUtils.isLocalAddress((InetAddress)inetAddress)) {
            bssc.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nodi.bufferServerPort));
        }
        Stream publisher = this.fastPublisherSubscriber ? new FastPublisher(connIdentifier, queueCapacity * 256) : new BufferServerPublisher(connIdentifier, queueCapacity);
        return new AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>>(sinkIdentifier, new ComponentContextPair<FastPublisher, StreamContext>((FastPublisher)publisher, bssc));
    }

    private HashMap<String, ComponentContextPair<Stream, StreamContext>> deployOutputStreams(List<OperatorDeployInfo> nodeList, HashMap<String, ArrayList<String>> groupedInputStreams) throws Exception {
        HashMap<String, ComponentContextPair<Stream, StreamContext>> newStreams = new HashMap<String, ComponentContextPair<Stream, StreamContext>>();
        for (OperatorDeployInfo ndi : nodeList) {
            Node<?> node = this.nodes.get(ndi.id);
            long checkpointWindowId = ndi.checkpoint.windowId;
            for (OperatorDeployInfo.OutputDeployInfo nodi : ndi.outputs) {
                String sourceIdentifier = Integer.toString(ndi.id).concat(".").concat(nodi.portName);
                int queueCapacity = (Integer)this.getValue(PortContext.QUEUE_CAPACITY, nodi, ndi);
                logger.debug("for stream {} the queue capacity is {}", (Object)sourceIdentifier, (Object)queueCapacity);
                ArrayList<String> collection = groupedInputStreams.get(sourceIdentifier);
                Map<Integer, StreamCodec<?>> streamCodecs = nodi.streamCodecs;
                if (collection == null && streamCodecs.size() == 1) {
                    assert (nodi.bufferServerHost != null) : "resulting stream cannot be inline: " + nodi;
                    Map.Entry<Integer, StreamCodec<?>> entry = streamCodecs.entrySet().iterator().next();
                    StreamCodec<?> streamCodec = entry.getValue();
                    Integer streamCodecIdentifier = entry.getKey();
                    String connIdentifier = sourceIdentifier + "." + streamCodecIdentifier;
                    AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher = this.deployBufferServerPublisher(connIdentifier, streamCodec, checkpointWindowId, queueCapacity, nodi);
                    newStreams.put(sourceIdentifier, deployBufferServerPublisher.getValue());
                    node.connectOutputPort(nodi.portName, (Sink<Object>)((Sink)deployBufferServerPublisher.getValue().component));
                    continue;
                }
                ComponentContextPair<Stream, StreamContext> pair = newStreams.get(sourceIdentifier);
                if (pair == null) {
                    StreamContext context = new StreamContext(nodi.declaredStreamId);
                    context.setSourceId(sourceIdentifier);
                    context.setFinishedWindowId(checkpointWindowId);
                    MuxStream stream = new MuxStream();
                    pair = new ComponentContextPair<MuxStream, StreamContext>(stream, context);
                    newStreams.put(sourceIdentifier, pair);
                    node.connectOutputPort(nodi.portName, stream);
                }
                if (nodi.bufferServerHost == null) continue;
                for (Map.Entry<Integer, StreamCodec<?>> entry : streamCodecs.entrySet()) {
                    Integer streamCodecIdentifier = entry.getKey();
                    StreamCodec<?> streamCodec = entry.getValue();
                    String connIdentifier = sourceIdentifier + "." + streamCodecIdentifier;
                    AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher = this.deployBufferServerPublisher(connIdentifier, streamCodec, checkpointWindowId, queueCapacity, nodi);
                    newStreams.put(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue());
                    String sinkIdentifier = ((StreamContext)((Object)pair.context)).getSinkId();
                    if (sinkIdentifier == null) {
                        ((StreamContext)((Object)pair.context)).setSinkId(deployBufferServerPublisher.getKey());
                    } else {
                        ((StreamContext)((Object)pair.context)).setSinkId(sinkIdentifier.concat(", ").concat(deployBufferServerPublisher.getKey()));
                    }
                    ((Stream.MultiSinkCapableStream)pair.component).setSink(deployBufferServerPublisher.getKey(), (Sink<Object>)((Sink)deployBufferServerPublisher.getValue().component));
                }
            }
        }
        return newStreams;
    }

    public final String getDeclaredStreamId(int operatorId, String portname) {
        String identifier = String.valueOf(operatorId).concat(".").concat(portname);
        ComponentContextPair<Stream, StreamContext> spair = this.streams.get(identifier);
        if (spair == null) {
            return null;
        }
        return ((StreamContext)((Object)spair.context)).getId();
    }

    private void deployInputStreams(List<OperatorDeployInfo> operatorList, HashMap<String, ComponentContextPair<Stream, StreamContext>> newStreams) throws UnknownHostException {
        ArrayList<OperatorDeployInfo> inputNodes = new ArrayList<OperatorDeployInfo>();
        long smallestCheckpointedWindowId = Long.MAX_VALUE;
        ConcurrentHashMap<Integer, Integer> oioNodes = new ConcurrentHashMap<Integer, Integer>();
        for (OperatorDeployInfo ndi : operatorList) {
            if (ndi.inputs == null || ndi.inputs.isEmpty()) {
                inputNodes.add(ndi);
                ndi.checkpoint = this.getFinishedCheckpoint(ndi);
                if (ndi.checkpoint.windowId >= smallestCheckpointedWindowId) continue;
                smallestCheckpointedWindowId = ndi.checkpoint.windowId;
                continue;
            }
            Node<?> node = this.nodes.get(ndi.id);
            for (OperatorDeployInfo.InputDeployInfo nidi : ndi.inputs) {
                SweepableReservoir stream;
                if (nidi.streamCodecs.size() != 1) {
                    throw new IllegalStateException("Only one input codec configuration should be present");
                }
                Map.Entry<Integer, StreamCodec<?>> entry = nidi.streamCodecs.entrySet().iterator().next();
                Integer streamCodecIdentifier = entry.getKey();
                StreamCodec<?> streamCodec = entry.getValue();
                String sourceIdentifier = Integer.toString(nidi.sourceNodeId).concat(".").concat(nidi.sourcePortName);
                String sinkIdentifier = Integer.toString(ndi.id).concat(".").concat(nidi.portName);
                int queueCapacity = (Integer)this.getValue(PortContext.QUEUE_CAPACITY, nidi, ndi);
                Checkpoint checkpoint = this.getFinishedCheckpoint(ndi);
                ComponentContextPair<Stream, StreamContext> pair = this.streams.get(sourceIdentifier);
                if (pair == null) {
                    pair = newStreams.get(sourceIdentifier);
                }
                if (pair == null) {
                    BufferServerSubscriber subscriber;
                    assert (nidi.locality != DAG.Locality.CONTAINER_LOCAL && nidi.locality != DAG.Locality.THREAD_LOCAL);
                    StreamContext context = new StreamContext(nidi.declaredStreamId);
                    context.setBufferServerAddress(InetSocketAddress.createUnresolved(nidi.bufferServerHost, nidi.bufferServerPort));
                    InetAddress inetAddress = context.getBufferServerAddress().getAddress();
                    if (inetAddress != null && NetUtils.isLocalAddress((InetAddress)inetAddress)) {
                        context.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), nidi.bufferServerPort));
                    }
                    context.put(StreamContext.BUFFER_SERVER_TOKEN, nidi.bufferServerToken);
                    String connIdentifier = sourceIdentifier + "." + streamCodecIdentifier;
                    context.setPortId(nidi.portName);
                    context.put(StreamContext.CODEC, streamCodec);
                    context.put(StreamContext.EVENT_LOOP, eventloop);
                    context.setPartitions(nidi.partitionMask, nidi.partitionKeys);
                    context.setSourceId(connIdentifier);
                    context.setSinkId(sinkIdentifier);
                    context.setFinishedWindowId(checkpoint.windowId);
                    BufferServerSubscriber bufferServerSubscriber = subscriber = this.fastPublisherSubscriber ? new FastSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity) : new BufferServerSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity);
                    if (streamCodec instanceof StreamCodecWrapperForPersistance) {
                        subscriber.acquireReservoirForPersistStream(sinkIdentifier, queueCapacity, streamCodec);
                    }
                    SweepableReservoir reservoir = subscriber.acquireReservoir(sinkIdentifier, queueCapacity);
                    if (checkpoint.windowId >= 0L) {
                        node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, reservoir, checkpoint.windowId));
                    }
                    node.connectInputPort(nidi.portName, reservoir);
                    newStreams.put(sinkIdentifier, new ComponentContextPair<BufferServerSubscriber, StreamContext>(subscriber, context));
                    logger.debug("put input stream {} against key {}", (Object)subscriber, (Object)sinkIdentifier);
                    continue;
                }
                assert (nidi.locality == DAG.Locality.CONTAINER_LOCAL || nidi.locality == DAG.Locality.THREAD_LOCAL);
                StreamContext inlineContext = new StreamContext(nidi.declaredStreamId);
                inlineContext.setSourceId(sourceIdentifier);
                inlineContext.setSinkId(sinkIdentifier);
                switch (nidi.locality) {
                    case CONTAINER_LOCAL: {
                        int outputQueueCapacity = this.getOutputQueueCapacity(operatorList, nidi.sourceNodeId, nidi.sourcePortName);
                        if (outputQueueCapacity > queueCapacity) {
                            queueCapacity = outputQueueCapacity;
                        }
                        stream = new InlineStream(queueCapacity);
                        if (checkpoint.windowId < 0L) break;
                        node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, stream, checkpoint.windowId));
                        break;
                    }
                    case THREAD_LOCAL: {
                        stream = new OiOStream();
                        oioNodes.put(ndi.id, nidi.sourceNodeId);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Locality can be either ContainerLocal or ThreadLocal");
                    }
                }
                node.connectInputPort(nidi.portName, stream);
                newStreams.put(sinkIdentifier, new ComponentContextPair<OiOStream, StreamContext>((OiOStream)stream, inlineContext));
                if (!(pair.component instanceof Stream.MultiSinkCapableStream)) {
                    String originalSinkId = ((StreamContext)((Object)pair.context)).getSinkId();
                    StreamContext muxContext = new StreamContext(nidi.declaredStreamId);
                    muxContext.setSourceId(sourceIdentifier);
                    muxContext.setFinishedWindowId(checkpoint.windowId);
                    muxContext.setSinkId(originalSinkId);
                    MuxStream muxStream = new MuxStream();
                    muxStream.setSink(originalSinkId, (Sink<Object>)((Sink)pair.component));
                    this.streams.put(originalSinkId, pair);
                    Node<?> sourceNode = this.nodes.get(nidi.sourceNodeId);
                    sourceNode.connectOutputPort(nidi.sourcePortName, muxStream);
                    pair = new ComponentContextPair<MuxStream, StreamContext>(muxStream, muxContext);
                    newStreams.put(sourceIdentifier, pair);
                }
                if (streamCodec instanceof StreamCodecWrapperForPersistance) {
                    PartitionAwareSinkForPersistence pas = nidi.partitionKeys == null ? new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance)streamCodec, nidi.partitionMask, (Sink<Object>)stream) : new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance)streamCodec, nidi.partitionKeys, nidi.partitionMask, (Sink<Object>)stream);
                    ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
                } else if (nidi.partitionKeys == null || nidi.partitionKeys.isEmpty()) {
                    ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, (Sink<Object>)stream);
                } else {
                    PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>((StreamCodec<Object>)(streamCodec == null ? this.nonSerializingStreamCodec : streamCodec), nidi.partitionKeys, nidi.partitionMask, (Sink<Object>)stream);
                    ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
                }
                String streamSinkId = ((StreamContext)((Object)pair.context)).getSinkId();
                if (streamSinkId == null) {
                    ((StreamContext)((Object)pair.context)).setSinkId(sinkIdentifier);
                    continue;
                }
                ((StreamContext)((Object)pair.context)).setSinkId(streamSinkId.concat(", ").concat(sinkIdentifier));
            }
        }
        this.setupOiOGroups(oioNodes);
        if (!inputNodes.isEmpty()) {
            WindowGenerator windowGenerator = this.setupWindowGenerator(smallestCheckpointedWindowId);
            for (OperatorDeployInfo ndi : inputNodes) {
                this.generators.put(ndi.id, windowGenerator);
                Node<?> node = this.nodes.get(ndi.id);
                SweepableReservoir reservoir = windowGenerator.acquireReservoir(String.valueOf(ndi.id), 1024);
                if (ndi.checkpoint.windowId >= 0L) {
                    node.connectInputPort("input", new WindowIdActivatedReservoir(Integer.toString(ndi.id), reservoir, ndi.checkpoint.windowId));
                }
                node.connectInputPort("input", reservoir);
            }
        }
    }

    private void setupOiOGroups(Map<Integer, Integer> oioNodes) {
        for (Integer child : oioNodes.keySet()) {
            Integer temp;
            Integer oioParent = oioNodes.get(child);
            while ((temp = oioNodes.get(oioParent)) != null) {
                oioParent = temp;
            }
            ArrayList<Integer> children = this.oioGroups.get(oioParent);
            if (children == null) {
                children = new ArrayList();
                this.oioGroups.put(oioParent, children);
            }
            children.add(child);
        }
    }

    protected WindowGenerator setupWindowGenerator(long finishedWindowId) {
        WindowGenerator windowGenerator = new WindowGenerator((ScheduledExecutorService)new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
        windowGenerator.setResetWindow(this.firstWindowMillis);
        long millisAtFirstWindow = WindowGenerator.getNextWindowMillis(finishedWindowId, this.firstWindowMillis, this.windowWidthMillis);
        windowGenerator.setFirstWindow(millisAtFirstWindow);
        windowGenerator.setWindowWidth(this.windowWidthMillis);
        long windowCount = WindowGenerator.getWindowCount(millisAtFirstWindow, this.firstWindowMillis, this.windowWidthMillis);
        windowGenerator.setCheckpointCount(this.checkpointWindowCount, (int)(windowCount % (long)this.checkpointWindowCount));
        return windowGenerator;
    }

    private void setupNode(OperatorDeployInfo ndi) {
        this.failedNodes.remove(ndi.id);
        Node<?> node = this.nodes.get(ndi.id);
        node.setup(node.context);
        LinkedHashMap<String, Operators.PortContextPair<Operator.InputPort<?>>> inputPorts = node.getPortMappingDescriptor().inputPorts;
        LinkedHashMap<String, Operators.PortContextPair<Operator.InputPort>> newInputPorts = new LinkedHashMap<String, Operators.PortContextPair<Operator.InputPort>>(inputPorts.size());
        for (OperatorDeployInfo.InputDeployInfo idi : ndi.inputs) {
            Operator.InputPort port = (Operator.InputPort)inputPorts.get((Object)idi.portName).component;
            PortContext context = new PortContext(idi.contextAttributes, node.context);
            newInputPorts.put(idi.portName, new Operators.PortContextPair<Operator.InputPort>(port, context));
            port.setup((Context)context);
        }
        inputPorts.putAll(newInputPorts);
        LinkedHashMap<String, Operators.PortContextPair<Operator.OutputPort<?>>> outputPorts = node.getPortMappingDescriptor().outputPorts;
        LinkedHashMap<String, Operators.PortContextPair<Operator.OutputPort>> newOutputPorts = new LinkedHashMap<String, Operators.PortContextPair<Operator.OutputPort>>(outputPorts.size());
        for (OperatorDeployInfo.OutputDeployInfo odi : ndi.outputs) {
            Operator.OutputPort port = (Operator.OutputPort)outputPorts.get((Object)odi.portName).component;
            PortContext context = new PortContext(odi.contextAttributes, node.context);
            newOutputPorts.put(odi.portName, new Operators.PortContextPair<Operator.OutputPort>(port, context));
            port.setup((Context)context);
        }
        outputPorts.putAll(newOutputPorts);
        logger.debug("activating {} in container {}", node, (Object)this.containerId);
        this.processNodeRequests(false);
        node.activate();
        this.eventBus.publish((Object)new ContainerEvent.NodeActivationEvent(node));
    }

    private void teardownNode(OperatorDeployInfo ndi) {
        Node<?> node = this.nodes.get(ndi.id);
        if (node == null) {
            logger.warn("node {}/{} took longer to exit, resulting in unclean undeploy!", (Object)ndi.id, (Object)ndi.name);
        } else {
            this.eventBus.publish((Object)new ContainerEvent.NodeDeactivationEvent(node));
            node.deactivate();
            node.teardown();
            logger.debug("deactivated {}", (Object)node.getId());
        }
    }

    public synchronized void activate(final Map<Integer, OperatorDeployInfo> nodeMap, Map<String, ComponentContextPair<Stream, StreamContext>> newStreams) {
        for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) {
            if (pair.component instanceof BufferServerSubscriber) continue;
            this.activeStreams.put((Stream)pair.component, (StreamContext)((Object)pair.context));
            ((Stream)pair.component).activate((Context)pair.context);
            this.eventBus.publish((Object)new ContainerEvent.StreamActivationEvent(pair));
        }
        final CountDownLatch signal = new CountDownLatch(nodeMap.size());
        for (final OperatorDeployInfo operatorDeployInfo : nodeMap.values()) {
            if (operatorDeployInfo.type == OperatorDeployInfo.OperatorType.OIO) continue;
            final Node<?> node = this.nodes.get(operatorDeployInfo.id);
            new Thread(Integer.toString(operatorDeployInfo.id).concat("/").concat(operatorDeployInfo.name).concat(":").concat(node.getOperator().getClass().getSimpleName())){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Loose catch block
                 */
                @Override
                public void run() {
                    block48: {
                        List oioNodeIdList;
                        OperatorDeployInfo currentdi;
                        HashSet<OperatorDeployInfo> setOperators;
                        block47: {
                            block44: {
                                setOperators = new HashSet<OperatorDeployInfo>();
                                currentdi = operatorDeployInfo;
                                StreamingContainer.this.setupNode(currentdi);
                                setOperators.add(currentdi);
                                oioNodeIdList = StreamingContainer.this.oioGroups.get(operatorDeployInfo.id);
                                if (oioNodeIdList != null) {
                                    for (Integer oioNodeId : oioNodeIdList) {
                                        currentdi = (OperatorDeployInfo)nodeMap.get(oioNodeId);
                                        StreamingContainer.this.setupNode(currentdi);
                                        setOperators.add(currentdi);
                                    }
                                }
                                currentdi = null;
                                int i = setOperators.size();
                                while (i-- > 0) {
                                    signal.countDown();
                                }
                                node.run();
                                if (!setOperators.contains(operatorDeployInfo)) break block44;
                                try {
                                    StreamingContainer.this.teardownNode(operatorDeployInfo);
                                }
                                catch (Exception ex) {
                                    StreamingContainer.this.failedNodes.add(operatorDeployInfo.id);
                                    logger.error("Shutdown of operator {} failed due to an exception.", (Object)operatorDeployInfo, (Object)ex);
                                }
                                break block47;
                            }
                            signal.countDown();
                        }
                        oioNodeIdList = StreamingContainer.this.oioGroups.get(operatorDeployInfo.id);
                        if (oioNodeIdList != null) {
                            for (Integer oioNodeId : oioNodeIdList) {
                                OperatorDeployInfo oiodi = (OperatorDeployInfo)nodeMap.get(oioNodeId);
                                if (setOperators.contains(oiodi)) {
                                    try {
                                        StreamingContainer.this.teardownNode(oiodi);
                                    }
                                    catch (Exception ex) {
                                        StreamingContainer.this.failedNodes.add(oiodi.id);
                                        logger.error("Shutdown of operator {} failed due to an exception.", (Object)oiodi, (Object)ex);
                                    }
                                    continue;
                                }
                                signal.countDown();
                            }
                        }
                        break block48;
                        catch (Error error) {
                            block49: {
                                block45: {
                                    int[] operators;
                                    if (currentdi == null) {
                                        logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, (Object)error);
                                        operators = new int[setOperators.size()];
                                        int i = 0;
                                        Iterator it = setOperators.iterator();
                                        while (it.hasNext()) {
                                            operators[i] = ((OperatorDeployInfo)it.next()).id;
                                            ++i;
                                        }
                                    } else {
                                        logger.error("Voluntary container termination due to an error in operator {}.", (Object)currentdi, (Object)error);
                                        operators = new int[]{currentdi.id};
                                    }
                                    StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace((Throwable)error));
                                    System.exit(1);
                                    if (!setOperators.contains(operatorDeployInfo)) break block45;
                                    try {
                                        StreamingContainer.this.teardownNode(operatorDeployInfo);
                                    }
                                    catch (Exception ex) {
                                        StreamingContainer.this.failedNodes.add(operatorDeployInfo.id);
                                        logger.error("Shutdown of operator {} failed due to an exception.", (Object)operatorDeployInfo, (Object)ex);
                                    }
                                    break block49;
                                }
                                signal.countDown();
                            }
                            oioNodeIdList = StreamingContainer.this.oioGroups.get(operatorDeployInfo.id);
                            if (oioNodeIdList != null) {
                                for (Integer oioNodeId : oioNodeIdList) {
                                    OperatorDeployInfo oiodi = (OperatorDeployInfo)nodeMap.get(oioNodeId);
                                    if (setOperators.contains(oiodi)) {
                                        try {
                                            StreamingContainer.this.teardownNode(oiodi);
                                        }
                                        catch (Exception ex) {
                                            StreamingContainer.this.failedNodes.add(oiodi.id);
                                            logger.error("Shutdown of operator {} failed due to an exception.", (Object)oiodi, (Object)ex);
                                        }
                                        continue;
                                    }
                                    signal.countDown();
                                }
                            }
                        }
                        catch (Exception ex) {
                            block50: {
                                block46: {
                                    int[] operators;
                                    if (currentdi == null) {
                                        StreamingContainer.this.failedNodes.add(operatorDeployInfo.id);
                                        logger.error("Operator set {} stopped running due to an exception.", setOperators, (Object)ex);
                                        operators = new int[]{operatorDeployInfo.id};
                                        StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace((Throwable)ex));
                                    } else {
                                        StreamingContainer.this.failedNodes.add(currentdi.id);
                                        logger.error("Abandoning deployment of operator {} due to setup failure.", (Object)currentdi, (Object)ex);
                                        operators = new int[]{currentdi.id};
                                        StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace((Throwable)ex));
                                    }
                                    if (!setOperators.contains(operatorDeployInfo)) break block46;
                                    try {
                                        StreamingContainer.this.teardownNode(operatorDeployInfo);
                                    }
                                    catch (Exception ex2) {
                                        StreamingContainer.this.failedNodes.add(operatorDeployInfo.id);
                                        logger.error("Shutdown of operator {} failed due to an exception.", (Object)operatorDeployInfo, (Object)ex2);
                                    }
                                    break block50;
                                }
                                signal.countDown();
                            }
                            oioNodeIdList = StreamingContainer.this.oioGroups.get(operatorDeployInfo.id);
                            if (oioNodeIdList != null) {
                                for (Integer oioNodeId : oioNodeIdList) {
                                    OperatorDeployInfo oiodi = (OperatorDeployInfo)nodeMap.get(oioNodeId);
                                    if (setOperators.contains(oiodi)) {
                                        try {
                                            StreamingContainer.this.teardownNode(oiodi);
                                        }
                                        catch (Exception ex3) {
                                            StreamingContainer.this.failedNodes.add(oiodi.id);
                                            logger.error("Shutdown of operator {} failed due to an exception.", (Object)oiodi, (Object)ex3);
                                        }
                                        continue;
                                    }
                                    signal.countDown();
                                }
                            }
                            {
                                catch (Throwable throwable) {
                                    if (setOperators.contains(operatorDeployInfo)) {
                                        try {
                                            StreamingContainer.this.teardownNode(operatorDeployInfo);
                                        }
                                        catch (Exception ex4) {
                                            StreamingContainer.this.failedNodes.add(operatorDeployInfo.id);
                                            logger.error("Shutdown of operator {} failed due to an exception.", (Object)operatorDeployInfo, (Object)ex4);
                                        }
                                    } else {
                                        signal.countDown();
                                    }
                                    List oioNodeIdList2 = StreamingContainer.this.oioGroups.get(operatorDeployInfo.id);
                                    if (oioNodeIdList2 != null) {
                                        for (Integer oioNodeId : oioNodeIdList2) {
                                            OperatorDeployInfo oiodi = (OperatorDeployInfo)nodeMap.get(oioNodeId);
                                            if (setOperators.contains(oiodi)) {
                                                try {
                                                    StreamingContainer.this.teardownNode(oiodi);
                                                }
                                                catch (Exception ex5) {
                                                    StreamingContainer.this.failedNodes.add(oiodi.id);
                                                    logger.error("Shutdown of operator {} failed due to an exception.", (Object)oiodi, (Object)ex5);
                                                }
                                                continue;
                                            }
                                            signal.countDown();
                                        }
                                    }
                                    throw throwable;
                                }
                            }
                        }
                    }
                }
            }.start();
        }
        try {
            signal.await();
        }
        catch (InterruptedException ex) {
            logger.debug("Activation of operators interrupted.", (Throwable)ex);
        }
        for (ComponentContextPair componentContextPair : newStreams.values()) {
            if (!(componentContextPair.component instanceof BufferServerSubscriber)) continue;
            this.activeStreams.put((Stream)componentContextPair.component, (StreamContext)((Object)componentContextPair.context));
            ((Stream)componentContextPair.component).activate((Context)componentContextPair.context);
            this.eventBus.publish((Object)new ContainerEvent.StreamActivationEvent(componentContextPair));
        }
        for (WindowGenerator windowGenerator : this.generators.values()) {
            if (this.activeGenerators.containsKey(windowGenerator)) continue;
            this.activeGenerators.put(windowGenerator, this.generators);
            windowGenerator.activate((StreamContext)null);
        }
    }

    private void groupInputStreams(HashMap<String, ArrayList<String>> groupedInputStreams, OperatorDeployInfo ndi) {
        for (OperatorDeployInfo.InputDeployInfo nidi : ndi.inputs) {
            String source = Integer.toString(nidi.sourceNodeId).concat(".").concat(nidi.sourcePortName);
            ArrayList<String> collection = groupedInputStreams.get(source);
            if (collection == null) {
                collection = new ArrayList();
                groupedInputStreams.put(source, collection);
            }
            collection.add(Integer.toString(ndi.id).concat(".").concat(nidi.portName));
        }
    }

    protected Checkpoint getFinishedCheckpoint(OperatorDeployInfo ndi) {
        Checkpoint checkpoint;
        if (ndi.contextAttributes != null && ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE) == Operator.ProcessingMode.AT_MOST_ONCE) {
            long now = System.currentTimeMillis();
            long windowCount = WindowGenerator.getWindowCount(now, this.firstWindowMillis, this.firstWindowMillis);
            Integer temp = (Integer)ndi.contextAttributes.get(OperatorContext.APPLICATION_WINDOW_COUNT);
            if (temp == null) {
                temp = (Integer)this.containerContext.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
            }
            int appWindowCount = (int)(windowCount % (long)temp.intValue());
            temp = (Integer)ndi.contextAttributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT);
            if (temp == null) {
                temp = (Integer)this.containerContext.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
            }
            int lCheckpointWindowCount = (int)(windowCount % (long)temp.intValue());
            checkpoint = new Checkpoint(WindowGenerator.getWindowId(now, this.firstWindowMillis, this.windowWidthMillis), appWindowCount, lCheckpointWindowCount);
            logger.debug("using {} on {} at {}", new Object[]{Operator.ProcessingMode.AT_MOST_ONCE, ndi.name, checkpoint});
        } else {
            checkpoint = ndi.checkpoint;
            logger.debug("using {} on {} at {}", new Object[]{ndi.contextAttributes == null ? Operator.ProcessingMode.AT_LEAST_ONCE : (ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE) == null ? Operator.ProcessingMode.AT_LEAST_ONCE : (Operator.ProcessingMode)ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE)), ndi.name, checkpoint});
        }
        return checkpoint;
    }

    public void operateListeners(StreamingContainerUmbilicalProtocol.StreamingContainerContext ctx, boolean setup) {
        if (setup) {
            for (Component<ContainerContext> c : this.components) {
                c.setup((Context)ctx);
            }
        } else {
            for (Component<ContainerContext> c : this.components) {
                c.teardown();
            }
        }
    }

    private <T> T getValue(Attribute<T> key, Context.PortContext portContext, OperatorDeployInfo deployInfo) {
        Object attr;
        Attribute.AttributeMap attributes;
        if (portContext != null && (attributes = portContext.getAttributes()) != null && (attr = attributes.get(key)) != null) {
            return (T)attr;
        }
        if (deployInfo != null && (attributes = deployInfo.contextAttributes) != null && (attr = attributes.get(key)) != null) {
            return (T)attr;
        }
        return this.containerContext.getValue(key);
    }

    private <T> T getValue(Attribute<T> key, OperatorDeployInfo deployInfo) {
        Object attr;
        Attribute.AttributeMap attributes;
        if (deployInfo != null && (attributes = deployInfo.contextAttributes) != null && (attr = attributes.get(key)) != null) {
            return (T)attr;
        }
        return this.containerContext.getValue(key);
    }

    private void handleChangeLoggersRequest(StramToNodeChangeLoggersRequest request) {
        logger.debug("handle change logger request");
        DTLoggerFactory.getInstance().changeLoggersLevel(request.getTargetChanges());
    }

    static {
        try {
            eventloop = DefaultEventLoop.createEventLoop((String)"ProcessWideEventLoop");
        }
        catch (IOException io) {
            throw new RuntimeException(io);
        }
        logger = LoggerFactory.getLogger(StreamingContainer.class);
    }
}

