package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.counters.BasicCounters;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow = false)
/* loaded from: input_file:com/datatorrent/lib/io/jms/AbstractJMSInputOperator.class */
public abstract class AbstractJMSInputOperator<T> extends JMSBase implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, MessageListener, ExceptionListener, Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener {
    protected static final int DEFAULT_BUFFER_SIZE = 10240;
    private String consumerName;
    protected transient Message lastMsg;
    private transient MessageProducer replyProducer;
    private transient MessageConsumer consumer;
    private transient Context.OperatorContext context;
    private transient long spinMillis;
    protected transient long currentWindowId;
    protected transient int emitCount;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJMSInputOperator.class);

    @Min(1)
    protected int bufferSize = DEFAULT_BUFFER_SIZE;
    public final transient DefaultOutputPort<T> output = new DefaultOutputPort<>();

    @NotNull
    private final BasicCounters<MutableLong> counters = new BasicCounters<>(MutableLong.class);
    private final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
    private final transient Set<String> pendingAck = Sets.newHashSet();

    @NotNull
    protected WindowDataManager windowDataManager = new FSWindowDataManager();
    private final transient Lock lock = new Lock();
    protected final transient Map<String, T> currentWindowRecoveryState = Maps.newLinkedHashMap();
    protected transient ArrayBlockingQueue<Message> holdingBuffer = new ArrayBlockingQueue<Message>(this.bufferSize) { // from class: com.datatorrent.lib.io.jms.AbstractJMSInputOperator.1
        private static final long serialVersionUID = 201411151139L;

        @Override // java.util.concurrent.ArrayBlockingQueue, java.util.AbstractQueue, java.util.AbstractCollection, java.util.Collection, java.util.Queue, java.util.concurrent.BlockingQueue
        public boolean add(Message message) {
            boolean z;
            synchronized (AbstractJMSInputOperator.this.lock) {
                try {
                    if (AbstractJMSInputOperator.this.messageConsumed(message)) {
                        z = super.add((AnonymousClass1) message);
                    }
                } catch (JMSException e) {
                    AbstractJMSInputOperator.LOG.error("message consumption", e);
                    AbstractJMSInputOperator.this.throwable.set(e);
                    throw new RuntimeException((Throwable) e);
                }
            }
            return z;
        }
    };

    /* loaded from: input_file:com/datatorrent/lib/io/jms/AbstractJMSInputOperator$CounterKeys.class */
    public enum CounterKeys {
        RECEIVED,
        REDELIVERED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/io/jms/AbstractJMSInputOperator$Lock.class */
    public static class Lock {
        private Lock() {
        }
    }

    public final void onMessage(Message message) {
        this.holdingBuffer.add(message);
        sendReply(message);
    }

    protected void sendReply(Message message) {
        try {
            if (message.getJMSReplyTo() != null) {
                this.replyProducer.send(message.getJMSReplyTo(), getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
            }
        } catch (JMSException e) {
            LOG.error(e.getLocalizedMessage());
            this.throwable.set(e);
            throw new RuntimeException(e);
        }
    }

    public void onException(JMSException jMSException) {
        cleanup();
        LOG.error(jMSException.getLocalizedMessage());
        this.throwable.set(jMSException);
        throw new RuntimeException((Throwable) jMSException);
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.context = operatorContext;
        this.spinMillis = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        this.counters.setCounter(CounterKeys.RECEIVED, new MutableLong());
        this.counters.setCounter(CounterKeys.REDELIVERED, new MutableLong());
        this.windowDataManager.setup(operatorContext);
    }

    protected boolean messageConsumed(Message message) throws JMSException {
        if (message.getJMSRedelivered() && this.pendingAck.contains(message.getJMSMessageID())) {
            this.counters.getCounter(CounterKeys.REDELIVERED).increment();
            LOG.warn("IGNORING: Redelivered Message {}", message.getJMSMessageID());
            return false;
        }
        this.pendingAck.add(message.getJMSMessageID());
        MutableLong counter = this.counters.getCounter(CounterKeys.RECEIVED);
        counter.increment();
        LOG.debug("message id: {} buffer size: {} received: {}", new Object[]{message.getJMSMessageID(), Integer.valueOf(this.holdingBuffer.size()), Long.valueOf(counter.longValue())});
        return true;
    }

    public void activate(Context.OperatorContext operatorContext) {
        try {
            super.createConnection();
            this.replyProducer = getSession().createProducer((Destination) null);
            this.consumer = (isDurable() && isTopic()) ? getSession().createDurableSubscriber(getDestination(), this.consumerName) : getSession().createConsumer(getDestination());
            this.consumer.setMessageListener(this);
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (j <= this.windowDataManager.getLargestCompletedWindow()) {
            replay(j);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void replay(long j) {
        try {
            Map map = (Map) this.windowDataManager.retrieve(j);
            if (map == null) {
                return;
            }
            for (Map.Entry entry : map.entrySet()) {
                this.pendingAck.add(entry.getKey());
                emit(entry.getValue());
            }
        } catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void emitTuples() {
        Message poll;
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        while (this.emitCount < this.bufferSize && (poll = this.holdingBuffer.poll()) != null) {
            processMessage(poll);
            this.emitCount++;
            this.lastMsg = poll;
        }
    }

    protected void processMessage(Message message) {
        try {
            T convert = convert(message);
            if (convert != null) {
                this.currentWindowRecoveryState.put(message.getJMSMessageID(), convert);
                emit(convert);
            }
        } catch (JMSException e) {
            throw new RuntimeException("processing msg", e);
        }
    }

    public void handleIdleTime() {
        Throwable th = this.throwable.get();
        if (th != null) {
            Throwables.propagate(th);
            return;
        }
        try {
            Thread.sleep(this.spinMillis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            synchronized (this.lock) {
                while (true) {
                    try {
                        Message poll = this.holdingBuffer.poll();
                        if (poll == null) {
                            break;
                        }
                        processMessage(poll);
                        this.emitCount++;
                        this.lastMsg = poll;
                    } catch (Throwable th) {
                        Throwables.propagate(th);
                    }
                }
                this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
                this.currentWindowRecoveryState.clear();
                if (this.lastMsg != null) {
                    acknowledge();
                }
                this.pendingAck.clear();
            }
            this.emitCount = 0;
        } else if (this.currentWindowId < this.windowDataManager.getLargestCompletedWindow()) {
            this.pendingAck.clear();
        }
        this.context.setCounters(this.counters);
    }

    protected void acknowledge() throws JMSException {
        if (isTransacted()) {
            getSession().commit();
        } else if (getSessionAckMode(getAckMode()) == 2) {
            this.lastMsg.acknowledge();
        }
    }

    public void beforeCheckpoint(long j) {
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException("committing", e);
        }
    }

    public void deactivate() {
        cleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.jms.JMSBase
    public void cleanup() {
        try {
            this.consumer.setMessageListener((MessageListener) null);
            this.replyProducer.close();
            this.replyProducer = null;
            this.consumer.close();
            this.consumer = null;
            super.cleanup();
        } catch (JMSException e) {
            throw new RuntimeException("at cleanup", e);
        }
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    protected abstract T convert(Message message) throws JMSException;

    public int getBufferSize() {
        return this.bufferSize;
    }

    public void setBufferSize(int i) {
        this.bufferSize = i;
    }

    public String getConsumerName() {
        return this.consumerName;
    }

    public void setConsumerName(String str) {
        this.consumerName = str;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setTransacted(boolean z) {
        this.transacted = z;
    }

    protected abstract void emit(T t);
}
