/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.util.ConfigUtils;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingContainerAgent {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerAgent.class);
    boolean shutdownRequested = false;
    Set<PTOperator> deployOpers = Sets.newHashSet();
    Set<Integer> undeployOpers = Sets.newHashSet();
    int deployCnt = 0;
    long lastHeartbeatMillis = 0L;
    long createdMillis = System.currentTimeMillis();
    final PTContainer container;
    final StreamingContainerUmbilicalProtocol.StreamingContainerContext initCtx;
    String jvmName;
    int memoryMBFree;
    long gcCollectionCount;
    long gcCollectionTime;
    final StreamingContainerManager dnmgr;
    private final ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> operatorRequests = new ConcurrentLinkedQueue();

    public StreamingContainerAgent(PTContainer container, StreamingContainerUmbilicalProtocol.StreamingContainerContext initCtx, StreamingContainerManager dnmgr) {
        this.container = container;
        this.initCtx = initCtx;
        this.memoryMBFree = this.container.getAllocatedMemoryMB();
        this.dnmgr = dnmgr;
    }

    public StreamingContainerUmbilicalProtocol.StreamingContainerContext getInitContext() {
        return this.initCtx;
    }

    public PTContainer getContainer() {
        return this.container;
    }

    public boolean hasPendingWork() {
        for (PTOperator oper : this.container.getOperators()) {
            if (oper.getState() != PTOperator.State.PENDING_DEPLOY) continue;
            return true;
        }
        return false;
    }

    public void addOperatorRequest(StreamingContainerUmbilicalProtocol.StramToNodeRequest r) {
        LOG.info("Adding operator request {} {}", (Object)this.container.getExternalId(), (Object)r);
        this.operatorRequests.add(r);
    }

    protected ConcurrentLinkedQueue<StreamingContainerUmbilicalProtocol.StramToNodeRequest> getOperatorRequests() {
        return this.operatorRequests;
    }

    public List<OperatorDeployInfo> getDeployInfoList(Collection<PTOperator> operators) {
        StreamCodec<?> streamCodecInfo;
        Integer id;
        OperatorDeployInfo ndi;
        if (this.container.bufferServerAddress == null) {
            throw new AssertionError((Object)"No buffer server address assigned");
        }
        LinkedHashMap<OperatorDeployInfo, PTOperator> nodes = new LinkedHashMap<OperatorDeployInfo, PTOperator>();
        HashSet<PTOperator.PTOutput> publishers = new HashSet<PTOperator.PTOutput>();
        PhysicalPlan physicalPlan = this.dnmgr.getPhysicalPlan();
        for (PTOperator pTOperator : operators) {
            if (pTOperator.getState() != PTOperator.State.PENDING_DEPLOY) {
                LOG.debug("Skipping deploy for operator {} state {}", (Object)pTOperator, (Object)pTOperator.getState());
                continue;
            }
            ndi = this.createOperatorDeployInfo(pTOperator);
            nodes.put(ndi, pTOperator);
            ndi.inputs = new ArrayList<OperatorDeployInfo.InputDeployInfo>(pTOperator.getInputs().size());
            ndi.outputs = new ArrayList<OperatorDeployInfo.OutputDeployInfo>(pTOperator.getOutputs().size());
            for (PTOperator.PTOutput out : pTOperator.getOutputs()) {
                LogicalPlan.StreamMeta streamMeta = out.logicalStream;
                OperatorDeployInfo.OutputDeployInfo portInfo = new OperatorDeployInfo.OutputDeployInfo();
                portInfo.declaredStreamId = streamMeta.getName();
                portInfo.portName = out.portName;
                try {
                    portInfo.contextAttributes = streamMeta.getSource().getAttributes().clone();
                }
                catch (CloneNotSupportedException ex) {
                    throw new RuntimeException("Cannot clone attributes", ex);
                }
                boolean outputUnified = false;
                Iterator<Serializable> i$ = out.sinks.iterator();
                while (i$.hasNext()) {
                    PTOperator.PTInput input = i$.next();
                    if (!input.target.isUnifier()) continue;
                    outputUnified = true;
                    break;
                }
                portInfo.contextAttributes.put(Context.PortContext.IS_OUTPUT_UNIFIED, (Object)outputUnified);
                if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER && (i$ = streamMeta.getSinks().iterator()).hasNext()) {
                    LogicalPlan.InputPortMeta sink = (LogicalPlan.InputPortMeta)i$.next();
                    portInfo.contextAttributes = sink.getAttributes();
                }
                if (!out.isDownStreamInline()) {
                    portInfo.bufferServerHost = pTOperator.getContainer().bufferServerAddress.getHostName();
                    portInfo.bufferServerPort = pTOperator.getContainer().bufferServerAddress.getPort();
                    portInfo.bufferServerToken = pTOperator.getContainer().getBufferServerToken();
                    for (PTOperator.PTInput input : out.sinks) {
                        LogicalPlan.InputPortMeta inputPortMeta;
                        if (input.target.getContainer() == out.source.getContainer() || portInfo.streamCodecs.containsKey(id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo = StreamingContainerAgent.getStreamCodec(inputPortMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(input))))) continue;
                        portInfo.streamCodecs.put(id, streamCodecInfo);
                    }
                }
                ndi.outputs.add(portInfo);
                publishers.add(out);
            }
        }
        for (Map.Entry entry : nodes.entrySet()) {
            ndi = (OperatorDeployInfo)entry.getKey();
            PTOperator oper = (PTOperator)entry.getValue();
            for (PTOperator.PTInput in : oper.getInputs()) {
                LogicalPlan.StreamMeta streamMeta = in.logicalStream;
                if (streamMeta.getSource() == null) {
                    throw new AssertionError((Object)("source is null: " + in));
                }
                PTOperator.PTOutput sourceOutput = in.source;
                OperatorDeployInfo.InputDeployInfo inputInfo = new OperatorDeployInfo.InputDeployInfo();
                inputInfo.declaredStreamId = streamMeta.getName();
                inputInfo.portName = in.portName;
                LogicalPlan.InputPortMeta inputPortMeta = StreamingContainerAgent.getInputPortMeta(oper.getOperatorMeta(), streamMeta);
                if (inputPortMeta != null) {
                    inputInfo.contextAttributes = inputPortMeta.getAttributes();
                }
                if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
                    inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
                }
                inputInfo.sourceNodeId = sourceOutput.source.getId();
                inputInfo.sourcePortName = sourceOutput.portName;
                if (in.partitions != null && in.partitions.mask != 0) {
                    inputInfo.partitionMask = in.partitions.mask;
                    inputInfo.partitionKeys = in.partitions.partitions;
                }
                if (sourceOutput.source.getContainer() == oper.getContainer()) {
                    if (!publishers.contains(sourceOutput)) {
                        throw new AssertionError((Object)("Source not deployed for container local stream " + sourceOutput + " " + in));
                    }
                    if (streamMeta.getLocality() == DAG.Locality.THREAD_LOCAL) {
                        inputInfo.locality = DAG.Locality.THREAD_LOCAL;
                        ndi.type = OperatorDeployInfo.OperatorType.OIO;
                    } else {
                        inputInfo.locality = DAG.Locality.CONTAINER_LOCAL;
                    }
                } else {
                    PTContainer container = sourceOutput.source.getContainer();
                    InetSocketAddress addr = container.bufferServerAddress;
                    if (addr == null) {
                        throw new AssertionError((Object)("upstream address not assigned: " + sourceOutput));
                    }
                    inputInfo.bufferServerHost = addr.getHostName();
                    inputInfo.bufferServerPort = addr.getPort();
                    inputInfo.bufferServerToken = container.getBufferServerToken();
                }
                LogicalPlan.InputPortMeta idInputPortMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(in);
                streamCodecInfo = StreamingContainerAgent.getStreamCodec(idInputPortMeta);
                id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
                inputInfo.streamCodecs.put(id, streamCodecInfo);
                ndi.inputs.add(inputInfo);
            }
        }
        return new ArrayList<OperatorDeployInfo>(nodes.keySet());
    }

    public static LogicalPlan.InputPortMeta getInputPortMeta(LogicalPlan.OperatorMeta operatorMeta, LogicalPlan.StreamMeta streamMeta) {
        LogicalPlan.InputPortMeta inputPortMeta = null;
        Map<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> inputStreams = operatorMeta.getInputStreams();
        for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : inputStreams.entrySet()) {
            if (entry.getValue() != streamMeta) continue;
            inputPortMeta = entry.getKey();
            break;
        }
        return inputPortMeta;
    }

    public static LogicalPlan.InputPortMeta getIdentifyingInputPortMeta(PTOperator.PTInput input) {
        LogicalPlan.InputPortMeta inputPortMeta;
        PTOperator inputTarget = input.target;
        LogicalPlan.StreamMeta streamMeta = input.logicalStream;
        if (!inputTarget.isUnifier()) {
            inputPortMeta = StreamingContainerAgent.getInputPortMeta(inputTarget.getOperatorMeta(), streamMeta);
        } else {
            PTOperator destTarget = StreamingContainerAgent.getIdentifyingOperator(inputTarget);
            inputPortMeta = StreamingContainerAgent.getInputPortMeta(destTarget.getOperatorMeta(), streamMeta);
        }
        return inputPortMeta;
    }

    public static PTOperator getIdentifyingOperator(PTOperator operator) {
        while (operator != null && operator.isUnifier()) {
            List<PTOperator.PTInput> sinks;
            PTOperator idOperator = null;
            List<PTOperator.PTOutput> outputs = operator.getOutputs();
            if (outputs.size() > 0 && (sinks = outputs.get((int)0).sinks).size() > 0) {
                PTOperator.PTInput sink = sinks.get(0);
                idOperator = sink.target;
            }
            operator = idOperator;
        }
        return operator;
    }

    public static StreamCodec<?> getStreamCodec(LogicalPlan.InputPortMeta inputPortMeta) {
        if (inputPortMeta != null) {
            StreamCodec codec = (StreamCodec)inputPortMeta.getValue(Context.PortContext.STREAM_CODEC);
            if (codec == null && (codec = inputPortMeta.getPortObject().getStreamCodec()) != null) {
                inputPortMeta.getAttributes().put(Context.PortContext.STREAM_CODEC, (Object)codec);
            }
            return codec;
        }
        return null;
    }

    private OperatorDeployInfo createOperatorDeployInfo(PTOperator oper) {
        OperatorDeployInfo ndi;
        if (oper.isUnifier()) {
            OperatorDeployInfo.UnifierDeployInfo udi = new OperatorDeployInfo.UnifierDeployInfo();
            try {
                udi.operatorAttributes = oper.getUnifiedOperatorMeta().getAttributes().clone();
            }
            catch (CloneNotSupportedException ex) {
                throw new RuntimeException("Cannot clone unifier attributes", ex);
            }
            ndi = udi;
        } else {
            ndi = new OperatorDeployInfo();
            Operator operator = oper.getOperatorMeta().getOperator();
            if (operator instanceof InputOperator) {
                ndi.type = OperatorDeployInfo.OperatorType.INPUT;
                if (!oper.getInputs().isEmpty()) {
                    for (PTOperator.PTInput ptInput : oper.getInputs()) {
                        if (ptInput.logicalStream == null || ptInput.logicalStream.getSource() == null) continue;
                        ndi.type = OperatorDeployInfo.OperatorType.GENERIC;
                        break;
                    }
                }
            } else {
                ndi.type = OperatorDeployInfo.OperatorType.GENERIC;
            }
        }
        Checkpoint checkpoint = oper.getRecoveryCheckpoint();
        Operator.ProcessingMode pm = (Operator.ProcessingMode)oper.getOperatorMeta().getValue(OperatorContext.PROCESSING_MODE);
        if (pm == Operator.ProcessingMode.AT_MOST_ONCE || pm == Operator.ProcessingMode.EXACTLY_ONCE) {
            StorageAgent agent = (StorageAgent)oper.getOperatorMeta().getAttributes().get(OperatorContext.STORAGE_AGENT);
            if (agent == null) {
                agent = (StorageAgent)this.initCtx.getValue(OperatorContext.STORAGE_AGENT);
            }
            try {
                long[] windowIds = agent.getWindowIds(oper.getId());
                long checkpointId = -1L;
                for (long windowId : windowIds) {
                    if (windowId <= checkpointId) continue;
                    checkpointId = windowId;
                }
                if (checkpoint == null || checkpoint.windowId != checkpointId) {
                    checkpoint = new Checkpoint(checkpointId, 0, 0);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to determine checkpoint window id " + oper, e);
            }
        }
        LOG.debug("{} recovery checkpoint {}", (Object)oper, (Object)checkpoint);
        ndi.checkpoint = checkpoint;
        ndi.name = oper.getOperatorMeta().getName();
        ndi.id = oper.getId();
        try {
            ndi.contextAttributes = oper.getOperatorMeta().getAttributes().clone();
        }
        catch (CloneNotSupportedException ex) {
            throw new RuntimeException("Cannot clone operator attributes", ex);
        }
        if (oper.isOperatorStateLess()) {
            ndi.contextAttributes.put(OperatorContext.STATELESS, (Object)true);
        }
        return ndi;
    }

    public ContainerInfo getContainerInfo() {
        ContainerInfo ci = new ContainerInfo();
        ci.id = this.container.getExternalId();
        ci.host = this.container.host;
        ci.state = this.container.getState().name();
        ci.jvmName = this.jvmName;
        ci.numOperators = this.container.getOperators().size();
        ci.memoryMBAllocated = this.container.getAllocatedMemoryMB();
        ci.lastHeartbeat = this.lastHeartbeatMillis;
        ci.memoryMBFree = this.memoryMBFree;
        ci.gcCollectionCount = this.gcCollectionCount;
        ci.gcCollectionTime = this.gcCollectionTime;
        ci.startedTime = this.container.getStartedTime();
        ci.finishedTime = this.container.getFinishedTime();
        if (this.container.nodeHttpAddress != null) {
            YarnConfiguration conf = new YarnConfiguration();
            ci.containerLogsUrl = ConfigUtils.getSchemePrefix(conf) + this.container.nodeHttpAddress + "/node/containerlogs/" + ci.id + "/" + System.getenv(ApplicationConstants.Environment.USER.toString());
            ci.rawContainerLogsUrl = ConfigUtils.getRawContainerLogsUrl(conf, this.container.nodeHttpAddress, (String)this.container.getPlan().getLogicalPlan().getAttributes().get(LogicalPlan.APPLICATION_ID), ci.id);
        }
        return ci;
    }

    public static class ContainerStartRequest {
        final PTContainer container;

        ContainerStartRequest(PTContainer container) {
            this.container = container;
        }
    }
}

