package com.datatorrent.stram;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.StreamingContainer;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/StramLocalCluster.class */
public class StramLocalCluster implements Runnable, LocalMode.Controller {
    private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
    private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
    protected final StreamingContainerManager dnmgr;
    private final UmbilicalProtocolLocalImpl umbilical;
    private InetSocketAddress bufferServerAddress;
    private boolean perContainerBufferServer;
    private Server bufferServer;
    private final Map<String, LocalStreamingContainer> childContainers;
    private int containerSeq;
    private boolean appDone;
    private final Map<String, StreamingContainer> injectShutdown;
    private boolean heartbeatMonitoringEnabled;
    private Callable<Boolean> exitCondition;
    private MockComponentFactory mockComponentFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/stram/StramLocalCluster$LocalStramChildLauncher.class */
    public class LocalStramChildLauncher implements Runnable {
        final String containerId;
        final LocalStreamingContainer child;

        private LocalStramChildLauncher(StreamingContainerAgent.ContainerStartRequest containerStartRequest) {
            this.containerId = "container-" + StramLocalCluster.access$508(StramLocalCluster.this);
            this.child = new LocalStreamingContainer(this.containerId, StramLocalCluster.this.umbilical, StramLocalCluster.this.mockComponentFactory != null ? StramLocalCluster.this.mockComponentFactory.setupWindowGenerator() : null);
            if (StramLocalCluster.this.dnmgr.assignContainer(new StreamingContainerManager.ContainerResource(containerStartRequest.container.getResourceRequestPriority(), this.containerId, "localhost", containerStartRequest.container.getRequiredMemoryMB(), containerStartRequest.container.getRequiredVCores(), null), StramLocalCluster.this.perContainerBufferServer ? null : NetUtils.getConnectAddress(StramLocalCluster.this.bufferServerAddress)) != null) {
                new Thread(this, this.containerId).start();
                StramLocalCluster.this.childContainers.put(this.containerId, this.child);
                StramLocalCluster.LOG.info("Started container {}", this.containerId);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    LocalStreamingContainer.run(this.child, StramLocalCluster.this.umbilical.getInitContext(this.containerId));
                    StramLocalCluster.this.childContainers.remove(this.containerId);
                    StramLocalCluster.LOG.info("Container {} terminating.", this.containerId);
                } catch (Exception e) {
                    StramLocalCluster.LOG.error("Container {} failed", this.containerId, e);
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                StramLocalCluster.this.childContainers.remove(this.containerId);
                StramLocalCluster.LOG.info("Container {} terminating.", this.containerId);
                throw th;
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StramLocalCluster$LocalStreamingContainer.class */
    public static class LocalStreamingContainer extends StreamingContainer {
        private final AtomicInteger heartbeatCount;
        private final WindowGenerator windowGenerator;

        public LocalStreamingContainer(String str, StreamingContainerUmbilicalProtocol streamingContainerUmbilicalProtocol, WindowGenerator windowGenerator) {
            super(str, streamingContainerUmbilicalProtocol);
            this.heartbeatCount = new AtomicInteger();
            this.windowGenerator = windowGenerator;
        }

        public static void run(StreamingContainer streamingContainer, StreamingContainerUmbilicalProtocol.StreamingContainerContext streamingContainerContext) throws Exception {
            StramLocalCluster.LOG.debug("Got context: " + streamingContainerContext);
            streamingContainer.setup(streamingContainerContext);
            boolean z = true;
            try {
                streamingContainer.heartbeatLoop();
                z = false;
                try {
                    streamingContainer.teardown();
                } catch (Exception e) {
                    if (0 == 0) {
                        throw e;
                    }
                }
            } catch (Throwable th) {
                try {
                    streamingContainer.teardown();
                } catch (Exception e2) {
                    if (!z) {
                        throw e2;
                    }
                }
                throw th;
            }
        }

        public void waitForHeartbeat(int i) throws InterruptedException {
            synchronized (this.heartbeatCount) {
                this.heartbeatCount.wait(i);
            }
        }

        @Override // com.datatorrent.stram.engine.StreamingContainer
        public void teardown() {
            super.teardown();
        }

        @Override // com.datatorrent.stram.engine.StreamingContainer
        protected WindowGenerator setupWindowGenerator(long j) {
            return this.windowGenerator != null ? this.windowGenerator : super.setupWindowGenerator(j);
        }

        OperatorContext getNodeContext(int i) {
            return this.nodes.get(Integer.valueOf(i)).context;
        }

        Operator getOperator(int i) {
            return this.nodes.get(Integer.valueOf(i)).getOperator();
        }

        Map<Integer, Node<?>> getNodes() {
            return Collections.unmodifiableMap(this.nodes);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/StramLocalCluster$MockComponentFactory.class */
    public interface MockComponentFactory {
        WindowGenerator setupWindowGenerator();
    }

    /* loaded from: input_file:com/datatorrent/stram/StramLocalCluster$UmbilicalProtocolLocalImpl.class */
    private class UmbilicalProtocolLocalImpl implements StreamingContainerUmbilicalProtocol {
        private UmbilicalProtocolLocalImpl() {
        }

        public long getProtocolVersion(String str, long j) throws IOException {
            throw new UnsupportedOperationException("not implemented in local mode");
        }

        public ProtocolSignature getProtocolSignature(String str, long j, int i) throws IOException {
            throw new UnsupportedOperationException("not implemented in local mode");
        }

        @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
        public void reportError(String str, int[] iArr, String str2) {
            try {
                log(str, str2);
            } catch (IOException e) {
            }
        }

        @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
        public void log(String str, String str2) throws IOException {
            StramLocalCluster.LOG.info("{} msg: {}", str, str2);
        }

        @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
        public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext(String str) throws IOException {
            StreamingContainerUmbilicalProtocol.StreamingContainerContext initContext = StramLocalCluster.this.dnmgr.getContainerAgent(str).getInitContext();
            initContext.deployBufferServer = StramLocalCluster.this.perContainerBufferServer;
            return initContext;
        }

        @Override // com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol
        public StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat(StreamingContainerUmbilicalProtocol.ContainerHeartbeat containerHeartbeat) {
            if (StramLocalCluster.this.injectShutdown.containsKey(containerHeartbeat.getContainerId())) {
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
                containerHeartbeatResponse.shutdown = true;
                return containerHeartbeatResponse;
            }
            try {
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse processHeartbeat = StramLocalCluster.this.dnmgr.processHeartbeat(containerHeartbeat);
                if (processHeartbeat != null) {
                    processHeartbeat = (StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse) SerializationUtils.clone(processHeartbeat);
                }
                StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse2 = processHeartbeat;
                LocalStreamingContainer localStreamingContainer = (LocalStreamingContainer) StramLocalCluster.this.childContainers.get(containerHeartbeat.getContainerId());
                synchronized (localStreamingContainer.heartbeatCount) {
                    localStreamingContainer.heartbeatCount.incrementAndGet();
                    localStreamingContainer.heartbeatCount.notifyAll();
                }
                return containerHeartbeatResponse2;
            } catch (Throwable th) {
                LocalStreamingContainer localStreamingContainer2 = (LocalStreamingContainer) StramLocalCluster.this.childContainers.get(containerHeartbeat.getContainerId());
                synchronized (localStreamingContainer2.heartbeatCount) {
                    localStreamingContainer2.heartbeatCount.incrementAndGet();
                    localStreamingContainer2.heartbeatCount.notifyAll();
                    throw th;
                }
            }
        }
    }

    public StramLocalCluster(LogicalPlan logicalPlan) throws IOException, ClassNotFoundException {
        this.bufferServer = null;
        this.childContainers = new ConcurrentHashMap();
        this.containerSeq = 0;
        this.appDone = false;
        this.injectShutdown = new ConcurrentHashMap();
        this.heartbeatMonitoringEnabled = true;
        logicalPlan.validate();
        cloneLogicalPlan(logicalPlan);
        String uri = CLUSTER_WORK_DIR.toURI().toString();
        try {
            FileContext.getLocalFSFileContext().delete(new Path(uri), true);
            logicalPlan.getAttributes().put(LogicalPlan.APPLICATION_ID, "app_local_" + System.currentTimeMillis());
            if (logicalPlan.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) {
                logicalPlan.getAttributes().put(LogicalPlan.APPLICATION_PATH, uri);
            }
            if (logicalPlan.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) {
                logicalPlan.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(uri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), (Configuration) null));
            }
            this.dnmgr = new StreamingContainerManager(logicalPlan);
            this.umbilical = new UmbilicalProtocolLocalImpl();
            if (this.perContainerBufferServer) {
                return;
            }
            StreamingContainer.eventloop.start();
            this.bufferServer = new Server(0, 1048576, 8);
            this.bufferServer.setSpoolStorage(new DiskStorage());
            this.bufferServerAddress = this.bufferServer.run(StreamingContainer.eventloop);
            LOG.info("Buffer server started: {}", this.bufferServerAddress);
        } catch (IOException e) {
            throw new RuntimeException("could not cleanup test dir", e);
        } catch (IllegalArgumentException e2) {
            throw e2;
        }
    }

    public static LogicalPlan cloneLogicalPlan(LogicalPlan logicalPlan) throws IOException, ClassNotFoundException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        LogicalPlan.write(logicalPlan, byteArrayOutputStream);
        LOG.debug("serialized size: {}", Integer.valueOf(byteArrayOutputStream.toByteArray().length));
        byteArrayOutputStream.flush();
        return LogicalPlan.read(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
    }

    LocalStreamingContainer getContainer(String str) {
        return this.childContainers.get(str);
    }

    public StreamingContainerManager getStreamingContainerManager() {
        return this.dnmgr;
    }

    public DAG getDAG() {
        return this.dnmgr.getPhysicalPlan().getLogicalPlan();
    }

    public StramLocalCluster(LogicalPlan logicalPlan, MockComponentFactory mockComponentFactory) throws Exception {
        this(logicalPlan);
        this.mockComponentFactory = mockComponentFactory;
    }

    void failContainer(StreamingContainer streamingContainer) {
        this.injectShutdown.put(streamingContainer.getContainerId(), streamingContainer);
        streamingContainer.triggerHeartbeat();
        LOG.info("Container {} failed, launching new container.", streamingContainer.getContainerId());
        this.dnmgr.scheduleContainerRestart(streamingContainer.getContainerId());
        this.childContainers.remove(streamingContainer.getContainerId());
    }

    public PTOperator findByLogicalNode(LogicalPlan.OperatorMeta operatorMeta) {
        List<PTOperator> operators = this.dnmgr.getPhysicalPlan().getOperators(operatorMeta);
        if (operators.isEmpty()) {
            return null;
        }
        return operators.get(0);
    }

    List<PTOperator> getPlanOperators(LogicalPlan.OperatorMeta operatorMeta) {
        return this.dnmgr.getPhysicalPlan().getOperators(operatorMeta);
    }

    public LocalStreamingContainer getContainer(PTOperator pTOperator) {
        LocalStreamingContainer container;
        String externalId = pTOperator.getContainer().getExternalId();
        if (externalId == null || (container = getContainer(externalId)) == null || container.getNodeContext(pTOperator.getId()) == null) {
            return null;
        }
        return container;
    }

    StreamingContainerAgent getContainerAgent(StreamingContainer streamingContainer) {
        return this.dnmgr.getContainerAgent(streamingContainer.getContainerId());
    }

    public void runAsync() {
        new Thread(this, "master").start();
    }

    public void shutdown() {
        this.appDone = true;
    }

    public void setHeartbeatMonitoringEnabled(boolean z) {
        this.heartbeatMonitoringEnabled = z;
    }

    public void setPerContainerBufferServer(boolean z) {
        this.perContainerBufferServer = z;
    }

    public void setExitCondition(Callable<Boolean> callable) {
        this.exitCondition = callable;
    }

    @Override // java.lang.Runnable
    public void run() {
        run(0L);
    }

    public void run(long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (!this.appDone) {
            for (String str : this.dnmgr.containerStopRequests.values()) {
                LocalStreamingContainer localStreamingContainer = this.childContainers.get(str);
                if (localStreamingContainer != null) {
                    StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse containerHeartbeatResponse = new StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse();
                    containerHeartbeatResponse.shutdown = true;
                    localStreamingContainer.processHeartbeatResponse(containerHeartbeatResponse);
                }
                this.dnmgr.containerStopRequests.remove(str);
                LOG.info("Container {} restart.", str);
                this.dnmgr.scheduleContainerRestart(str);
            }
            while (!this.dnmgr.containerStartRequests.isEmpty()) {
                StreamingContainerAgent.ContainerStartRequest poll = this.dnmgr.containerStartRequests.poll();
                if (poll != null) {
                    new LocalStramChildLauncher(poll);
                }
            }
            if (this.heartbeatMonitoringEnabled) {
                this.dnmgr.monitorHeartbeat();
            }
            if (this.childContainers.isEmpty() && this.dnmgr.containerStartRequests.isEmpty()) {
                this.appDone = true;
            }
            if (j > 0 && System.currentTimeMillis() > currentTimeMillis) {
                this.appDone = true;
            }
            try {
                if (this.exitCondition != null && this.exitCondition.call().booleanValue()) {
                    this.appDone = true;
                }
                if (Thread.interrupted()) {
                    break;
                }
                if (!this.appDone) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        LOG.info("Sleep interrupted " + e.getMessage());
                    }
                }
            } catch (Exception e2) {
            }
        }
        for (LocalStreamingContainer localStreamingContainer2 : this.childContainers.values()) {
            this.injectShutdown.put(localStreamingContainer2.getContainerId(), localStreamingContainer2);
            localStreamingContainer2.triggerHeartbeat();
        }
        this.dnmgr.teardown();
        LOG.info("Application finished.");
        if (this.perContainerBufferServer) {
            return;
        }
        StreamingContainer.eventloop.stop(this.bufferServer);
        StreamingContainer.eventloop.stop();
    }

    static /* synthetic */ int access$508(StramLocalCluster stramLocalCluster) {
        int i = stramLocalCluster.containerSeq;
        stramLocalCluster.containerSeq = i + 1;
        return i;
    }
}
