/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.stram.engine.GenericNode;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.stream.OiOStream;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.UnhandledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OiONode
extends GenericNode {
    private long lastResetWindowId = -1L;
    private long lastEndStreamWindowId = 15998L;
    private int expectingEndWindows = 0;
    private static final Logger logger = LoggerFactory.getLogger(OiONode.class);

    public OiONode(Operator operator, OperatorContext context) {
        super(operator, context);
    }

    @Override
    public void connectInputPort(String port, SweepableReservoir reservoir) {
        ((OiOStream)reservoir).setControlSink(new ControlSink(reservoir));
        super.connectInputPort(port, reservoir);
    }

    class ControlSink
    implements Sink<Tuple> {
        final SweepableReservoir reservoir;

        ControlSink(SweepableReservoir sr) {
            this.reservoir = sr;
        }

        public void put(Tuple t) {
            switch (t.getType()) {
                case BEGIN_WINDOW: {
                    OiONode.this.expectingEndWindows++;
                    if (t.getWindowId() == OiONode.this.currentWindowId) break;
                    OiONode.this.currentWindowId = t.getWindowId();
                    int s = OiONode.this.sinks.length;
                    while (s-- > 0) {
                        OiONode.this.sinks[s].put((Object)t);
                    }
                    ++OiONode.this.controlTupleCount;
                    if (OiONode.this.applicationWindowCount != 0) break;
                    OiONode.this.insideWindow = true;
                    OiONode.this.operator.beginWindow(OiONode.this.currentWindowId);
                    break;
                }
                case END_WINDOW: {
                    OiONode.this.endWindowDequeueTimes.put(this.reservoir, System.currentTimeMillis());
                    if (--OiONode.this.expectingEndWindows != 0) break;
                    OiONode.this.processEndWindow(t);
                    break;
                }
                case CHECKPOINT: {
                    if (OiONode.this.lastCheckpointWindowId >= t.getWindowId() || OiONode.this.doCheckpoint) break;
                    if (OiONode.this.checkpointWindowCount == 0) {
                        OiONode.this.checkpoint(t.getWindowId());
                        OiONode.this.lastCheckpointWindowId = t.getWindowId();
                    } else {
                        OiONode.this.doCheckpoint = true;
                    }
                    int s = OiONode.this.sinks.length;
                    while (s-- > 0) {
                        OiONode.this.sinks[s].put((Object)t);
                    }
                    ++OiONode.this.controlTupleCount;
                    break;
                }
                case RESET_WINDOW: {
                    if (t.getWindowId() == OiONode.this.lastResetWindowId) break;
                    OiONode.this.lastResetWindowId = t.getWindowId();
                    int s = OiONode.this.sinks.length;
                    while (s-- > 0) {
                        OiONode.this.sinks[s].put((Object)t);
                    }
                    ++OiONode.this.controlTupleCount;
                    break;
                }
                case END_STREAM: {
                    if (OiONode.this.lastEndStreamWindowId == t.getWindowId()) break;
                    OiONode.this.lastEndStreamWindowId = t.getWindowId();
                    for (Map.Entry e : OiONode.this.inputs.entrySet()) {
                        Operators.PortContextPair<Operator.InputPort<?>> pcpair = OiONode.this.descriptor.inputPorts.get(e.getKey());
                        if (pcpair == null) continue;
                        ((Operator.InputPort)pcpair.component).setConnected(false);
                    }
                    OiONode.this.inputs.clear();
                    Iterator dici = OiONode.this.deferredInputConnections.iterator();
                    while (dici.hasNext()) {
                        GenericNode.DeferredInputConnection dic = (GenericNode.DeferredInputConnection)dici.next();
                        if (OiONode.this.inputs.containsKey(dic.portname)) continue;
                        dici.remove();
                        OiONode.this.connectInputPort(dic.portname, dic.reservoir);
                    }
                    if (!OiONode.this.inputs.isEmpty()) break;
                    if (OiONode.this.insideWindow) {
                        OiONode.this.applicationWindowCount = OiONode.this.APPLICATION_WINDOW_COUNT - 1;
                        OiONode.this.expectingEndWindows = 0;
                        OiONode.this.endWindowDequeueTimes.put(this.reservoir, System.currentTimeMillis());
                        OiONode.this.processEndWindow(null);
                    }
                    OiONode.this.emitEndStream();
                    break;
                }
                default: {
                    throw new UnhandledException("Unrecognized Control Tuple", (Throwable)new IllegalArgumentException(t.toString()));
                }
            }
        }

        public int getCount(boolean reset) {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }
}

