package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterBaseTest.class */
public class FileSplitterBaseTest {

    @Rule
    public BastTestMeta baseTestMeta = new BastTestMeta();
    private static final transient Logger LOG = LoggerFactory.getLogger(FileSplitterBaseTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterBaseTest$BastTestMeta.class */
    class BastTestMeta extends TestWatcher {
        String dataDirectory;
        FileSplitterBase fileSplitter;
        CollectorTestSink<AbstractFileSplitter.FileMetadata> fileMetadataSink;
        CollectorTestSink<BlockMetadata.FileBlockMetadata> blockMetadataSink;
        Set<String> filePaths;
        Context.OperatorContext context;

        BastTestMeta() {
        }

        protected void starting(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
            this.dataDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
            try {
                this.filePaths = FileSplitterInputTest.createData(this.dataDirectory);
                this.fileSplitter = new FileSplitterBase();
                this.fileSplitter.setFile(this.dataDirectory);
                Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
                defaultAttributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
                this.context = new OperatorContextTestHelper.TestIdOperatorContext(0, defaultAttributeMap);
                this.fileMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitter.filesMetadataOutput, this.fileMetadataSink);
                this.blockMetadataSink = new CollectorTestSink<>();
                TestUtils.setSink(this.fileSplitter.blocksMetadataOutput, this.blockMetadataSink);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        protected void finished(Description description) {
            TestUtils.deleteTargetTestClassFolder(description);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterBaseTest$MockFileInput.class */
    static class MockFileInput extends BaseOperator implements InputOperator {
        public final transient DefaultOutputPort<AbstractFileSplitter.FileInfo> files = new DefaultOutputPort<>();
        protected Set<String> filePaths;
        protected boolean done;

        MockFileInput() {
        }

        public void emitTuples() {
            if (this.done) {
                return;
            }
            this.done = true;
            Iterator<String> it = this.filePaths.iterator();
            while (it.hasNext()) {
                this.files.emit(new AbstractFileSplitter.FileInfo((String) null, it.next()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterBaseTest$MockReceiver.class */
    public static class MockReceiver extends BaseOperator implements StatsListener {

        @AutoMetric
        int count;
        transient CountDownLatch latch = new CountDownLatch(1);
        public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> fileMetadata = new DefaultInputPort<AbstractFileSplitter.FileMetadata>() { // from class: com.datatorrent.lib.io.fs.FileSplitterBaseTest.MockReceiver.1
            public void process(AbstractFileSplitter.FileMetadata fileMetadata) {
                MockReceiver.this.count++;
                FileSplitterBaseTest.LOG.debug("count {}", Integer.valueOf(MockReceiver.this.count));
            }
        };

        MockReceiver() {
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.count = ((Integer) ((Stats.OperatorStats) batchedOperatorStats.getLastWindowedStats().get(batchedOperatorStats.getLastWindowedStats().size() - 1)).metrics.get("count")).intValue();
            if (this.count != 12) {
                return null;
            }
            this.latch.countDown();
            return null;
        }
    }

    @ApplicationAnnotation(name = "TestApp")
    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterBaseTest$SplitterApp.class */
    class SplitterApp implements StreamingApplication {
        MockReceiver receiver;

        SplitterApp() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            dag.setAttribute(DAG.APPLICATION_PATH, FileSplitterBaseTest.this.baseTestMeta.dataDirectory);
            MockFileInput addOperator = dag.addOperator("Input", new MockFileInput());
            addOperator.filePaths = FileSplitterBaseTest.this.baseTestMeta.filePaths;
            FileSplitterBase addOperator2 = dag.addOperator("Splitter", new FileSplitterBase());
            addOperator2.setFile(FileSplitterBaseTest.this.baseTestMeta.dataDirectory);
            this.receiver = dag.addOperator("Receiver", new MockReceiver());
            dag.addStream("files", addOperator.files, addOperator2.input);
            dag.addStream("file-metadata", addOperator2.filesMetadataOutput, this.receiver.fileMetadata);
        }
    }

    @Test
    public void testFileMetadata() throws InterruptedException {
        this.baseTestMeta.fileSplitter.setup(this.baseTestMeta.context);
        this.baseTestMeta.fileSplitter.beginWindow(1L);
        Iterator<String> it = this.baseTestMeta.filePaths.iterator();
        while (it.hasNext()) {
            this.baseTestMeta.fileSplitter.input.process(new AbstractFileSplitter.FileInfo((String) null, it.next()));
        }
        this.baseTestMeta.fileSplitter.endWindow();
        Assert.assertEquals("File metadata", 12L, this.baseTestMeta.fileMetadataSink.collectedTuples.size());
        for (AbstractFileSplitter.FileMetadata fileMetadata : this.baseTestMeta.fileMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileMetadata.getFilePath(), this.baseTestMeta.filePaths.contains(fileMetadata.getFilePath()));
            Assert.assertNotNull("name: ", fileMetadata.getFileName());
        }
        this.baseTestMeta.fileMetadataSink.collectedTuples.clear();
        this.baseTestMeta.fileSplitter.teardown();
    }

    @Test
    public void testBlockMetadataNoSplit() throws InterruptedException {
        this.baseTestMeta.fileSplitter.setup(this.baseTestMeta.context);
        this.baseTestMeta.fileSplitter.beginWindow(1L);
        Iterator<String> it = this.baseTestMeta.filePaths.iterator();
        while (it.hasNext()) {
            this.baseTestMeta.fileSplitter.input.process(new AbstractFileSplitter.FileInfo((String) null, it.next()));
        }
        Assert.assertEquals("Blocks", 12L, this.baseTestMeta.blockMetadataSink.collectedTuples.size());
        for (BlockMetadata.FileBlockMetadata fileBlockMetadata : this.baseTestMeta.blockMetadataSink.collectedTuples) {
            Assert.assertTrue("path: " + fileBlockMetadata.getFilePath(), this.baseTestMeta.filePaths.contains(fileBlockMetadata.getFilePath()));
        }
        this.baseTestMeta.fileSplitter.teardown();
    }

    @Test
    public void testBlockMetadataWithSplit() throws InterruptedException {
        this.baseTestMeta.fileSplitter.setup(this.baseTestMeta.context);
        this.baseTestMeta.fileSplitter.setBlockSize(2L);
        this.baseTestMeta.fileSplitter.beginWindow(1L);
        Iterator<String> it = this.baseTestMeta.filePaths.iterator();
        while (it.hasNext()) {
            this.baseTestMeta.fileSplitter.input.process(new AbstractFileSplitter.FileInfo((String) null, it.next()));
        }
        Assert.assertEquals("Files", 12L, this.baseTestMeta.fileMetadataSink.collectedTuples.size());
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.baseTestMeta.dataDirectory, ((AbstractFileSplitter.FileMetadata) this.baseTestMeta.fileMetadataSink.collectedTuples.get(i2)).getFileName()).length() / 2.0d);
        }
        Assert.assertEquals("Blocks", i, this.baseTestMeta.blockMetadataSink.collectedTuples.size());
        this.baseTestMeta.fileSplitter.teardown();
    }

    @Test
    public void testBlocksThreshold() throws InterruptedException {
        this.baseTestMeta.fileSplitter.setup(this.baseTestMeta.context);
        int i = 0;
        for (int i2 = 0; i2 < 12; i2++) {
            i += (int) Math.ceil(new File(this.baseTestMeta.dataDirectory, "file" + i2 + ".txt").length() / 2.0d);
        }
        this.baseTestMeta.fileSplitter.setBlockSize(2L);
        this.baseTestMeta.fileSplitter.setBlocksThreshold(10);
        this.baseTestMeta.fileSplitter.beginWindow(1L);
        Iterator<String> it = this.baseTestMeta.filePaths.iterator();
        while (it.hasNext()) {
            this.baseTestMeta.fileSplitter.input.process(new AbstractFileSplitter.FileInfo((String) null, it.next()));
        }
        this.baseTestMeta.fileSplitter.endWindow();
        Assert.assertEquals("Blocks", 10L, this.baseTestMeta.blockMetadataSink.collectedTuples.size());
        for (int i3 = 2; i3 < 8; i3++) {
            this.baseTestMeta.fileSplitter.beginWindow(i3);
            this.baseTestMeta.fileSplitter.handleIdleTime();
            this.baseTestMeta.fileSplitter.endWindow();
        }
        Assert.assertEquals("Files", 12L, this.baseTestMeta.fileMetadataSink.collectedTuples.size());
        Assert.assertEquals("Blocks", i, this.baseTestMeta.blockMetadataSink.collectedTuples.size());
        this.baseTestMeta.fileSplitter.teardown();
    }

    @Test
    public void testSplitterInApp() throws Exception {
        LocalMode newInstance = LocalMode.newInstance();
        SplitterApp splitterApp = new SplitterApp();
        newInstance.prepareDAG(splitterApp, new Configuration());
        newInstance.cloneDAG();
        LocalMode.Controller controller = newInstance.getController();
        controller.runAsync();
        splitterApp.receiver.latch.await();
        Assert.assertEquals("no. of metadata", 12L, splitterApp.receiver.count);
        controller.shutdown();
    }
}
