package com.datatorrent.lib.io.fs;

import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.stream.DevNull;
import com.datatorrent.netlet.util.Slice;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/FSInputModuleAppTest.class */
public class FSInputModuleAppTest {
    private String inputDir;
    static String outputDir;
    private StreamingApplication app;
    private static final String FILE_1 = "file1.txt";
    private static final String FILE_2 = "file2.txt";
    private static final String FILE_1_DATA = "File one data";
    private static final String FILE_2_DATA = "File two data. This has more data hence more blocks.";
    static final String OUT_DATA_FILE = "fileData.txt";
    static final String OUT_METADATA_FILE = "fileMetaData.txt";

    @Rule
    public TestMeta testMeta = new TestMeta();
    private static Logger LOG = LoggerFactory.getLogger(FSInputModuleAppTest.class);

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FSInputModuleAppTest$Application.class */
    private static class Application implements StreamingApplication {
        private Application() {
        }

        public void populateDAG(DAG dag, Configuration configuration) {
            FSInputModule addModule = dag.addModule("hdfsInputModule", FSInputModule.class);
            MetadataWriter metadataWriter = new MetadataWriter(FSInputModuleAppTest.OUT_METADATA_FILE);
            metadataWriter.setFilePath(FSInputModuleAppTest.outputDir);
            dag.addOperator("FileMetadataWriter", metadataWriter);
            HDFSFileWriter hDFSFileWriter = new HDFSFileWriter(FSInputModuleAppTest.OUT_DATA_FILE);
            hDFSFileWriter.setFilePath(FSInputModuleAppTest.outputDir);
            dag.addOperator("FileDataWriter", hDFSFileWriter);
            DevNull addOperator = dag.addOperator("devNull", DevNull.class);
            dag.addStream("FileMetaData", addModule.filesMetadataOutput, ((AbstractFileOutputOperator) metadataWriter).input);
            dag.addStream("data", addModule.messages, ((AbstractFileOutputOperator) hDFSFileWriter).input);
            dag.addStream("blockMetadata", addModule.blocksMetadataOutput, addOperator.data);
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FSInputModuleAppTest$HDFSFileWriter.class */
    private static class HDFSFileWriter extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>> {
        String fileName;

        private HDFSFileWriter() {
        }

        public HDFSFileWriter(String str) {
            this.fileName = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getFileName(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
            return this.fileName;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> readerRecord) {
            return ((Slice) readerRecord.getRecord()).buffer;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FSInputModuleAppTest$MetadataWriter.class */
    private static class MetadataWriter extends AbstractFileOutputOperator<AbstractFileSplitter.FileMetadata> {
        String fileName;

        private MetadataWriter() {
        }

        public MetadataWriter(String str) {
            this.fileName = str;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getFileName(AbstractFileSplitter.FileMetadata fileMetadata) {
            return this.fileName;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public byte[] getBytesForTuple(AbstractFileSplitter.FileMetadata fileMetadata) {
            return fileMetadata.toString().getBytes();
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FSInputModuleAppTest$TestMeta.class */
    public static class TestMeta extends TestWatcher {
        public String baseDirectory;

        protected void starting(Description description) {
            this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName();
        }
    }

    @Before
    public void setup() throws Exception {
        this.inputDir = this.testMeta.baseDirectory + File.separator + "input";
        outputDir = this.testMeta.baseDirectory + File.separator + "output";
        FileUtils.writeStringToFile(new File(this.inputDir + File.separator + FILE_1), FILE_1_DATA);
        FileUtils.writeStringToFile(new File(this.inputDir + File.separator + FILE_2), FILE_2_DATA);
        FileUtils.forceMkdir(new File(this.inputDir + File.separator + "dir"));
        FileUtils.writeStringToFile(new File(this.inputDir + File.separator + "dir/inner.txt"), FILE_1_DATA);
    }

    @After
    public void tearDown() throws IOException {
        FileUtils.deleteDirectory(new File(this.inputDir));
    }

    @Test
    public void testApplication() throws Exception {
        this.app = new Application();
        Configuration configuration = new Configuration(false);
        configuration.set("dt.operator.hdfsInputModule.prop.files", this.inputDir);
        configuration.set("dt.operator.hdfsInputModule.prop.blockSize", "10");
        configuration.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000");
        LocalMode newInstance = LocalMode.newInstance();
        newInstance.prepareDAG(this.app, configuration);
        LocalMode.Controller controller = newInstance.getController();
        controller.setHeartbeatMonitoringEnabled(true);
        controller.runAsync();
        long currentTimeMillis = System.currentTimeMillis();
        Path path = new Path("file://" + new File(outputDir).getAbsolutePath());
        FileSystem newInstance2 = FileSystem.newInstance(path.toUri(), new Configuration());
        while (!newInstance2.exists(path) && System.currentTimeMillis() - currentTimeMillis < 20000) {
            Thread.sleep(500L);
            LOG.debug("Waiting for {}", path);
        }
        Thread.sleep(10000L);
        controller.shutdown();
        Assert.assertTrue("output dir does not exist", newInstance2.exists(path));
        File file = new File(outputDir);
        WildcardFileFilter wildcardFileFilter = new WildcardFileFilter("fileMetaData.txt*");
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter), "[fileName=file1.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/file1.txt]");
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter), "[fileName=file2.txt, numberOfBlocks=6, isDirectory=false, relativePath=input/file2.txt]");
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter), "[fileName=dir, numberOfBlocks=0, isDirectory=true, relativePath=input/dir]");
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter), "[fileName=inner.txt, numberOfBlocks=2, isDirectory=false, relativePath=input/dir/inner.txt]");
        WildcardFileFilter wildcardFileFilter2 = new WildcardFileFilter("fileData.txt*");
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter2), FILE_1_DATA);
        verifyFileContents(file.listFiles((FileFilter) wildcardFileFilter2), FILE_2_DATA);
    }

    private void verifyFileContents(File[] fileArr, String str) throws IOException {
        StringBuilder sb = new StringBuilder();
        for (File file : fileArr) {
            sb.append(FileUtils.readFileToString(file));
        }
        Assert.assertTrue("File data doesn't contain expected text", sb.indexOf(str) > -1);
    }
}
