package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.db.jdbc.AbstractJdbcNonTransactionableBatchOutputOperator;
import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator.class */
public abstract class AbstractFileInputOperator<T> implements InputOperator, Partitioner<AbstractFileInputOperator<T>>, StatsListener, Operator.CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);

    @NotNull
    protected String directory;
    protected int offset;
    protected String currentFile;
    private transient Context.OperatorContext context;
    protected transient long currentWindowId;
    protected int operatorId;
    protected transient FileSystem fs;
    protected transient Configuration configuration;
    protected transient long lastScanMillis;
    protected transient Path filePath;
    protected transient InputStream inputStream;

    @NotNull
    protected DirectoryScanner scanner = new DirectoryScanner();
    protected int scanIntervalMillis = 5000;
    protected Set<String> processedFiles = new HashSet();
    protected int emitBatchSize = AbstractJdbcNonTransactionableBatchOutputOperator.DEFAULT_BATCH_SIZE;
    protected int currentPartitions = 1;
    protected int partitionCount = 1;
    private int retryCount = 0;
    private int maxRetryCount = 5;
    protected transient int skipCount = 0;
    private final BasicCounters<MutableLong> fileCounters = new BasicCounters<>(MutableLong.class);
    protected MutableLong globalNumberOfFailures = new MutableLong();
    protected MutableLong localNumberOfFailures = new MutableLong();
    protected MutableLong globalNumberOfRetries = new MutableLong();
    protected MutableLong localNumberOfRetries = new MutableLong();
    protected transient MutableLong globalProcessedFileCount = new MutableLong();
    protected transient MutableLong localProcessedFileCount = new MutableLong();
    protected transient MutableLong pendingFileCount = new MutableLong();

    @NotNull
    protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager();
    protected final transient LinkedList<RecoveryEntry> currentWindowRecoveryState = Lists.newLinkedList();
    protected long lastRepartition = 0;
    protected Queue<FailedFile> unfinishedFiles = new LinkedList();
    protected Queue<FailedFile> failedFiles = new LinkedList();
    protected Set<String> pendingFiles = new LinkedHashSet();

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$AggregatedFileCounters.class */
    public enum AggregatedFileCounters {
        PROCESSED_FILES,
        PENDING_FILES,
        NUMBER_OF_ERRORS,
        NUMBER_OF_RETRIES
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$DirectoryScanner.class */
    public static class DirectoryScanner implements Serializable {
        private static final long serialVersionUID = 4535844463258899929L;
        private String filePatternRegexp;
        private int partitionIndex;
        private int partitionCount;
        private transient Pattern regex = null;
        protected final transient HashSet<String> ignoredFiles = new HashSet<>();

        public String getFilePatternRegexp() {
            return this.filePatternRegexp;
        }

        public void setFilePatternRegexp(String str) {
            this.filePatternRegexp = str;
            this.regex = null;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public int getPartitionIndex() {
            return this.partitionIndex;
        }

        protected Pattern getRegex() {
            if (this.regex == null && this.filePatternRegexp != null) {
                this.regex = Pattern.compile(this.filePatternRegexp);
            }
            return this.regex;
        }

        public LinkedHashSet<Path> scan(FileSystem fileSystem, Path path, Set<String> set) {
            LinkedHashSet<Path> newLinkedHashSet = Sets.newLinkedHashSet();
            try {
                AbstractFileInputOperator.LOG.debug("Scanning {} with pattern {}", path, this.filePatternRegexp);
                for (FileStatus fileStatus : fileSystem.listStatus(path)) {
                    Path path2 = fileStatus.getPath();
                    String path3 = path2.toString();
                    if (!set.contains(path3) && !this.ignoredFiles.contains(path3)) {
                        if (acceptFile(path3)) {
                            AbstractFileInputOperator.LOG.debug("Found {}", path3);
                            newLinkedHashSet.add(path2);
                        } else {
                            this.ignoredFiles.add(path3);
                        }
                    }
                }
            } catch (FileNotFoundException e) {
                AbstractFileInputOperator.LOG.warn("Failed to list directory {}", path, e);
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
            return newLinkedHashSet;
        }

        protected boolean acceptFile(String str) {
            if (this.partitionCount > 1) {
                int hashCode = str.hashCode();
                int i = hashCode % this.partitionCount;
                if (i < 0) {
                    i += this.partitionCount;
                }
                AbstractFileInputOperator.LOG.debug("partition {} {} {} {}", new Object[]{Integer.valueOf(this.partitionIndex), str, Integer.valueOf(hashCode), Integer.valueOf(i)});
                if (i != this.partitionIndex) {
                    return false;
                }
            }
            Pattern regex = getRegex();
            return regex == null || regex.matcher(str).matches();
        }

        public List<DirectoryScanner> partition(int i) {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
            for (int i2 = 0; i2 < i; i2++) {
                newArrayListWithExpectedSize.add(createPartition(i2, i));
            }
            return newArrayListWithExpectedSize;
        }

        public List<DirectoryScanner> partition(int i, Collection<DirectoryScanner> collection) {
            return partition(i);
        }

        protected DirectoryScanner createPartition(int i, int i2) {
            DirectoryScanner directoryScanner = new DirectoryScanner();
            directoryScanner.filePatternRegexp = this.filePatternRegexp;
            directoryScanner.regex = this.regex;
            directoryScanner.partitionIndex = i;
            directoryScanner.partitionCount = i2;
            return directoryScanner;
        }

        public String toString() {
            return "DirectoryScanner [filePatternRegexp=" + this.filePatternRegexp + " partitionIndex=" + this.partitionIndex + " partitionCount=" + this.partitionCount + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$FailedFile.class */
    public static class FailedFile {
        String path;
        int offset;
        int retryCount;
        long lastFailedTime;

        protected FailedFile() {
        }

        protected FailedFile(String str, int i) {
            this.path = str;
            this.offset = i;
            this.retryCount = 0;
        }

        protected FailedFile(String str, int i, int i2) {
            this.path = str;
            this.offset = i;
            this.retryCount = i2;
        }

        public String toString() {
            return "FailedFile[path='" + this.path + "', offset=" + this.offset + ", retryCount=" + this.retryCount + ", lastFailedTime=" + this.lastFailedTime + ']';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$FileCounters.class */
    public enum FileCounters {
        GLOBAL_PROCESSED_FILES,
        LOCAL_PROCESSED_FILES,
        GLOBAL_NUMBER_OF_FAILURES,
        LOCAL_NUMBER_OF_FAILURES,
        GLOBAL_NUMBER_OF_RETRIES,
        LOCAL_NUMBER_OF_RETRIES,
        PENDING_FILES
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$FileCountersAggregator.class */
    public static final class FileCountersAggregator implements Context.CountersAggregator, Serializable {
        private static final long serialVersionUID = 201409041428L;
        MutableLong totalLocalProcessedFiles = new MutableLong();
        MutableLong pendingFiles = new MutableLong();
        MutableLong totalLocalNumberOfFailures = new MutableLong();
        MutableLong totalLocalNumberOfRetries = new MutableLong();

        public Object aggregate(Collection<?> collection) {
            if (collection.isEmpty()) {
                return null;
            }
            BasicCounters basicCounters = (BasicCounters) collection.iterator().next();
            MutableLong counter = basicCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES);
            MutableLong counter2 = basicCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES);
            MutableLong counter3 = basicCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES);
            this.totalLocalProcessedFiles.setValue(0L);
            this.pendingFiles.setValue(0L);
            this.totalLocalNumberOfFailures.setValue(0L);
            this.totalLocalNumberOfRetries.setValue(0L);
            Iterator<?> it = collection.iterator();
            while (it.hasNext()) {
                BasicCounters basicCounters2 = (BasicCounters) it.next();
                this.totalLocalProcessedFiles.add(basicCounters2.getCounter(FileCounters.LOCAL_PROCESSED_FILES));
                this.pendingFiles.add(basicCounters2.getCounter(FileCounters.PENDING_FILES));
                this.totalLocalNumberOfFailures.add(basicCounters2.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES));
                this.totalLocalNumberOfRetries.add(basicCounters2.getCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES));
            }
            counter.add(this.totalLocalProcessedFiles);
            counter.subtract(this.pendingFiles);
            counter2.add(this.totalLocalNumberOfFailures);
            counter3.add(this.totalLocalNumberOfRetries);
            BasicCounters basicCounters3 = new BasicCounters(MutableLong.class);
            basicCounters3.setCounter(AggregatedFileCounters.PROCESSED_FILES, counter);
            basicCounters3.setCounter(AggregatedFileCounters.PENDING_FILES, this.pendingFiles);
            basicCounters3.setCounter(AggregatedFileCounters.NUMBER_OF_ERRORS, this.totalLocalNumberOfFailures);
            basicCounters3.setCounter(AggregatedFileCounters.NUMBER_OF_RETRIES, this.totalLocalNumberOfRetries);
            return basicCounters3;
        }
    }

    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$FileLineInputOperator.class */
    public static class FileLineInputOperator extends AbstractFileInputOperator<String> {
        public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
        protected transient BufferedReader br;

        @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperator
        protected InputStream openFile(Path path) throws IOException {
            InputStream openFile = super.openFile(path);
            this.br = new BufferedReader(new InputStreamReader(openFile));
            return openFile;
        }

        @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperator
        protected void closeFile(InputStream inputStream) throws IOException {
            super.closeFile(inputStream);
            this.br.close();
            this.br = null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperator
        public String readEntity() throws IOException {
            return this.br.readLine();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperator
        public void emit(String str) {
            this.output.emit(str);
        }

        @Override // com.datatorrent.lib.io.fs.AbstractFileInputOperator
        public /* bridge */ /* synthetic */ void setup(Context context) {
            super.setup((Context.OperatorContext) context);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/datatorrent/lib/io/fs/AbstractFileInputOperator$RecoveryEntry.class */
    public static class RecoveryEntry {
        final String file;
        final int startOffset;
        final int endOffset;

        private RecoveryEntry() {
            this.file = null;
            this.startOffset = -1;
            this.endOffset = -1;
        }

        RecoveryEntry(String str, int i, int i2) {
            this.file = (String) Preconditions.checkNotNull(str, "file");
            this.startOffset = i;
            this.endOffset = i2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof RecoveryEntry)) {
                return false;
            }
            RecoveryEntry recoveryEntry = (RecoveryEntry) obj;
            if (this.endOffset == recoveryEntry.endOffset && this.startOffset == recoveryEntry.startOffset) {
                return this.file.equals(recoveryEntry.file);
            }
            return false;
        }

        public int hashCode() {
            return (31 * ((31 * this.file.hashCode()) + this.startOffset)) + this.endOffset;
        }
    }

    public String getDirectory() {
        return this.directory;
    }

    public void setDirectory(String str) {
        this.directory = str;
    }

    public DirectoryScanner getScanner() {
        return this.scanner;
    }

    public void setScanner(DirectoryScanner directoryScanner) {
        this.scanner = directoryScanner;
    }

    public int getScanIntervalMillis() {
        return this.scanIntervalMillis;
    }

    public void setScanIntervalMillis(int i) {
        this.scanIntervalMillis = i;
    }

    public int getEmitBatchSize() {
        return this.emitBatchSize;
    }

    public void setEmitBatchSize(int i) {
        this.emitBatchSize = i;
    }

    public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
        this.idempotentStorageManager = idempotentStorageManager;
    }

    public IdempotentStorageManager getIdempotentStorageManager() {
        return this.idempotentStorageManager;
    }

    public int getPartitionCount() {
        return this.partitionCount;
    }

    public void setPartitionCount(int i) {
        this.partitionCount = i;
    }

    public int getCurrentPartitions() {
        return this.currentPartitions;
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        this.operatorId = operatorContext.getId();
        this.globalProcessedFileCount.setValue(this.processedFiles.size());
        LOG.debug("Setup processed file count: {}", this.globalProcessedFileCount);
        this.context = operatorContext;
        try {
            this.filePath = new Path(this.directory);
            this.configuration = new Configuration();
            this.fs = getFSInstance();
        } catch (IOException e) {
            failureHandling(e);
        }
        this.fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, this.globalProcessedFileCount);
        this.fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, this.localProcessedFileCount);
        this.fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, this.globalNumberOfFailures);
        this.fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, this.localNumberOfFailures);
        this.fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, this.globalNumberOfRetries);
        this.fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, this.localNumberOfRetries);
        this.fileCounters.setCounter(FileCounters.PENDING_FILES, this.pendingFileCount);
        this.idempotentStorageManager.setup(operatorContext);
        if (((Long) operatorContext.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue() < this.idempotentStorageManager.getLargestRecoveryWindow()) {
            this.currentFile = null;
            this.offset = 0;
        }
    }

    protected FileSystem getFSInstance() throws IOException {
        return FileSystem.newInstance(this.filePath.toUri(), this.configuration);
    }

    public void teardown() {
        String str;
        IOException iOException = null;
        boolean z = false;
        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (IOException e) {
            iOException = e;
            z = true;
        }
        boolean z2 = false;
        try {
            this.fs.close();
        } catch (IOException e2) {
            iOException = e2;
            z2 = true;
        }
        if (iOException == null) {
            this.idempotentStorageManager.teardown();
            return;
        }
        str = "";
        str = z ? str + "Failed to close " + this.currentFile + ". " : "";
        if (z2) {
            str = str + "Failed to close filesystem.";
        }
        throw new RuntimeException(str, iOException);
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (j <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
            replay(j);
        }
    }

    public void endWindow() {
        if (this.currentWindowId > this.idempotentStorageManager.getLargestRecoveryWindow()) {
            try {
                this.idempotentStorageManager.save(this.currentWindowRecoveryState, this.operatorId, this.currentWindowId);
            } catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
        if (this.context != null) {
            this.pendingFileCount.setValue(this.pendingFiles.size() + this.failedFiles.size() + this.unfinishedFiles.size());
            if (this.currentFile != null) {
                this.pendingFileCount.increment();
            }
            this.context.setCounters(this.fileCounters);
        }
    }

    protected void replay(long j) {
        try {
            Iterator<Object> it = this.idempotentStorageManager.load(j).values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((LinkedList) it.next()).iterator();
                while (it2.hasNext()) {
                    RecoveryEntry recoveryEntry = (RecoveryEntry) it2.next();
                    if (this.scanner.acceptFile(recoveryEntry.file)) {
                        if (this.currentFile != null && this.currentFile.equals(recoveryEntry.file) && this.offset == recoveryEntry.startOffset) {
                            while (this.offset < recoveryEntry.endOffset) {
                                T readEntity = readEntity();
                                this.offset++;
                                emit(readEntity);
                            }
                        } else {
                            if (this.inputStream != null) {
                                closeFile(this.inputStream);
                            }
                            this.processedFiles.add(recoveryEntry.file);
                            Iterator<FailedFile> it3 = this.failedFiles.iterator();
                            while (true) {
                                if (!it3.hasNext()) {
                                    break;
                                }
                                FailedFile next = it3.next();
                                if (next.path.equals(recoveryEntry.file) && next.offset == recoveryEntry.startOffset) {
                                    it3.remove();
                                    break;
                                }
                            }
                            Iterator<FailedFile> it4 = this.unfinishedFiles.iterator();
                            while (true) {
                                if (!it4.hasNext()) {
                                    break;
                                }
                                FailedFile next2 = it4.next();
                                if (next2.path.equals(recoveryEntry.file) && next2.offset == recoveryEntry.startOffset) {
                                    it4.remove();
                                    break;
                                }
                            }
                            if (this.pendingFiles.contains(recoveryEntry.file)) {
                                this.pendingFiles.remove(recoveryEntry.file);
                            }
                            this.inputStream = retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset));
                            while (this.offset < recoveryEntry.endOffset) {
                                T readEntity2 = readEntity();
                                this.offset++;
                                emit(readEntity2);
                            }
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:24:0x0137, code lost:
    
        com.datatorrent.lib.io.fs.AbstractFileInputOperator.LOG.info("done reading file ({} entries).", java.lang.Integer.valueOf(r7.offset));
        closeFile(r7.inputStream);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void emitTuples() {
        /*
            Method dump skipped, instructions count: 417
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.lib.io.fs.AbstractFileInputOperator.emitTuples():void");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void scanDirectory() {
        if (System.currentTimeMillis() - this.scanIntervalMillis >= this.lastScanMillis) {
            Iterator<Path> it = this.scanner.scan(this.fs, this.filePath, this.processedFiles).iterator();
            while (it.hasNext()) {
                String path = it.next().toString();
                this.pendingFiles.add(path);
                this.processedFiles.add(path);
                this.localProcessedFileCount.increment();
            }
            this.lastScanMillis = System.currentTimeMillis();
        }
    }

    private void failureHandling(Exception exc) {
        this.localNumberOfFailures.increment();
        if (this.maxRetryCount <= 0) {
            throw new RuntimeException(exc);
        }
        LOG.error("FS reader error", exc);
        addToFailedList();
    }

    protected void addToFailedList() {
        FailedFile failedFile = new FailedFile(this.currentFile, this.offset, this.retryCount);
        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (IOException e) {
            this.localNumberOfFailures.increment();
            LOG.error("Could not close input stream on: " + this.currentFile);
        }
        failedFile.retryCount++;
        failedFile.lastFailedTime = System.currentTimeMillis();
        failedFile.offset = this.offset;
        this.currentFile = null;
        this.inputStream = null;
        if (failedFile.retryCount > this.maxRetryCount) {
            return;
        }
        this.localNumberOfRetries.increment();
        LOG.info("adding to failed list path {} offset {} retry {}", new Object[]{failedFile.path, Integer.valueOf(failedFile.offset), Integer.valueOf(failedFile.retryCount)});
        this.failedFiles.add(failedFile);
    }

    protected InputStream retryFailedFile(FailedFile failedFile) throws IOException {
        LOG.info("retrying failed file {} offset {} retry {}", new Object[]{failedFile.path, Integer.valueOf(failedFile.offset), Integer.valueOf(failedFile.retryCount)});
        String str = failedFile.path;
        if (!this.fs.exists(new Path(str))) {
            return null;
        }
        this.inputStream = openFile(new Path(str));
        this.offset = failedFile.offset;
        this.retryCount = failedFile.retryCount;
        this.skipCount = failedFile.offset;
        return this.inputStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InputStream openFile(Path path) throws IOException {
        this.currentFile = path.toString();
        this.offset = 0;
        this.retryCount = 0;
        this.skipCount = 0;
        LOG.info("opening file {}", path);
        return this.fs.open(path);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeFile(InputStream inputStream) throws IOException {
        LOG.info("closing file {} offset {}", this.currentFile, Integer.valueOf(this.offset));
        if (inputStream != null) {
            inputStream.close();
        }
        this.currentFile = null;
        this.inputStream = null;
    }

    public Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> collection, Partitioner.PartitioningContext partitioningContext) {
        this.lastRepartition = System.currentTimeMillis();
        int newPartitionCount = getNewPartitionCount(collection, partitioningContext);
        LOG.debug("Computed new partitions: {}", Integer.valueOf(newPartitionCount));
        if (newPartitionCount == collection.size()) {
            return collection;
        }
        AbstractFileInputOperator abstractFileInputOperator = (AbstractFileInputOperator) collection.iterator().next().getPartitionedInstance();
        MutableLong mutableLong = abstractFileInputOperator.globalNumberOfRetries;
        MutableLong mutableLong2 = abstractFileInputOperator.globalNumberOfRetries;
        HashSet newHashSet = Sets.newHashSet();
        HashSet newHashSet2 = Sets.newHashSet();
        LinkedList newLinkedList = Lists.newLinkedList();
        LinkedList newLinkedList2 = Lists.newLinkedList();
        LinkedList newLinkedList3 = Lists.newLinkedList();
        HashSet newHashSet3 = Sets.newHashSet();
        Iterator<Partitioner.Partition<AbstractFileInputOperator<T>>> it = collection.iterator();
        while (it.hasNext()) {
            AbstractFileInputOperator abstractFileInputOperator2 = (AbstractFileInputOperator) it.next().getPartitionedInstance();
            newHashSet.addAll(abstractFileInputOperator2.processedFiles);
            newLinkedList2.addAll(abstractFileInputOperator2.failedFiles);
            newLinkedList3.addAll(abstractFileInputOperator2.pendingFiles);
            newHashSet2.addAll(this.unfinishedFiles);
            mutableLong.add(abstractFileInputOperator2.localNumberOfRetries);
            mutableLong2.add(abstractFileInputOperator2.localNumberOfFailures);
            if (abstractFileInputOperator2.currentFile != null) {
                newHashSet2.add(new FailedFile(abstractFileInputOperator2.currentFile, abstractFileInputOperator2.offset));
            }
            newLinkedList.add(abstractFileInputOperator2.getScanner());
            newHashSet3.add(Integer.valueOf(abstractFileInputOperator2.operatorId));
        }
        List<DirectoryScanner> partition = this.scanner.partition(newPartitionCount, newLinkedList);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(newPartitionCount);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(newPartitionCount);
        KryoCloneUtils createCloneUtils = KryoCloneUtils.createCloneUtils(this);
        for (int i = 0; i < partition.size(); i++) {
            AbstractFileInputOperator abstractFileInputOperator3 = (AbstractFileInputOperator) createCloneUtils.getClone();
            DirectoryScanner directoryScanner = partition.get(i);
            abstractFileInputOperator3.setScanner(directoryScanner);
            abstractFileInputOperator3.processedFiles.addAll(newHashSet);
            abstractFileInputOperator3.globalNumberOfFailures = mutableLong;
            abstractFileInputOperator3.localNumberOfFailures.setValue(0L);
            abstractFileInputOperator3.globalNumberOfRetries = mutableLong2;
            abstractFileInputOperator3.localNumberOfRetries.setValue(0L);
            abstractFileInputOperator3.unfinishedFiles.clear();
            abstractFileInputOperator3.currentFile = null;
            abstractFileInputOperator3.offset = 0;
            Iterator it2 = newHashSet2.iterator();
            while (it2.hasNext()) {
                FailedFile failedFile = (FailedFile) it2.next();
                if (directoryScanner.acceptFile(failedFile.path)) {
                    abstractFileInputOperator3.unfinishedFiles.add(failedFile);
                    it2.remove();
                }
            }
            abstractFileInputOperator3.failedFiles.clear();
            Iterator it3 = newLinkedList2.iterator();
            while (it3.hasNext()) {
                FailedFile failedFile2 = (FailedFile) it3.next();
                if (directoryScanner.acceptFile(failedFile2.path)) {
                    abstractFileInputOperator3.failedFiles.add(failedFile2);
                    it3.remove();
                }
            }
            abstractFileInputOperator3.pendingFiles.clear();
            Iterator it4 = newLinkedList3.iterator();
            while (it4.hasNext()) {
                String str = (String) it4.next();
                if (directoryScanner.acceptFile(str)) {
                    abstractFileInputOperator3.pendingFiles.add(str);
                    it4.remove();
                }
            }
            newArrayListWithExpectedSize.add(new DefaultPartition(abstractFileInputOperator3));
            newArrayListWithExpectedSize2.add(abstractFileInputOperator3.idempotentStorageManager);
        }
        this.idempotentStorageManager.partitioned(newArrayListWithExpectedSize2, newHashSet3);
        LOG.info("definePartitions called returning {} partitions", Integer.valueOf(newArrayListWithExpectedSize.size()));
        return newArrayListWithExpectedSize;
    }

    protected int getNewPartitionCount(Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> collection, Partitioner.PartitioningContext partitioningContext) {
        return DefaultPartition.getRequiredPartitionCount(partitioningContext, this.partitionCount);
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractFileInputOperator<T>>> map) {
        this.currentPartitions = map.size();
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.idempotentStorageManager.deleteUpTo(this.operatorId, j);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected abstract T readEntity() throws IOException;

    protected abstract void emit(T t);

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        StatsListener.Response response = new StatsListener.Response();
        response.repartitionRequired = false;
        if (this.currentPartitions != this.partitionCount) {
            LOG.info("processStats: trying repartition of input operator current {} required {}", Integer.valueOf(this.currentPartitions), Integer.valueOf(this.partitionCount));
            response.repartitionRequired = true;
        }
        return response;
    }

    public int getMaxRetryCount() {
        return this.maxRetryCount;
    }

    public void setMaxRetryCount(int i) {
        this.maxRetryCount = i;
    }
}
