/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.mob.mapreduce;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.NoTagsKeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.mapreduce.MobFilePathHashPartitioner;
import org.apache.hadoop.hbase.mob.mapreduce.SweepMapper;
import org.apache.hadoop.hbase.mob.mapreduce.SweepReducer;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.Private
public class SweepJob {
    private final FileSystem fs;
    private final Configuration conf;
    private static final Log LOG = LogFactory.getLog(SweepJob.class);
    static final String SWEEP_JOB_ID = "hbase.mob.sweep.job.id";
    static final String SWEEP_JOB_SERVERNAME = "hbase.mob.sweep.job.servername";
    static final String SWEEP_JOB_TABLE_NODE = "hbase.mob.sweep.job.table.node";
    static final String WORKING_DIR_KEY = "hbase.mob.sweep.job.dir";
    static final String WORKING_ALLNAMES_FILE_KEY = "hbase.mob.sweep.job.all.file";
    static final String WORKING_VISITED_DIR_KEY = "hbase.mob.sweep.job.visited.dir";
    static final String WORKING_ALLNAMES_DIR = "all";
    static final String WORKING_VISITED_DIR = "visited";
    public static final String WORKING_FILES_DIR_KEY = "mob.sweep.job.files.dir";
    public static final String MOB_SWEEP_JOB_DELAY = "hbase.mob.sweep.job.delay";
    protected static final long ONE_DAY = 86400000L;
    private long compactionStartTime = EnvironmentEdgeManager.currentTime();
    public static final String CREDENTIALS_LOCATION = "credentials_location";
    private CacheConfig cacheConfig;
    static final int SCAN_CACHING = 10000;
    private TableLockManager tableLockManager;

    public SweepJob(Configuration conf, FileSystem fs) {
        this.conf = conf;
        this.fs = fs;
        Configuration copyOfConf = new Configuration(conf);
        copyOfConf.setFloat("hfile.block.cache.size", 0.0f);
        this.cacheConfig = new CacheConfig(copyOfConf);
    }

