package com.datatorrent.stram.engine;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.RecoverableRpcProxy;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StringCodecs;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.ContainerContext;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.RequestFactory;
import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.debug.StdOutErrLog;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.stream.FastPublisher;
import com.datatorrent.stram.stream.FastSubscriber;
import com.datatorrent.stram.stream.InlineStream;
import com.datatorrent.stram.stream.MuxStream;
import com.datatorrent.stram.stream.OiOStream;
import com.datatorrent.stram.stream.PartitionAwareSink;
import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence;
import com.datatorrent.stram.util.AbstractWritableAdapter;
import com.datatorrent.stram.util.LoggerUtil;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import java.io.IOException;
import java.lang.Thread;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.config.BusConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainer.class */
public class StreamingContainer extends StramUtils.YarnContainerMain {
    public static final String PROP_APP_PATH;
    private final String containerId;
    private final transient StreamingContainerUmbilicalProtocol umbilical;
    private transient List<GarbageCollectorMXBean> garbageCollectorMXBeans;
    public static DefaultEventLoop eventloop;
    private long firstWindowMillis;
    private int windowWidthMillis;
    protected InetSocketAddress bufferServerAddress;
    protected Server bufferServer;
    private int checkpointWindowCount;
    private boolean fastPublisherSubscriber;
    private StreamingContainerUmbilicalProtocol.StreamingContainerContext containerContext;
    private RequestFactory requestFactory;
    private static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final Map<Integer, Node<?>> nodes = new ConcurrentHashMap();
    protected final Set<Integer> failedNodes = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Map<String, ComponentContextPair<Stream, StreamContext>> streams = new ConcurrentHashMap();
    protected final Map<Integer, WindowGenerator> generators = new ConcurrentHashMap();
    protected final Map<Integer, ArrayList<Integer>> oioGroups = new ConcurrentHashMap();
    private final Map<Stream, StreamContext> activeStreams = new ConcurrentHashMap();
    private final Map<WindowGenerator, Object> activeGenerators = new ConcurrentHashMap();
    private int heartbeatIntervalMillis = StreamingContainerManager.METRIC_QUEUE_SIZE;
    private volatile boolean exitHeartbeatLoop = false;
    private final Object heartbeatTrigger = new Object();
    private long lastCommittedWindowId = -1;
    private final StreamCodec<Object> nonSerializingStreamCodec = new StreamCodec<Object>() { // from class: com.datatorrent.stram.engine.StreamingContainer.3
        public Object fromByteArray(Slice slice) {
            return null;
        }

        public Slice toByteArray(Object obj) {
            return null;
        }

        public int getPartition(Object obj) {
            return obj.hashCode();
        }
    };
    private final transient String jvmName = ManagementFactory.getRuntimeMXBean().getName();
    HashSet<Component<ContainerContext>> components = new HashSet<>();
    private final MBassador<ContainerEvent> eventBus = new MBassador<>(BusConfiguration.Default(1, 1, 1));
    private final HashMap<String, Object> singletons = new HashMap<>();
    private List<StreamingContainerUmbilicalProtocol.StramToNodeRequest> nodeRequests = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.datatorrent.stram.engine.StreamingContainer$4, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/engine/StreamingContainer$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$api$DAG$Locality = new int[DAG.Locality.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$api$DAG$Locality[DAG.Locality.CONTAINER_LOCAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$api$DAG$Locality[DAG.Locality.THREAD_LOCAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamingContainer(String str, StreamingContainerUmbilicalProtocol streamingContainerUmbilicalProtocol) {
        logger.debug("instantiated StramChild {}", str);
        this.umbilical = streamingContainerUmbilicalProtocol;
        this.containerId = str;
    }

    public void setup(StreamingContainerUmbilicalProtocol.StreamingContainerContext streamingContainerContext) {
        int i;
        int i2;
        this.containerContext = streamingContainerContext;
        this.requestFactory = new RequestFactory();
        streamingContainerContext.attributes.put(ContainerContext.REQUEST_FACTORY, this.requestFactory);
        this.heartbeatIntervalMillis = ((Integer) streamingContainerContext.getValue(Context.DAGContext.HEARTBEAT_INTERVAL_MILLIS)).intValue();
        this.firstWindowMillis = streamingContainerContext.startWindowMillis;
        this.windowWidthMillis = ((Integer) streamingContainerContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        this.checkpointWindowCount = ((Integer) streamingContainerContext.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)).intValue();
        this.fastPublisherSubscriber = ((Boolean) streamingContainerContext.getValue(LogicalPlan.FAST_PUBLISHER_SUBSCRIBER)).booleanValue();
        StringCodecs.loadConverters((Map) streamingContainerContext.getValue(Context.DAGContext.STRING_CODECS));
        try {
            if (streamingContainerContext.deployBufferServer) {
                eventloop.start();
                int intValue = ((Integer) streamingContainerContext.getValue(ContainerContext.BUFFER_SERVER_MB)).intValue();
                logger.debug("buffer server memory {}", Integer.valueOf(intValue));
                if (intValue < ((Integer) ContainerContext.BUFFER_SERVER_MB.defaultValue).intValue()) {
                    i2 = 8;
                    i = intValue / 8;
                    if (i < 1) {
                        i = 1;
                    }
                } else {
                    i = 64;
                    i2 = intValue / 64;
                }
                this.bufferServer = new Server(0, i * 1024 * 1024, i2);
                this.bufferServer.setAuthToken((byte[]) streamingContainerContext.getValue(StreamingContainerUmbilicalProtocol.StreamingContainerContext.BUFFER_SERVER_TOKEN));
                if (((Boolean) streamingContainerContext.getValue(Context.DAGContext.BUFFER_SPOOLING)).booleanValue()) {
                    this.bufferServer.setSpoolStorage(new DiskStorage());
                }
                this.bufferServerAddress = NetUtils.getConnectAddress(this.bufferServer.run(eventloop));
                logger.debug("Buffer server started: {}", this.bufferServerAddress);
            }
            for (Class<?> cls : ContainerEvent.CONTAINER_EVENTS_LISTENERS) {
                try {
                    Object newInstance = cls.newInstance();
                    this.singletons.put(cls.getName(), newInstance);
                    if (newInstance instanceof Component) {
                        this.components.add((Component) newInstance);
                    }
                    this.eventBus.subscribe(newInstance);
                } catch (IllegalAccessException e) {
                    logger.warn("Container Event Listener Instantiation", e);
                } catch (InstantiationException e2) {
                    logger.warn("Container Event Listener Instantiation", e2);
                }
            }
            operateListeners(streamingContainerContext, true);
        } catch (IOException e3) {
            logger.warn("deploy request failed due to {}", e3);
            throw new IllegalStateException("Failed to deploy buffer server", e3);
        }
    }

    public String getContainerId() {
        return this.containerId;
    }

    public Object getInstance(String str) {
        return this.singletons.get(str);
    }

    public static void main(String[] strArr) throws Throwable {
        StdOutErrLog.tieSystemOutAndErrToLog();
        logger.debug("PID: " + System.getenv().get("JVM_PID"));
        logger.info("Child starting with classpath: {}", System.getProperty("java.class.path"));
        String property = System.getProperty(PROP_APP_PATH);
        if (property == null) {
            logger.error("{} not set in container environment.", PROP_APP_PATH);
            System.exit(1);
        }
        int i = 1;
        RecoverableRpcProxy recoverableRpcProxy = new RecoverableRpcProxy(property, new Configuration());
        StreamingContainerUmbilicalProtocol proxy = recoverableRpcProxy.getProxy();
        String property2 = System.getProperty("dt.cid");
        try {
            try {
                try {
                    StreamingContainerUmbilicalProtocol.StreamingContainerContext initContext = proxy.getInitContext(property2);
                    StreamingContainer streamingContainer = new StreamingContainer(property2, proxy);
                    logger.debug("Container Context = {}", initContext);
                    streamingContainer.setup(initContext);
                    try {
                        streamingContainer.heartbeatLoop();
                        i = 0;
                        streamingContainer.teardown();
                        recoverableRpcProxy.close();
                        DefaultMetricsSystem.shutdown();
                        logger.info("Exit status for container: {}", 0);
                        LogManager.shutdown();
                    } catch (Throwable th) {
                        streamingContainer.teardown();
                        throw th;
                    }
                } catch (Throwable th2) {
                    recoverableRpcProxy.close();
                    DefaultMetricsSystem.shutdown();
                    logger.info("Exit status for container: {}", 1);
                    LogManager.shutdown();
                    throw th2;
                }
            } catch (Exception e) {
                logger.error("Fatal exception in container!", e);
                proxy.reportError(property2, null, ExceptionUtils.getStackTrace(e));
                recoverableRpcProxy.close();
                DefaultMetricsSystem.shutdown();
                logger.info("Exit status for container: {}", 1);
                LogManager.shutdown();
            }
        } catch (Error e2) {
            logger.error("Fatal error in container!", e2);
            proxy.reportError(property2, null, "FATAL: " + ExceptionUtils.getStackTrace(e2));
            recoverableRpcProxy.close();
            DefaultMetricsSystem.shutdown();
            logger.info("Exit status for container: {}", 1);
            LogManager.shutdown();
        }
        if (i != 0) {
            System.exit(i);
        }
    }

    public synchronized void deactivate() {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Integer, Node<?>> entry : this.nodes.entrySet()) {
            Thread thread = entry.getValue().context.getThread();
            if (thread == null || !thread.isAlive()) {
                disconnectNode(entry.getKey().intValue());
            } else {
                arrayList.add(thread);
                arrayList2.add(entry.getKey());
                entry.getValue().shutdown();
            }
        }
        try {
            Iterator it = arrayList2.iterator();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                Thread thread2 = (Thread) it2.next();
                thread2.join(1000L);
                if (!thread2.getState().equals(Thread.State.TERMINATED)) {
                    thread2.interrupt();
                }
                disconnectNode(((Integer) it.next()).intValue());
            }
        } catch (InterruptedException e) {
            logger.warn("Aborting wait for operators to get deactivated!", e);
        }
        Iterator<WindowGenerator> it3 = this.activeGenerators.keySet().iterator();
        while (it3.hasNext()) {
            it3.next().deactivate();
        }
        this.activeGenerators.clear();
        Iterator<Stream> it4 = this.activeStreams.keySet().iterator();
        while (it4.hasNext()) {
            it4.next().deactivate();
        }
        this.activeStreams.clear();
    }

    private void disconnectNode(int i) {
        Node<?> node = this.nodes.get(Integer.valueOf(i));
        disconnectWindowGenerator(i, node);
        Operators.PortMappingDescriptor portMappingDescriptor = node.getPortMappingDescriptor();
        Iterator<String> it = portMappingDescriptor.outputPorts.keySet().iterator();
        while (it.hasNext()) {
            String concat = String.valueOf(i).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(it.next());
            ComponentContextPair<Stream, StreamContext> remove = this.streams.remove(concat);
            if (remove != null) {
                if (this.activeStreams.remove(remove.component) != null) {
                    ((Stream) remove.component).deactivate();
                    this.eventBus.publish(new ContainerEvent.StreamDeactivationEvent(remove));
                }
                if (remove.component instanceof Stream.MultiSinkCapableStream) {
                    String sinkId = remove.context.getSinkId();
                    if (sinkId != null) {
                        String[] split = sinkId.split(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR);
                        int length = split.length;
                        while (true) {
                            int i2 = length;
                            length--;
                            if (i2 <= 0) {
                                break;
                            }
                            ComponentContextPair<Stream, StreamContext> remove2 = this.streams.remove(split[length]);
                            if (remove2 == null) {
                                logger.error("mux is missing the stream for sink {}", split[length]);
                            } else {
                                if (this.activeStreams.remove(remove2.component) != null) {
                                    ((Stream) remove2.component).deactivate();
                                    this.eventBus.publish(new ContainerEvent.StreamDeactivationEvent(remove2));
                                }
                                ((Stream) remove2.component).teardown();
                            }
                        }
                    } else {
                        logger.error("mux sinks found connected at {} with sink id null", concat);
                    }
                }
                ((Stream) remove.component).teardown();
            }
        }
        Iterator<String> it2 = portMappingDescriptor.inputPorts.keySet().iterator();
        while (it2.hasNext()) {
            String concat2 = String.valueOf(i).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(it2.next());
            ComponentContextPair<Stream, StreamContext> remove3 = this.streams.remove(concat2);
            if (remove3 != null) {
                if (this.activeStreams.remove(remove3.component) != null) {
                    ((Stream) remove3.component).deactivate();
                    this.eventBus.publish(new ContainerEvent.StreamDeactivationEvent(remove3));
                }
                ((Stream) remove3.component).teardown();
                ComponentContextPair<Stream, StreamContext> componentContextPair = this.streams.get(remove3.context.getSourceId());
                if (componentContextPair != null) {
                    if (componentContextPair == remove3) {
                        this.streams.remove(remove3.context.getSourceId());
                    } else {
                        unregisterSinkFromMux(componentContextPair, concat2);
                    }
                }
            }
        }
    }

    private boolean unregisterSinkFromMux(ComponentContextPair<Stream, StreamContext> componentContextPair, String str) {
        String[] split = componentContextPair.context.getSinkId().split(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR);
        boolean z = false;
        int length = split.length;
        while (true) {
            int i = length;
            length--;
            if (i <= 0) {
                break;
            }
            if (split[length].equals(str)) {
                split[length] = null;
                z = true;
                break;
            }
        }
        if (z) {
            ((Stream.MultiSinkCapableStream) componentContextPair.component).setSink(str, null);
            if (split.length == 1) {
                componentContextPair.context.setSinkId(null);
                this.streams.remove(componentContextPair.context.getSourceId());
                if (this.activeStreams.remove(componentContextPair.component) != null) {
                    ((Stream) componentContextPair.component).deactivate();
                    this.eventBus.publish(new ContainerEvent.StreamDeactivationEvent(componentContextPair));
                }
                ((Stream) componentContextPair.component).teardown();
            } else {
                StringBuilder sb = new StringBuilder((componentContextPair.context.getSinkId().length() - MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR.length()) - str.length());
                z = false;
                int length2 = split.length;
                while (true) {
                    int i2 = length2;
                    length2--;
                    if (i2 <= 0) {
                        break;
                    }
                    if (split[length2] != null) {
                        if (z) {
                            sb.append(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).append(split[length2]);
                        } else {
                            sb.append(split[length2]);
                            z = true;
                        }
                    }
                }
                componentContextPair.context.setSinkId(sb.toString());
            }
        } else {
            logger.error("{} was not connected to stream connected to {}", str, componentContextPair.context.getSourceId());
        }
        return z;
    }

    private void disconnectWindowGenerator(int i, Node<?> node) {
        WindowGenerator remove = this.generators.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.releaseReservoir(Integer.toString(i).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(Node.INPUT));
            int i2 = 0;
            Iterator<WindowGenerator> it = this.generators.values().iterator();
            while (it.hasNext()) {
                if (remove == it.next()) {
                    i2++;
                }
            }
            if (i2 == 0) {
                this.activeGenerators.remove(remove);
                remove.deactivate();
                remove.teardown();
            }
        }
    }

