/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FsDatasetAsyncDiskService {
    public static final Logger LOG = LoggerFactory.getLogger(FsDatasetAsyncDiskService.class);
    private static final int CORE_THREADS_PER_VOLUME = 1;
    private final int maxNumThreadsPerVolume;
    private static final long THREADS_KEEP_ALIVE_SECONDS = 60L;
    private final DataNode datanode;
    private final FsDatasetImpl fsdatasetImpl;
    private Map<String, ThreadPoolExecutor> executors = new HashMap<String, ThreadPoolExecutor>();
    private Map<String, Set<Long>> deletedBlockIds = new HashMap<String, Set<Long>>();
    private static final int MAX_DELETED_BLOCKS = 64;
    private int numDeletedBlocks = 0;

    FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) {
        this.datanode = datanode;
        this.fsdatasetImpl = fsdatasetImpl;
        this.maxNumThreadsPerVolume = datanode.getConf().getInt("dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume", 4);
        Preconditions.checkArgument((this.maxNumThreadsPerVolume > 0 ? 1 : 0) != 0, (Object)"dfs.datanode.fsdatasetasyncdisk.max.threads.per.volume must be a positive integer.");
    }

    private void addExecutorForVolume(final FsVolumeImpl volume) {
        ThreadFactory threadFactory = new ThreadFactory(){
            int counter = 0;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Thread newThread(Runnable r) {
                int thisIndex;
                1 var3_2 = this;
                synchronized (var3_2) {
                    thisIndex = this.counter++;
                }
                Thread t = new Thread(r);
                t.setName("Async disk worker #" + thisIndex + " for volume " + volume);
                return t;
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, this.maxNumThreadsPerVolume, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        executor.allowCoreThreadTimeOut(true);
        this.executors.put(volume.getStorageID(), executor);
    }

    synchronized void addVolume(FsVolumeImpl volume) {
        if (this.executors == null) {
            throw new RuntimeException("AsyncDiskService is already shutdown");
        }
        if (volume == null) {
            throw new RuntimeException("Attempt to add a null volume");
        }
        ThreadPoolExecutor executor = this.executors.get(volume.getStorageID());
        if (executor != null) {
            throw new RuntimeException("Volume " + volume + " is already existed.");
        }
        this.addExecutorForVolume(volume);
    }

    synchronized void removeVolume(String storageId) {
        if (this.executors == null) {
            throw new RuntimeException("AsyncDiskService is already shutdown");
        }
        ThreadPoolExecutor executor = this.executors.get(storageId);
        if (executor == null) {
            throw new RuntimeException("Can not find volume with storageId " + storageId + " to remove.");
        }
        executor.shutdown();
        this.executors.remove(storageId);
    }

    synchronized long countPendingDeletions() {
        long count = 0L;
        for (ThreadPoolExecutor exec : this.executors.values()) {
            count += exec.getTaskCount() - exec.getCompletedTaskCount();
        }
        return count;
    }

    synchronized void execute(FsVolumeImpl volume, Runnable task) {
        try {
            if (this.executors == null) {
                throw new RuntimeException("AsyncDiskService is already shutdown");
            }
            if (volume == null) {
                throw new RuntimeException("A null volume does not have a executor");
            }
            ThreadPoolExecutor executor = this.executors.get(volume.getStorageID());
            if (executor == null) {
                throw new RuntimeException("Cannot find volume " + volume + " for execution of task " + task);
            }
            executor.execute(task);
        }
        catch (RuntimeException re) {
            if (task instanceof ReplicaFileDeleteTask) {
                IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{((ReplicaFileDeleteTask)task).volumeRef});
            }
            throw re;
        }
    }

    synchronized void shutdown() {
        if (this.executors == null) {
            LOG.warn("AsyncDiskService has already shut down.");
        } else {
            LOG.info("Shutting down all async disk service threads");
            for (Map.Entry<String, ThreadPoolExecutor> e : this.executors.entrySet()) {
                e.getValue().shutdown();
            }
            this.executors = null;
            LOG.info("All async disk service threads have been shut down");
        }
    }

    public void submitSyncFileRangeRequest(FsVolumeImpl volume, ReplicaOutputStreams streams, long offset, long nbytes, int flags) {
        this.execute(volume, () -> {
            try {
                streams.syncFileRangeIfPossible(offset, nbytes, flags);
            }
            catch (NativeIOException e) {
                try {
                    LOG.warn("sync_file_range error. Volume: {}, Capacity: {}, Available space: {}, File range offset: {}, length: {}, flags: {}", new Object[]{volume, volume.getCapacity(), volume.getAvailable(), offset, nbytes, flags, e});
                }
                catch (IOException ioe) {
                    LOG.warn("sync_file_range error. Volume: {}, Capacity: {}, File range offset: {}, length: {}, flags: {}", new Object[]{volume, volume.getCapacity(), offset, nbytes, flags, e});
                }
            }
        });
    }

    void deleteAsync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete, ExtendedBlock block, String trashDirectory) {
        LOG.info("Scheduling " + block.getLocalBlock() + " replica " + replicaToDelete + " on volume " + replicaToDelete.getVolume() + " for deletion");
        ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef, replicaToDelete, block, trashDirectory);
        this.execute((FsVolumeImpl)volumeRef.getVolume(), deletionTask);
    }

    void deleteSync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete, ExtendedBlock block, String trashDirectory) {
        LOG.info("Deleting " + block.getLocalBlock() + " replica " + replicaToDelete);
        ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef, replicaToDelete, block, trashDirectory);
        deletionTask.run();
    }

    private synchronized void updateDeletedBlockId(ExtendedBlock block) {
        Set<Long> blockIds = this.deletedBlockIds.get(block.getBlockPoolId());
        if (blockIds == null) {
            blockIds = new HashSet<Long>();
            this.deletedBlockIds.put(block.getBlockPoolId(), blockIds);
        }
        blockIds.add(block.getBlockId());
        ++this.numDeletedBlocks;
        if (this.numDeletedBlocks == 64) {
            for (Map.Entry<String, Set<Long>> e : this.deletedBlockIds.entrySet()) {
                String bpid = e.getKey();
                Set<Long> bs = e.getValue();
                this.fsdatasetImpl.removeDeletedBlocks(bpid, bs);
                bs.clear();
            }
            this.numDeletedBlocks = 0;
        }
    }

    class ReplicaFileDeleteTask
    implements Runnable {
        private final FsVolumeReference volumeRef;
        private final FsVolumeImpl volume;
        private final ReplicaInfo replicaToDelete;
        private final ExtendedBlock block;
        private final String trashDirectory;

        ReplicaFileDeleteTask(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete, ExtendedBlock block, String trashDirectory) {
            this.volumeRef = volumeRef;
            this.volume = (FsVolumeImpl)volumeRef.getVolume();
            this.replicaToDelete = replicaToDelete;
            this.block = block;
            this.trashDirectory = trashDirectory;
        }

        public String toString() {
            return "deletion of block " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " with block file " + this.replicaToDelete.getBlockURI() + " and meta file " + this.replicaToDelete.getMetadataURI() + " from volume " + this.volume;
        }

        private boolean deleteFiles() {
            return this.replicaToDelete.deleteBlockData() && (this.replicaToDelete.deleteMetadata() || !this.replicaToDelete.metadataExists());
        }

        private boolean moveFiles() {
            if (this.trashDirectory == null) {
                LOG.error("Trash dir for replica " + this.replicaToDelete + " is null");
                return false;
            }
            File trashDirFile = new File(this.trashDirectory);
            try {
                this.volume.getFileIoProvider().mkdirsWithExistsCheck(this.volume, trashDirFile);
            }
            catch (IOException e) {
                return false;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Moving files " + this.replicaToDelete.getBlockURI() + " and " + this.replicaToDelete.getMetadataURI() + " to trash.");
            }
            String blockName = this.replicaToDelete.getBlockName();
            long genstamp = this.replicaToDelete.getGenerationStamp();
            File newBlockFile = new File(this.trashDirectory, blockName);
            File newMetaFile = new File(this.trashDirectory, DatanodeUtil.getMetaName(blockName, genstamp));
            try {
                return this.replicaToDelete.renameData(newBlockFile.toURI()) && this.replicaToDelete.renameMeta(newMetaFile.toURI());
            }
            catch (IOException e) {
                LOG.error("Error moving files to trash: " + this.replicaToDelete, (Throwable)e);
                return false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                boolean result;
                long blockLength = this.replicaToDelete.getBlockDataLength();
                long metaLength = this.replicaToDelete.getMetadataLength();
                boolean bl = result = this.trashDirectory == null ? this.deleteFiles() : this.moveFiles();
                if (!result) {
                    LOG.warn("Unexpected error trying to " + (this.trashDirectory == null ? "delete" : "move") + " block " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " at file " + this.replicaToDelete.getBlockURI() + ". Ignored.");
                } else {
                    if (this.block.getLocalBlock().getNumBytes() != Long.MAX_VALUE) {
                        FsDatasetAsyncDiskService.this.datanode.notifyNamenodeDeletedBlock(this.block, this.volume.getStorageID());
                    }
                    this.volume.onBlockFileDeletion(this.block.getBlockPoolId(), blockLength);
                    this.volume.onMetaFileDeletion(this.block.getBlockPoolId(), metaLength);
                    LOG.info("Deleted " + this.block.getBlockPoolId() + " " + this.block.getLocalBlock() + " URI " + this.replicaToDelete.getBlockURI());
                }
                FsDatasetAsyncDiskService.this.updateDeletedBlockId(this.block);
            }
            catch (Throwable throwable) {
                IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{this.volumeRef});
                throw throwable;
            }
            IOUtils.cleanupWithLogger(null, (Closeable[])new Closeable[]{this.volumeRef});
        }
    }
}

