package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/jms/AbstractJMSOutputOperator.class */
public abstract class AbstractJMSOutputOperator extends JMSBase implements Operator {
    private static final Logger logger = LoggerFactory.getLogger(AbstractJMSOutputOperator.class);
    private transient String appId;
    private transient int operatorId;
    private transient long committedWindowId;
    private long currentWindowId;
    private Operator.ProcessingMode mode;
    private transient MessageProducer producer;
    private List<Object> tupleBatch = Lists.newArrayList();
    private List<Message> messageBatch = Lists.newArrayList();
    protected JMSBaseTransactionableStore store = new JMSTransactionableStore();

    public void setup(Context.OperatorContext operatorContext) {
        this.appId = (String) operatorContext.getValue(DAG.APPLICATION_ID);
        this.operatorId = operatorContext.getId();
        logger.debug("Application Id {} operatorId {}", this.appId, Integer.valueOf(this.operatorId));
        this.store.setBase(this);
        this.store.setAppId(this.appId);
        this.store.setOperatorId(this.operatorId);
        this.transacted = this.store.isTransactable();
        try {
            createConnection();
            logger.debug("Session is null {}:", Boolean.valueOf(getSession() == null));
            try {
                this.store.connect();
                logger.debug("Done connecting store.");
                this.mode = (Operator.ProcessingMode) operatorContext.getValue(Context.OperatorContext.PROCESSING_MODE);
                if (this.mode == Operator.ProcessingMode.AT_MOST_ONCE) {
                    this.tupleBatch.clear();
                }
                Iterator<Object> it = this.tupleBatch.iterator();
                while (it.hasNext()) {
                    this.messageBatch.add(createMessage(it.next()));
                }
                this.committedWindowId = this.store.getCommittedWindowId(this.appId, this.operatorId);
                logger.debug("committedWindowId {}", Long.valueOf(this.committedWindowId));
                logger.debug("End of setup store in transaction: {}", Boolean.valueOf(this.store.isInTransaction()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (JMSException e2) {
            logger.debug(e2.getLocalizedMessage());
            throw new RuntimeException((Throwable) e2);
        }
    }

    public void teardown() {
        this.tupleBatch.clear();
        this.messageBatch.clear();
        logger.debug("beginning teardown");
        try {
            this.store.disconnect();
            cleanup();
            logger.debug("ending teardown");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        this.store.beginTransaction();
        logger.debug("Transaction started for window {}", Long.valueOf(j));
    }

    public void endWindow() {
        logger.debug("Ending window {}", Long.valueOf(this.currentWindowId));
        if (this.store.isExactlyOnce()) {
            if (this.committedWindowId < this.currentWindowId) {
                this.store.storeCommittedWindowId(this.appId, this.operatorId, this.currentWindowId);
                this.committedWindowId = this.currentWindowId;
            }
            flushBatch();
            this.store.commitTransaction();
        } else {
            flushBatch();
            this.store.commitTransaction();
            if (this.committedWindowId < this.currentWindowId) {
                this.store.storeCommittedWindowId(this.appId, this.operatorId, this.currentWindowId);
                this.committedWindowId = this.currentWindowId;
            }
        }
        logger.debug("done ending window {}", Long.valueOf(this.currentWindowId));
    }

    protected void flushBatch() {
        logger.debug("flushing batch, batch size {}", Integer.valueOf(this.tupleBatch.size()));
        Iterator<Message> it = this.messageBatch.iterator();
        while (it.hasNext()) {
            try {
                this.producer.send(it.next());
            } catch (JMSException e) {
                throw new RuntimeException((Throwable) e);
            }
        }
        this.tupleBatch.clear();
        this.messageBatch.clear();
        logger.debug("done flushing batch");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendMessage(Object obj) {
        if (this.currentWindowId <= this.committedWindowId) {
            return;
        }
        this.tupleBatch.add(obj);
        this.messageBatch.add(createMessage(obj));
        if (this.tupleBatch.size() >= getBatch()) {
            flushBatch();
        }
    }

    public void setStore(JMSBaseTransactionableStore jMSBaseTransactionableStore) {
        this.store = jMSBaseTransactionableStore;
    }

    public JMSBaseTransactionableStore getStore() {
        return this.store;
    }

    @Override // com.datatorrent.lib.io.jms.JMSBase
    public void cleanup() {
        try {
            this.producer.close();
            this.producer = null;
            super.cleanup();
        } catch (JMSException e) {
            logger.error((String) null, e);
        }
    }

    @Override // com.datatorrent.lib.io.jms.JMSBase
    public void createConnection() throws JMSException {
        super.createConnection();
        this.producer = getSession().createProducer(getDestination());
    }

    protected abstract Message createMessage(Object obj);
}
