package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/io/SimpleSinglePortInputOperator.class */
public abstract class SimpleSinglePortInputOperator<T> extends BaseOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext> {
    private transient Thread ioThread;
    private transient boolean isActive;
    public final transient BufferingOutputPort<T> outputPort;

    /* loaded from: input_file:com/datatorrent/lib/io/SimpleSinglePortInputOperator$BufferingOutputPort.class */
    public static class BufferingOutputPort<T> extends DefaultOutputPort<T> {
        public final ArrayBlockingQueue<T> tuples;

        public BufferingOutputPort(Operator operator) {
            this.tuples = new ArrayBlockingQueue<>(1024);
        }

        public BufferingOutputPort(Operator operator, int i) {
            this.tuples = new ArrayBlockingQueue<>(i);
        }

        public void emit(T t) {
            try {
                this.tuples.put(t);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void flush(int i) {
            Iterator<T> it = this.tuples.iterator();
            while (true) {
                int i2 = i;
                i--;
                if (i2 <= 0 || !it.hasNext()) {
                    return;
                }
                super.emit(it.next());
                it.remove();
            }
        }
    }

    public SimpleSinglePortInputOperator(int i) {
        this.isActive = false;
        this.outputPort = new BufferingOutputPort<>(this, i);
    }

    public SimpleSinglePortInputOperator() {
        this(1024);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public final void activate(Context.OperatorContext operatorContext) {
        this.isActive = true;
        if (this instanceof Runnable) {
            this.ioThread = new Thread((Runnable) this, "io-" + ClassUtils.getShortClassName(getClass()));
            this.ioThread.start();
        }
    }

    public final void deactivate() {
        this.isActive = false;
        if (this.ioThread != null) {
            this.ioThread.interrupt();
        }
    }

    public final boolean isActive() {
        return this.isActive;
    }

    public void emitTuples() {
        this.outputPort.flush(Integer.MAX_VALUE);
    }
}
