package com.datatorrent.contrib.zmq;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.contrib.helper.SourceModule;
import com.datatorrent.contrib.memcache.MemcachePOJOOperatorTest;
import com.datatorrent.netlet.util.DTThrowable;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.class */
public class ZeroMQOutputOperatorTest {
    protected static Logger logger = LoggerFactory.getLogger(ZeroMQOutputOperatorTest.class);

    @Test
    public void testDag() throws Exception {
        runTest(3);
        logger.debug("end of test");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v23, types: [com.datatorrent.contrib.zmq.ZeroMQOutputOperatorTest$1] */
    public void runTest(final int i) {
        LocalMode newInstance = LocalMode.newInstance();
        DAG dag = newInstance.getDAG();
        SourceModule addOperator = dag.addOperator("source", new SourceModule());
        addOperator.setTestNum(i);
        ZeroMQOutputOperator addOperator2 = dag.addOperator("generator", new ZeroMQOutputOperator());
        addOperator2.setUrl("tcp://*:5556");
        addOperator2.setSyncUrl("tcp://*:5557");
        addOperator2.setSUBSCRIBERS_EXPECTED(1);
        dag.addStream("Stream", addOperator.outPort, addOperator2.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
        final LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        final ZeroMQMessageReceiver zeroMQMessageReceiver = new ZeroMQMessageReceiver();
        zeroMQMessageReceiver.setup();
        final Thread thread = new Thread(zeroMQMessageReceiver);
        thread.start();
        new Thread("LocalClusterController") { // from class: com.datatorrent.contrib.zmq.ZeroMQOutputOperatorTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    try {
                        Thread.sleep(1000L);
                        long currentTimeMillis = System.currentTimeMillis();
                        while (System.currentTimeMillis() - currentTimeMillis < 10000 && zeroMQMessageReceiver.count < i * 3) {
                            Thread.sleep(10L);
                        }
                        ZeroMQOutputOperatorTest.logger.debug("done...");
                        controller.shutdown();
                        try {
                            try {
                                Thread.sleep(1000L);
                                thread.interrupt();
                                zeroMQMessageReceiver.teardown();
                            } catch (Throwable th) {
                                thread.interrupt();
                                zeroMQMessageReceiver.teardown();
                                throw th;
                            }
                        } catch (InterruptedException e) {
                            DTThrowable.rethrow(e);
                        }
                    } catch (Throwable th2) {
                        ZeroMQOutputOperatorTest.logger.debug("done...");
                        controller.shutdown();
                        try {
                            try {
                                Thread.sleep(1000L);
                                thread.interrupt();
                                zeroMQMessageReceiver.teardown();
                            } catch (InterruptedException e2) {
                                DTThrowable.rethrow(e2);
                                thread.interrupt();
                                zeroMQMessageReceiver.teardown();
                            }
                            throw th2;
                        } catch (Throwable th3) {
                            thread.interrupt();
                            zeroMQMessageReceiver.teardown();
                            throw th3;
                        }
                    }
                } catch (InterruptedException e3) {
                    DTThrowable.rethrow(e3);
                    ZeroMQOutputOperatorTest.logger.debug("done...");
                    controller.shutdown();
                    try {
                        try {
                            Thread.sleep(1000L);
                            thread.interrupt();
                            zeroMQMessageReceiver.teardown();
                        } catch (InterruptedException e4) {
                            DTThrowable.rethrow(e4);
                            thread.interrupt();
                            zeroMQMessageReceiver.teardown();
                        }
                    } finally {
                        thread.interrupt();
                        zeroMQMessageReceiver.teardown();
                    }
                }
            }
        }.start();
        controller.run();
        Assert.assertEquals("emitted value for testNum was ", i * 3, zeroMQMessageReceiver.count);
        for (Map.Entry<String, Integer> entry : zeroMQMessageReceiver.dataMap.entrySet()) {
            if (entry.getKey().equals("a")) {
                Assert.assertEquals("emitted value for 'a' was ", new Integer(2), entry.getValue());
            } else if (entry.getKey().equals("b")) {
                Assert.assertEquals("emitted value for 'b' was ", new Integer(20), entry.getValue());
            } else if (entry.getKey().equals("c")) {
                Assert.assertEquals("emitted value for 'c' was ", new Integer(MemcachePOJOOperatorTest.TUPLE_SIZE), entry.getValue());
            }
        }
    }
}
