/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import java.io.Closeable;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.channels.AsynchronousCloseException;
import java.util.HashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiver;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;

class DataXceiverServer
implements Runnable {
    public static final Logger LOG = DataNode.LOG;
    private static final int DEFAULT_RECONFIGURE_WAIT = 30;
    private final PeerServer peerServer;
    private final DataNode datanode;
    private final HashMap<Peer, Thread> peers = new HashMap();
    private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap();
    private final Lock lock = new ReentrantLock();
    private final Condition noPeers = this.lock.newCondition();
    private boolean closed = false;
    private int maxReconfigureWaitTime = 30;
    volatile int maxXceiverCount;
    final BlockBalanceThrottler balanceThrottler;
    private volatile DataTransferThrottler transferThrottler;
    private volatile DataTransferThrottler writeThrottler;
    private volatile DataTransferThrottler readThrottler;
    final long estimateBlockSize;

    DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) {
        this.peerServer = peerServer;
        this.datanode = datanode;
        this.maxXceiverCount = conf.getInt("dfs.datanode.max.transfer.threads", 4096);
        Preconditions.checkArgument((this.maxXceiverCount >= 1 ? 1 : 0) != 0, (Object)"dfs.datanode.max.transfer.threads should not be less than 1.");
        this.estimateBlockSize = conf.getLongBytes("dfs.blocksize", 0x8000000L);
        this.balanceThrottler = new BlockBalanceThrottler(conf.getLongBytes("dfs.datanode.balance.bandwidthPerSec", 0x6400000L), conf.getInt("dfs.datanode.balance.max.concurrent.moves", 100));
        this.initBandwidthPerSec(conf);
    }

    private void initBandwidthPerSec(Configuration conf) {
        long bandwidthPerSec = conf.getLongBytes("dfs.datanode.data.transfer.bandwidthPerSec", 0L);
        this.transferThrottler = bandwidthPerSec > 0L ? new DataTransferThrottler(bandwidthPerSec) : null;
        bandwidthPerSec = conf.getLongBytes("dfs.datanode.data.write.bandwidthPerSec", 0L);
        this.writeThrottler = bandwidthPerSec > 0L ? new DataTransferThrottler(bandwidthPerSec) : null;
        bandwidthPerSec = conf.getLongBytes("dfs.datanode.data.read.bandwidthPerSec", 0L);
        this.readThrottler = bandwidthPerSec > 0L ? new DataTransferThrottler(bandwidthPerSec) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Peer peer = null;
        while (this.datanode.shouldRun && !this.datanode.shutdownForUpgrade) {
            try {
                peer = this.peerServer.accept();
                int curXceiverCount = this.datanode.getXceiverCount();
                if (curXceiverCount > this.maxXceiverCount) {
                    throw new IOException("Xceiver count " + curXceiverCount + " exceeds the limit of concurrent xceivers: " + this.maxXceiverCount);
                }
                new Daemon(this.datanode.threadGroup, (Runnable)DataXceiver.create(peer, this.datanode, this)).start();
            }
            catch (SocketTimeoutException curXceiverCount) {
            }
            catch (AsynchronousCloseException ace) {
                if (!this.datanode.shouldRun || this.datanode.shutdownForUpgrade) continue;
                LOG.warn("{}:DataXceiverServer", (Object)this.datanode.getDisplayName(), (Object)ace);
            }
            catch (IOException ie) {
                IOUtils.closeStream((Closeable)peer);
                LOG.warn("{}:DataXceiverServer", (Object)this.datanode.getDisplayName(), (Object)ie);
            }
            catch (OutOfMemoryError ie) {
                IOUtils.closeStream((Closeable)peer);
                LOG.error("DataNode is out of memory. Will retry in 30 seconds.", (Throwable)ie);
                try {
                    Thread.sleep(TimeUnit.SECONDS.toMillis(30L));
                }
                catch (InterruptedException interruptedException) {
                }
            }
            catch (Throwable te) {
                LOG.error("{}:DataXceiverServer: Exiting.", (Object)this.datanode.getDisplayName(), (Object)te);
                this.datanode.shouldRun = false;
            }
        }
        this.lock.lock();
        try {
            if (!this.closed) {
                this.peerServer.close();
                this.closed = true;
            }
        }
        catch (IOException ie) {
            LOG.warn("{}:DataXceiverServer: close exception", (Object)this.datanode.getDisplayName(), (Object)ie);
        }
        finally {
            this.lock.unlock();
        }
        if (this.datanode.shutdownForUpgrade) {
            this.restartNotifyPeers();
            LOG.info("Shutting down DataXceiverServer before restart");
            this.waitAllPeers(2L, TimeUnit.SECONDS);
        }
        this.closeAllPeers();
    }

    void kill() {
        assert (!this.datanode.shouldRun || this.datanode.shutdownForUpgrade) : "shoudRun should be set to false or restarting should be true before killing";
        this.lock.lock();
        try {
            if (!this.closed) {
                this.peerServer.close();
                this.closed = true;
            }
        }
        catch (IOException ie) {
            LOG.warn("{}:DataXceiverServer.kill()", (Object)this.datanode.getDisplayName(), (Object)ie);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void addPeer(Peer peer, Thread t, DataXceiver xceiver) throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                throw new IOException("Server closed.");
            }
            this.peers.put(peer, t);
            this.peersXceiver.put(peer, xceiver);
            this.datanode.metrics.incrDataNodeActiveXceiversCount();
        }
        finally {
            this.lock.unlock();
        }
    }

    void closePeer(Peer peer) {
        this.lock.lock();
        try {
            this.peers.remove(peer);
            this.peersXceiver.remove(peer);
            this.datanode.metrics.decrDataNodeActiveXceiversCount();
            IOUtils.closeStream((Closeable)peer);
            if (this.peers.isEmpty()) {
                this.noPeers.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendOOBToPeers() {
        this.lock.lock();
        try {
            if (!this.datanode.shutdownForUpgrade) {
                return;
            }
            for (Peer p : this.peers.keySet()) {
                try {
                    this.peersXceiver.get(p).sendOOB();
                }
                catch (IOException e) {
                    LOG.warn("Got error when sending OOB message.", (Throwable)e);
                }
                catch (InterruptedException e) {
                    LOG.warn("Interrupted when sending OOB message.");
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public void stopWriters() {
        this.lock.lock();
        try {
            this.peers.keySet().forEach(p -> this.peersXceiver.get(p).stopWriter());
        }
        finally {
            this.lock.unlock();
        }
    }

    void restartNotifyPeers() {
        assert (this.datanode.shouldRun && this.datanode.shutdownForUpgrade);
        this.lock.lock();
        try {
            this.peers.values().forEach(t -> t.interrupt());
        }
        finally {
            this.lock.unlock();
        }
    }

    void closeAllPeers() {
        LOG.info("Closing all peers.");
        this.lock.lock();
        try {
            this.peers.keySet().forEach(IOUtils::closeStream);
            this.peers.clear();
            this.peersXceiver.clear();
            this.datanode.metrics.setDataNodeActiveXceiversCount(0);
            this.noPeers.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean waitAllPeers(long timeout, TimeUnit unit) {
        long nanos = unit.toNanos(timeout);
        this.lock.lock();
        try {
            while (!this.peers.isEmpty()) {
                if (nanos <= 0L) {
                    boolean bl = false;
                    return bl;
                }
                nanos = this.noPeers.awaitNanos(nanos);
            }
        }
        catch (InterruptedException e) {
            LOG.debug("Interrupted waiting for peers to close");
            boolean bl = false;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
        return true;
    }

    int getNumPeers() {
        this.lock.lock();
        try {
            int n = this.peers.size();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    @VisibleForTesting
    int getNumPeersXceiver() {
        this.lock.lock();
        try {
            int n = this.peersXceiver.size();
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    @VisibleForTesting
    PeerServer getPeerServer() {
        return this.peerServer;
    }

    public DataTransferThrottler getTransferThrottler() {
        return this.transferThrottler;
    }

    public DataTransferThrottler getWriteThrottler() {
        return this.writeThrottler;
    }

    public DataTransferThrottler getReadThrottler() {
        return this.readThrottler;
    }

    void releasePeer(Peer peer) {
        this.lock.lock();
        try {
            this.peers.remove(peer);
            this.peersXceiver.remove(peer);
            this.datanode.metrics.decrDataNodeActiveXceiversCount();
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean updateBalancerMaxConcurrentMovers(int movers) {
        return this.balanceThrottler.setMaxConcurrentMovers(movers, this.maxReconfigureWaitTime);
    }

    @VisibleForTesting
    void setMaxReconfigureWaitTime(int max) {
        this.maxReconfigureWaitTime = max;
    }

    public void setMaxXceiverCount(int xceiverCount) {
        Preconditions.checkArgument((xceiverCount > 0 ? 1 : 0) != 0, (Object)"dfs.datanode.max.transfer.threads should be larger than 0");
        this.maxXceiverCount = xceiverCount;
    }

    @VisibleForTesting
    public int getMaxXceiverCount() {
        return this.maxXceiverCount;
    }

    public void setTransferThrottler(DataTransferThrottler transferThrottler) {
        this.transferThrottler = transferThrottler;
    }

    public void setWriteThrottler(DataTransferThrottler writeThrottler) {
        this.writeThrottler = writeThrottler;
    }

    public void setReadThrottler(DataTransferThrottler readThrottler) {
        this.readThrottler = readThrottler;
    }

    static class BlockBalanceThrottler
    extends DataTransferThrottler {
        private final Semaphore semaphore;
        private int maxThreads;

        private BlockBalanceThrottler(long bandwidth, int maxThreads) {
            super(bandwidth);
            this.semaphore = new Semaphore(maxThreads, true);
            this.maxThreads = maxThreads;
            LOG.info("Balancing bandwidth is " + bandwidth + " bytes/s");
            LOG.info("Number threads for balancing is " + maxThreads);
        }

        private boolean setMaxConcurrentMovers(int newMaxThreads, int duration) {
            Preconditions.checkArgument((newMaxThreads > 0 ? 1 : 0) != 0);
            int delta = newMaxThreads - this.maxThreads;
            LOG.debug("Change concurrent thread count to {} from {}", (Object)newMaxThreads, (Object)this.maxThreads);
            if (delta == 0) {
                return true;
            }
            if (delta > 0) {
                LOG.debug("Adding thread capacity: {}", (Object)delta);
                this.semaphore.release(delta);
                this.maxThreads = newMaxThreads;
                return true;
            }
            try {
                LOG.debug("Removing thread capacity: {}. Max wait: {}", (Object)delta, (Object)duration);
                boolean acquired = this.semaphore.tryAcquire(Math.abs(delta), duration, TimeUnit.SECONDS);
                if (acquired) {
                    this.maxThreads = newMaxThreads;
                } else {
                    LOG.warn("Could not lower thread count to {} from {}. Too busy.", (Object)newMaxThreads, (Object)this.maxThreads);
                }
                return acquired;
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted before adjusting thread count: {}", (Object)delta);
                return false;
            }
        }

        @VisibleForTesting
        int getMaxConcurrentMovers() {
            return this.maxThreads;
        }

        boolean acquire() {
            return this.semaphore.tryAcquire();
        }

        void release() {
            this.semaphore.release();
        }
    }
}