    private synchronized void undeploy(List<Integer> list) {
        HashMap hashMap = new HashMap();
        for (Integer num : list) {
            Node<?> node = this.nodes.get(num);
            if (node == null) {
                throw new IllegalArgumentException("Node " + num + " is not hosted in this container!");
            }
            if (hashMap.containsKey(num)) {
                throw new IllegalArgumentException("Node " + num + " is requested to be undeployed more than once");
            }
            hashMap.put(num, node);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Integer num2 : list) {
            Thread thread = this.nodes.get(num2).context.getThread();
            if (thread == null || !thread.isAlive()) {
                disconnectNode(num2.intValue());
            } else {
                arrayList.add(thread);
                arrayList2.add(num2);
                this.nodes.get(num2).shutdown();
            }
        }
        try {
            Iterator it = arrayList2.iterator();
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                Thread thread2 = (Thread) it2.next();
                thread2.join(1000L);
                if (!thread2.getState().equals(Thread.State.TERMINATED)) {
                    thread2.interrupt();
                }
                disconnectNode(((Integer) it.next()).intValue());
            }
            logger.info("Undeploy complete.");
        } catch (InterruptedException e) {
            logger.warn("Aborting wait for operators to get deactivated!", e);
        }
        Iterator<Integer> it3 = list.iterator();
        while (it3.hasNext()) {
            this.nodes.remove(it3.next());
        }
    }

    public void teardown() {
        operateListeners(this.containerContext, false);
        deactivate();
        if (!$assertionsDisabled && !this.streams.isEmpty()) {
            throw new AssertionError();
        }
        this.eventBus.shutdown();
        this.nodes.clear();
        HashSet hashSet = new HashSet();
        hashSet.addAll(this.generators.values());
        this.generators.clear();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((WindowGenerator) it.next()).teardown();
        }
        if (this.bufferServer != null) {
            eventloop.stop(this.bufferServer);
            eventloop.stop();
        }
        hashSet.clear();
    }

    public void triggerHeartbeat() {
        synchronized (this.heartbeatTrigger) {
            this.heartbeatTrigger.notifyAll();
        }
    }

    public void heartbeatLoop() throws Exception {
        StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat;
        this.umbilical.log(this.containerId, "[" + this.containerId + "] Entering heartbeat loop..");
        logger.debug("Entering heartbeat loop (interval is {} ms)", Integer.valueOf(this.heartbeatIntervalMillis));
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        long doubleValue = (long) (((Double) this.containerContext.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR)).doubleValue() * ((Long) this.containerContext.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME)).longValue());
        long currentTimeMillis = System.currentTimeMillis();
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        String str = null;
        Iterator it = credentials.getAllTokens().iterator();
        while (it.hasNext()) {
            logger.debug("token: {}", (Token) it.next());
        }
        String str2 = (String) this.containerContext.getValue(LogicalPlan.PRINCIPAL);
        String str3 = (String) this.containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
        while (!this.exitHeartbeatLoop) {
            if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= currentTimeMillis && str3 != null) {
                currentTimeMillis = StramUserLogin.refreshTokens(doubleValue, FileUtils.getTempDirectoryPath(), this.containerId, yarnConfiguration, str2, str3, credentials, null, false);
            }
            synchronized (this.heartbeatTrigger) {
                try {
                    this.heartbeatTrigger.wait(this.heartbeatIntervalMillis);
                } catch (InterruptedException e) {
                    logger.warn("Interrupted in heartbeat loop, exiting..");
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            StreamingContainerUmbilicalProtocol.ContainerHeartbeat containerHeartbeat = new StreamingContainerUmbilicalProtocol.ContainerHeartbeat();
            containerHeartbeat.jvmName = this.jvmName;
            if (this.bufferServerAddress != null) {
                containerHeartbeat.bufferServerHost = this.bufferServerAddress.getHostName();
                containerHeartbeat.bufferServerPort = this.bufferServerAddress.getPort();
                if (this.bufferServer != null && !eventloop.isActive()) {
                    logger.warn("Requesting restart due to terminated event loop");
                    containerHeartbeat.restartRequested = true;
                }
            }
            containerHeartbeat.memoryMBFree = (int) (Runtime.getRuntime().freeMemory() / 1048576);
            this.garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
            for (GarbageCollectorMXBean garbageCollectorMXBean : this.garbageCollectorMXBeans) {
                containerHeartbeat.gcCollectionTime += garbageCollectorMXBean.getCollectionTime();
                containerHeartbeat.gcCollectionCount += garbageCollectorMXBean.getCollectionCount();
            }
            do {
                StreamingContainerUmbilicalProtocol.ContainerStats containerStats = new StreamingContainerUmbilicalProtocol.ContainerStats(this.containerId);
                for (Map.Entry<Integer, Node<?>> entry : this.nodes.entrySet()) {
                    StreamingContainerUmbilicalProtocol.OperatorHeartbeat operatorHeartbeat = new StreamingContainerUmbilicalProtocol.OperatorHeartbeat();
                    operatorHeartbeat.setNodeId(entry.getKey().intValue());
                    operatorHeartbeat.setGeneratedTms(currentTimeMillis2);
                    operatorHeartbeat.setIntervalMs(this.heartbeatIntervalMillis);
                    if (entry.getValue().commandResponse.size() > 0) {
                        BlockingQueue<StatsListener.OperatorResponse> blockingQueue = entry.getValue().commandResponse;
                        ArrayList<StatsListener.OperatorResponse> arrayList = new ArrayList<>();
                        for (int i = 0; i < blockingQueue.size(); i++) {
                            arrayList.add(blockingQueue.poll());
                        }
                        operatorHeartbeat.requestResponse = arrayList;
                    }
                    OperatorContext operatorContext = entry.getValue().context;
                    operatorContext.drainStats(operatorHeartbeat.getOperatorStatsContainer());
                    if (operatorContext.getThread() == null || operatorContext.getThread().getState() != Thread.State.TERMINATED) {
                        operatorHeartbeat.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
                    } else if (this.failedNodes.contains(Integer.valueOf(operatorHeartbeat.nodeId))) {
                        operatorHeartbeat.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.FAILED);
                    } else {
                        logger.debug("Reporting SHUTDOWN state because thread is {} and failedNodes is {}", operatorContext.getThread(), this.failedNodes);
                        operatorHeartbeat.setState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.SHUTDOWN);
                    }
                    containerStats.addNodeStats(operatorHeartbeat);
                }
                this.eventBus.publish(new ContainerEvent.ContainerStatsEvent(containerStats));
                containerHeartbeat.setContainerStats(containerStats);
                containerHeartbeat.sentTms = System.currentTimeMillis();
                containerHeartbeat.stackTrace = str;
                processHeartbeat = this.umbilical.processHeartbeat(containerHeartbeat);
                str = processHeartbeat.stackTraceRequired ? StramUtils.getStackTrace().toString() : null;
                processHeartbeatResponse(processHeartbeat);
                if (processHeartbeat.hasPendingRequests) {
                    logger.info("Waiting for pending request.");
                    synchronized (this.heartbeatTrigger) {
                        try {
                            this.heartbeatTrigger.wait(500L);
                        } catch (InterruptedException e2) {
                            logger.warn("Interrupted in heartbeat loop", e2);
                        }
                    }
                }
            } while (processHeartbeat.hasPendingRequests);
        }
        logger.debug("Exiting hearbeat loop");
        this.umbilical.log(this.containerId, "[" + this.containerId + "] Exiting heartbeat loop..");
    }

    private void processNodeRequests(boolean z) {
        for (StreamingContainerUmbilicalProtocol.StramToNodeRequest stramToNodeRequest : this.nodeRequests) {
            if (!stramToNodeRequest.isDeleted()) {
                if (stramToNodeRequest instanceof StramToNodeChangeLoggersRequest) {
                    handleChangeLoggersRequest((StramToNodeChangeLoggersRequest) stramToNodeRequest);
                } else {
                    Node<?> node = this.nodes.get(Integer.valueOf(stramToNodeRequest.getOperatorId()));
                    if (node == null) {
                        logger.warn("Node for operator {} is not found, probably not deployed yet", Integer.valueOf(stramToNodeRequest.getOperatorId()));
                    } else {
                        Thread thread = node.context.getThread();
                        if (thread != null && thread.isAlive()) {
                            logger.debug("request received: {}", stramToNodeRequest);
                            StatsListener.OperatorRequest requestExecutor = this.requestFactory.getRequestExecutor(this.nodes.get(Integer.valueOf(stramToNodeRequest.operatorId)), stramToNodeRequest);
                            if (requestExecutor != null) {
                                node.context.request(requestExecutor);
                            } else {
                                logger.warn("No executor identified for the request {}", stramToNodeRequest);
                            }
                            stramToNodeRequest.setDeleted(true);
                        } else if (z) {
                            logger.warn("Received request with invalid operator id {} ({})", Integer.valueOf(stramToNodeRequest.getOperatorId()), stramToNodeRequest);
                            stramToNodeRequest.setDeleted(true);
                        }
                    }
                }
            }
        }
    }

    public void processHeartbeatResponse(StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse) {
        if (containerHeartbeatResponse.nodeRequests != null) {
            this.nodeRequests = containerHeartbeatResponse.nodeRequests;
        }
        if (containerHeartbeatResponse.committedWindowId != this.lastCommittedWindowId) {
            this.lastCommittedWindowId = containerHeartbeatResponse.committedWindowId;
            if (this.bufferServer != null) {
                this.bufferServer.purge(this.lastCommittedWindowId - 1);
            }
            StatsListener.OperatorRequest operatorRequest = null;
            for (Map.Entry<Integer, Node<?>> entry : this.nodes.entrySet()) {
                Thread thread = entry.getValue().context.getThread();
                if (thread != null && thread.isAlive() && (entry.getValue().getOperator() instanceof Operator.CheckpointListener)) {
                    if (operatorRequest == null) {
                        operatorRequest = new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.engine.StreamingContainer.1
                            public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                                ((Operator.CheckpointListener) operator).committed(StreamingContainer.this.lastCommittedWindowId);
                                return null;
                            }
                        };
                    }
                    entry.getValue().context.request(operatorRequest);
                }
            }
        }
        if (containerHeartbeatResponse.undeployRequest != null) {
            logger.info("Undeploy request: {}", containerHeartbeatResponse.undeployRequest);
            processNodeRequests(false);
            undeploy(containerHeartbeatResponse.undeployRequest);
        }
        if (containerHeartbeatResponse.shutdown) {
            logger.info("Received shutdown request");
            processNodeRequests(false);
            this.exitHeartbeatLoop = true;
            return;
        }
        if (containerHeartbeatResponse.deployRequest != null) {
            logger.info("Deploy request: {}", containerHeartbeatResponse.deployRequest);
            try {
                deploy(containerHeartbeatResponse.deployRequest);
            } catch (Exception e) {
                logger.error("deploy request failed", e);
                try {
                    this.umbilical.log(this.containerId, "deploy request failed: " + containerHeartbeatResponse.deployRequest + " " + ExceptionUtils.getStackTrace(e));
                } catch (IOException e2) {
                }
                this.exitHeartbeatLoop = true;
                throw new IllegalStateException("Deploy request failed: " + containerHeartbeatResponse.deployRequest, e);
            }
        }
        processNodeRequests(true);
    }

    private int getOutputQueueCapacity(List<OperatorDeployInfo> list, int i, String str) {
        for (OperatorDeployInfo operatorDeployInfo : list) {
            if (operatorDeployInfo.id == i) {
                for (OperatorDeployInfo.OutputDeployInfo outputDeployInfo : operatorDeployInfo.outputs) {
                    if (outputDeployInfo.portName.equals(str)) {
                        return ((Integer) getValue(PortContext.QUEUE_CAPACITY, outputDeployInfo, operatorDeployInfo)).intValue();
                    }
                }
            }
        }
        return ((Integer) PortContext.QUEUE_CAPACITY.defaultValue).intValue();
    }

    private synchronized void deploy(List<OperatorDeployInfo> list) throws Exception {
        for (OperatorDeployInfo operatorDeployInfo : list) {
            if (this.nodes.containsKey(Integer.valueOf(operatorDeployInfo.id))) {
                throw new IllegalStateException("Node with id: " + operatorDeployInfo.id + " already present in container " + this.containerId + "!");
            }
        }
        deployNodes(list);
        HashMap<String, ArrayList<String>> hashMap = new HashMap<>();
        Iterator<OperatorDeployInfo> it = list.iterator();
        while (it.hasNext()) {
            groupInputStreams(hashMap, it.next());
        }
        HashMap<String, ComponentContextPair<Stream, StreamContext>> deployOutputStreams = deployOutputStreams(list, hashMap);
        deployInputStreams(list, deployOutputStreams);
        for (ComponentContextPair<Stream, StreamContext> componentContextPair : deployOutputStreams.values()) {
            ((Stream) componentContextPair.component).setup(componentContextPair.context);
        }
        this.streams.putAll(deployOutputStreams);
        HashMap hashMap2 = new HashMap(list.size());
        for (OperatorDeployInfo operatorDeployInfo2 : list) {
            hashMap2.put(Integer.valueOf(operatorDeployInfo2.id), operatorDeployInfo2);
        }
        activate(hashMap2, deployOutputStreams);
    }

    public static String getUnifierInputPortName(String str, int i, String str2) {
        return str + "(" + i + LogicalPlanConfiguration.KEY_SEPARATOR + str2 + ")";
    }

    private void massageUnifierDeployInfo(OperatorDeployInfo operatorDeployInfo) {
        for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
            inputDeployInfo.portName = getUnifierInputPortName(inputDeployInfo.portName, inputDeployInfo.sourceNodeId, inputDeployInfo.sourcePortName);
        }
    }

    private void deployNodes(List<OperatorDeployInfo> list) throws IOException {
        AbstractWritableAdapter abstractWritableAdapter;
        for (OperatorDeployInfo operatorDeployInfo : list) {
            StorageAgent storageAgent = (StorageAgent) getValue(OperatorContext.STORAGE_AGENT, operatorDeployInfo);
            if (!$assertionsDisabled && storageAgent == null) {
                throw new AssertionError();
            }
            if (operatorDeployInfo instanceof OperatorDeployInfo.UnifierDeployInfo) {
                abstractWritableAdapter = new PortContext(operatorDeployInfo.inputs.get(0).contextAttributes, new OperatorContext(0, operatorDeployInfo.name, ((OperatorDeployInfo.UnifierDeployInfo) operatorDeployInfo).operatorAttributes, this.containerContext));
                massageUnifierDeployInfo(operatorDeployInfo);
            } else {
                abstractWritableAdapter = this.containerContext;
            }
            OperatorContext operatorContext = new OperatorContext(operatorDeployInfo.id, operatorDeployInfo.name, operatorDeployInfo.contextAttributes, abstractWritableAdapter);
            operatorContext.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, Long.valueOf(operatorDeployInfo.checkpoint.windowId));
            logger.debug("Restoring operator {} to checkpoint {} stateless={}.", new Object[]{Integer.valueOf(operatorDeployInfo.id), Codec.getStringWindowId(operatorDeployInfo.checkpoint.windowId), Boolean.valueOf(operatorContext.stateless)});
            Node<?> retrieveNode = Node.retrieveNode(storageAgent.load(operatorDeployInfo.id, operatorContext.stateless ? -1L : operatorDeployInfo.checkpoint.windowId), operatorContext, operatorDeployInfo.type);
            retrieveNode.currentWindowId = operatorDeployInfo.checkpoint.windowId;
            retrieveNode.applicationWindowCount = operatorDeployInfo.checkpoint.applicationWindowCount;
            retrieveNode.firstWindowMillis = this.firstWindowMillis;
            retrieveNode.windowWidthMillis = this.windowWidthMillis;
            retrieveNode.setId(operatorDeployInfo.id);
            this.nodes.put(Integer.valueOf(operatorDeployInfo.id), retrieveNode);
            logger.debug("Marking operator {} as deployed.", retrieveNode);
        }
    }

    private AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher(String str, StreamCodec<?> streamCodec, long j, int i, OperatorDeployInfo.OutputDeployInfo outputDeployInfo) throws UnknownHostException {
        String concat = "tcp://".concat(outputDeployInfo.bufferServerHost).concat(":").concat(String.valueOf(outputDeployInfo.bufferServerPort)).concat("/").concat(str);
        StreamContext streamContext = new StreamContext(outputDeployInfo.declaredStreamId);
        streamContext.setPortId(outputDeployInfo.portName);
        streamContext.setSourceId(str);
        streamContext.setSinkId(concat);
        streamContext.setFinishedWindowId(j);
        streamContext.put(StreamContext.CODEC, streamCodec);
        streamContext.put(StreamContext.EVENT_LOOP, eventloop);
        streamContext.setBufferServerAddress(InetSocketAddress.createUnresolved(outputDeployInfo.bufferServerHost, outputDeployInfo.bufferServerPort));
        streamContext.put(StreamContext.BUFFER_SERVER_TOKEN, outputDeployInfo.bufferServerToken);
        InetAddress address = streamContext.getBufferServerAddress().getAddress();
        if (address != null && NetUtils.isLocalAddress(address)) {
            streamContext.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), outputDeployInfo.bufferServerPort));
        }
        return new AbstractMap.SimpleEntry<>(concat, new ComponentContextPair(this.fastPublisherSubscriber ? new FastPublisher(str, i * 256) : new BufferServerPublisher(str, i), streamContext));
    }

    private HashMap<String, ComponentContextPair<Stream, StreamContext>> deployOutputStreams(List<OperatorDeployInfo> list, HashMap<String, ArrayList<String>> hashMap) throws Exception {
        HashMap<String, ComponentContextPair<Stream, StreamContext>> hashMap2 = new HashMap<>();
        for (OperatorDeployInfo operatorDeployInfo : list) {
            Node<?> node = this.nodes.get(Integer.valueOf(operatorDeployInfo.id));
            long j = operatorDeployInfo.checkpoint.windowId;
            for (OperatorDeployInfo.OutputDeployInfo outputDeployInfo : operatorDeployInfo.outputs) {
                String concat = Integer.toString(operatorDeployInfo.id).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(outputDeployInfo.portName);
                int intValue = ((Integer) getValue(PortContext.QUEUE_CAPACITY, outputDeployInfo, operatorDeployInfo)).intValue();
                logger.debug("for stream {} the queue capacity is {}", concat, Integer.valueOf(intValue));
                ArrayList<String> arrayList = hashMap.get(concat);
                Map<Integer, StreamCodec<?>> map = outputDeployInfo.streamCodecs;
                if (arrayList == null && map.size() == 1) {
                    if (!$assertionsDisabled && outputDeployInfo.bufferServerHost == null) {
                        throw new AssertionError("resulting stream cannot be inline: " + outputDeployInfo);
                    }
                    Map.Entry<Integer, StreamCodec<?>> next = map.entrySet().iterator().next();
                    AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher = deployBufferServerPublisher(concat + LogicalPlanConfiguration.KEY_SEPARATOR + next.getKey(), next.getValue(), j, intValue, outputDeployInfo);
                    hashMap2.put(concat, deployBufferServerPublisher.getValue());
                    node.connectOutputPort(outputDeployInfo.portName, (Sink) deployBufferServerPublisher.getValue().component);
                } else {
                    ComponentContextPair<Stream, StreamContext> componentContextPair = hashMap2.get(concat);
                    if (componentContextPair == null) {
                        StreamContext streamContext = new StreamContext(outputDeployInfo.declaredStreamId);
                        streamContext.setSourceId(concat);
                        streamContext.setFinishedWindowId(j);
                        MuxStream muxStream = new MuxStream();
                        ComponentContextPair<Stream, StreamContext> componentContextPair2 = new ComponentContextPair<>(muxStream, streamContext);
                        componentContextPair = componentContextPair2;
                        hashMap2.put(concat, componentContextPair2);
                        node.connectOutputPort(outputDeployInfo.portName, muxStream);
                    }
                    if (outputDeployInfo.bufferServerHost != null) {
                        for (Map.Entry<Integer, StreamCodec<?>> entry : map.entrySet()) {
                            AbstractMap.SimpleEntry<String, ComponentContextPair<Stream, StreamContext>> deployBufferServerPublisher2 = deployBufferServerPublisher(concat + LogicalPlanConfiguration.KEY_SEPARATOR + entry.getKey(), entry.getValue(), j, intValue, outputDeployInfo);
                            hashMap2.put(deployBufferServerPublisher2.getKey(), deployBufferServerPublisher2.getValue());
                            String sinkId = componentContextPair.context.getSinkId();
                            if (sinkId == null) {
                                componentContextPair.context.setSinkId(deployBufferServerPublisher2.getKey());
                            } else {
                                componentContextPair.context.setSinkId(sinkId.concat(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).concat(deployBufferServerPublisher2.getKey()));
                            }
                            ((Stream.MultiSinkCapableStream) componentContextPair.component).setSink(deployBufferServerPublisher2.getKey(), (Sink) deployBufferServerPublisher2.getValue().component);
                        }
                    }
                }
            }
        }
        return hashMap2;
    }

    public final String getDeclaredStreamId(int i, String str) {
        ComponentContextPair<Stream, StreamContext> componentContextPair = this.streams.get(String.valueOf(i).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(str));
        if (componentContextPair == null) {
            return null;
        }
        return componentContextPair.context.getId();
    }

    private void deployInputStreams(List<OperatorDeployInfo> list, HashMap<String, ComponentContextPair<Stream, StreamContext>> hashMap) throws UnknownHostException {
        Sink<Object> oiOStream;
        SweepableReservoir reservoir;
        ArrayList arrayList = new ArrayList();
        long j = Long.MAX_VALUE;
        Map<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();
        for (OperatorDeployInfo operatorDeployInfo : list) {
            if (operatorDeployInfo.inputs == null || operatorDeployInfo.inputs.isEmpty()) {
                arrayList.add(operatorDeployInfo);
                operatorDeployInfo.checkpoint = getFinishedCheckpoint(operatorDeployInfo);
                if (operatorDeployInfo.checkpoint.windowId < j) {
                    j = operatorDeployInfo.checkpoint.windowId;
                }
            } else {
                Node<?> node = this.nodes.get(Integer.valueOf(operatorDeployInfo.id));
                for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
                    if (inputDeployInfo.streamCodecs.size() != 1) {
                        throw new IllegalStateException("Only one input codec configuration should be present");
                    }
                    Map.Entry<Integer, StreamCodec<?>> next = inputDeployInfo.streamCodecs.entrySet().iterator().next();
                    Integer key = next.getKey();
                    StreamCodec<?> value = next.getValue();
                    String concat = Integer.toString(inputDeployInfo.sourceNodeId).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(inputDeployInfo.sourcePortName);
                    String concat2 = Integer.toString(operatorDeployInfo.id).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(inputDeployInfo.portName);
                    int intValue = ((Integer) getValue(PortContext.QUEUE_CAPACITY, inputDeployInfo, operatorDeployInfo)).intValue();
                    Checkpoint finishedCheckpoint = getFinishedCheckpoint(operatorDeployInfo);
                    ComponentContextPair<Stream, StreamContext> componentContextPair = this.streams.get(concat);
                    if (componentContextPair == null) {
                        componentContextPair = hashMap.get(concat);
                    }
                    if (componentContextPair == null) {
                        if (!$assertionsDisabled && (inputDeployInfo.locality == DAG.Locality.CONTAINER_LOCAL || inputDeployInfo.locality == DAG.Locality.THREAD_LOCAL)) {
                            throw new AssertionError();
                        }
                        StreamContext streamContext = new StreamContext(inputDeployInfo.declaredStreamId);
                        streamContext.setBufferServerAddress(InetSocketAddress.createUnresolved(inputDeployInfo.bufferServerHost, inputDeployInfo.bufferServerPort));
                        InetAddress address = streamContext.getBufferServerAddress().getAddress();
                        if (address != null && NetUtils.isLocalAddress(address)) {
                            streamContext.setBufferServerAddress(new InetSocketAddress(InetAddress.getByName(null), inputDeployInfo.bufferServerPort));
                        }
                        streamContext.put(StreamContext.BUFFER_SERVER_TOKEN, inputDeployInfo.bufferServerToken);
                        String str = concat + LogicalPlanConfiguration.KEY_SEPARATOR + key;
                        streamContext.setPortId(inputDeployInfo.portName);
                        streamContext.put(StreamContext.CODEC, value);
                        streamContext.put(StreamContext.EVENT_LOOP, eventloop);
                        streamContext.setPartitions(inputDeployInfo.partitionMask, inputDeployInfo.partitionKeys);
                        streamContext.setSourceId(str);
                        streamContext.setSinkId(concat2);
                        streamContext.setFinishedWindowId(finishedCheckpoint.windowId);
                        BufferServerSubscriber fastSubscriber = this.fastPublisherSubscriber ? new FastSubscriber("tcp://".concat(inputDeployInfo.bufferServerHost).concat(":").concat(String.valueOf(inputDeployInfo.bufferServerPort)).concat("/").concat(str), intValue) : new BufferServerSubscriber("tcp://".concat(inputDeployInfo.bufferServerHost).concat(":").concat(String.valueOf(inputDeployInfo.bufferServerPort)).concat("/").concat(str), intValue);
                        if (value instanceof StreamCodecWrapperForPersistance) {
                            fastSubscriber.acquireReservoirForPersistStream(concat2, intValue, value);
                        }
                        SweepableReservoir acquireReservoir = fastSubscriber.acquireReservoir(concat2, intValue);
                        if (finishedCheckpoint.windowId >= 0) {
                            node.connectInputPort(inputDeployInfo.portName, new WindowIdActivatedReservoir(concat2, acquireReservoir, finishedCheckpoint.windowId));
                        }
                        node.connectInputPort(inputDeployInfo.portName, acquireReservoir);
                        hashMap.put(concat2, new ComponentContextPair<>(fastSubscriber, streamContext));
                        logger.debug("put input stream {} against key {}", fastSubscriber, concat2);
                    } else {
                        if (!$assertionsDisabled && inputDeployInfo.locality != DAG.Locality.CONTAINER_LOCAL && inputDeployInfo.locality != DAG.Locality.THREAD_LOCAL) {
                            throw new AssertionError();
                        }
                        StreamContext streamContext2 = new StreamContext(inputDeployInfo.declaredStreamId);
                        streamContext2.setSourceId(concat);
                        streamContext2.setSinkId(concat2);
                        switch (AnonymousClass4.$SwitchMap$com$datatorrent$api$DAG$Locality[inputDeployInfo.locality.ordinal()]) {
                            case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                                int outputQueueCapacity = getOutputQueueCapacity(list, inputDeployInfo.sourceNodeId, inputDeployInfo.sourcePortName);
                                if (outputQueueCapacity > intValue) {
                                    intValue = outputQueueCapacity;
                                }
                                oiOStream = new InlineStream(intValue);
                                reservoir = ((InlineStream) oiOStream).getReservoir();
                                if (finishedCheckpoint.windowId >= 0) {
                                    node.connectInputPort(inputDeployInfo.portName, new WindowIdActivatedReservoir(concat2, reservoir, finishedCheckpoint.windowId));
                                    break;
                                }
                                break;
                            case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                                oiOStream = new OiOStream();
                                reservoir = ((OiOStream) oiOStream).getReservoir();
                                ((OiOStream.OiOReservoir) reservoir).setControlSink(((OiONode) node).getControlSink(reservoir));
                                concurrentHashMap.put(Integer.valueOf(operatorDeployInfo.id), Integer.valueOf(inputDeployInfo.sourceNodeId));
                                break;
                            default:
                                throw new IllegalStateException("Locality can be either ContainerLocal or ThreadLocal");
                        }
                        node.connectInputPort(inputDeployInfo.portName, reservoir);
                        hashMap.put(concat2, new ComponentContextPair<>(oiOStream, streamContext2));
                        if (!(componentContextPair.component instanceof Stream.MultiSinkCapableStream)) {
                            String sinkId = componentContextPair.context.getSinkId();
                            StreamContext streamContext3 = new StreamContext(inputDeployInfo.declaredStreamId);
                            streamContext3.setSourceId(concat);
                            streamContext3.setFinishedWindowId(finishedCheckpoint.windowId);
                            streamContext3.setSinkId(sinkId);
                            MuxStream muxStream = new MuxStream();
                            muxStream.setSink(sinkId, (Sink) componentContextPair.component);
                            this.streams.put(sinkId, componentContextPair);
                            this.nodes.get(Integer.valueOf(inputDeployInfo.sourceNodeId)).connectOutputPort(inputDeployInfo.sourcePortName, muxStream);
                            ComponentContextPair<Stream, StreamContext> componentContextPair2 = new ComponentContextPair<>(muxStream, streamContext3);
                            componentContextPair = componentContextPair2;
                            hashMap.put(concat, componentContextPair2);
                        }
                        if (value instanceof StreamCodecWrapperForPersistance) {
                            ((Stream.MultiSinkCapableStream) componentContextPair.component).setSink(concat2, inputDeployInfo.partitionKeys == null ? new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance) value, inputDeployInfo.partitionMask, oiOStream) : new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance) value, inputDeployInfo.partitionKeys, inputDeployInfo.partitionMask, oiOStream));
                        } else if (inputDeployInfo.partitionKeys == null || inputDeployInfo.partitionKeys.isEmpty()) {
                            ((Stream.MultiSinkCapableStream) componentContextPair.component).setSink(concat2, oiOStream);
                        } else {
                            ((Stream.MultiSinkCapableStream) componentContextPair.component).setSink(concat2, new PartitionAwareSink(value == null ? this.nonSerializingStreamCodec : value, inputDeployInfo.partitionKeys, inputDeployInfo.partitionMask, oiOStream));
                        }
                        String sinkId2 = componentContextPair.context.getSinkId();
                        if (sinkId2 == null) {
                            componentContextPair.context.setSinkId(concat2);
                        } else {
                            componentContextPair.context.setSinkId(sinkId2.concat(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).concat(concat2));
                        }
                    }
                }
            }
        }
        setupOiOGroups(concurrentHashMap);
        if (arrayList.isEmpty()) {
            return;
        }
        WindowGenerator windowGenerator = setupWindowGenerator(j);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            OperatorDeployInfo operatorDeployInfo2 = (OperatorDeployInfo) it.next();
            this.generators.put(Integer.valueOf(operatorDeployInfo2.id), windowGenerator);
            Node<?> node2 = this.nodes.get(Integer.valueOf(operatorDeployInfo2.id));
            SweepableReservoir acquireReservoir2 = windowGenerator.acquireReservoir(String.valueOf(operatorDeployInfo2.id), 1024);
            if (operatorDeployInfo2.checkpoint.windowId >= 0) {
                node2.connectInputPort(Node.INPUT, new WindowIdActivatedReservoir(Integer.toString(operatorDeployInfo2.id), acquireReservoir2, operatorDeployInfo2.checkpoint.windowId));
            }
            node2.connectInputPort(Node.INPUT, acquireReservoir2);
        }
    }

    private void setupOiOGroups(Map<Integer, Integer> map) {
        Integer num;
        for (Integer num2 : map.keySet()) {
            Integer num3 = map.get(num2);
            while (true) {
                num = num3;
                Integer num4 = map.get(num);
                if (num4 == null) {
                    break;
                } else {
                    num3 = num4;
                }
            }
            ArrayList<Integer> arrayList = this.oioGroups.get(num);
            if (arrayList == null) {
                Map<Integer, ArrayList<Integer>> map2 = this.oioGroups;
                ArrayList<Integer> arrayList2 = new ArrayList<>();
                arrayList = arrayList2;
                map2.put(num, arrayList2);
            }
            arrayList.add(num2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WindowGenerator setupWindowGenerator(long j) {
        WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
        windowGenerator.setResetWindow(this.firstWindowMillis);
        long nextWindowMillis = WindowGenerator.getNextWindowMillis(j, this.firstWindowMillis, this.windowWidthMillis);
        windowGenerator.setFirstWindow(nextWindowMillis);
        windowGenerator.setWindowWidth(this.windowWidthMillis);
        windowGenerator.setCheckpointCount(this.checkpointWindowCount, (int) (WindowGenerator.getWindowCount(nextWindowMillis, this.firstWindowMillis, this.windowWidthMillis) % this.checkpointWindowCount));
        return windowGenerator;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupNode(OperatorDeployInfo operatorDeployInfo) {
        this.failedNodes.remove(Integer.valueOf(operatorDeployInfo.id));
        Node<?> node = this.nodes.get(Integer.valueOf(operatorDeployInfo.id));
        node.setup(node.context);
        LinkedHashMap<String, Operators.PortContextPair<Operator.InputPort<?>>> linkedHashMap = node.getPortMappingDescriptor().inputPorts;
        LinkedHashMap linkedHashMap2 = new LinkedHashMap(linkedHashMap.size());
        for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
            Operator.InputPort inputPort = linkedHashMap.get(inputDeployInfo.portName).component;
            PortContext portContext = new PortContext(inputDeployInfo.contextAttributes, node.context);
            linkedHashMap2.put(inputDeployInfo.portName, new Operators.PortContextPair(inputPort, portContext));
            inputPort.setup(portContext);
        }
        linkedHashMap.putAll(linkedHashMap2);
        LinkedHashMap<String, Operators.PortContextPair<Operator.OutputPort<?>>> linkedHashMap3 = node.getPortMappingDescriptor().outputPorts;
        LinkedHashMap linkedHashMap4 = new LinkedHashMap(linkedHashMap3.size());
        for (OperatorDeployInfo.OutputDeployInfo outputDeployInfo : operatorDeployInfo.outputs) {
            Operator.OutputPort outputPort = linkedHashMap3.get(outputDeployInfo.portName).component;
            PortContext portContext2 = new PortContext(outputDeployInfo.contextAttributes, node.context);
            linkedHashMap4.put(outputDeployInfo.portName, new Operators.PortContextPair(outputPort, portContext2));
            outputPort.setup(portContext2);
        }
        linkedHashMap3.putAll(linkedHashMap4);
        logger.debug("activating {} in container {}", node, this.containerId);
        processNodeRequests(false);
        node.activate();
        this.eventBus.publish(new ContainerEvent.NodeActivationEvent(node));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void teardownNode(OperatorDeployInfo operatorDeployInfo) {
        Node<?> node = this.nodes.get(Integer.valueOf(operatorDeployInfo.id));
        if (node == null) {
            logger.warn("node {}/{} took longer to exit, resulting in unclean undeploy!", Integer.valueOf(operatorDeployInfo.id), operatorDeployInfo.name);
            return;
        }
        this.eventBus.publish(new ContainerEvent.NodeDeactivationEvent(node));
        node.deactivate();
        node.teardown();
        logger.debug("deactivated {}", Integer.valueOf(node.getId()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void activate(final Map<Integer, OperatorDeployInfo> map, Map<String, ComponentContextPair<Stream, StreamContext>> map2) {
        for (ComponentContextPair<Stream, StreamContext> componentContextPair : map2.values()) {
            this.activeStreams.put(componentContextPair.component, componentContextPair.context);
            ((Stream) componentContextPair.component).activate(componentContextPair.context);
            this.eventBus.publish(new ContainerEvent.StreamActivationEvent(componentContextPair));
        }
        for (final OperatorDeployInfo operatorDeployInfo : map.values()) {
            if (operatorDeployInfo.type != OperatorDeployInfo.OperatorType.OIO) {
                final Node<?> node = this.nodes.get(Integer.valueOf(operatorDeployInfo.id));
                Thread thread = new Thread(Integer.toString(operatorDeployInfo.id) + '/' + operatorDeployInfo.name + ':' + node.getOperator().getClass().getSimpleName()) { // from class: com.datatorrent.stram.engine.StreamingContainer.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        int[] iArr;
                        HashSet hashSet = new HashSet();
                        OperatorDeployInfo operatorDeployInfo2 = operatorDeployInfo;
                        try {
                            try {
                                StreamingContainer.this.setupNode(operatorDeployInfo2);
                                hashSet.add(operatorDeployInfo2);
                                ArrayList<Integer> arrayList = StreamingContainer.this.oioGroups.get(Integer.valueOf(operatorDeployInfo.id));
                                if (arrayList != null) {
                                    Iterator<Integer> it = arrayList.iterator();
                                    while (it.hasNext()) {
                                        OperatorDeployInfo operatorDeployInfo3 = (OperatorDeployInfo) map.get(it.next());
                                        StreamingContainer.this.setupNode(operatorDeployInfo3);
                                        hashSet.add(operatorDeployInfo3);
                                    }
                                }
                                operatorDeployInfo2 = null;
                                node.run();
                                if (hashSet.contains(operatorDeployInfo)) {
                                    try {
                                        StreamingContainer.this.teardownNode(operatorDeployInfo);
                                    } catch (Exception e) {
                                        StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo.id));
                                        StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo, e);
                                    }
                                }
                                ArrayList<Integer> arrayList2 = StreamingContainer.this.oioGroups.get(Integer.valueOf(operatorDeployInfo.id));
                                if (arrayList2 != null) {
                                    Iterator<Integer> it2 = arrayList2.iterator();
                                    while (it2.hasNext()) {
                                        OperatorDeployInfo operatorDeployInfo4 = (OperatorDeployInfo) map.get(it2.next());
                                        if (hashSet.contains(operatorDeployInfo4)) {
                                            try {
                                                StreamingContainer.this.teardownNode(operatorDeployInfo4);
                                            } catch (Exception e2) {
                                                StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo4.id));
                                                StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo4, e2);
                                            }
                                        }
                                    }
                                }
                            } catch (Throwable th) {
                                if (hashSet.contains(operatorDeployInfo)) {
                                    try {
                                        StreamingContainer.this.teardownNode(operatorDeployInfo);
                                    } catch (Exception e3) {
                                        StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo.id));
                                        StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo, e3);
                                    }
                                }
                                ArrayList<Integer> arrayList3 = StreamingContainer.this.oioGroups.get(Integer.valueOf(operatorDeployInfo.id));
                                if (arrayList3 != null) {
                                    Iterator<Integer> it3 = arrayList3.iterator();
                                    while (it3.hasNext()) {
                                        OperatorDeployInfo operatorDeployInfo5 = (OperatorDeployInfo) map.get(it3.next());
                                        if (hashSet.contains(operatorDeployInfo5)) {
                                            try {
                                                StreamingContainer.this.teardownNode(operatorDeployInfo5);
                                            } catch (Exception e4) {
                                                StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo5.id));
                                                StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo5, e4);
                                            }
                                        }
                                    }
                                }
                                throw th;
                            }
                        } catch (Error e5) {
                            if (operatorDeployInfo2 == null) {
                                StreamingContainer.logger.error("Voluntary container termination due to an error in operator set {}.", hashSet, e5);
                                iArr = new int[hashSet.size()];
                                int i = 0;
                                Iterator it4 = hashSet.iterator();
                                while (it4.hasNext()) {
                                    iArr[i] = ((OperatorDeployInfo) it4.next()).id;
                                    i++;
                                }
                            } else {
                                StreamingContainer.logger.error("Voluntary container termination due to an error in operator {}.", operatorDeployInfo2, e5);
                                iArr = new int[]{operatorDeployInfo2.id};
                            }
                            StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, iArr, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(e5));
                            System.exit(1);
                            if (hashSet.contains(operatorDeployInfo)) {
                                try {
                                    StreamingContainer.this.teardownNode(operatorDeployInfo);
                                } catch (Exception e6) {
                                    StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo.id));
                                    StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo, e6);
                                }
                            }
                            ArrayList<Integer> arrayList4 = StreamingContainer.this.oioGroups.get(Integer.valueOf(operatorDeployInfo.id));
                            if (arrayList4 != null) {
                                Iterator<Integer> it5 = arrayList4.iterator();
                                while (it5.hasNext()) {
                                    OperatorDeployInfo operatorDeployInfo6 = (OperatorDeployInfo) map.get(it5.next());
                                    if (hashSet.contains(operatorDeployInfo6)) {
                                        try {
                                            StreamingContainer.this.teardownNode(operatorDeployInfo6);
                                        } catch (Exception e7) {
                                            StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo6.id));
                                            StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo6, e7);
                                        }
                                    }
                                }
                            }
                        } catch (Exception e8) {
                            if (operatorDeployInfo2 == null) {
                                StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo.id));
                                StreamingContainer.logger.error("Operator set {} stopped running due to an exception.", hashSet, e8);
                                StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, new int[]{operatorDeployInfo.id}, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(e8));
                            } else {
                                StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo2.id));
                                StreamingContainer.logger.error("Abandoning deployment of operator {} due to setup failure.", operatorDeployInfo2, e8);
                                StreamingContainer.this.umbilical.reportError(StreamingContainer.this.containerId, new int[]{operatorDeployInfo2.id}, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(e8));
                            }
                            if (hashSet.contains(operatorDeployInfo)) {
                                try {
                                    StreamingContainer.this.teardownNode(operatorDeployInfo);
                                } catch (Exception e9) {
                                    StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo.id));
                                    StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo, e9);
                                }
                            }
                            ArrayList<Integer> arrayList5 = StreamingContainer.this.oioGroups.get(Integer.valueOf(operatorDeployInfo.id));
                            if (arrayList5 != null) {
                                Iterator<Integer> it6 = arrayList5.iterator();
                                while (it6.hasNext()) {
                                    OperatorDeployInfo operatorDeployInfo7 = (OperatorDeployInfo) map.get(it6.next());
                                    if (hashSet.contains(operatorDeployInfo7)) {
                                        try {
                                            StreamingContainer.this.teardownNode(operatorDeployInfo7);
                                        } catch (Exception e10) {
                                            StreamingContainer.this.failedNodes.add(Integer.valueOf(operatorDeployInfo7.id));
                                            StreamingContainer.logger.error("Shutdown of operator {} failed due to an exception.", operatorDeployInfo7, e10);
                                        }
                                    }
                                }
                            }
                        }
                    }
                };
                node.context.setThread(thread);
                thread.start();
            }
        }
        for (WindowGenerator windowGenerator : this.generators.values()) {
            if (!this.activeGenerators.containsKey(windowGenerator)) {
                this.activeGenerators.put(windowGenerator, this.generators);
                windowGenerator.activate((StreamContext) null);
            }
        }
    }

    private void groupInputStreams(HashMap<String, ArrayList<String>> hashMap, OperatorDeployInfo operatorDeployInfo) {
        for (OperatorDeployInfo.InputDeployInfo inputDeployInfo : operatorDeployInfo.inputs) {
            String concat = Integer.toString(inputDeployInfo.sourceNodeId).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(inputDeployInfo.sourcePortName);
            ArrayList<String> arrayList = hashMap.get(concat);
            if (arrayList == null) {
                arrayList = new ArrayList<>();
                hashMap.put(concat, arrayList);
            }
            arrayList.add(Integer.toString(operatorDeployInfo.id).concat(LogicalPlanConfiguration.KEY_SEPARATOR).concat(inputDeployInfo.portName));
        }
    }

    protected Checkpoint getFinishedCheckpoint(OperatorDeployInfo operatorDeployInfo) {
        Checkpoint checkpoint;
        if (operatorDeployInfo.contextAttributes == null || operatorDeployInfo.contextAttributes.get(OperatorContext.PROCESSING_MODE) != Operator.ProcessingMode.AT_MOST_ONCE) {
            checkpoint = operatorDeployInfo.checkpoint;
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = operatorDeployInfo.contextAttributes == null ? Operator.ProcessingMode.AT_LEAST_ONCE : operatorDeployInfo.contextAttributes.get(OperatorContext.PROCESSING_MODE) == null ? Operator.ProcessingMode.AT_LEAST_ONCE : (Operator.ProcessingMode) operatorDeployInfo.contextAttributes.get(OperatorContext.PROCESSING_MODE);
            objArr[1] = operatorDeployInfo.name;
            objArr[2] = checkpoint;
            logger2.debug("using {} on {} at {}", objArr);
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            long windowCount = WindowGenerator.getWindowCount(currentTimeMillis, this.firstWindowMillis, this.firstWindowMillis);
            Integer num = (Integer) operatorDeployInfo.contextAttributes.get(OperatorContext.APPLICATION_WINDOW_COUNT);
            if (num == null) {
                num = (Integer) this.containerContext.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
            }
            int intValue = (int) (windowCount % num.intValue());
            Integer num2 = (Integer) operatorDeployInfo.contextAttributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT);
            if (num2 == null) {
                num2 = (Integer) this.containerContext.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
            }
            checkpoint = new Checkpoint(WindowGenerator.getWindowId(currentTimeMillis, this.firstWindowMillis, this.windowWidthMillis), intValue, (int) (windowCount % num2.intValue()));
            logger.debug("using {} on {} at {}", new Object[]{Operator.ProcessingMode.AT_MOST_ONCE, operatorDeployInfo.name, checkpoint});
        }
        return checkpoint;
    }

    public void operateListeners(StreamingContainerUmbilicalProtocol.StreamingContainerContext streamingContainerContext, boolean z) {
        if (z) {
            Iterator<Component<ContainerContext>> it = this.components.iterator();
            while (it.hasNext()) {
                it.next().setup(streamingContainerContext);
            }
        } else {
            Iterator<Component<ContainerContext>> it2 = this.components.iterator();
            while (it2.hasNext()) {
                it2.next().teardown();
            }
        }
    }

    private <T> T getValue(Attribute<T> attribute, Context.PortContext portContext, OperatorDeployInfo operatorDeployInfo) {
        Attribute.AttributeMap attributeMap;
        T t;
        Attribute.AttributeMap attributes;
        T t2;
        return (portContext == null || (attributes = portContext.getAttributes()) == null || (t2 = (T) attributes.get(attribute)) == null) ? (operatorDeployInfo == null || (attributeMap = operatorDeployInfo.contextAttributes) == null || (t = (T) attributeMap.get(attribute)) == null) ? (T) this.containerContext.getValue(attribute) : t : t2;
    }

    private <T> T getValue(Attribute<T> attribute, OperatorDeployInfo operatorDeployInfo) {
        Attribute.AttributeMap attributeMap;
        T t;
        return (operatorDeployInfo == null || (attributeMap = operatorDeployInfo.contextAttributes) == null || (t = (T) attributeMap.get(attribute)) == null) ? (T) this.containerContext.getValue(attribute) : t;
    }

    private void handleChangeLoggersRequest(StramToNodeChangeLoggersRequest stramToNodeChangeLoggersRequest) {
        logger.debug("handle change logger request");
        LoggerUtil.changeLoggersLevel(stramToNodeChangeLoggersRequest.getTargetChanges());
    }

    static {
        $assertionsDisabled = !StreamingContainer.class.desiredAssertionStatus();
        PROP_APP_PATH = "dt." + Context.DAGContext.APPLICATION_PATH.getName();
        try {
            eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
            logger = LoggerFactory.getLogger(StreamingContainer.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
