package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.FileStitcher;
import com.datatorrent.lib.io.fs.Synchronizer;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/io/fs/HDFSFileMerger.class */
public class HDFSFileMerger extends FileMerger {
    private transient boolean fastMergeActive;
    private transient long defaultBlockSize;
    private transient FastMergerDecisionMaker fastMergerDecisionMaker;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSFileMerger.class);

    /* loaded from: input_file:com/datatorrent/lib/io/fs/HDFSFileMerger$FastMergerDecisionMaker.class */
    public static class FastMergerDecisionMaker {
        private String blocksDir;
        private FileSystem appFS;
        private long defaultBlockSize;

        public FastMergerDecisionMaker(String str, FileSystem fileSystem, long j) {
            this.blocksDir = str;
            this.appFS = fileSystem;
            this.defaultBlockSize = j;
        }

        public boolean isFastMergePossible(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException, FileStitcher.BlockNotFoundException {
            short s = 0;
            boolean z = true;
            boolean z2 = true;
            int numberOfBlocks = outputFileMetadata.getNumberOfBlocks();
            HDFSFileMerger.LOG.debug("fileMetadata.getNumberOfBlocks(): {}", Integer.valueOf(outputFileMetadata.getNumberOfBlocks()));
            long[] blockIds = outputFileMetadata.getBlockIds();
            HDFSFileMerger.LOG.debug("fileMetadata.getBlockIds().len: {}", Integer.valueOf(outputFileMetadata.getBlockIds().length));
            for (int i = 0; i < numberOfBlocks && z && z2; i++) {
                Path path = new Path(this.blocksDir + "/" + blockIds[i]);
                if (!this.appFS.exists(path)) {
                    throw new FileStitcher.BlockNotFoundException(path);
                }
                FileStatus fileStatus = this.appFS.getFileStatus(new Path(this.blocksDir + "/" + blockIds[i]));
                if (i == 0) {
                    s = fileStatus.getReplication();
                    HDFSFileMerger.LOG.debug("replicationFactor: {}", Short.valueOf(s));
                } else {
                    z = s == fileStatus.getReplication();
                    HDFSFileMerger.LOG.debug("sameReplicationFactor: {}", Boolean.valueOf(z));
                }
                if (i != numberOfBlocks - 1) {
                    z2 = fileStatus.getLen() % this.defaultBlockSize == 0;
                    HDFSFileMerger.LOG.debug("multipleOfBlockSize: {}", Boolean.valueOf(z2));
                }
            }
            return z && z2;
        }
    }

    @Override // com.datatorrent.lib.io.fs.FileStitcher, com.datatorrent.lib.io.fs.AbstractReconciler
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        this.fastMergeActive = this.outputFS.getConf().getBoolean("dfs.support.append", true) && this.appFS.getUri().equals(this.outputFS.getUri());
        LOG.debug("appFS.getUri():{}", this.appFS.getUri());
        LOG.debug("outputFS.getUri():{}", this.outputFS.getUri());
        this.defaultBlockSize = this.outputFS.getDefaultBlockSize(new Path(this.filePath));
        this.fastMergerDecisionMaker = new FastMergerDecisionMaker(this.blocksDirectoryPath, this.appFS, this.defaultBlockSize);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.fs.FileStitcher
    public void mergeBlocks(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        try {
            LOG.debug("fastMergeActive: {}", Boolean.valueOf(this.fastMergeActive));
            if (this.fastMergeActive && this.fastMergerDecisionMaker.isFastMergePossible(outputFileMetadata) && outputFileMetadata.getNumberOfBlocks() > 0) {
                LOG.debug("Using fast merge on HDFS.");
                concatBlocks(outputFileMetadata);
            } else {
                LOG.debug("Falling back to slow merge on HDFS.");
                super.mergeBlocks((HDFSFileMerger) outputFileMetadata);
            }
        } catch (FileStitcher.BlockNotFoundException e) {
            if (!recover(outputFileMetadata)) {
                this.failedFiles.add(outputFileMetadata);
            } else {
                LOG.debug("Recovery attempt successful.");
                this.successfulFiles.add(outputFileMetadata);
            }
        }
    }

    private void concatBlocks(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        Path path = new Path(this.filePath, outputFileMetadata.getRelativePath());
        int numberOfBlocks = outputFileMetadata.getNumberOfBlocks();
        long[] blockIds = outputFileMetadata.getBlockIds();
        Path path2 = new Path(this.blocksDirectoryPath, Long.toString(blockIds[0]));
        if (numberOfBlocks > 1) {
            Path[] pathArr = new Path[numberOfBlocks - 1];
            for (int i = 1; i < numberOfBlocks; i++) {
                pathArr[i - 1] = new Path(this.blocksDirectoryPath, Long.toString(blockIds[i]));
            }
            this.outputFS.concat(path2, pathArr);
        }
        moveToFinalFile(path2, path);
    }

    @VisibleForTesting
    protected boolean recover(Synchronizer.OutputFileMetadata outputFileMetadata) throws IOException {
        Path path = new Path(this.blocksDirectoryPath + "/" + outputFileMetadata.getBlockIds()[0]);
        Path path2 = new Path(this.filePath, outputFileMetadata.getRelativePath());
        if (this.appFS.exists(path)) {
            if (this.appFS.getFileStatus(path).getLen() == outputFileMetadata.getFileLength()) {
                moveToFinalFile(path, path2);
                return true;
            }
            LOG.error("Unable to recover in FileMerger for file: {}", path2);
            return false;
        }
        if (this.outputFS.exists(path2)) {
            LOG.debug("Output file already present at the destination, nothing to recover.");
            return true;
        }
        LOG.error("Unable to recover in FileMerger for file: {}", path2);
        return false;
    }
}
