package org.apache.hadoop.tools.fedbalance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.proto.AclProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.OptionsParser;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/tools/fedbalance/DistCpProcedure.class */
public class DistCpProcedure extends BalanceProcedure {
    private FedBalanceContext context;
    private Path src;
    private Path dst;
    private Configuration conf;
    private int mapNum;
    private int bandWidth;
    private String jobId;
    private Stage stage;
    private boolean forceCloseOpenFiles;
    private boolean useMountReadOnly;
    private int diffThreshold;
    private FsPermission fPerm;
    private AclStatus acl;
    private JobClient client;
    private DistributedFileSystem srcFs;
    private DistributedFileSystem dstFs;

    @VisibleForTesting
    private Job localJob;
    public static final Logger LOG = LoggerFactory.getLogger(DistCpProcedure.class);

    @VisibleForTesting
    static boolean enabledForTest = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/DistCpProcedure$LocalJobStatus.class */
    public static class LocalJobStatus implements RunningJobStatus {
        private final Job testJob;

        LocalJobStatus(Job job) {
            this.testJob = job;
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public String getJobID() {
            return this.testJob.getJobID().toString();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public boolean isComplete() throws IOException {
            return this.testJob.isComplete();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public boolean isSuccessful() throws IOException {
            return this.testJob.isSuccessful();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public String getFailureInfo() throws IOException {
            try {
                return this.testJob.getStatus().getFailureInfo();
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/DistCpProcedure$RunningJobStatus.class */
    public interface RunningJobStatus {
        String getJobID();

        boolean isComplete() throws IOException;

        boolean isSuccessful() throws IOException;

        String getFailureInfo() throws IOException;
    }

    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/DistCpProcedure$Stage.class */
    public enum Stage {
        PRE_CHECK,
        INIT_DISTCP,
        DIFF_DISTCP,
        DISABLE_WRITE,
        FINAL_DISTCP,
        FINISH
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/DistCpProcedure$YarnRunningJobStatus.class */
    public static class YarnRunningJobStatus implements RunningJobStatus {
        private final RunningJob job;

        YarnRunningJobStatus(RunningJob runningJob) {
            this.job = runningJob;
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public String getJobID() {
            return this.job.getID().toString();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public boolean isComplete() throws IOException {
            return this.job.isComplete();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public boolean isSuccessful() throws IOException {
            return this.job.isSuccessful();
        }

        @Override // org.apache.hadoop.tools.fedbalance.DistCpProcedure.RunningJobStatus
        public String getFailureInfo() throws IOException {
            return this.job.getFailureInfo();
        }
    }

    public static void enableForTest() {
        enabledForTest = true;
    }

    public static void disableForTest() {
        enabledForTest = false;
    }

    public DistCpProcedure() {
    }

    public DistCpProcedure(String str, String str2, long j, FedBalanceContext fedBalanceContext) throws IOException {
        super(str, str2, j);
        this.context = fedBalanceContext;
        this.src = fedBalanceContext.getSrc();
        this.dst = fedBalanceContext.getDst();
        this.conf = fedBalanceContext.getConf();
        this.client = new JobClient(this.conf);
        this.stage = Stage.PRE_CHECK;
        this.mapNum = fedBalanceContext.getMapNum();
        this.bandWidth = fedBalanceContext.getBandwidthLimit();
        this.forceCloseOpenFiles = fedBalanceContext.getForceCloseOpenFiles();
        this.useMountReadOnly = fedBalanceContext.getUseMountReadOnly();
        this.diffThreshold = fedBalanceContext.getDiffThreshold();
        this.srcFs = fedBalanceContext.getSrc().getFileSystem(this.conf);
        this.dstFs = fedBalanceContext.getDst().getFileSystem(this.conf);
    }

    @Override // org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure
    public boolean execute() throws BalanceProcedure.RetryException, IOException {
        LOG.info("Stage={}", this.stage.name());
        switch (this.stage) {
            case PRE_CHECK:
                preCheck();
                return false;
            case INIT_DISTCP:
                initDistCp();
                return false;
            case DIFF_DISTCP:
                diffDistCp();
                return false;
            case DISABLE_WRITE:
                disableWrite(this.context);
                return false;
            case FINAL_DISTCP:
                finalDistCp();
                return false;
            case FINISH:
                finish();
                return true;
            default:
                throw new IOException("Unexpected stage=" + this.stage);
        }
    }

    void preCheck() throws IOException {
        if (!this.srcFs.getFileStatus(this.src).isDirectory()) {
            throw new IOException(this.src + " should be a directory.");
        }
        if (this.dstFs.exists(this.dst)) {
            throw new IOException(this.dst + " already exists.");
        }
        if (this.srcFs.exists(new Path(this.src, ".snapshot"))) {
            throw new IOException(this.src + " shouldn't enable snapshot.");
        }
        updateStage(Stage.INIT_DISTCP);
    }

    void initDistCp() throws IOException, BalanceProcedure.RetryException {
        RunningJobStatus currentJob = getCurrentJob();
        if (currentJob == null) {
            pathCheckBeforeInitDistcp();
            this.srcFs.createSnapshot(this.src, FedBalanceConfigs.CURRENT_SNAPSHOT_NAME);
            this.jobId = submitDistCpJob(this.src.toString() + "/.snapshot/" + FedBalanceConfigs.CURRENT_SNAPSHOT_NAME, this.dst.toString(), false);
        } else {
            if (!currentJob.isComplete()) {
                throw new BalanceProcedure.RetryException();
            }
            this.jobId = null;
            if (currentJob.isSuccessful()) {
                updateStage(Stage.DIFF_DISTCP);
            } else {
                LOG.warn("DistCp failed. Failure={}", currentJob.getFailureInfo());
            }
        }
    }

    void diffDistCp() throws IOException, BalanceProcedure.RetryException {
        RunningJobStatus currentJob = getCurrentJob();
        if (currentJob == null) {
            if (diffDistCpStageDone()) {
                updateStage(Stage.DISABLE_WRITE);
                return;
            } else {
                submitDiffDistCp();
                return;
            }
        }
        if (!currentJob.isComplete()) {
            throw new BalanceProcedure.RetryException();
        }
        this.jobId = null;
        if (!currentJob.isSuccessful()) {
            throw new IOException("DistCp failed. jobId=" + currentJob.getJobID() + " failure=" + currentJob.getFailureInfo());
        }
        LOG.info("DistCp succeeded. jobId={}", currentJob.getJobID());
    }

    protected void disableWrite(FedBalanceContext fedBalanceContext) throws IOException {
        this.fPerm = this.srcFs.getFileStatus(this.src).getPermission();
        this.acl = this.srcFs.getAclStatus(this.src);
        this.srcFs.setPermission(this.src, FsPermission.createImmutable((short) 0));
        updateStage(Stage.FINAL_DISTCP);
    }

    protected void enableWrite() throws IOException {
        restorePermission();
    }

    void restorePermission() throws IOException {
        this.dstFs.removeAcl(this.dst);
        if (this.acl != null) {
            this.dstFs.modifyAclEntries(this.dst, this.acl.getEntries());
        }
        if (this.fPerm != null) {
            this.dstFs.setPermission(this.dst, this.fPerm);
        }
    }

    void finalDistCp() throws IOException, BalanceProcedure.RetryException {
        closeAllOpenFiles(this.srcFs, this.src);
        RunningJobStatus currentJob = getCurrentJob();
        if (currentJob == null) {
            submitDiffDistCp();
        } else {
            if (!currentJob.isComplete()) {
                throw new BalanceProcedure.RetryException();
            }
            this.jobId = null;
            if (!currentJob.isSuccessful()) {
                throw new IOException("Final DistCp failed. Failure: " + currentJob.getFailureInfo());
            }
            updateStage(Stage.FINISH);
        }
    }

    void finish() throws IOException {
        enableWrite();
        if (this.srcFs.exists(this.src)) {
            cleanupSnapshot(this.srcFs, this.src);
        }
        if (this.dstFs.exists(this.dst)) {
            cleanupSnapshot(this.dstFs, this.dst);
        }
    }

    @VisibleForTesting
    Stage getStage() {
        return this.stage;
    }

    @VisibleForTesting
    protected void updateStage(Stage stage) {
        LOG.info("Stage updated from {} to {}.", this.stage == null ? "null" : this.stage.name(), stage == null ? "null" : stage.name());
        this.stage = stage;
    }

    private void submitDiffDistCp() throws IOException {
        enableSnapshot(this.dstFs, this.dst);
        deleteSnapshot(this.srcFs, this.src, FedBalanceConfigs.LAST_SNAPSHOT_NAME);
        deleteSnapshot(this.dstFs, this.dst, FedBalanceConfigs.LAST_SNAPSHOT_NAME);
        this.dstFs.createSnapshot(this.dst, FedBalanceConfigs.LAST_SNAPSHOT_NAME);
        this.srcFs.renameSnapshot(this.src, FedBalanceConfigs.CURRENT_SNAPSHOT_NAME, FedBalanceConfigs.LAST_SNAPSHOT_NAME);
        this.srcFs.createSnapshot(this.src, FedBalanceConfigs.CURRENT_SNAPSHOT_NAME);
        this.jobId = submitDistCpJob(this.src.toString(), this.dst.toString(), true);
    }

    private void closeAllOpenFiles(DistributedFileSystem distributedFileSystem, Path path) throws IOException {
        String path2 = path.toUri().getPath();
        while (true) {
            RemoteIterator listOpenFiles = distributedFileSystem.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), path2);
            if (!listOpenFiles.hasNext()) {
                return;
            }
            while (listOpenFiles.hasNext()) {
                try {
                    this.srcFs.recoverLease(new Path(((OpenFileEntry) listOpenFiles.next()).getFilePath()));
                } catch (IOException e) {
                }
            }
        }
    }

    @VisibleForTesting
    boolean diffDistCpStageDone() throws IOException, BalanceProcedure.RetryException {
        if (getDiffSize() > this.diffThreshold) {
            return false;
        }
        if (this.forceCloseOpenFiles || !verifyOpenFiles()) {
            return true;
        }
        throw new BalanceProcedure.RetryException();
    }

    private int getDiffSize() throws IOException {
        return this.srcFs.getSnapshotDiffReport(this.src, FedBalanceConfigs.CURRENT_SNAPSHOT_NAME, "").getDiffList().size();
    }

    private boolean verifyOpenFiles() throws IOException {
        return this.srcFs.listOpenFiles(EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES), this.src.toString()).hasNext();
    }

    private RunningJobStatus getCurrentJob() throws IOException {
        if (this.jobId == null) {
            return null;
        }
        if (enabledForTest) {
            return getCurrentLocalJob();
        }
        RunningJob job = this.client.getJob(JobID.forName(this.jobId));
        if (job == null) {
            return null;
        }
        return new YarnRunningJobStatus(job);
    }

    private LocalJobStatus getCurrentLocalJob() throws IOException {
        if (this.localJob == null) {
            return null;
        }
        try {
            Job job = this.localJob.getCluster().getJob(JobID.forName(this.jobId));
            if (job == null) {
                return null;
            }
            return new LocalJobStatus(job);
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private void pathCheckBeforeInitDistcp() throws IOException {
        if (this.dstFs.exists(this.dst)) {
            throw new IOException("The dst path=" + this.dst + " already exists. The admin should delete it before submitting the initial distcp job.");
        }
        Path path = new Path(this.src, ".snapshot/DISTCP-BALANCE-NEXT");
        if (this.srcFs.exists(path)) {
            throw new IOException("The src snapshot=" + path + " already exists. The admin should delete the snapshot before submitting the initial distcp.");
        }
        this.srcFs.allowSnapshot(this.src);
    }

    private String submitDistCpJob(String str, String str2, boolean z) throws IOException {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(Arrays.asList("-async", "-update", "-append", "-pruxgpcab"));
        if (z) {
            arrayList.add("-diff");
            arrayList.add(FedBalanceConfigs.LAST_SNAPSHOT_NAME);
            arrayList.add(FedBalanceConfigs.CURRENT_SNAPSHOT_NAME);
        }
        arrayList.add("-m");
        arrayList.add(this.mapNum + "");
        arrayList.add("-bandwidth");
        arrayList.add(this.bandWidth + "");
        arrayList.add(str);
        arrayList.add(str2);
        try {
            Job createAndSubmitJob = new DistCp(new Configuration(this.conf), OptionsParser.parse((String[]) arrayList.toArray(new String[0]))).createAndSubmitJob();
            LOG.info("Submit distcp job={}", createAndSubmitJob);
            if (enabledForTest) {
                this.localJob = createAndSubmitJob;
            }
            return createAndSubmitJob.getJobID().toString();
        } catch (Exception e) {
            throw new IOException("Submit job failed.", e);
        }
    }

    @Override // org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure
    public void write(DataOutput dataOutput) throws IOException {
        super.write(dataOutput);
        this.context.write(dataOutput);
        if (this.jobId == null) {
            dataOutput.writeBoolean(false);
        } else {
            dataOutput.writeBoolean(true);
            Text.writeString(dataOutput, this.jobId);
        }
        dataOutput.writeInt(this.stage.ordinal());
        if (this.fPerm == null) {
            dataOutput.writeBoolean(false);
        } else {
            dataOutput.writeBoolean(true);
            dataOutput.writeShort(this.fPerm.toShort());
        }
        if (this.acl == null) {
            dataOutput.writeBoolean(false);
            return;
        }
        dataOutput.writeBoolean(true);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PBHelperClient.convert(this.acl).writeDelimitedTo(byteArrayOutputStream);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        dataOutput.writeInt(byteArray.length);
        dataOutput.write(byteArray);
    }

    @Override // org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure
    public void readFields(DataInput dataInput) throws IOException {
        super.readFields(dataInput);
        this.context = new FedBalanceContext();
        this.context.readFields(dataInput);
        this.src = this.context.getSrc();
        this.dst = this.context.getDst();
        this.conf = this.context.getConf();
        if (dataInput.readBoolean()) {
            this.jobId = Text.readString(dataInput);
        }
        this.stage = Stage.values()[dataInput.readInt()];
        if (dataInput.readBoolean()) {
            this.fPerm = FsPermission.read(dataInput);
        }
        if (dataInput.readBoolean()) {
            byte[] bArr = new byte[dataInput.readInt()];
            dataInput.readFully(bArr);
            this.acl = PBHelperClient.convert(AclProtos.GetAclStatusResponseProto.parseDelimitedFrom(new ByteArrayInputStream(bArr)));
        }
        this.srcFs = this.context.getSrc().getFileSystem(this.conf);
        this.dstFs = this.context.getDst().getFileSystem(this.conf);
        this.mapNum = this.context.getMapNum();
        this.bandWidth = this.context.getBandwidthLimit();
        this.forceCloseOpenFiles = this.context.getForceCloseOpenFiles();
        this.useMountReadOnly = this.context.getUseMountReadOnly();
        this.client = new JobClient(this.conf);
    }

    private static void enableSnapshot(DistributedFileSystem distributedFileSystem, Path path) throws IOException {
        if (distributedFileSystem.exists(new Path(path, ".snapshot"))) {
            return;
        }
        distributedFileSystem.allowSnapshot(path);
    }

    static void deleteSnapshot(DistributedFileSystem distributedFileSystem, Path path, String str) throws IOException {
        if (distributedFileSystem.exists(new Path(path, ".snapshot/" + str))) {
            distributedFileSystem.deleteSnapshot(path, str);
        }
    }

    static void cleanupSnapshot(DistributedFileSystem distributedFileSystem, Path path) throws IOException {
        if (distributedFileSystem.exists(new Path(path, ".snapshot"))) {
            for (FileStatus fileStatus : distributedFileSystem.listStatus(new Path(path, ".snapshot"))) {
                deleteSnapshot(distributedFileSystem, path, fileStatus.getPath().getName());
            }
            distributedFileSystem.disallowSnapshot(path);
        }
    }
}
