/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Stats;
import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.api.ContainerEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import net.engio.mbassy.listener.Handler;

public class BufferServerStatsSubscriber {
    private HashMap<String, ByteCounterStream> inputStreams = new HashMap();
    private HashMap<String, List<ByteCounterStream>> outputStreams = new HashMap();

    @Handler
    public void handleStreamActivation(ContainerEvent.StreamActivationEvent sae) {
        ComponentContextPair<Stream, StreamContext> stream = sae.getStream();
        String portId = ((StreamContext)((Object)stream.context)).getPortId();
        String sinkId = ((StreamContext)((Object)stream.context)).getSinkId();
        if (stream.component instanceof ByteCounterStream) {
            if (sinkId.startsWith("tcp:")) {
                List<ByteCounterStream> portStreams = this.outputStreams.get(portId);
                if (portStreams == null) {
                    portStreams = new ArrayList<ByteCounterStream>();
                    this.outputStreams.put(portId, portStreams);
                }
                portStreams.add((ByteCounterStream)stream.component);
            } else {
                this.inputStreams.put(portId, (ByteCounterStream)stream.component);
            }
        }
    }

    @Handler
    public void handleStreamDeactivation(ContainerEvent.StreamDeactivationEvent sde) {
        ComponentContextPair<Stream, StreamContext> stream = sde.getStream();
        String portId = ((StreamContext)((Object)stream.context)).getPortId();
        String sinkId = ((StreamContext)((Object)stream.context)).getSinkId();
        if (stream.component instanceof ByteCounterStream) {
            if (sinkId.startsWith("tcp:")) {
                List<ByteCounterStream> portStreams = this.outputStreams.get(portId);
                if (portStreams != null) {
                    portStreams.remove(stream);
                    if (portStreams.size() == 0) {
                        this.outputStreams.remove(portId);
                    }
                }
            } else {
                this.inputStreams.remove(portId);
            }
        }
    }

    @Handler
    public void handleContainerStats(ContainerEvent.ContainerStatsEvent cse) {
        StreamingContainerUmbilicalProtocol.ContainerStats stats = cse.getContainerStats();
        for (StreamingContainerUmbilicalProtocol.OperatorHeartbeat node : stats.operators) {
            for (Stats.OperatorStats os : node.windowStats) {
                if (os.inputPorts != null) {
                    for (Stats.OperatorStats.PortStats ps : os.inputPorts) {
                        ByteCounterStream stream = this.inputStreams.get(ps.id);
                        if (stream == null) continue;
                        ps.bufferServerBytes = stream.getByteCount(true);
                    }
                }
                if (os.outputPorts == null) continue;
                for (Stats.OperatorStats.PortStats ps : os.outputPorts) {
                    List<ByteCounterStream> portStreams = this.outputStreams.get(ps.id);
                    if (this.outputStreams == null) continue;
                    ps.bufferServerBytes = 0L;
                    for (ByteCounterStream stream : portStreams) {
                        ps.bufferServerBytes = stream.getByteCount(true);
                    }
                }
            }
        }
    }
}

