package com.datatorrent.lib.io.fs;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.testbench.RandomWordGenerator;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconcilerTest.class */
public class AbstractReconcilerTest {

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconcilerTest$TestDag.class */
    public static class TestDag implements StreamingApplication {
        public void populateDAG(DAG dag, Configuration configuration) {
            RandomWordGenerator addOperator = dag.addOperator("words", new RandomWordGenerator());
            TestReconciler addOperator2 = dag.addOperator("synchronizer", new TestReconciler());
            addOperator.setTuplesPerWindow(10);
            dag.addStream("toWriter", addOperator.output, addOperator2.input);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconcilerTest$TestMeta.class */
    public static class TestMeta {
        long windowid;
        String data;
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconcilerTest$TestReconciler.class */
    public static class TestReconciler extends AbstractReconciler<byte[], TestMeta> {
        private long currentCommittedWindow;
        private StringBuilder windowData = new StringBuilder();

        /* JADX INFO: Access modifiers changed from: protected */
        public void processTuple(byte[] bArr) {
            if (this.windowData.length() > 0) {
                this.windowData.append(",");
            }
            this.windowData.append(bArr);
        }

        public void endWindow() {
            TestMeta testMeta = new TestMeta();
            testMeta.windowid = this.currentWindowId;
            testMeta.data = this.windowData.toString();
            enqueueForProcessing(testMeta);
            this.windowData.setLength(0);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processCommittedData(TestMeta testMeta) {
            Assert.assertTrue("processing window id should not be greater than commited window id", testMeta.windowid <= this.currentCommittedWindow);
        }

        public void committed(long j) {
            this.currentCommittedWindow = j;
            super.committed(j);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractReconcilerTest$TestReconciler1.class */
    public static class TestReconciler1 extends AbstractReconciler<String, String> {
        public transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
        private List<String> emitData = new ArrayList();

        public void beginWindow(long j) {
            Iterator<String> it = this.emitData.iterator();
            while (it.hasNext()) {
                this.outputPort.emit(it.next());
            }
            this.emitData.clear();
            super.beginWindow(j);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processTuple(String str) {
            enqueueForProcessing(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void processCommittedData(String str) {
            this.emitData.add(str);
        }
    }

    @Test
    public void testReconciler() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        newInstance.prepareDAG(new TestDag(), new Configuration());
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(false);
        controller.run(15000L);
        controller.shutdown();
    }

    @Test
    public void testOperator() throws Exception {
        CollectorTestSink collectorTestSink = new CollectorTestSink();
        TestReconciler1 testReconciler1 = new TestReconciler1();
        ArrayList newArrayList = Lists.newArrayList();
        testReconciler1.outputPort.setSink(collectorTestSink);
        testReconciler1.setup(null);
        int i = 0 + 1;
        testReconciler1.beginWindow(0);
        testReconciler1.input.process("a");
        testReconciler1.input.process("b");
        testReconciler1.endWindow();
        int i2 = i + 1;
        testReconciler1.beginWindow(i);
        testReconciler1.input.process("c");
        testReconciler1.input.process("d");
        testReconciler1.endWindow();
        testReconciler1.committed(0L);
        Thread.sleep(500L);
        int i3 = i2 + 1;
        testReconciler1.beginWindow(i2);
        testReconciler1.input.process("e");
        testReconciler1.input.process("f");
        testReconciler1.endWindow();
        int i4 = i3 + 1;
        testReconciler1.beginWindow(i3);
        testReconciler1.input.process("g");
        testReconciler1.input.process("h");
        testReconciler1.endWindow();
        newArrayList.add("a");
        newArrayList.add("b");
        Assert.assertEquals(newArrayList, collectorTestSink.collectedTuples);
        newArrayList.clear();
        collectorTestSink.collectedTuples.clear();
        testReconciler1.committed(2L);
        Thread.sleep(500L);
        int i5 = i4 + 1;
        testReconciler1.beginWindow(i4);
        testReconciler1.endWindow();
        newArrayList.add("c");
        newArrayList.add("d");
        newArrayList.add("e");
        newArrayList.add("f");
        Assert.assertEquals(newArrayList, collectorTestSink.collectedTuples);
        testReconciler1.teardown();
    }
}
