package com.datatorrent.lib.io.jms;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.io.File;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.class */
public class JMSStringInputOperatorTest {

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static final transient Logger LOG = LoggerFactory.getLogger(JMSStringInputOperatorTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/jms/JMSStringInputOperatorTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        String baseDir;
        JMSStringInputOperator operator;
        CollectorTestSink<Object> sink;
        Context.OperatorContext context;
        JMSTestBase testBase;

        protected void starting(Description description) {
            this.testBase = new JMSTestBase();
            try {
                this.testBase.beforTest();
                this.baseDir = "target/" + description.getClassName() + "/" + description.getMethodName();
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
                defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.baseDir);
                this.context = new OperatorContextTestHelper.TestIdOperatorContext(1, defaultAttributeMap);
                this.operator = new JMSStringInputOperator();
                this.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
                this.sink = new CollectorTestSink<>();
                this.operator.output.setSink(this.sink);
                this.operator.setup(this.context);
                this.operator.activate(this.context);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            this.operator.deactivate();
            this.operator.teardown();
            try {
                FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
                this.testBase.afterTest();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void testStringMsgInput() throws Exception {
        produceMsg(10);
        Thread.sleep(1000L);
        this.testMeta.operator.emitTuples();
        Assert.assertEquals("num of messages", 10L, this.testMeta.sink.collectedTuples.size());
    }

    @Test
    public void testRecoveryAndIdempotency() throws Exception {
        produceMsg(100);
        Thread.sleep(1000L);
        this.testMeta.operator.beginWindow(1L);
        this.testMeta.operator.emitTuples();
        this.testMeta.operator.endWindow();
        this.testMeta.sink.collectedTuples.clear();
        this.testMeta.operator.setup(this.testMeta.context);
        this.testMeta.operator.activate(this.testMeta.context);
        Assert.assertEquals("largest recovery window", 1L, this.testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
        this.testMeta.operator.beginWindow(1L);
        this.testMeta.operator.endWindow();
        Assert.assertEquals("num of messages in window 1", 100L, this.testMeta.sink.collectedTuples.size());
        this.testMeta.sink.collectedTuples.clear();
    }

    @Test
    public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception {
        this.testMeta.operator = new JMSStringInputOperator() { // from class: com.datatorrent.lib.io.jms.JMSStringInputOperatorTest.1
            protected void acknowledge() throws JMSException {
                throw new RuntimeException("fail ack");
            }
        };
        this.testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
        this.testMeta.operator.setup(this.testMeta.context);
        this.testMeta.operator.activate(this.testMeta.context);
        produceMsg(10);
        Thread.sleep(1000L);
        this.testMeta.operator.beginWindow(1L);
        this.testMeta.operator.emitTuples();
        try {
            this.testMeta.operator.endWindow();
        } catch (Throwable th) {
            LOG.debug("ack failed");
        }
        this.testMeta.operator.setup(this.testMeta.context);
        this.testMeta.operator.activate(this.testMeta.context);
        Assert.assertEquals("window 1 should not exist", -1L, this.testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
    }

    private void produceMsg(int i) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory("vm://localhost").createConnection();
        createConnection.start();
        Session createSession = createConnection.createSession(false, 2);
        MessageProducer createProducer = createSession.createProducer(createSession.createQueue(JMSTransactionableStoreTestBase.SUBJECT));
        createProducer.setDeliveryMode(1);
        TextMessage createTextMessage = createSession.createTextMessage("Hello world! From tester producer");
        for (int i2 = 0; i2 < i; i2++) {
            createProducer.send(createTextMessage);
        }
        createSession.close();
        createConnection.close();
    }
}
