package com.datatorrent.contrib.zmq;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.SimpleSinglePortInputOperator;
import org.zeromq.ZMQ;

/* loaded from: input_file:com/datatorrent/contrib/zmq/SimpleSinglePortZeroMQPullInputOperator.class */
public abstract class SimpleSinglePortZeroMQPullInputOperator<T> extends SimpleSinglePortInputOperator<T> implements Runnable {
    private transient ZMQ.Context context;
    private transient ZMQ.Socket sock;
    private String zmqAddress;

    private SimpleSinglePortZeroMQPullInputOperator() {
        this.zmqAddress = "tcp://127.0.0.1:5555";
    }

    public SimpleSinglePortZeroMQPullInputOperator(String str) {
        this.zmqAddress = "tcp://127.0.0.1:5555";
        this.zmqAddress = str;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            byte[] recv = this.sock.recv(0);
            if (recv != null) {
                this.outputPort.emit(convertFromBytesToTuple(recv));
            }
        }
    }

    protected abstract T convertFromBytesToTuple(byte[] bArr);

    public void setup(Context.OperatorContext operatorContext) {
        this.context = ZMQ.context(1);
        this.sock = this.context.socket(7);
        this.sock.connect(this.zmqAddress);
    }

    public void teardown() {
        this.sock.close();
        this.context.term();
    }
}
