package com.datatorrent.stram.engine;

import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/InputNode.class */
public class InputNode extends Node<InputOperator> {
    private final ArrayList<SweepableReservoir> deferredInputConnections;
    protected SweepableReservoir controlTuples;
    private static final Logger logger = LoggerFactory.getLogger(InputNode.class);

    /* renamed from: com.datatorrent.stram.engine.InputNode$1, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/engine/InputNode$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$bufferserver$packet$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.BEGIN_WINDOW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.END_WINDOW.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.CHECKPOINT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.END_STREAM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public InputNode(InputOperator inputOperator, OperatorContext operatorContext) {
        super(inputOperator, operatorContext);
        this.deferredInputConnections = new ArrayList<>();
    }

    @Override // com.datatorrent.stram.engine.Node
    public void connectInputPort(String str, SweepableReservoir sweepableReservoir) {
        if (Node.INPUT.equals(str)) {
            if (this.controlTuples == null) {
                this.controlTuples = sweepableReservoir;
            } else {
                this.deferredInputConnections.add(sweepableReservoir);
            }
        }
    }

    @Override // java.lang.Runnable
    public final void run() {
        Throwable th;
        long intValue = ((Integer) this.context.getValue(OperatorContext.SPIN_MILLIS)).intValue();
        boolean z = this.operator instanceof Operator.IdleTimeHandler;
        boolean z2 = this.applicationWindowCount != 0;
        boolean z3 = false;
        boolean z4 = false;
        while (this.alive) {
            try {
                Tuple sweep = this.controlTuples.sweep();
                if (sweep != null) {
                    this.controlTuples.remove();
                    switch (AnonymousClass1.$SwitchMap$com$datatorrent$bufferserver$packet$MessageType[sweep.getType().ordinal()]) {
                        case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                            int length = this.sinks.length;
                            while (true) {
                                int i = length;
                                length--;
                                if (i <= 0) {
                                    this.controlTupleCount++;
                                    this.currentWindowId = sweep.getWindowId();
                                    z4 = true;
                                    if (this.applicationWindowCount == 0) {
                                        z2 = true;
                                        this.operator.beginWindow(this.currentWindowId);
                                    }
                                    this.operator.emitTuples();
                                    break;
                                } else {
                                    this.sinks[length].put(sweep);
                                }
                            }
                        case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                            this.endWindowEmitTime = System.currentTimeMillis();
                            z4 = false;
                            int i2 = this.applicationWindowCount + 1;
                            this.applicationWindowCount = i2;
                            if (i2 == this.APPLICATION_WINDOW_COUNT) {
                                z2 = false;
                                this.operator.endWindow();
                                this.applicationWindowCount = 0;
                            }
                            int length2 = this.sinks.length;
                            while (true) {
                                int i3 = length2;
                                length2--;
                                if (i3 <= 0) {
                                    this.controlTupleCount++;
                                    int i4 = this.checkpointWindowCount + 1;
                                    this.checkpointWindowCount = i4;
                                    if (i4 == this.CHECKPOINT_WINDOW_COUNT) {
                                        this.checkpointWindowCount = 0;
                                        if (z3) {
                                            checkpoint(this.currentWindowId);
                                            z3 = false;
                                        } else if (this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                                            checkpoint(this.currentWindowId);
                                        }
                                    }
                                    Stats.OperatorStats operatorStats = new Stats.OperatorStats();
                                    reportStats(operatorStats, this.currentWindowId);
                                    if (!z2) {
                                        operatorStats.metrics = collectMetrics();
                                    }
                                    handleRequests(this.currentWindowId);
                                    break;
                                } else {
                                    this.sinks[length2].put(sweep);
                                }
                            }
                        case MethodSignatureVisitor.VISIT_EXCEPTION /* 3 */:
                            if (this.checkpointWindowCount != 0 || this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                                z3 = true;
                            } else {
                                checkpoint(this.currentWindowId);
                            }
                            int length3 = this.sinks.length;
                            while (true) {
                                int i5 = length3;
                                length3--;
                                if (i5 <= 0) {
                                    this.controlTupleCount++;
                                    break;
                                } else {
                                    this.sinks[length3].put(sweep);
                                }
                            }
                            break;
                        case 4:
                            if (!this.deferredInputConnections.isEmpty()) {
                                this.controlTuples = this.deferredInputConnections.remove(0);
                                break;
                            } else {
                                int length4 = this.sinks.length;
                                while (true) {
                                    int i6 = length4;
                                    length4--;
                                    if (i6 <= 0) {
                                        this.controlTupleCount++;
                                        this.alive = false;
                                        break;
                                    } else {
                                        this.sinks[length4].put(sweep);
                                    }
                                }
                            }
                        default:
                            int length5 = this.sinks.length;
                            while (true) {
                                int i7 = length5;
                                length5--;
                                if (i7 <= 0) {
                                    this.controlTupleCount++;
                                    break;
                                } else {
                                    this.sinks[length5].put(sweep);
                                }
                            }
                    }
                } else if (z4) {
                    int i8 = 0;
                    for (Sink<Object> sink : this.sinks) {
                        i8 -= sink.getCount(false);
                    }
                    this.operator.emitTuples();
                    for (Sink<Object> sink2 : this.sinks) {
                        i8 += sink2.getCount(false);
                    }
                    if (i8 == 0) {
                        if (z) {
                            this.operator.handleIdleTime();
                        } else {
                            Thread.sleep(intValue);
                        }
                    }
                } else {
                    Thread.sleep(0L);
                }
            } catch (Operator.ShutdownException e) {
                logger.debug("Shutdown requested by the operator when alive = {}.", Boolean.valueOf(this.alive));
                this.alive = false;
            } catch (Throwable th2) {
                synchronized (this) {
                    if (this.alive) {
                        DTThrowable.rethrow(th2);
                    }
                    Throwable th3 = th2;
                    while (true) {
                        th = th3;
                        if (th != null && !(th instanceof InterruptedException)) {
                            th3 = th.getCause();
                        }
                    }
                    if (th == null) {
                        DTThrowable.rethrow(th2);
                    } else {
                        logger.debug("Ignoring InterruptedException after shutdown", th2);
                    }
                }
            }
        }
        if (z2) {
            this.endWindowEmitTime = System.currentTimeMillis();
            this.operator.endWindow();
            int i9 = this.applicationWindowCount + 1;
            this.applicationWindowCount = i9;
            if (i9 == this.APPLICATION_WINDOW_COUNT) {
                this.applicationWindowCount = 0;
            }
            int i10 = this.checkpointWindowCount + 1;
            this.checkpointWindowCount = i10;
            if (i10 == this.CHECKPOINT_WINDOW_COUNT) {
                this.checkpointWindowCount = 0;
                if (z3 || this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE) {
                    checkpoint(this.currentWindowId);
                }
            }
            Stats.OperatorStats operatorStats2 = new Stats.OperatorStats();
            reportStats(operatorStats2, this.currentWindowId);
            operatorStats2.metrics = collectMetrics();
            handleRequests(this.currentWindowId);
        }
    }
}
