package com.datatorrent.contrib.enrich;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.aerospike.AerospikeTestUtils;
import com.datatorrent.contrib.enrich.JDBCLoaderTest;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/enrich/POJOEnricherTest.class */
public class POJOEnricherTest extends JDBCLoaderTest {

    /* loaded from: input_file:com/datatorrent/contrib/enrich/POJOEnricherTest$EnrichApplication.class */
    public static class EnrichApplication implements StreamingApplication {
        private final JDBCLoaderTest.TestMeta testMeta;
        BackendLoader loader;

        public EnrichApplication(JDBCLoaderTest.TestMeta testMeta) {
            this.testMeta = testMeta;
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            RandomPOJOGenerator addOperator = dag.addOperator("Input", RandomPOJOGenerator.class);
            POJOEnricher addOperator2 = dag.addOperator("Enrich", POJOEnricher.class);
            EnrichVerifier addOperator3 = dag.addOperator("Verify", EnrichVerifier.class);
            addOperator3.address = this.testMeta.address;
            addOperator3.age = this.testMeta.age;
            addOperator3.names = this.testMeta.name;
            addOperator3.salary = this.testMeta.salary;
            addOperator2.setStore(this.loader);
            ArrayList arrayList = new ArrayList();
            arrayList.add(AerospikeTestUtils.TestPOJO.ID);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add("NAME");
            arrayList2.add("AGE");
            arrayList2.add("ADDRESS");
            arrayList2.add("SALARY");
            addOperator2.setLookupFields(arrayList);
            addOperator2.setIncludeFields(arrayList2);
            dag.getMeta(addOperator2).getMeta(addOperator2.input).getAttributes().put(Context.PortContext.TUPLE_CLASS, Order.class);
            dag.getMeta(addOperator2).getMeta(addOperator2.output).getAttributes().put(Context.PortContext.TUPLE_CLASS, EmployeeOrder.class);
            dag.addStream("S1", addOperator.output, addOperator2.input);
            dag.addStream("S2", addOperator2.output, addOperator3.input);
        }

        public void setLoader(BackendLoader backendLoader) {
            this.loader = backendLoader;
        }
    }

    /* loaded from: input_file:com/datatorrent/contrib/enrich/POJOEnricherTest$EnrichVerifier.class */
    public static class EnrichVerifier extends BaseOperator {
        private static final Logger logger = LoggerFactory.getLogger(EnrichVerifier.class);
        String[] names;
        int[] age;
        String[] address;
        double[] salary;
        private transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: com.datatorrent.contrib.enrich.POJOEnricherTest.EnrichVerifier.1
            public void process(Object obj) {
                Assert.assertTrue(obj instanceof EmployeeOrder);
                EmployeeOrder employeeOrder = (EmployeeOrder) obj;
                int id = employeeOrder.getID();
                Assert.assertTrue(id >= 1 && id <= 4);
                Assert.assertEquals(1L, employeeOrder.getOID());
                Assert.assertEquals(100.0d, employeeOrder.getAmount(), 0.0d);
                Assert.assertEquals(EnrichVerifier.this.names[id - 1], employeeOrder.getNAME());
                Assert.assertEquals(EnrichVerifier.this.age[id - 1], employeeOrder.getAGE());
                Assert.assertEquals(EnrichVerifier.this.address[id - 1], employeeOrder.getADDRESS());
                Assert.assertEquals(EnrichVerifier.this.salary[id - 1], employeeOrder.getSALARY(), 0.0d);
            }
        };
    }

    /* loaded from: input_file:com/datatorrent/contrib/enrich/POJOEnricherTest$RandomPOJOGenerator.class */
    public static class RandomPOJOGenerator implements InputOperator {
        public transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
        private int idx = 0;
        private boolean emit = true;

        public void emitTuples() {
            if (this.emit) {
                int i = this.idx;
                int i2 = this.idx;
                this.idx = i2 + 1;
                this.idx = i + (i2 % 4);
                this.output.emit(new Order(1, this.idx + 1, 100.0d));
                if (this.idx == 3) {
                    this.emit = false;
                }
            }
        }

        public void beginWindow(long j) {
            this.emit = true;
        }

        public void endWindow() {
        }

        public void setup(Context.OperatorContext operatorContext) {
        }

        public void teardown() {
        }
    }

    @Test
    public void includeSelectedKeys() {
        POJOEnricher pOJOEnricher = new POJOEnricher();
        pOJOEnricher.setStore(this.testMeta.dbloader);
        pOJOEnricher.setLookupFields(Arrays.asList(AerospikeTestUtils.TestPOJO.ID));
        pOJOEnricher.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS"));
        pOJOEnricher.outputClass = EmployeeOrder.class;
        pOJOEnricher.inputClass = Order.class;
        pOJOEnricher.setup((Context.OperatorContext) null);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(pOJOEnricher.output, collectorTestSink);
        pOJOEnricher.activate((Context) null);
        pOJOEnricher.beginWindow(1L);
        pOJOEnricher.input.process(new Order(3, 4, 700.0d));
        pOJOEnricher.endWindow();
        pOJOEnricher.deactivate();
        Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=0.0}", collectorTestSink.collectedTuples.get(0).toString());
    }

    @Test
    public void includeAllKeys() {
        POJOEnricher pOJOEnricher = new POJOEnricher();
        pOJOEnricher.setStore(this.testMeta.dbloader);
        pOJOEnricher.setLookupFields(Arrays.asList(AerospikeTestUtils.TestPOJO.ID));
        pOJOEnricher.setIncludeFields(Arrays.asList("NAME", "AGE", "ADDRESS", "SALARY"));
        pOJOEnricher.outputClass = EmployeeOrder.class;
        pOJOEnricher.inputClass = Order.class;
        pOJOEnricher.setup((Context.OperatorContext) null);
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestUtils.setSink(pOJOEnricher.output, collectorTestSink);
        pOJOEnricher.activate((Context) null);
        pOJOEnricher.beginWindow(1L);
        pOJOEnricher.input.process(new Order(3, 4, 700.0d));
        pOJOEnricher.endWindow();
        pOJOEnricher.deactivate();
        Assert.assertEquals("includeSelectedKeys: Number of tuples emitted: ", 1L, collectorTestSink.collectedTuples.size());
        Assert.assertEquals("Ouput Tuple: ", "{OID=3, ID=4, amount=700.0, NAME='Mark', AGE=25, ADDRESS='Rich-Mond', SALARY=65000.0}", collectorTestSink.collectedTuples.get(0).toString());
    }

    @Test
    public void testApplication() throws Exception {
        EnrichApplication enrichApplication = new EnrichApplication(this.testMeta);
        enrichApplication.setLoader(this.testMeta.dbloader);
        LocalMode newInstance = LocalMode.newInstance();
        newInstance.prepareDAG(enrichApplication, new Configuration(false));
        newInstance.getController().run(10000L);
    }
}
