package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow = false)
/* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInput.class */
public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener {
    protected transient LinkedList<ScannedFileInfo> currentWindowRecoveryState;
    private Map<String, Map<String, Long>> referenceTimes;
    private transient long sleepMillis;
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);

    @NotNull
    private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();

    @NotNull
    @Valid
    private TimeBasedDirectoryScanner scanner = new TimeBasedDirectoryScanner();

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInput$ScannedFileInfo.class */
    public static class ScannedFileInfo extends AbstractFileSplitter.FileInfo {
        protected final long modifiedTime;

        protected ScannedFileInfo() {
            this.modifiedTime = -1L;
        }

        public ScannedFileInfo(@Nullable String str, @NotNull String str2, long j) {
            super(str, str2);
            this.modifiedTime = j;
        }

        public long getModifiedTime() {
            return this.modifiedTime;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/FileSplitterInput$TimeBasedDirectoryScanner.class */
    public static class TimeBasedDirectoryScanner implements Runnable, Component<Context.OperatorContext> {
        private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
        private static String FILE_BEING_COPIED = "_COPYING_";
        private volatile transient boolean trigger;
        private String filePatternRegularExp;
        private String ignoreFilePatternRegularExp;
        protected transient long lastScanMillis;
        protected transient FileSystem fs;
        protected transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
        protected transient ExecutorService scanService;
        protected transient AtomicReference<Throwable> atomicThrowable;
        private volatile transient boolean running;
        protected transient HashSet<String> ignoredFiles;
        protected transient Pattern regex;
        private transient Pattern ignoreRegex;
        protected transient long sleepMillis;
        protected transient Map<String, Map<String, Long>> referenceTimes;
        private transient ScannedFileInfo lastScannedInfo;
        private transient int numDiscoveredPerIteration;
        private boolean recursive = true;

        @NotNull
        @Size(min = 1)
        private final Set<String> files = new LinkedHashSet();

        @Min(0)
        private long scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;

        public void setup(Context.OperatorContext operatorContext) {
            this.scanService = Executors.newSingleThreadExecutor();
            this.discoveredFiles = new LinkedBlockingDeque<>();
            this.atomicThrowable = new AtomicReference<>();
            this.ignoredFiles = Sets.newHashSet();
            this.sleepMillis = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
            if (this.filePatternRegularExp != null) {
                this.regex = Pattern.compile(this.filePatternRegularExp);
            }
            if (this.ignoreFilePatternRegularExp != null) {
                this.ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
            }
            try {
                this.fs = getFSInstance();
            } catch (IOException e) {
                throw new RuntimeException("opening fs", e);
            }
        }

        protected void startScanning(Map<String, Map<String, Long>> map) {
            this.referenceTimes = (Map) Preconditions.checkNotNull(map);
            this.scanService.submit(this);
        }

        protected void stopScanning() {
            this.running = false;
        }

        public void teardown() {
            stopScanning();
            this.scanService.shutdownNow();
            try {
                this.fs.close();
            } catch (IOException e) {
                throw new RuntimeException("closing fs", e);
            }
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.newInstance(new Path(this.files.iterator().next()).toUri(), new Configuration());
        }

        @Override // java.lang.Runnable
        public void run() {
            this.running = true;
            while (this.running) {
                try {
                    if ((this.trigger || System.currentTimeMillis() - this.scanIntervalMillis >= this.lastScanMillis) && isIterationCompleted()) {
                        this.trigger = false;
                        this.lastScannedInfo = null;
                        this.numDiscoveredPerIteration = 0;
                        Iterator<String> it = this.files.iterator();
                        while (it.hasNext()) {
                            Path path = new Path(it.next());
                            FileSplitterInput.LOG.debug("Scan started for input {}", path);
                            Map<String, Long> map = null;
                            if (this.fs.exists(path)) {
                                map = this.referenceTimes.get(this.fs.getFileStatus(path).getPath().toUri().getPath());
                            }
                            scan(path, null, map);
                        }
                        scanIterationComplete();
                    } else {
                        Thread.sleep(this.sleepMillis);
                    }
                } catch (Throwable th) {
                    FileSplitterInput.LOG.error("service", th);
                    this.running = false;
                    this.atomicThrowable.set(th);
                    DTThrowable.rethrow(th);
                    return;
                }
            }
        }

        private boolean isIterationCompleted() {
            if (this.lastScannedInfo == null) {
                return true;
            }
            Map<String, Long> map = this.referenceTimes.get(this.lastScannedInfo.getDirectoryPath());
            return (map == null || map.get(this.lastScannedInfo.getFilePath()) == null) ? false : true;
        }

        protected void scanIterationComplete() {
            FileSplitterInput.LOG.debug("scan complete {} {}", Long.valueOf(this.lastScanMillis), Integer.valueOf(this.numDiscoveredPerIteration));
            this.lastScanMillis = System.currentTimeMillis();
        }

        protected void scan(@NotNull Path path, Path path2) {
            scan(path, path2, this.referenceTimes.get(path.toUri().getPath()));
        }

        private void scan(Path path, Path path2, Map<String, Long> map) {
            try {
                FileStatus fileStatus = this.fs.getFileStatus(path);
                String path3 = path.toUri().getPath();
                FileSplitterInput.LOG.debug("scan {}", path3);
                FileStatus[] listStatus = this.fs.listStatus(path);
                if (listStatus.length == 0 && path2 == null && (map == null || map.get(path3) == null)) {
                    processDiscoveredFile(new ScannedFileInfo(null, path.toString(), fileStatus.getModificationTime()));
                }
                for (FileStatus fileStatus2 : listStatus) {
                    Path path4 = fileStatus2.getPath();
                    String path5 = path4.toUri().getPath();
                    if (fileStatus2.isDirectory() && isRecursive()) {
                        addToDiscoveredFiles(path2, fileStatus, fileStatus2, map);
                        scan(path4, path2 == null ? fileStatus.getPath() : path2, map);
                    } else if (acceptFile(path5)) {
                        addToDiscoveredFiles(path2, fileStatus, fileStatus2, map);
                    } else {
                        this.ignoredFiles.add(path5);
                    }
                }
            } catch (FileNotFoundException e) {
                FileSplitterInput.LOG.warn("Failed to list directory {}", path, e);
            } catch (IOException e2) {
                throw new RuntimeException("listing files", e2);
            }
        }

        private void addToDiscoveredFiles(Path path, FileStatus fileStatus, FileStatus fileStatus2, Map<String, Long> map) throws IOException {
            Path path2 = fileStatus2.getPath();
            String path3 = path2.toUri().getPath();
            Long l = null;
            if (map != null) {
                l = map.get(path3);
            }
            if (skipFile(path2, Long.valueOf(fileStatus2.getModificationTime()), l)) {
                return;
            }
            if ((!fileStatus2.isDirectory() || l == null) && !this.ignoredFiles.contains(path3)) {
                ScannedFileInfo createScannedFileInfo = createScannedFileInfo(fileStatus.getPath(), fileStatus, path2, fileStatus2, path);
                FileSplitterInput.LOG.debug("Processing file: " + createScannedFileInfo.getFilePath());
                processDiscoveredFile(createScannedFileInfo);
            }
        }

        protected void processDiscoveredFile(ScannedFileInfo scannedFileInfo) {
            this.numDiscoveredPerIteration++;
            this.lastScannedInfo = scannedFileInfo;
            this.discoveredFiles.add(scannedFileInfo);
        }

        protected ScannedFileInfo createScannedFileInfo(Path path, FileStatus fileStatus, Path path2, FileStatus fileStatus2, Path path3) {
            ScannedFileInfo scannedFileInfo;
            if (path3 == null) {
                scannedFileInfo = fileStatus.isDirectory() ? new ScannedFileInfo(path.toUri().getPath(), path2.getName(), fileStatus2.getModificationTime()) : new ScannedFileInfo(null, path2.toUri().getPath(), fileStatus2.getModificationTime());
            } else {
                scannedFileInfo = new ScannedFileInfo(path3.toUri().getPath(), path3.toUri().relativize(path2.toUri()).getPath(), fileStatus2.getModificationTime());
            }
            return scannedFileInfo;
        }

        protected static boolean skipFile(@NotNull Path path, @NotNull Long l, Long l2) throws IOException {
            return l2 != null && l.longValue() <= l2.longValue();
        }

        protected boolean acceptFile(String str) {
            if (this.fs.getScheme().equalsIgnoreCase("hdfs") && str.endsWith(FILE_BEING_COPIED)) {
                return false;
            }
            if (this.regex == null || this.regex.matcher(str).matches()) {
                return this.ignoreRegex == null || !this.ignoreRegex.matcher(str).matches();
            }
            return false;
        }

        public AbstractFileSplitter.FileInfo pollFile() {
            return this.discoveredFiles.poll();
        }

        protected int getNumDiscoveredPerIteration() {
            return this.numDiscoveredPerIteration;
        }

        public String getFilePatternRegularExp() {
            return this.filePatternRegularExp;
        }

        public void setFilePatternRegularExp(String str) {
            this.filePatternRegularExp = str;
        }

        public String getIgnoreFilePatternRegularExp() {
            return this.ignoreFilePatternRegularExp;
        }

        public void setIgnoreFilePatternRegularExp(String str) {
            this.ignoreFilePatternRegularExp = str;
        }

        public void setFiles(String str) {
            Iterables.addAll(this.files, Splitter.on(",").omitEmptyStrings().split(str));
        }

        public String getFiles() {
            return Joiner.on(",").join(this.files);
        }

        public void setRecursive(boolean z) {
            this.recursive = z;
        }

        public boolean isRecursive() {
            return this.recursive;
        }

        public void setTrigger(boolean z) {
            this.trigger = z;
        }

        public boolean isTrigger() {
            return this.trigger;
        }

        public long getScanIntervalMillis() {
            return this.scanIntervalMillis;
        }

        public void setScanIntervalMillis(long j) {
            this.scanIntervalMillis = j;
        }
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    public void setup(Context.OperatorContext operatorContext) {
        this.currentWindowRecoveryState = Lists.newLinkedList();
        if (this.referenceTimes == null) {
            this.referenceTimes = Maps.newHashMap();
        }
        this.sleepMillis = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        this.scanner.setup(operatorContext);
        this.windowDataManager.setup(operatorContext);
        super.setup(operatorContext);
        long largestCompletedWindow = this.windowDataManager.getLargestCompletedWindow();
        if (largestCompletedWindow == -1 || ((Long) operatorContext.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue() > largestCompletedWindow) {
            this.scanner.startScanning(Collections.unmodifiableMap(this.referenceTimes));
        }
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    public void beginWindow(long j) {
        super.beginWindow(j);
        if (j <= this.windowDataManager.getLargestCompletedWindow()) {
            replay(j);
        }
    }

    protected void replay(long j) {
        try {
            LinkedList linkedList = (LinkedList) this.windowDataManager.retrieve(j);
            if (linkedList == null) {
                return;
            }
            if (this.blockMetadataIterator != null) {
                emitBlockMetadata();
            }
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                ScannedFileInfo scannedFileInfo = (ScannedFileInfo) it.next();
                updateReferenceTimes(scannedFileInfo);
                AbstractFileSplitter.FileMetadata buildFileMetadata = buildFileMetadata(scannedFileInfo);
                this.filesMetadataOutput.emit(buildFileMetadata);
                this.blockMetadataIterator = new AbstractFileSplitter.BlockMetadataIterator(this, buildFileMetadata, this.blockSize.longValue());
                if (!emitBlockMetadata()) {
                    break;
                }
            }
            if (j == this.windowDataManager.getLargestCompletedWindow()) {
                this.scanner.startScanning(Collections.unmodifiableMap(this.referenceTimes));
            }
        } catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        Throwable th = this.scanner.atomicThrowable.get();
        if (th != null) {
            DTThrowable.rethrow(th);
        }
        if (this.blockMetadataIterator == null && this.scanner.discoveredFiles.isEmpty()) {
            try {
                Thread.sleep(this.sleepMillis);
            } catch (InterruptedException e) {
                throw new RuntimeException("waiting for work", e);
            }
        }
        process();
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    protected AbstractFileSplitter.FileInfo getFileInfo() {
        return this.scanner.pollFile();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    public boolean processFileInfo(AbstractFileSplitter.FileInfo fileInfo) {
        ScannedFileInfo scannedFileInfo = (ScannedFileInfo) fileInfo;
        this.currentWindowRecoveryState.add(scannedFileInfo);
        updateReferenceTimes(scannedFileInfo);
        return super.processFileInfo(fileInfo);
    }

    protected void updateReferenceTimes(ScannedFileInfo scannedFileInfo) {
        String filePath = scannedFileInfo.getDirectoryPath() == null ? scannedFileInfo.getFilePath() : scannedFileInfo.getDirectoryPath();
        Map<String, Long> map = this.referenceTimes.get(filePath);
        Map<String, Long> map2 = map;
        if (map == null) {
            map2 = Maps.newHashMap();
        }
        map2.put(scannedFileInfo.getFilePath(), Long.valueOf(scannedFileInfo.modifiedTime));
        this.referenceTimes.put(filePath, map2);
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
            } catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    protected long getDefaultBlockSize() {
        return this.scanner.fs.getDefaultBlockSize(new Path((String) this.scanner.files.iterator().next()));
    }

    @Override // com.datatorrent.lib.io.fs.AbstractFileSplitter
    protected FileStatus getFileStatus(Path path) throws IOException {
        return this.scanner.fs.getFileStatus(path);
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        this.scanner.teardown();
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setScanner(TimeBasedDirectoryScanner timeBasedDirectoryScanner) {
        this.scanner = timeBasedDirectoryScanner;
    }

    public TimeBasedDirectoryScanner getScanner() {
        return this.scanner;
    }
}
