/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.rabbitmq;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.netlet.util.DTThrowable;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRabbitMQInputOperator<T>
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
    @NotNull
    protected String host;
    protected int port;
    @NotNull
    protected String exchange;
    @NotNull
    protected String exchangeType;
    protected String routingKey = "";
    protected String queueName;
    protected transient ConnectionFactory connFactory;
    private static final int DEFAULT_BLAST_SIZE = 1000;
    private static final int DEFAULT_BUFFER_SIZE = 0x100000;
    private int tuple_blast = 1000;
    protected int bufferSize = 0x100000;
    protected transient Connection connection;
    protected transient Channel channel;
    protected transient TracingConsumer tracingConsumer;
    protected transient String cTag;
    protected transient ArrayBlockingQueue<KeyValPair<Long, byte[]>> holdingBuffer;
    private WindowDataManager windowDataManager;
    protected final transient Map<Long, byte[]> currentWindowRecoveryState = new HashMap<Long, byte[]>();
    private final transient Set<Long> pendingAck = new HashSet<Long>();
    private final transient Set<Long> recoveredTags = new HashSet<Long>();
    private transient long currentWindowId;
    private transient int operatorContextId;

    public AbstractRabbitMQInputOperator() {
        this.windowDataManager = new WindowDataManager.NoopWindowDataManager();
    }

    public void emitTuples() {
        int ntuples = this.tuple_blast;
        if (ntuples > this.holdingBuffer.size()) {
            ntuples = this.holdingBuffer.size();
        }
        int i = ntuples;
        while (i-- > 0) {
            KeyValPair<Long, byte[]> message = this.holdingBuffer.poll();
            this.currentWindowRecoveryState.put((Long)message.getKey(), (byte[])message.getValue());
            this.emitTuple((byte[])message.getValue());
        }
    }

    public abstract void emitTuple(byte[] var1);

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestCompletedWindow()) {
            this.replay(windowId);
        }
    }

    private void replay(long windowId) {
        try {
            Map recoveredData = (Map)this.windowDataManager.retrieve(windowId);
            if (recoveredData == null) {
                return;
            }
            for (Map.Entry recoveredEntry : recoveredData.entrySet()) {
                this.recoveredTags.add((Long)recoveredEntry.getKey());
                this.emitTuple((byte[])recoveredEntry.getValue());
            }
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
    }

    public void endWindow() {
        KeyValPair<Long, byte[]> message;
        while ((message = this.holdingBuffer.poll()) != null) {
            this.currentWindowRecoveryState.put((Long)message.getKey(), (byte[])message.getValue());
            this.emitTuple((byte[])message.getValue());
        }
        try {
            this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
        this.currentWindowRecoveryState.clear();
        for (Long deliveryTag : this.pendingAck) {
            try {
                this.channel.basicAck(deliveryTag.longValue(), false);
            }
            catch (IOException e) {
                DTThrowable.rethrow((Exception)e);
            }
        }
        this.pendingAck.clear();
    }

    public void setup(Context.OperatorContext context) {
        this.operatorContextId = context.getId();
        this.holdingBuffer = new ArrayBlockingQueue(this.bufferSize);
        this.windowDataManager.setup((Context)context);
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    public void activate(Context.OperatorContext ctx) {
        try {
            this.connFactory = new ConnectionFactory();
            this.connFactory.setHost(this.host);
            if (this.port != 0) {
                this.connFactory.setPort(this.port);
            }
            this.connection = this.connFactory.newConnection();
            this.channel = this.connection.createChannel();
            this.channel.exchangeDeclare(this.exchange, this.exchangeType);
            boolean resetQueueName = false;
            if (this.queueName == null) {
                this.queueName = this.channel.queueDeclare().getQueue();
                resetQueueName = true;
            } else {
                this.channel.queueDeclare(this.queueName, true, false, false, null);
            }
            this.channel.queueBind(this.queueName, this.exchange, this.routingKey);
            this.tracingConsumer = new TracingConsumer(this.channel);
            this.cTag = this.channel.basicConsume(this.queueName, false, (Consumer)this.tracingConsumer);
            if (resetQueueName) {
                this.queueName = null;
            }
        }
        catch (IOException ex) {
            throw new RuntimeException("Connection Failure", ex);
        }
    }

    public void deactivate() {
        try {
            this.channel.close();
            this.connection.close();
        }
        catch (IOException ex) {
            logger.debug(ex.toString());
        }
    }

    public void checkpointed(long windowId) {
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public void setTupleBlast(int i) {
        this.tuple_blast = i;
    }

    public String getHost() {
        return this.host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getExchange() {
        return this.exchange;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }

    public String getExchangeType() {
        return this.exchangeType;
    }

    public void setExchangeType(String exchangeType) {
        this.exchangeType = exchangeType;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public class TracingConsumer
    extends DefaultConsumer {
        public TracingConsumer(Channel ch) {
            super(ch);
        }

        public void handleConsumeOk(String c) {
            logger.debug((Object)((Object)this) + ".handleConsumeOk(" + c + ")");
            super.handleConsumeOk(c);
        }

        public void handleCancelOk(String c) {
            logger.debug((Object)((Object)this) + ".handleCancelOk(" + c + ")");
            super.handleCancelOk(c);
        }

        public void handleShutdownSignal(String c, ShutdownSignalException sig) {
            logger.debug((Object)((Object)this) + ".handleShutdownSignal(" + c + ", " + sig + ")");
            super.handleShutdownSignal(c, sig);
        }

        public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            long tag = envelope.getDeliveryTag();
            if (envelope.isRedeliver() && (AbstractRabbitMQInputOperator.this.recoveredTags.contains(tag) || AbstractRabbitMQInputOperator.this.pendingAck.contains(tag))) {
                if (AbstractRabbitMQInputOperator.this.recoveredTags.contains(tag)) {
                    AbstractRabbitMQInputOperator.this.pendingAck.add(tag);
                }
                return;
            }
            AbstractRabbitMQInputOperator.this.pendingAck.add(tag);
            AbstractRabbitMQInputOperator.this.holdingBuffer.add((KeyValPair<Long, byte[]>)new KeyValPair((Object)tag, (Object)body));
            logger.debug("Received Async message: {}  buffersize: {} ", (Object)new String(body), (Object)AbstractRabbitMQInputOperator.this.holdingBuffer.size());
        }
    }
}

