package com.datatorrent.contrib.rabbitmq;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.netlet.util.DTThrowable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.class */
public abstract class AbstractRabbitMQInputOperator<T> implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);

    @NotNull
    protected String host;
    protected int port;

    @NotNull
    protected String exchange;

    @NotNull
    protected String exchangeType;
    protected String queueName;
    protected transient ConnectionFactory connFactory;
    private static final int DEFAULT_BLAST_SIZE = 1000;
    private static final int DEFAULT_BUFFER_SIZE = 1048576;
    protected transient Connection connection;
    protected transient Channel channel;
    protected transient AbstractRabbitMQInputOperator<T>.TracingConsumer tracingConsumer;
    protected transient String cTag;
    protected transient ArrayBlockingQueue<KeyValPair<Long, byte[]>> holdingBuffer;
    private transient long currentWindowId;
    private transient int operatorContextId;
    protected String routingKey = "";
    private int tuple_blast = 1000;
    protected int bufferSize = DEFAULT_BUFFER_SIZE;
    protected final transient Map<Long, byte[]> currentWindowRecoveryState = new HashMap();
    private final transient Set<Long> pendingAck = new HashSet();
    private final transient Set<Long> recoveredTags = new HashSet();
    private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();

    /* loaded from: input_file:com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator$TracingConsumer.class */
    public class TracingConsumer extends DefaultConsumer {
        public TracingConsumer(Channel channel) {
            super(channel);
        }

        public void handleConsumeOk(String str) {
            AbstractRabbitMQInputOperator.logger.debug(this + ".handleConsumeOk(" + str + ")");
            super.handleConsumeOk(str);
        }

        public void handleCancelOk(String str) {
            AbstractRabbitMQInputOperator.logger.debug(this + ".handleCancelOk(" + str + ")");
            super.handleCancelOk(str);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            AbstractRabbitMQInputOperator.logger.debug(this + ".handleShutdownSignal(" + str + ", " + shutdownSignalException + ")");
            super.handleShutdownSignal(str, shutdownSignalException);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            long deliveryTag = envelope.getDeliveryTag();
            if (envelope.isRedeliver() && (AbstractRabbitMQInputOperator.this.recoveredTags.contains(Long.valueOf(deliveryTag)) || AbstractRabbitMQInputOperator.this.pendingAck.contains(Long.valueOf(deliveryTag)))) {
                if (AbstractRabbitMQInputOperator.this.recoveredTags.contains(Long.valueOf(deliveryTag))) {
                    AbstractRabbitMQInputOperator.this.pendingAck.add(Long.valueOf(deliveryTag));
                }
            } else {
                AbstractRabbitMQInputOperator.this.pendingAck.add(Long.valueOf(deliveryTag));
                AbstractRabbitMQInputOperator.this.holdingBuffer.add(new KeyValPair<>(Long.valueOf(deliveryTag), bArr));
                AbstractRabbitMQInputOperator.logger.debug("Received Async message: {}  buffersize: {} ", new String(bArr), Integer.valueOf(AbstractRabbitMQInputOperator.this.holdingBuffer.size()));
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void emitTuples() {
        int i = this.tuple_blast;
        if (i > this.holdingBuffer.size()) {
            i = this.holdingBuffer.size();
        }
        int i2 = i;
        while (true) {
            int i3 = i2;
            i2--;
            if (i3 <= 0) {
                return;
            }
            KeyValPair<Long, byte[]> poll = this.holdingBuffer.poll();
            this.currentWindowRecoveryState.put(poll.getKey(), poll.getValue());
            emitTuple((byte[]) poll.getValue());
        }
    }

    public abstract void emitTuple(byte[] bArr);

    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (j <= this.windowDataManager.getLargestCompletedWindow()) {
            replay(j);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void replay(long j) {
        try {
            Map map = (Map) this.windowDataManager.retrieve(j);
            if (map == null) {
                return;
            }
            for (Map.Entry entry : map.entrySet()) {
                this.recoveredTags.add(entry.getKey());
                emitTuple((byte[]) entry.getValue());
            }
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void endWindow() {
        while (true) {
            KeyValPair<Long, byte[]> poll = this.holdingBuffer.poll();
            if (poll != null) {
                this.currentWindowRecoveryState.put(poll.getKey(), poll.getValue());
                emitTuple((byte[]) poll.getValue());
            } else {
                try {
                    break;
                } catch (IOException e) {
                    DTThrowable.rethrow(e);
                }
            }
        }
        this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
        this.currentWindowRecoveryState.clear();
        Iterator<Long> it = this.pendingAck.iterator();
        while (it.hasNext()) {
            try {
                this.channel.basicAck(it.next().longValue(), false);
            } catch (IOException e2) {
                DTThrowable.rethrow(e2);
            }
        }
        this.pendingAck.clear();
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.operatorContextId = operatorContext.getId();
        this.holdingBuffer = new ArrayBlockingQueue<>(this.bufferSize);
        this.windowDataManager.setup(operatorContext);
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    public void activate(Context.OperatorContext operatorContext) {
        try {
            this.connFactory = new ConnectionFactory();
            this.connFactory.setHost(this.host);
            if (this.port != 0) {
                this.connFactory.setPort(this.port);
            }
            this.connection = this.connFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.exchangeDeclare(this.exchange, this.exchangeType);
            boolean z = false;
            if (this.queueName == null) {
                this.queueName = this.channel.queueDeclare().getQueue();
                z = true;
            } else {
                this.channel.queueDeclare(this.queueName, true, false, false, (Map) null);
            }
            this.channel.queueBind(this.queueName, this.exchange, this.routingKey);
            this.tracingConsumer = new TracingConsumer(this.channel);
            this.cTag = this.channel.basicConsume(this.queueName, false, this.tracingConsumer);
            if (z) {
                this.queueName = null;
            }
        } catch (IOException e) {
            throw new RuntimeException("Connection Failure", e);
        }
    }

    public void deactivate() {
        try {
            this.channel.close();
            this.connection.close();
        } catch (IOException e) {
            logger.debug(e.toString());
        }
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public void setTupleBlast(int i) {
        this.tuple_blast = i;
    }

    public String getHost() {
        return this.host;
    }

    public void setHost(String str) {
        this.host = str;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setExchange(String str) {
        this.exchange = str;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void setQueueName(String str) {
        this.queueName = str;
    }

    public String getExchangeType() {
        return this.exchangeType;
    }

    public void setExchangeType(String str) {
        this.exchangeType = str;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public void setRoutingKey(String str) {
        this.routingKey = str;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }
}