    static ServerName getCurrentServerName(Configuration conf) throws IOException {
        int port;
        String hostname = conf.get("hbase.regionserver.ipc.address", Strings.domainNamePointerToHostName((String)DNS.getDefaultHost((String)conf.get("hbase.regionserver.dns.interface", "default"), (String)conf.get("hbase.regionserver.dns.nameserver", "default"))));
        InetSocketAddress initialIsa = new InetSocketAddress(hostname, port = conf.getInt("hbase.regionserver.port", 16020));
        if (initialIsa.getAddress() == null) {
            throw new IllegalArgumentException("Failed resolve of " + initialIsa);
        }
        return ServerName.valueOf((String)initialIsa.getHostName(), (int)initialIsa.getPort(), (long)EnvironmentEdgeManager.currentTime());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int sweep(TableName tn, HColumnDescriptor family) throws IOException, ClassNotFoundException, InterruptedException, KeeperException {
        Configuration conf = new Configuration(this.conf);
        String currentUserName = UserGroupInformation.getCurrentUser().getShortUserName();
        FileStatus[] hbaseRootFileStat = this.fs.listStatus(new Path(conf.get("hbase.rootdir")));
        if (hbaseRootFileStat.length > 0) {
            String owner = hbaseRootFileStat[0].getOwner();
            if (!owner.equals(currentUserName)) {
                String errorMsg = "The current user[" + currentUserName + "] doesn't have hbase root credentials." + " Please make sure the user is the root of the target HBase";
                LOG.error((Object)errorMsg);
                throw new IOException(errorMsg);
            }
        } else {
            LOG.error((Object)"The target HBase doesn't exist");
            throw new IOException("The target HBase doesn't exist");
        }
        String familyName = family.getNameAsString();
        String id = "SweepJob" + UUID.randomUUID().toString().replace("-", "");
        try (ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, id, (Abortable)new DummyMobAbortable());){
            Job job;
            String tableName;
            TableLockManager.TableLock lock;
            block34: {
                int n;
                ServerName serverName = SweepJob.getCurrentServerName(conf);
                this.tableLockManager = TableLockManager.createTableLockManager(conf, zkw, serverName);
                TableName lockName = MobUtils.getTableLockName(tn);
                lock = this.tableLockManager.writeLock(lockName, "Run sweep tool");
                tableName = tn.getNameAsString();
                try {
                    lock.acquire();
                }
                catch (Exception e) {
                    LOG.warn((Object)("Can not lock the table " + tableName + ". The major compaction in HBase may be in-progress or another sweep job is running." + " Please re-run the job."));
                    int n2 = 3;
                    zkw.close();
                    return n2;
                }
                job = null;
                try {
                    Scan scan = new Scan();
                    scan.addFamily(family.getName());
                    scan.setAttribute("hbase.mob.scan.raw", Bytes.toBytes((boolean)Boolean.TRUE));
                    scan.setAttribute("hbase.mob.scan.ref.only", Bytes.toBytes((boolean)Boolean.TRUE));
                    scan.setCaching(10000);
                    scan.setCacheBlocks(false);
                    scan.setMaxVersions(family.getMaxVersions());
                    conf.set("io.serializations", JavaSerialization.class.getName() + "," + WritableSerialization.class.getName());
                    conf.set(SWEEP_JOB_ID, id);
                    conf.set(SWEEP_JOB_SERVERNAME, serverName.toString());
                    String tableLockNode = ZKUtil.joinZNode((String)zkw.tableLockZNode, (String)lockName.getNameAsString());
                    conf.set(SWEEP_JOB_TABLE_NODE, tableLockNode);
                    job = this.prepareJob(tn, familyName, scan, conf);
                    job.getConfiguration().set("hbase.mapreduce.scan.column.family", familyName);
                    job.getConfiguration().setLong("hbase.mob.sweep.tool.compaction.start.date", this.compactionStartTime);
                    job.setPartitionerClass(MobFilePathHashPartitioner.class);
                    this.submit(job, tn, familyName);
                    if (job.waitForCompletion(true)) {
                        this.removeUnusedFiles(job, tn, family);
                        break block34;
                    }
                    System.err.println("Job was not successful");
                    n = 4;
                }
                catch (Throwable throwable) {
                    try {
                        this.cleanup(job, tn, familyName);
                    }
                    finally {
                        try {
                            lock.release();
                        }
                        catch (IOException e) {
                            LOG.error((Object)("Failed to release the table lock " + tableName), (Throwable)e);
                        }
                    }
                    throw throwable;
                }
                try {
                    this.cleanup(job, tn, familyName);
                }
                finally {
                    try {
                        lock.release();
                    }
                    catch (IOException e) {
                        LOG.error((Object)("Failed to release the table lock " + tableName), (Throwable)e);
                    }
                }
                return n;
            }
            try {
                this.cleanup(job, tn, familyName);
            }
            finally {
                try {
                    lock.release();
                }
                catch (IOException e) {
                    LOG.error((Object)("Failed to release the table lock " + tableName), (Throwable)e);
                }
            }
        }
        return 0;
    }

    private Job prepareJob(TableName tn, String familyName, Scan scan, Configuration conf) throws IOException {
        Job job = Job.getInstance((Configuration)conf);
        job.setJarByClass(SweepMapper.class);
        TableMapReduceUtil.initTableMapperJob(tn.getNameAsString(), scan, SweepMapper.class, Text.class, Writable.class, job);
        job.setInputFormatClass(TableInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NoTagsKeyValue.class);
        job.setReducerClass(SweepReducer.class);
        job.setOutputFormatClass(NullOutputFormat.class);
        String jobName = SweepJob.getCustomJobName(this.getClass().getSimpleName(), tn, familyName);
        job.setJobName(jobName);
        if (StringUtils.isNotEmpty((String)conf.get(CREDENTIALS_LOCATION))) {
            String fileLoc = conf.get(CREDENTIALS_LOCATION);
            Credentials cred = Credentials.readTokenStorageFile((File)new File(fileLoc), (Configuration)conf);
            job.getCredentials().addAll(cred);
        }
        return job;
    }

