package org.apache.apex.malhar.lib.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Module;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.FileSplitterInput;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.apache.apex.malhar.lib.fs.FSRecordReader;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderModule.class */
public class FSRecordReaderModule implements Module {

    @NotNull
    @Size(min = 1)
    private String files;
    private String filePatternRegularExp;

    @Min(1)
    private long scanIntervalMillis = 5000;
    private boolean recursive = true;
    private boolean sequentialFileRead = false;

    @Min(1)
    private int readersCount = 1;

    @Min(1)
    protected int blocksThreshold = 1;
    public final transient Module.ProxyOutputPort<byte[]> records = new Module.ProxyOutputPort<>();
    private FSRecordReader.RECORD_READER_MODE mode = FSRecordReader.RECORD_READER_MODE.DELIMITED_RECORD;
    private int recordLength;

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/FSRecordReaderModule$SequentialFileBlockMetadataCodec.class */
    public static class SequentialFileBlockMetadataCodec extends KryoSerializableStreamCodec<BlockMetadata.FileBlockMetadata> {
        @Override // com.datatorrent.lib.codec.KryoSerializableStreamCodec
        public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata) {
            return fileBlockMetadata.hashCode();
        }
    }

    public FileSplitterInput createFileSplitter() {
        return new FileSplitterInput();
    }

    public FSRecordReader createRecordReader() {
        FSRecordReader fSRecordReader = new FSRecordReader();
        fSRecordReader.setMode(this.mode);
        fSRecordReader.setRecordLength(this.recordLength);
        return fSRecordReader;
    }

    public void populateDAG(DAG dag, Configuration configuration) {
        FileSplitterInput addOperator = dag.addOperator("FileSplitter", createFileSplitter());
        FSRecordReader addOperator2 = dag.addOperator("BlockReader", createRecordReader());
        dag.addStream("BlockMetadata", addOperator.blocksMetadataOutput, addOperator2.blocksMetadataInput);
        if (this.sequentialFileRead) {
            dag.setInputPortAttribute(addOperator2.blocksMetadataInput, Context.PortContext.STREAM_CODEC, new SequentialFileBlockMetadataCodec());
        }
        FileSplitterInput.TimeBasedDirectoryScanner scanner = addOperator.getScanner();
        scanner.setFiles(this.files);
        if (this.scanIntervalMillis != 0) {
            scanner.setScanIntervalMillis(this.scanIntervalMillis);
        }
        scanner.setRecursive(this.recursive);
        if (this.filePatternRegularExp != null) {
            addOperator.getScanner().setFilePatternRegularExp(this.filePatternRegularExp);
        }
        addOperator2.setBasePath(this.files);
        if (this.readersCount != 0) {
            dag.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(this.readersCount));
        }
        addOperator.setBlocksThreshold(this.blocksThreshold);
        this.records.set(addOperator2.records);
    }

    public void setFiles(String str) {
        this.files = str;
    }

    public String getFiles() {
        return this.files;
    }

    public String getFilePatternRegularExp() {
        return this.filePatternRegularExp;
    }

    public void setFilePatternRegularExp(String str) {
        this.filePatternRegularExp = str;
    }

    public long getScanIntervalMillis() {
        return this.scanIntervalMillis;
    }

    public void setScanIntervalMillis(long j) {
        this.scanIntervalMillis = j;
    }

    public boolean isRecursive() {
        return this.recursive;
    }

    public void setRecursive(boolean z) {
        this.recursive = z;
    }

    public int getReadersCount() {
        return this.readersCount;
    }

    public void setReadersCount(int i) {
        this.readersCount = i;
    }

    public boolean isSequentialFileRead() {
        return this.sequentialFileRead;
    }

    public void setSequentialFileRead(boolean z) {
        this.sequentialFileRead = z;
    }

    public void setBlocksThreshold(int i) {
        this.blocksThreshold = i;
    }

    public int getBlocksThreshold() {
        return this.blocksThreshold;
    }

    public FSRecordReader.RECORD_READER_MODE getMode() {
        return this.mode;
    }

    public void setMode(FSRecordReader.RECORD_READER_MODE record_reader_mode) {
        this.mode = record_reader_mode;
    }

    public int getRecordLength() {
        return this.recordLength;
    }

    public void setRecordLength(int i) {
        this.recordLength = i;
    }
}
