package com.datatorrent.lib.io.jms;

import java.io.IOException;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/io/jms/JMSTransactionableStore.class */
public class JMSTransactionableStore extends JMSBaseTransactionableStore {
    private static final Logger logger = LoggerFactory.getLogger(JMSTransactionableStore.class);
    private transient MessageProducer producer;
    private transient MessageConsumer consumer;
    private transient boolean connected = false;
    private transient boolean inTransaction = false;

    @Override // com.datatorrent.lib.db.TransactionableStore
    public long getCommittedWindowId(String str, int i) {
        logger.debug("Getting committed windowId appId {} operatorId {}", str, Integer.valueOf(i));
        try {
            beginTransaction();
            BytesMessage receive = this.consumer.receive();
            logger.debug("Retrieved committed window message id {}", receive.getJMSMessageID());
            long readLong = receive.readLong();
            BytesMessage createBytesMessage = getBase().getSession().createBytesMessage();
            createBytesMessage.writeLong(readLong);
            this.producer.send(createBytesMessage);
            commitTransaction();
            logger.debug("Retrieved windowId {}", Long.valueOf(readLong));
            return readLong;
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.TransactionableStore
    public void storeCommittedWindowId(String str, int i, long j) {
        if (!this.inTransaction) {
            throw new RuntimeException("This should be called while you are in an existing transaction");
        }
        logger.debug("storing window appId {} operatorId {} windowId {}", new Object[]{str, Integer.valueOf(i), Long.valueOf(j)});
        try {
            removeCommittedWindowId(str, i);
            BytesMessage createBytesMessage = getBase().getSession().createBytesMessage();
            createBytesMessage.writeLong(j);
            this.producer.send(createBytesMessage);
            logger.debug("Retrieved committed window message id {}", createBytesMessage.getJMSMessageID());
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.TransactionableStore
    public void removeCommittedWindowId(String str, int i) {
        try {
            this.consumer.receive();
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void beginTransaction() {
        logger.debug("beginning transaction");
        if (this.inTransaction) {
            throw new RuntimeException("Cannot start a transaction twice.");
        }
        this.inTransaction = true;
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void commitTransaction() {
        logger.debug("committing transaction.");
        if (!this.inTransaction) {
            throw new RuntimeException("Cannot commit a transaction if you are not in one.");
        }
        try {
            getBase().getSession().commit();
            this.inTransaction = false;
            logger.debug("finished committing transaction.");
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public void rollbackTransaction() {
        try {
            getBase().getSession().rollback();
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Transactionable
    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override // com.datatorrent.lib.db.Connectable
    public void connect() throws IOException {
        logger.debug("Entering connect. is in transaction: {}", Boolean.valueOf(this.inTransaction));
        try {
            String queueName = getQueueName(getAppId(), getOperatorId());
            logger.debug("Base is null: {}", Boolean.valueOf(getBase() == null));
            if (getBase() != null) {
                logger.debug("Session is null: {}", Boolean.valueOf(getBase().getSession() == null));
            }
            Queue createQueue = getBase().getSession().createQueue(queueName);
            try {
                boolean hasMoreElements = getBase().getSession().createBrowser(createQueue).getEnumeration().hasMoreElements();
                this.producer = getBase().getSession().createProducer(createQueue);
                this.consumer = getBase().getSession().createConsumer(createQueue);
                this.connected = true;
                logger.debug("Connected. is in transaction: {}", Boolean.valueOf(this.inTransaction));
                if (!hasMoreElements) {
                    beginTransaction();
                    BytesMessage createBytesMessage = getBase().getSession().createBytesMessage();
                    createBytesMessage.writeLong(-1L);
                    this.producer.send(createBytesMessage);
                    commitTransaction();
                }
                logger.debug("Exiting connect. is in transaction: {}", Boolean.valueOf(this.inTransaction));
            } catch (JMSException e) {
                throw new RuntimeException((Throwable) e);
            }
        } catch (JMSException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    @Override // com.datatorrent.lib.db.Connectable
    public void disconnect() throws IOException {
        logger.debug("disconnectiong");
        try {
            this.producer.close();
            this.consumer.close();
            this.inTransaction = false;
            this.connected = false;
            logger.debug("done disconnectiong");
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.datatorrent.lib.db.Connectable
    public boolean isConnected() {
        return this.connected;
    }

    private String getQueueName(String str, int i) {
        return str + "-" + i;
    }
}