    private static String getCustomJobName(String className, TableName tableName, String familyName) {
        StringBuilder name = new StringBuilder();
        name.append(className);
        name.append('-').append(SweepMapper.class.getSimpleName());
        name.append('-').append(SweepReducer.class.getSimpleName());
        name.append('-').append(tableName.getNamespaceAsString());
        name.append('-').append(tableName.getQualifierAsString());
        name.append('-').append(familyName);
        return name.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submit(Job job, TableName tn, String familyName) throws IOException {
        FSDataOutputStream fout;
        block7: {
            Path tempDir = new Path(MobUtils.getMobHome(job.getConfiguration()), ".tmp");
            Path mobCompactionTempDir = new Path(tempDir, "mobcompaction");
            Path workingPath = MobUtils.getCompactionWorkingPath(mobCompactionTempDir, job.getJobName());
            job.getConfiguration().set(WORKING_DIR_KEY, workingPath.toString());
            this.fs.delete(workingPath, true);
            this.fs.mkdirs(workingPath);
            Path workingPathOfFiles = new Path(workingPath, "files");
            Path workingPathOfNames = new Path(workingPath, "names");
            job.getConfiguration().set(WORKING_FILES_DIR_KEY, workingPathOfFiles.toString());
            Path allFileNamesPath = new Path(workingPathOfNames, WORKING_ALLNAMES_DIR);
            job.getConfiguration().set(WORKING_ALLNAMES_FILE_KEY, allFileNamesPath.toString());
            Path vistiedFileNamesPath = new Path(workingPathOfNames, WORKING_VISITED_DIR);
            job.getConfiguration().set(WORKING_VISITED_DIR_KEY, vistiedFileNamesPath.toString());
            this.fs.mkdirs(vistiedFileNamesPath);
            Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, familyName);
            FileStatus[] files = this.fs.listStatus(mobStorePath);
            TreeSet<String> fileNames = new TreeSet<String>();
            long mobCompactionDelay = job.getConfiguration().getLong(MOB_SWEEP_JOB_DELAY, 86400000L);
            for (FileStatus fileStatus : files) {
                if (!fileStatus.isFile() || HFileLink.isHFileLink(fileStatus.getPath()) || this.compactionStartTime - fileStatus.getModificationTime() <= mobCompactionDelay) continue;
                fileNames.add(fileStatus.getPath().getName());
            }
            fout = null;
            SequenceFile.Writer writer = null;
            try {
                fout = this.fs.create(allFileNamesPath, true);
                writer = SequenceFile.createWriter((Configuration)job.getConfiguration(), (FSDataOutputStream)fout, String.class, String.class, (SequenceFile.CompressionType)SequenceFile.CompressionType.NONE, null);
                for (String fileName : fileNames) {
                    writer.append((Object)fileName, (Object)"");
                }
                writer.hflush();
                if (writer == null) break block7;
            }
            catch (Throwable throwable) {
                if (writer != null) {
                    IOUtils.closeStream(writer);
                }
                if (fout != null) {
                    IOUtils.closeStream((Closeable)fout);
                }
                throw throwable;
            }
            IOUtils.closeStream((Closeable)writer);
        }
        if (fout != null) {
            IOUtils.closeStream((Closeable)fout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<String> getUnusedFiles(Configuration conf) throws IOException {
        ArrayList<String> toBeArchived;
        MergeSortReader visitedNamesReader;
        block12: {
            Path allFileNamesPath = new Path(conf.get(WORKING_ALLNAMES_FILE_KEY));
            SequenceFile.Reader allNamesReader = null;
            visitedNamesReader = null;
            toBeArchived = new ArrayList<String>();
            try {
                allNamesReader = new SequenceFile.Reader(this.fs, allFileNamesPath, conf);
                visitedNamesReader = new MergeSortReader(this.fs, conf, new Path(conf.get(WORKING_VISITED_DIR_KEY)));
                String nextAll = (String)allNamesReader.next((Object)null);
                String nextVisited = visitedNamesReader.next();
                while (nextAll != null) {
                    if (nextVisited != null) {
                        int compare = nextAll.compareTo(nextVisited);
                        if (compare < 0) {
                            toBeArchived.add(nextAll);
                            nextAll = (String)allNamesReader.next((Object)null);
                        } else if (compare > 0) {
                            nextVisited = visitedNamesReader.next();
                        } else {
                            nextAll = (String)allNamesReader.next((Object)null);
                            nextVisited = visitedNamesReader.next();
                        }
                    } else {
                        toBeArchived.add(nextAll);
                        nextAll = (String)allNamesReader.next((Object)null);
                    }
                    if (nextAll != null || nextVisited != null) continue;
                }
                if (allNamesReader == null) break block12;
            }
            catch (Throwable throwable) {
                if (allNamesReader != null) {
                    IOUtils.closeStream(allNamesReader);
                }
                if (visitedNamesReader != null) {
                    visitedNamesReader.close();
                }
                throw throwable;
            }
            IOUtils.closeStream((Closeable)allNamesReader);
        }
        if (visitedNamesReader != null) {
            visitedNamesReader.close();
        }
        return toBeArchived;
    }

    private void removeUnusedFiles(Job job, TableName tn, HColumnDescriptor hcd) throws IOException {
        ArrayList<StoreFile> storeFiles = new ArrayList<StoreFile>();
        List<String> toBeArchived = this.getUnusedFiles(job.getConfiguration());
        Path mobStorePath = MobUtils.getMobFamilyPath(job.getConfiguration(), tn, hcd.getNameAsString());
        for (String archiveFileName : toBeArchived) {
            Path path = new Path(mobStorePath, archiveFileName);
            storeFiles.add(new StoreFile(this.fs, path, job.getConfiguration(), this.cacheConfig, BloomType.NONE));
        }
        if (!storeFiles.isEmpty()) {
            try {
                MobUtils.removeMobFiles(job.getConfiguration(), this.fs, tn, FSUtils.getTableDir(MobUtils.getMobHome(this.conf), tn), hcd.getName(), storeFiles);
                LOG.info((Object)(storeFiles.size() + " unused MOB files are removed"));
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to archive the store files " + storeFiles), (Throwable)e);
            }
        }
    }

    private void cleanup(Job job, TableName tn, String familyName) {
        if (job != null) {
            Path workingPath = new Path(job.getConfiguration().get(WORKING_DIR_KEY));
            try {
                this.fs.delete(workingPath, true);
            }
            catch (IOException e) {
                LOG.warn((Object)("Failed to delete the working directory after sweeping store " + familyName + " in the table " + tn.getNameAsString()), (Throwable)e);
            }
        }
    }

    public static class DummyMobAbortable
    implements Abortable {
        private boolean abort = false;

        public void abort(String why, Throwable e) {
            this.abort = true;
        }

        public boolean isAborted() {
            return this.abort;
        }
    }

    public static enum SweepCounter {
        INPUT_FILE_COUNT,
        FILE_TO_BE_MERGE_OR_CLEAN,
        FILE_AFTER_MERGE_OR_CLEAN,
        RECORDS_UPDATED;

    }

    private static class MergeSortReader {
        private List<SequenceFile.Reader> readers = new ArrayList<SequenceFile.Reader>();
        private PriorityQueue<IndexedResult> results = new PriorityQueue();

        public MergeSortReader(FileSystem fs, Configuration conf, Path path) throws IOException {
            if (fs.exists(path)) {
                FileStatus[] files = fs.listStatus(path);
                int index = 0;
                for (FileStatus file : files) {
                    SequenceFile.Reader reader;
                    String key;
                    if (!file.isFile() || (key = (String)(reader = new SequenceFile.Reader(fs, file.getPath(), conf)).next((Object)null)) == null) continue;
                    this.results.add(new IndexedResult(index, key));
                    this.readers.add(reader);
                    ++index;
                }
            }
        }

        public String next() throws IOException {
            IndexedResult result = this.results.poll();
            if (result != null) {
                SequenceFile.Reader reader = this.readers.get(result.getIndex());
                String key = (String)reader.next((Object)null);
                if (key != null) {
                    this.results.add(new IndexedResult(result.getIndex(), key));
                }
                return result.getValue();
            }
            return null;
        }

        public void close() {
            for (SequenceFile.Reader reader : this.readers) {
                if (reader == null) continue;
                IOUtils.closeStream((Closeable)reader);
            }
        }
    }

    private static class IndexedResult
    implements Comparable<IndexedResult> {
        private int index;
        private String value;

        public IndexedResult(int index, String value) {
            this.index = index;
            this.value = value;
        }

        public int getIndex() {
            return this.index;
        }

        public String getValue() {
            return this.value;
        }

        @Override
        public int compareTo(IndexedResult o) {
            if (this.value == null && o.getValue() == null) {
                return 0;
            }
            if (o.value == null) {
                return 1;
            }
            if (this.value == null) {
                return -1;
            }
            return this.value.compareTo(o.value);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof IndexedResult)) {
                return false;
            }
            return this.compareTo((IndexedResult)obj) == 0;
        }

        public int hashCode() {
            return this.value.hashCode();
        }
    }
}

