/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.stram.debug.TappedReservoir;
import com.datatorrent.stram.engine.Node;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.engine.WindowIdActivatedReservoir;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.lang.UnhandledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenericNode
extends Node<Operator> {
    protected final HashMap<String, SweepableReservoir> inputs = new HashMap();
    protected ArrayList<DeferredInputConnection> deferredInputConnections = new ArrayList();
    boolean insideWindow;
    boolean doCheckpoint;
    long lastCheckpointWindowId = -1L;
    private static final Logger logger = LoggerFactory.getLogger(GenericNode.class);

    @Override
    public void addSinks(Map<String, Sink<Object>> sinks) {
        for (Map.Entry<String, Sink<Object>> e : sinks.entrySet()) {
            TappedReservoir tr;
            SweepableReservoir original = this.inputs.get(e.getKey());
            if (original instanceof TappedReservoir) {
                tr = (TappedReservoir)original;
                tr.add(e.getValue());
                continue;
            }
            if (original == null) continue;
            tr = new TappedReservoir(original, e.getValue());
            this.inputs.put(e.getKey(), tr);
        }
        super.addSinks(sinks);
    }

    @Override
    public void removeSinks(Map<String, Sink<Object>> sinks) {
        for (Map.Entry<String, Sink<Object>> e : sinks.entrySet()) {
            SweepableReservoir reservoir = this.inputs.get(e.getKey());
            if (!(reservoir instanceof TappedReservoir)) continue;
            TappedReservoir tr = (TappedReservoir)reservoir;
            tr.remove(e.getValue());
            if (tr.getSinks().length != 0) continue;
            tr.reservoir.setSink(tr.setSink(null));
            this.inputs.put(e.getKey(), tr.reservoir);
        }
        super.removeSinks(sinks);
    }

    public GenericNode(Operator operator, OperatorContext context) {
        super(operator, context);
    }

    public Operator.InputPort<Object> getInputPort(String port) {
        return (Operator.InputPort)this.descriptor.inputPorts.get((Object)port).component;
    }

    @Override
    public void connectInputPort(String port, SweepableReservoir reservoir) {
        if (reservoir == null) {
            throw new IllegalArgumentException("Reservoir cannot be null for port '" + port + "' on operator '" + this.operator + "'");
        }
        Operator.InputPort<Object> inputPort = this.getInputPort(port);
        if (inputPort == null) {
            throw new IllegalArgumentException("Port '" + port + "' does not exist on operator '" + this.operator + "'");
        }
        if (this.inputs.containsKey(port)) {
            this.deferredInputConnections.add(new DeferredInputConnection(port, reservoir));
        } else {
            inputPort.setConnected(true);
            this.inputs.put(port, reservoir);
            reservoir.setSink((Sink<Object>)inputPort.getSink());
        }
    }

    protected void processEndWindow(Tuple endWindowTuple) {
        this.endWindowEmitTime = System.currentTimeMillis();
        if (++this.applicationWindowCount == this.APPLICATION_WINDOW_COUNT) {
            this.insideWindow = false;
            this.operator.endWindow();
            this.applicationWindowCount = 0;
        }
        if (endWindowTuple == null) {
            this.emitEndWindow();
        } else {
            int s = this.sinks.length;
            while (s-- > 0) {
                this.sinks[s].put((Object)endWindowTuple);
            }
            ++this.controlTupleCount;
        }
        if (++this.checkpointWindowCount == this.CHECKPOINT_WINDOW_COUNT) {
            this.checkpointWindowCount = 0;
            if (this.doCheckpoint) {
                this.checkpoint(this.currentWindowId);
                this.lastCheckpointWindowId = this.currentWindowId;
                this.doCheckpoint = false;
            } else if (this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                this.checkpoint(this.currentWindowId);
                this.lastCheckpointWindowId = this.currentWindowId;
            }
        }
        Stats.OperatorStats stats = new Stats.OperatorStats();
        this.reportStats(stats, this.currentWindowId);
        if (!this.insideWindow) {
            stats.metrics = this.collectMetrics();
        }
        this.handleRequests(this.currentWindowId);
    }

    @Override
    public void activate() {
        super.activate();
        this.insideWindow = this.applicationWindowCount != 0;
    }

    private boolean isInputPortConnectedToDelayOperator(String portName) {
        Operators.PortContextPair<Operator.InputPort<?>> pcPair = this.descriptor.inputPorts.get(portName);
        if (pcPair == null || pcPair.context == null) {
            return false;
        }
        return (Boolean)((Context.PortContext)pcPair.context).getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    @Override
    public final void run() {
        int totalQueues;
        this.doCheckpoint = false;
        long spinMillis = ((Integer)this.context.getValue(OperatorContext.SPIN_MILLIS)).intValue();
        boolean handleIdleTime = this.operator instanceof Operator.IdleTimeHandler;
        int regularQueues = totalQueues = this.inputs.size();
        for (String portName : this.inputs.keySet()) {
            if (!this.isInputPortConnectedToDelayOperator(portName)) continue;
            --regularQueues;
        }
        ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<Map.Entry<String, SweepableReservoir>>();
        activeQueues.addAll(this.inputs.entrySet());
        int expectingBeginWindow = activeQueues.size();
        int receivedEndWindow = 0;
        long firstWindowId = -1L;
        LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
        try {
            do {
                Iterator buffers = activeQueues.iterator();
                block15: while (buffers.hasNext()) {
                    Map.Entry activePortEntry = (Map.Entry)buffers.next();
                    SweepableReservoir activePort = (SweepableReservoir)activePortEntry.getValue();
                    Tuple tuple = activePort.sweep();
                    if (tuple == null) continue;
                    boolean delay = this.operator instanceof Operator.DelayOperator;
                    long windowAhead = 0L;
                    if (delay) {
                        windowAhead = WindowGenerator.getAheadWindowId(tuple.getWindowId(), this.firstWindowMillis, this.windowWidthMillis, 1);
                    }
                    switch (tuple.getType()) {
                        case BEGIN_WINDOW: {
                            if (expectingBeginWindow == totalQueues) {
                                if (this.isInputPortConnectedToDelayOperator((String)activePortEntry.getKey())) continue block15;
                                activePort.remove();
                                --expectingBeginWindow;
                                receivedEndWindow = 0;
                                this.currentWindowId = tuple.getWindowId();
                                if (delay) {
                                    if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > (long)tuple.getBaseSeconds()) {
                                        ResetWindowTuple resetWindowTuple = new ResetWindowTuple(windowAhead);
                                        int s = this.sinks.length;
                                        while (s-- > 0) {
                                            this.sinks[s].put((Object)resetWindowTuple);
                                        }
                                        ++this.controlTupleCount;
                                    }
                                    tuple.setWindowId(windowAhead);
                                }
                                int s = this.sinks.length;
                                while (s-- > 0) {
                                    this.sinks[s].put((Object)tuple);
                                }
                                ++this.controlTupleCount;
                                if (this.applicationWindowCount != 0) continue block15;
                                this.insideWindow = true;
                                this.operator.beginWindow(this.currentWindowId);
                                break;
                            }
                            if (tuple.getWindowId() == this.currentWindowId) {
                                activePort.remove();
                                --expectingBeginWindow;
                                break;
                            }
                            buffers.remove();
                            String port = (String)activePortEntry.getKey();
                            if (this.PROCESSING_MODE == Operator.ProcessingMode.AT_MOST_ONCE) {
                                if (tuple.getWindowId() < this.currentWindowId) {
                                    Sink<Object> sink = activePort.setSink((Sink<Object>)Sink.BLACKHOLE);
                                    this.deferredInputConnections.add(0, new DeferredInputConnection(port, activePort));
                                    WindowIdActivatedReservoir wiar = new WindowIdActivatedReservoir(port, activePort, this.currentWindowId);
                                    wiar.setSink(sink);
                                    this.inputs.put(port, wiar);
                                    activeQueues.add(new AbstractMap.SimpleEntry<String, WindowIdActivatedReservoir>(port, wiar));
                                    break block15;
                                }
                                --expectingBeginWindow;
                                if (++receivedEndWindow != totalQueues) continue block15;
                                this.processEndWindow(null);
                                activeQueues.addAll(this.inputs.entrySet());
                                expectingBeginWindow = activeQueues.size();
                                break block15;
                            }
                            logger.error("Catastrophic Error: Out of sequence {} tuple {} on port {} while expecting {}", new Object[]{tuple.getType(), Codec.getStringWindowId((long)tuple.getWindowId()), port, Codec.getStringWindowId((long)this.currentWindowId)});
                            System.exit(2);
                            break;
                        }
                        case END_WINDOW: {
                            buffers.remove();
                            if (tuple.getWindowId() != this.currentWindowId) continue block15;
                            activePort.remove();
                            this.endWindowDequeueTimes.put(activePort, System.currentTimeMillis());
                            if (++receivedEndWindow != totalQueues) continue block15;
                            assert (activeQueues.isEmpty());
                            if (delay) {
                                tuple.setWindowId(windowAhead);
                            }
                            this.processEndWindow(tuple);
                            activeQueues.addAll(this.inputs.entrySet());
                            expectingBeginWindow = activeQueues.size();
                            break block15;
                        }
                        case CHECKPOINT: {
                            activePort.remove();
                            long checkpointWindow = tuple.getWindowId();
                            if (this.lastCheckpointWindowId >= checkpointWindow) continue block15;
                            if (this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                                this.lastCheckpointWindowId = checkpointWindow;
                            } else if (!this.doCheckpoint) {
                                if (this.checkpointWindowCount == 0) {
                                    this.checkpoint(checkpointWindow);
                                    this.lastCheckpointWindowId = checkpointWindow;
                                } else {
                                    this.doCheckpoint = true;
                                }
                            }
                            if (delay) continue block15;
                            int s = this.sinks.length;
                            while (s-- > 0) {
                                this.sinks[s].put((Object)tuple);
                            }
                            ++this.controlTupleCount;
                            break;
                        }
                        case RESET_WINDOW: {
                            int trackerIndex;
                            activePort.remove();
                            if (this.isInputPortConnectedToDelayOperator((String)activePortEntry.getKey())) break;
                            buffers.remove();
                            int baseSeconds = tuple.getBaseSeconds();
                            TupleTracker tracker22 = null;
                            for (TupleTracker tracker22 : resetTupleTracker) {
                                if (tracker22.tuple.getBaseSeconds() != baseSeconds) continue;
                            }
                            if (tracker22 == null) {
                                tracker22 = new TupleTracker(tuple, regularQueues);
                                resetTupleTracker.add(tracker22);
                            }
                            for (trackerIndex = 0; trackerIndex < tracker22.ports.length; ++trackerIndex) {
                                if (tracker22.ports[trackerIndex] == null) {
                                    tracker22.ports[trackerIndex++] = activePort;
                                    break;
                                }
                                if (tracker22.ports[trackerIndex] == activePort) break;
                            }
                            if (trackerIndex != regularQueues) continue block15;
                            Iterator trackerIterator = resetTupleTracker.iterator();
                            while (trackerIterator.hasNext()) {
                                if (((TupleTracker)trackerIterator.next()).tuple.getBaseSeconds() > baseSeconds) continue;
                                trackerIterator.remove();
                            }
                            if (!delay) {
                                int s = this.sinks.length;
                                while (s-- > 0) {
                                    this.sinks[s].put((Object)tuple);
                                }
                                ++this.controlTupleCount;
                            }
                            if (!activeQueues.isEmpty()) {
                                for (Map.Entry entry : activeQueues) {
                                    if (!this.isInputPortConnectedToDelayOperator((String)entry.getKey())) assert (false);
                                }
                                activeQueues.clear();
                            }
                            activeQueues.addAll(this.inputs.entrySet());
                            expectingBeginWindow = activeQueues.size();
                            if (firstWindowId != -1L) break block15;
                            if (delay) {
                                int s = this.sinks.length;
                                while (s-- > 0) {
                                    this.sinks[s].put((Object)tuple);
                                }
                                ++this.controlTupleCount;
                                this.fabricateFirstWindow((Operator.DelayOperator)this.operator, windowAhead);
                            }
                            firstWindowId = tuple.getWindowId();
                            break block15;
                        }
                        case END_STREAM: {
                            int trackerIndex;
                            activePort.remove();
                            buffers.remove();
                            if (firstWindowId == -1L) {
                                if (delay) {
                                    this.fabricateFirstWindow((Operator.DelayOperator)this.operator, windowAhead);
                                }
                                firstWindowId = tuple.getWindowId();
                            }
                            Iterator<Map.Entry<String, SweepableReservoir>> it = this.inputs.entrySet().iterator();
                            while (it.hasNext()) {
                                Map.Entry<String, SweepableReservoir> e = it.next();
                                if (e.getValue() != activePort) continue;
                                if (!this.descriptor.inputPorts.isEmpty()) {
                                    ((Operator.InputPort)this.descriptor.inputPorts.get((Object)e.getKey()).component).setConnected(false);
                                }
                                it.remove();
                                Iterator<DeferredInputConnection> iterator = this.deferredInputConnections.iterator();
                                while (iterator.hasNext()) {
                                    DeferredInputConnection dic = iterator.next();
                                    if (!e.getKey().equals(dic.portname)) continue;
                                    this.connectInputPort(dic.portname, dic.reservoir);
                                    iterator.remove();
                                    activeQueues.add(new AbstractMap.SimpleEntry<String, SweepableReservoir>(dic.portname, dic.reservoir));
                                    break block15;
                                }
                                break block25;
                            }
                            --expectingBeginWindow;
                            --totalQueues;
                            boolean break_activequeue = false;
                            if (--regularQueues == 0) {
                                this.alive = false;
                                break_activequeue = true;
                            } else if (activeQueues.isEmpty()) {
                                assert (!this.inputs.isEmpty());
                                this.processEndWindow(null);
                                activeQueues.addAll(this.inputs.entrySet());
                                expectingBeginWindow = activeQueues.size();
                                break_activequeue = true;
                            }
                            Tuple tuple2 = null;
                            Iterator iterator = resetTupleTracker.iterator();
                            block27: while (iterator.hasNext()) {
                                TupleTracker tracker = (TupleTracker)iterator.next();
                                for (trackerIndex = 0; trackerIndex < tracker.ports.length; ++trackerIndex) {
                                    if (tracker.ports[trackerIndex] == activePort) {
                                        SweepableReservoir[] ports = new SweepableReservoir[regularQueues];
                                        System.arraycopy(tracker.ports, 0, ports, 0, trackerIndex);
                                        if (trackerIndex < regularQueues) {
                                            System.arraycopy(tracker.ports, trackerIndex + 1, ports, trackerIndex, tracker.ports.length - trackerIndex - 1);
                                        }
                                        tracker.ports = ports;
                                        continue block27;
                                    }
                                    if (tracker.ports[trackerIndex] == null) {
                                        if (trackerIndex != regularQueues) continue block27;
                                        if (tuple2 == null || tuple2.getBaseSeconds() < tracker.tuple.getBaseSeconds()) {
                                            tuple2 = tracker.tuple;
                                        }
                                        iterator.remove();
                                        continue block27;
                                    }
                                    tracker.ports = Arrays.copyOf(tracker.ports, regularQueues);
                                }
                            }
                            if (tuple2 != null && !delay) {
                                void var26_45;
                                int n = this.sinks.length;
                                while (--var26_45 > 0) {
                                    this.sinks[var26_45].put(tuple2);
                                }
                                ++this.controlTupleCount;
                            }
                            if (!break_activequeue) continue block15;
                            break block15;
                        }
                        default: {
                            throw new UnhandledException("Unrecognized Control Tuple", (Throwable)new IllegalArgumentException(tuple.toString()));
                        }
                    }
                }
                if (activeQueues.isEmpty() && this.alive) {
                    logger.error("Catastrophic Error: Invalid State - the operator blocked forever!");
                    System.exit(2);
                    continue;
                }
                boolean need2sleep = true;
                for (Map.Entry entry : activeQueues) {
                    if (((SweepableReservoir)entry.getValue()).size() <= 0) continue;
                    need2sleep = false;
                    break;
                }
                if (!need2sleep) continue;
                if (handleIdleTime && this.insideWindow) {
                    ((Operator.IdleTimeHandler)this.operator).handleIdleTime();
                    continue;
                }
                Thread.sleep(spinMillis);
            } while (this.alive);
        }
        catch (Operator.ShutdownException se) {
            logger.debug("Shutdown requested by the operator when alive = {}.", (Object)this.alive);
            this.alive = false;
        }
        catch (Throwable cause) {
            Throwable rootCause;
            GenericNode need2sleep = this;
            synchronized (need2sleep) {
                if (this.alive) {
                    DTThrowable.rethrow((Throwable)cause);
                }
            }
            for (rootCause = cause; rootCause != null && !(rootCause instanceof InterruptedException); rootCause = rootCause.getCause()) {
            }
            if (rootCause == null) {
                DTThrowable.rethrow((Throwable)cause);
            }
            logger.debug("Ignoring InterruptedException after shutdown", cause);
        }
        if (this.insideWindow && !this.shutdown) {
            this.endWindowEmitTime = System.currentTimeMillis();
            this.operator.endWindow();
            if (++this.applicationWindowCount == this.APPLICATION_WINDOW_COUNT) {
                this.applicationWindowCount = 0;
            }
            if (++this.checkpointWindowCount == this.CHECKPOINT_WINDOW_COUNT) {
                this.checkpointWindowCount = 0;
                if (this.doCheckpoint || this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                    this.checkpoint(this.currentWindowId);
                }
            }
            Stats.OperatorStats stats = new Stats.OperatorStats();
            this.fixEndWindowDequeueTimesBeforeDeactivate();
            this.reportStats(stats, this.currentWindowId);
            stats.metrics = this.collectMetrics();
            this.handleRequests(this.currentWindowId);
        }
    }

    private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead) {
        Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);
        Tuple endWindowTuple = new Tuple(MessageType.END_WINDOW, windowAhead);
        for (Sink sink : this.outputs.values()) {
            sink.put((Object)beginWindowTuple);
        }
        ++this.controlTupleCount;
        delayOperator.firstWindow();
        for (Sink sink : this.outputs.values()) {
            sink.put((Object)endWindowTuple);
        }
        ++this.controlTupleCount;
    }

    private void fixEndWindowDequeueTimesBeforeDeactivate() {
        long endWindowDequeueTime = System.currentTimeMillis();
        for (SweepableReservoir sr : this.inputs.values()) {
            if (this.endWindowDequeueTimes.get(sr) != null) continue;
            this.endWindowDequeueTimes.put(sr, endWindowDequeueTime);
        }
    }

    @Override
    protected void reportStats(Stats.OperatorStats stats, long windowId) {
        ArrayList<Stats.OperatorStats.PortStats> ipstats = new ArrayList<Stats.OperatorStats.PortStats>();
        for (Map.Entry<String, SweepableReservoir> e : this.inputs.entrySet()) {
            SweepableReservoir ar = e.getValue();
            Stats.OperatorStats.PortStats portStats = new Stats.OperatorStats.PortStats(e.getKey());
            portStats.queueSize = ar.size();
            if (this.DATA_TUPLE_AWARE && ar instanceof CircularBuffer) {
                Iterator iterator = ((CircularBuffer)ar).getFrozenIterator();
                while (iterator.hasNext()) {
                    if (!(iterator.next() instanceof Tuple)) continue;
                    --portStats.queueSize;
                }
            }
            portStats.tupleCount = ar.getCount(true);
            portStats.endWindowTimestamp = (Long)this.endWindowDequeueTimes.get(e.getValue());
            ipstats.add(portStats);
        }
        stats.inputPorts = ipstats;
        super.reportStats(stats, windowId);
    }

    protected class DeferredInputConnection {
        String portname;
        SweepableReservoir reservoir;

        DeferredInputConnection(String portname, SweepableReservoir reservoir) {
            this.portname = portname;
            this.reservoir = reservoir;
        }
    }

    class TupleTracker {
        final Tuple tuple;
        SweepableReservoir[] ports;

        TupleTracker(Tuple base, int count) {
            this.tuple = base;
            this.ports = new SweepableReservoir[count];
        }
    }
}

