package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hdfs/DFSStripedOutputStream.class */
public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities {
    private static final ByteBufferPool BUFFER_POOL;
    private final ExceptionLastSeen exceptionLastSeen;
    private final Coordinator coordinator;
    private final CellBuffers cellBuffers;
    private final ErasureCodingPolicy ecPolicy;
    private final RawErasureEncoder encoder;
    private final List<StripedDataStreamer> streamers;
    private final DFSPacket[] currentPackets;
    private final int cellSize;
    private final int numAllBlocks;
    private final int numDataBlocks;
    private ExtendedBlock currentBlockGroup;
    private ExtendedBlock prevBlockGroup4Append;
    private final String[] favoredNodes;
    private final List<StripedDataStreamer> failedStreamers;
    private final Map<Integer, Integer> corruptBlockCountMap;
    private ExecutorService flushAllExecutor;
    private CompletionService<Void> flushAllExecutorCompletionService;
    private int blockGroupIndex;
    private long datanodeRestartTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/DFSStripedOutputStream$CellBuffers.class */
    public class CellBuffers {
        private final ByteBuffer[] buffers;
        private final byte[][] checksumArrays;

        /* JADX WARN: Type inference failed for: r1v4, types: [byte[], byte[][]] */
        CellBuffers(int i) {
            if (DFSStripedOutputStream.this.cellSize % DFSStripedOutputStream.this.bytesPerChecksum != 0) {
                throw new HadoopIllegalArgumentException("Invalid values: dfs.bytes-per-checksum (=" + DFSStripedOutputStream.this.bytesPerChecksum + ") must divide cell size (=" + DFSStripedOutputStream.this.cellSize + ").");
            }
            this.checksumArrays = new byte[i];
            int checksumSize = DFSStripedOutputStream.this.getChecksumSize() * (DFSStripedOutputStream.this.cellSize / DFSStripedOutputStream.this.bytesPerChecksum);
            for (int i2 = 0; i2 < this.checksumArrays.length; i2++) {
                this.checksumArrays[i2] = new byte[checksumSize];
            }
            this.buffers = new ByteBuffer[DFSStripedOutputStream.this.numAllBlocks];
            for (int i3 = 0; i3 < this.buffers.length; i3++) {
                this.buffers[i3] = DFSStripedOutputStream.BUFFER_POOL.getBuffer(DFSStripedOutputStream.this.useDirectBuffer(), DFSStripedOutputStream.this.cellSize);
                this.buffers[i3].limit(DFSStripedOutputStream.this.cellSize);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ByteBuffer[] getBuffers() {
            return this.buffers;
        }

        byte[] getChecksumArray(int i) {
            return this.checksumArrays[i - DFSStripedOutputStream.this.numDataBlocks];
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int addTo(int i, byte[] bArr, int i2, int i3) {
            ByteBuffer byteBuffer = this.buffers[i];
            int position = byteBuffer.position() + i3;
            Preconditions.checkState(position <= DFSStripedOutputStream.this.cellSize);
            byteBuffer.put(bArr, i2, i3);
            return position;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void clear() {
            for (int i = 0; i < DFSStripedOutputStream.this.numAllBlocks; i++) {
                this.buffers[i].clear();
                this.buffers[i].limit(DFSStripedOutputStream.this.cellSize);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void release() {
            for (int i = 0; i < DFSStripedOutputStream.this.numAllBlocks; i++) {
                if (this.buffers[i] != null) {
                    DFSStripedOutputStream.BUFFER_POOL.putBuffer(this.buffers[i]);
                    this.buffers[i] = null;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flipDataBuffers() {
            for (int i = 0; i < DFSStripedOutputStream.this.numDataBlocks; i++) {
                this.buffers[i].flip();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/DFSStripedOutputStream$Coordinator.class */
    public static class Coordinator {
        private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
        private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
        private final MultipleBlockingQueue<LocatedBlock> newBlocks;
        private final Map<StripedDataStreamer, Boolean> updateStreamerMap;
        private final MultipleBlockingQueue<Boolean> streamerUpdateResult;
        static final /* synthetic */ boolean $assertionsDisabled;

        Coordinator(int i) {
            this.followingBlocks = new MultipleBlockingQueue<>(i, 1);
            this.endBlocks = new MultipleBlockingQueue<>(i, 1);
            this.newBlocks = new MultipleBlockingQueue<>(i, 1);
            this.updateStreamerMap = new ConcurrentHashMap(i);
            this.streamerUpdateResult = new MultipleBlockingQueue<>(i, 1);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
            return this.followingBlocks;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
            return this.newBlocks;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void offerEndBlock(int i, ExtendedBlock extendedBlock) {
            this.endBlocks.offer(i, extendedBlock);
        }

        void offerStreamerUpdateResult(int i, boolean z) {
            this.streamerUpdateResult.offer(i, Boolean.valueOf(z));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean takeStreamerUpdateResult(int i) throws InterruptedIOException {
            return this.streamerUpdateResult.take(i).booleanValue();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void updateStreamer(StripedDataStreamer stripedDataStreamer, boolean z) {
            if (!$assertionsDisabled && this.updateStreamerMap.containsKey(stripedDataStreamer)) {
                throw new AssertionError();
            }
            this.updateStreamerMap.put(stripedDataStreamer, Boolean.valueOf(z));
        }

        void clearFailureStates() {
            this.newBlocks.clear();
            this.updateStreamerMap.clear();
            this.streamerUpdateResult.clear();
        }

        static {
            $assertionsDisabled = !DFSStripedOutputStream.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/DFSStripedOutputStream$MultipleBlockingQueue.class */
    public static class MultipleBlockingQueue<T> {
        private final List<BlockingQueue<T>> queues;

        MultipleBlockingQueue(int i, int i2) {
            this.queues = new ArrayList(i);
            for (int i3 = 0; i3 < i; i3++) {
                this.queues.add(new LinkedBlockingQueue(i2));
            }
        }

        void offer(int i, T t) {
            Preconditions.checkState(this.queues.get(i).offer(t), "Failed to offer " + t + " to queue, i=" + i);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public T take(int i) throws InterruptedIOException {
            try {
                return this.queues.get(i).take();
            } catch (InterruptedException e) {
                throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e);
            }
        }

        T takeWithTimeout(int i) throws InterruptedIOException {
            try {
                return this.queues.get(i).poll(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                throw DFSUtilClient.toInterruptedIOException("take interrupted, i=" + i, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public T poll(int i) {
            return this.queues.get(i).poll();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public T peek(int i) {
            return this.queues.get(i).peek();
        }

        void clear() {
            Iterator<BlockingQueue<T>> it = this.queues.iterator();
            while (it.hasNext()) {
                it.next().clear();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DFSStripedOutputStream(DFSClient dFSClient, String str, HdfsFileStatus hdfsFileStatus, EnumSet<CreateFlag> enumSet, Progressable progressable, DataChecksum dataChecksum, String[] strArr) throws IOException {
        super(dFSClient, str, hdfsFileStatus, enumSet, progressable, dataChecksum, strArr, false);
        this.exceptionLastSeen = new ExceptionLastSeen();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating DFSStripedOutputStream for " + str);
        }
        this.ecPolicy = hdfsFileStatus.getErasureCodingPolicy();
        int numParityUnits = this.ecPolicy.getNumParityUnits();
        this.cellSize = this.ecPolicy.getCellSize();
        this.numDataBlocks = this.ecPolicy.getNumDataUnits();
        this.numAllBlocks = this.numDataBlocks + numParityUnits;
        this.favoredNodes = strArr;
        this.failedStreamers = new ArrayList();
        this.corruptBlockCountMap = new LinkedHashMap();
        this.flushAllExecutor = Executors.newFixedThreadPool(this.numAllBlocks);
        this.flushAllExecutorCompletionService = new ExecutorCompletionService(this.flushAllExecutor);
        this.encoder = CodecUtil.createRawEncoder(dFSClient.getConfiguration(), this.ecPolicy.getCodecName(), new ErasureCoderOptions(this.numDataBlocks, numParityUnits));
        this.coordinator = new Coordinator(this.numAllBlocks);
        this.cellBuffers = new CellBuffers(numParityUnits);
        this.streamers = new ArrayList(this.numAllBlocks);
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= this.numAllBlocks) {
                this.currentPackets = new DFSPacket[this.streamers.size()];
                this.datanodeRestartTimeout = dFSClient.getConf().getDatanodeRestartTimeout();
                setCurrentStreamer(0);
                return;
            } else {
                this.streamers.add(new StripedDataStreamer(hdfsFileStatus, dFSClient, str, progressable, dataChecksum, this.cachingStrategy, this.byteArrayManager, strArr, s2, this.coordinator, getAddBlockFlags()));
                s = (short) (s2 + 1);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DFSStripedOutputStream(DFSClient dFSClient, String str, EnumSet<CreateFlag> enumSet, Progressable progressable, LocatedBlock locatedBlock, HdfsFileStatus hdfsFileStatus, DataChecksum dataChecksum, String[] strArr) throws IOException {
        this(dFSClient, str, hdfsFileStatus, enumSet, progressable, dataChecksum, strArr);
        this.initialFileSize = hdfsFileStatus.getLen();
        this.prevBlockGroup4Append = locatedBlock != null ? locatedBlock.getBlock() : null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean useDirectBuffer() {
        return this.encoder.preferDirectBuffer();
    }

    StripedDataStreamer getStripedDataStreamer(int i) {
        return this.streamers.get(i);
    }

    int getCurrentIndex() {
        return getCurrentStreamer().getIndex();
    }

    private synchronized StripedDataStreamer getCurrentStreamer() {
        return (StripedDataStreamer) this.streamer;
    }

    private synchronized StripedDataStreamer setCurrentStreamer(int i) {
        int indexOf;
        if (this.streamer != null && (indexOf = this.streamers.indexOf(getCurrentStreamer())) >= 0) {
            this.currentPackets[indexOf] = this.currentPacket;
        }
        this.streamer = getStripedDataStreamer(i);
        this.currentPacket = this.currentPackets[i];
        adjustChunkBoundary();
        return getCurrentStreamer();
    }

    private static void encode(RawErasureEncoder rawErasureEncoder, int i, ByteBuffer[] byteBufferArr) throws IOException {
        ByteBuffer[] byteBufferArr2 = new ByteBuffer[i];
        ByteBuffer[] byteBufferArr3 = new ByteBuffer[byteBufferArr.length - i];
        System.arraycopy(byteBufferArr, 0, byteBufferArr2, 0, byteBufferArr2.length);
        System.arraycopy(byteBufferArr, i, byteBufferArr3, 0, byteBufferArr3.length);
        rawErasureEncoder.encode(byteBufferArr2, byteBufferArr3);
    }

    private Set<StripedDataStreamer> checkStreamers() throws IOException {
        HashSet hashSet = new HashSet();
        for (StripedDataStreamer stripedDataStreamer : this.streamers) {
            if (!stripedDataStreamer.isHealthy() && !this.failedStreamers.contains(stripedDataStreamer)) {
                hashSet.add(stripedDataStreamer);
            }
        }
        int size = this.failedStreamers.size() + hashSet.size();
        if (LOG.isDebugEnabled()) {
            LOG.debug("checkStreamers: " + this.streamers);
            LOG.debug("healthy streamer count=" + (this.numAllBlocks - size));
            LOG.debug("original failed streamers: " + this.failedStreamers);
            LOG.debug("newly failed streamers: " + hashSet);
        }
        if (size <= this.numAllBlocks - this.numDataBlocks) {
            return hashSet;
        }
        closeAllStreamers();
        throw new IOException("Failed: the number of failed blocks = " + size + " > the number of parity blocks = " + (this.numAllBlocks - this.numDataBlocks));
    }

    private void closeAllStreamers() {
        Iterator<StripedDataStreamer> it = this.streamers.iterator();
        while (it.hasNext()) {
            it.next().close(true);
        }
    }

    private void handleCurrentStreamerFailure(String str, Exception exc) throws IOException {
        this.currentPacket = null;
        handleStreamerFailure(str, exc, getCurrentStreamer());
    }

    private void handleStreamerFailure(String str, Exception exc, StripedDataStreamer stripedDataStreamer) throws IOException {
        LOG.warn("Failed: " + str + ", " + this, exc);
        stripedDataStreamer.getErrorState().setInternalError();
        stripedDataStreamer.close(true);
        checkStreamers();
        this.currentPackets[stripedDataStreamer.getIndex()] = null;
    }

    private void replaceFailedStreamers() {
        if (!$assertionsDisabled && this.streamers.size() != this.numAllBlocks) {
            throw new AssertionError();
        }
        int currentIndex = getCurrentIndex();
        if (!$assertionsDisabled && currentIndex != 0) {
            throw new AssertionError();
        }
        short s = 0;
        while (true) {
            short s2 = s;
            if (s2 >= this.numAllBlocks) {
                return;
            }
            StripedDataStreamer stripedDataStreamer = getStripedDataStreamer(s2);
            if (!stripedDataStreamer.isHealthy()) {
                LOG.info("replacing previously failed streamer " + stripedDataStreamer);
                StripedDataStreamer stripedDataStreamer2 = new StripedDataStreamer(stripedDataStreamer.stat, this.dfsClient, this.src, stripedDataStreamer.progress, stripedDataStreamer.checksum4WriteBlock, this.cachingStrategy, this.byteArrayManager, this.favoredNodes, s2, this.coordinator, getAddBlockFlags());
                this.streamers.set(s2, stripedDataStreamer2);
                this.currentPackets[s2] = null;
                if (s2 == currentIndex) {
                    this.streamer = stripedDataStreamer2;
                    this.currentPacket = null;
                }
                stripedDataStreamer2.start();
            }
            s = (short) (s2 + 1);
        }
    }

    private void waitEndBlocks(int i) throws IOException {
        while (getStripedDataStreamer(i).isHealthy()) {
            ExtendedBlock extendedBlock = (ExtendedBlock) this.coordinator.endBlocks.takeWithTimeout(i);
            if (extendedBlock != null) {
                StripedBlockUtil.checkBlocks(this.currentBlockGroup, i, extendedBlock);
                return;
            }
        }
    }

    private DatanodeInfo[] getExcludedNodes() {
        ArrayList arrayList = new ArrayList();
        Iterator<StripedDataStreamer> it = this.streamers.iterator();
        while (it.hasNext()) {
            for (DatanodeInfo datanodeInfo : it.next().getExcludedNodes()) {
                if (datanodeInfo != null) {
                    arrayList.add(datanodeInfo);
                }
            }
        }
        return (DatanodeInfo[]) arrayList.toArray(new DatanodeInfo[arrayList.size()]);
    }

    private void allocateNewBlock() throws IOException {
        if (this.currentBlockGroup != null) {
            for (int i = 0; i < this.numAllBlocks; i++) {
                waitEndBlocks(i);
            }
        }
        this.failedStreamers.clear();
        DatanodeInfo[] excludedNodes = getExcludedNodes();
        LOG.debug("Excluding DataNodes when allocating new block: " + Arrays.asList(excludedNodes));
        ExtendedBlock extendedBlock = this.currentBlockGroup;
        if (this.prevBlockGroup4Append != null) {
            extendedBlock = this.prevBlockGroup4Append;
            this.prevBlockGroup4Append = null;
        }
        replaceFailedStreamers();
        LOG.debug("Allocating new block group. The previous block group: " + extendedBlock);
        try {
            LocatedBlock addBlock = addBlock(excludedNodes, this.dfsClient, this.src, extendedBlock, this.fileId, this.favoredNodes, getAddBlockFlags());
            if (!$assertionsDisabled && !addBlock.isStriped()) {
                throw new AssertionError();
            }
            this.currentBlockGroup = addBlock.getBlock();
            this.blockGroupIndex++;
            LocatedBlock[] parseStripedBlockGroup = StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) addBlock, this.cellSize, this.numDataBlocks, this.numAllBlocks - this.numDataBlocks);
            for (int i2 = 0; i2 < parseStripedBlockGroup.length; i2++) {
                StripedDataStreamer stripedDataStreamer = getStripedDataStreamer(i2);
                if (!$assertionsDisabled && !stripedDataStreamer.isHealthy()) {
                    throw new AssertionError();
                }
                if (parseStripedBlockGroup[i2] != null) {
                    this.coordinator.getFollowingBlocks().offer(i2, parseStripedBlockGroup[i2]);
                } else {
                    if (!$assertionsDisabled && i2 < this.numDataBlocks) {
                        throw new AssertionError();
                    }
                    LOG.warn("Cannot allocate parity block(index={}, policy={}). Exclude nodes={}. There may not be enough datanodes or racks. You can check if the cluster topology supports the enabled erasure coding policies by running the command 'hdfs ec -verifyClusterSetup'.", new Object[]{Integer.valueOf(i2), this.ecPolicy.getName(), excludedNodes});
                    stripedDataStreamer.getLastException().set(new IOException("Failed to get parity block, index=" + i2));
                    stripedDataStreamer.getErrorState().setInternalError();
                    stripedDataStreamer.close(true);
                }
            }
        } catch (IOException e) {
            closeAllStreamers();
            throw e;
        }
    }

    private boolean shouldEndBlockGroup() {
        return this.currentBlockGroup != null && this.currentBlockGroup.getNumBytes() == this.blockSize * ((long) this.numDataBlocks);
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    protected synchronized void writeChunk(byte[] bArr, int i, int i2, byte[] bArr2, int i3, int i4) throws IOException {
        int currentIndex = getCurrentIndex();
        boolean z = this.cellBuffers.addTo(currentIndex, bArr, i, i2) == this.cellSize;
        if (this.currentBlockGroup == null || shouldEndBlockGroup()) {
            allocateNewBlock();
        }
        this.currentBlockGroup.setNumBytes(this.currentBlockGroup.getNumBytes() + i2);
        if (getCurrentStreamer().isHealthy()) {
            try {
                super.writeChunk(bArr, i, i2, bArr2, i3, i4);
            } catch (Exception e) {
                handleCurrentStreamerFailure("offset=" + i + ", length=" + i2, e);
            }
        }
        if (z) {
            int i5 = currentIndex + 1;
            if (i5 == this.numDataBlocks) {
                this.cellBuffers.flipDataBuffers();
                writeParityCells();
                i5 = 0;
                if (shouldEndBlockGroup()) {
                    flushAllInternals();
                    checkStreamerFailures(false);
                    for (int i6 = 0; i6 < this.numAllBlocks; i6++) {
                        if (setCurrentStreamer(i6).isHealthy()) {
                            try {
                                endBlock();
                            } catch (IOException e2) {
                            }
                        }
                    }
                } else {
                    checkStreamerFailures(true);
                }
            }
            setCurrentStreamer(i5);
        }
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    synchronized void enqueueCurrentPacketFull() throws IOException {
        LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}, appendChunk={}, {}", new Object[]{this.currentPacket, this.src, Long.valueOf(getStreamer().getBytesCurBlock()), Long.valueOf(this.blockSize), Boolean.valueOf(getStreamer().getAppendChunk()), getStreamer()});
        enqueueCurrentPacket();
        adjustChunkBoundary();
    }

    private boolean isStreamerWriting(int i) {
        long numBytes = this.currentBlockGroup == null ? 0L : this.currentBlockGroup.getNumBytes();
        if (numBytes == 0) {
            return false;
        }
        return i >= this.numDataBlocks || i < ((int) (((numBytes - 1) / ((long) this.cellSize)) + 1));
    }

    private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < this.numAllBlocks; i++) {
            StripedDataStreamer stripedDataStreamer = getStripedDataStreamer(i);
            if (stripedDataStreamer.isHealthy() && isStreamerWriting(i)) {
                Preconditions.checkState(stripedDataStreamer.getStage() == BlockConstructionStage.DATA_STREAMING, "streamer: " + stripedDataStreamer);
                stripedDataStreamer.setExternalError();
                hashSet.add(stripedDataStreamer);
            } else if (!stripedDataStreamer.streamerClosed() && stripedDataStreamer.getErrorState().hasDatanodeError() && stripedDataStreamer.getErrorState().doWaitForRestart()) {
                hashSet.add(stripedDataStreamer);
                this.failedStreamers.remove(stripedDataStreamer);
            }
        }
        return hashSet;
    }

    private void checkStreamerFailures(boolean z) throws IOException {
        if (checkStreamers().size() == 0) {
            return;
        }
        if (z) {
            flushAllInternals();
        }
        Set<StripedDataStreamer> checkStreamers = checkStreamers();
        while (checkStreamers.size() > 0) {
            this.failedStreamers.addAll(checkStreamers);
            this.coordinator.clearFailureStates();
            this.corruptBlockCountMap.put(Integer.valueOf(this.blockGroupIndex), Integer.valueOf(this.failedStreamers.size()));
            Set<StripedDataStreamer> markExternalErrorOnStreamers = markExternalErrorOnStreamers();
            ExtendedBlock updateBlockForPipeline = updateBlockForPipeline(markExternalErrorOnStreamers);
            checkStreamers = waitCreatingStreamers(markExternalErrorOnStreamers);
            if (checkStreamers.size() + this.failedStreamers.size() > this.numAllBlocks - this.numDataBlocks) {
                closeAllStreamers();
                throw new IOException("Data streamers failed while creating new block streams: " + checkStreamers + ". There are not enough healthy streamers.");
            }
            for (StripedDataStreamer stripedDataStreamer : checkStreamers) {
                if (!$assertionsDisabled && stripedDataStreamer.isHealthy()) {
                    throw new AssertionError();
                }
            }
            if (checkStreamers.size() == 0) {
                for (StripedDataStreamer stripedDataStreamer2 : markExternalErrorOnStreamers) {
                    if (!$assertionsDisabled && !stripedDataStreamer2.isHealthy()) {
                        throw new AssertionError();
                    }
                    stripedDataStreamer2.getErrorState().reset();
                }
                updatePipeline(updateBlockForPipeline);
            }
            for (int i = 0; i < this.numAllBlocks; i++) {
                this.coordinator.offerStreamerUpdateResult(i, checkStreamers.size() == 0);
            }
            if (checkStreamers.size() != 0) {
                try {
                    Thread.sleep(this.datanodeRestartTimeout);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private int checkStreamerUpdates(Set<StripedDataStreamer> set, Set<StripedDataStreamer> set2) {
        for (StripedDataStreamer stripedDataStreamer : set2) {
            if (!this.coordinator.updateStreamerMap.containsKey(stripedDataStreamer) && !stripedDataStreamer.isHealthy() && this.coordinator.getNewBlocks().peek(stripedDataStreamer.getIndex()) != null) {
                set.add(stripedDataStreamer);
            }
        }
        return this.coordinator.updateStreamerMap.size() + set.size();
    }

    private Set<StripedDataStreamer> waitCreatingStreamers(Set<StripedDataStreamer> set) throws IOException {
        HashSet hashSet = new HashSet();
        int size = set.size();
        long socketTimeout = this.dfsClient.getConf().getSocketTimeout();
        long j = socketTimeout > 0 ? socketTimeout / 2 : Long.MAX_VALUE;
        synchronized (this.coordinator) {
            while (checkStreamerUpdates(hashSet, set) < size && j > 0) {
                try {
                    long monotonicNow = Time.monotonicNow();
                    this.coordinator.wait(1000L);
                    j -= Time.monotonicNow() - monotonicNow;
                } catch (InterruptedException e) {
                    throw DFSUtilClient.toInterruptedIOException("Interrupted when waiting for results of updating striped streamers", e);
                }
            }
        }
        synchronized (this.coordinator) {
            for (StripedDataStreamer stripedDataStreamer : set) {
                if (!this.coordinator.updateStreamerMap.containsKey(stripedDataStreamer)) {
                    LOG.info("close the slow stream " + stripedDataStreamer);
                    stripedDataStreamer.setStreamerAsClosed();
                    hashSet.add(stripedDataStreamer);
                }
            }
        }
        for (Map.Entry entry : this.coordinator.updateStreamerMap.entrySet()) {
            if (!((Boolean) entry.getValue()).booleanValue()) {
                hashSet.add(entry.getKey());
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            set.remove((StripedDataStreamer) it.next());
        }
        return hashSet;
    }

    private ExtendedBlock updateBlockForPipeline(Set<StripedDataStreamer> set) throws IOException {
        LocatedBlock updateBlockForPipeline = this.dfsClient.namenode.updateBlockForPipeline(this.currentBlockGroup, this.dfsClient.clientName);
        long generationStamp = updateBlockForPipeline.getBlock().getGenerationStamp();
        ExtendedBlock extendedBlock = new ExtendedBlock(this.currentBlockGroup);
        extendedBlock.setGenerationStamp(generationStamp);
        LocatedBlock[] parseStripedBlockGroup = StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) updateBlockForPipeline, this.cellSize, this.numDataBlocks, this.numAllBlocks - this.numDataBlocks);
        for (int i = 0; i < this.numAllBlocks; i++) {
            if (set.contains(getStripedDataStreamer(i))) {
                LocatedBlock locatedBlock = new LocatedBlock(new ExtendedBlock(extendedBlock), (DatanodeInfoWithStorage[]) null, (String[]) null, (StorageType[]) null, -1L, updateBlockForPipeline.isCorrupt(), (DatanodeInfo[]) null);
                locatedBlock.setBlockToken(parseStripedBlockGroup[i].getBlockToken());
                this.coordinator.getNewBlocks().offer(i, locatedBlock);
            }
        }
        return extendedBlock;
    }

    private void updatePipeline(ExtendedBlock extendedBlock) throws IOException {
        DatanodeInfo[] datanodeInfoArr = new DatanodeInfo[this.numAllBlocks];
        String[] strArr = new String[this.numAllBlocks];
        for (int i = 0; i < this.numAllBlocks; i++) {
            StripedDataStreamer stripedDataStreamer = getStripedDataStreamer(i);
            DatanodeInfo[] nodes = stripedDataStreamer.getNodes();
            String[] storageIDs = stripedDataStreamer.getStorageIDs();
            if (!stripedDataStreamer.isHealthy() || nodes == null || storageIDs == null) {
                datanodeInfoArr[i] = new DatanodeInfo.DatanodeInfoBuilder().setNodeID(DatanodeID.EMPTY_DATANODE_ID).build();
                strArr[i] = "";
            } else {
                datanodeInfoArr[i] = nodes[0];
                strArr[i] = storageIDs[0];
            }
        }
        long numBytes = this.currentBlockGroup.getNumBytes();
        long ackedLength = getAckedLength();
        Preconditions.checkState(ackedLength <= numBytes, "Acked:" + ackedLength + ", Sent:" + numBytes);
        this.currentBlockGroup.setNumBytes(ackedLength);
        extendedBlock.setNumBytes(ackedLength);
        this.dfsClient.namenode.updatePipeline(this.dfsClient.clientName, this.currentBlockGroup, extendedBlock, datanodeInfoArr, strArr);
        this.currentBlockGroup = extendedBlock;
        this.currentBlockGroup.setNumBytes(numBytes);
    }

    private List<Long> getBlockLengths() {
        ArrayList arrayList = new ArrayList(this.numAllBlocks);
        for (int i = 0; i < this.numAllBlocks; i++) {
            StripedDataStreamer stripedDataStreamer = getStripedDataStreamer(i);
            long j = -1;
            if (stripedDataStreamer.isHealthy() && stripedDataStreamer.getBlock() != null) {
                j = stripedDataStreamer.getBlock().getNumBytes();
            }
            arrayList.add(Long.valueOf(j));
        }
        return arrayList;
    }

    private long getAckedLength() {
        long numBytes = this.currentBlockGroup.getNumBytes();
        long j = (numBytes / this.numDataBlocks) / this.cellSize;
        long j2 = j * this.numDataBlocks * this.cellSize;
        if (!$assertionsDisabled && j2 > numBytes) {
            throw new AssertionError("Full stripe length can't be greater than the block group length");
        }
        List unmodifiableList = Collections.unmodifiableList(getBlockLengths());
        ArrayList arrayList = new ArrayList(unmodifiableList);
        Collections.sort(arrayList);
        long longValue = j > 0 ? ((Long) arrayList.get(arrayList.size() - this.numDataBlocks)).longValue() * this.numDataBlocks : 0L;
        if (longValue >= j2 && longValue != numBytes) {
            int i = (int) ((numBytes - j2) / this.cellSize);
            int i2 = ((int) (numBytes - j2)) % this.cellSize;
            int i3 = i2 == 0 ? 0 : 1;
            int i4 = (this.numDataBlocks - i) - i3;
            int i5 = i > 0 ? this.cellSize : i2;
            long j3 = j2 / this.numDataBlocks;
            long[] jArr = new long[this.numAllBlocks];
            int i6 = 0;
            while (i6 < i) {
                jArr[i6] = j3 + this.cellSize;
                i6++;
            }
            while (i6 < i + i3) {
                jArr[i6] = j3 + i2;
                i6++;
            }
            while (i6 < i + i3 + i4) {
                jArr[i6] = j3;
                i6++;
            }
            while (i6 < this.numAllBlocks) {
                jArr[i6] = j3 + i5;
                i6++;
            }
            int i7 = 0;
            for (int i8 = 0; i8 < this.numAllBlocks; i8++) {
                if (((Long) unmodifiableList.get(i8)).longValue() == jArr[i8]) {
                    i7++;
                }
            }
            if (i7 >= this.numDataBlocks) {
                longValue = numBytes;
            }
            return longValue;
        }
        return longValue;
    }

    private int stripeDataSize() {
        return this.numDataBlocks * this.cellSize;
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    public boolean hasCapability(String str) {
        return false;
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    public void hflush() {
        LOG.debug("DFSStripedOutputStream does not support hflush. Caller should check StreamCapabilities before calling.");
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    public void hsync() {
        LOG.debug("DFSStripedOutputStream does not support hsync. Caller should check StreamCapabilities before calling.");
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    public void hsync(EnumSet<HdfsDataOutputStream.SyncFlag> enumSet) {
        LOG.debug("DFSStripedOutputStream does not support hsync {}. Caller should check StreamCapabilities before calling.", enumSet);
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    protected synchronized void start() {
        Iterator<StripedDataStreamer> it = this.streamers.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    void abort() throws IOException {
        MultipleIOException.Builder builder = new MultipleIOException.Builder();
        synchronized (this) {
            if (isClosed()) {
                return;
            }
            this.exceptionLastSeen.set(new IOException("Lease timeout of " + (this.dfsClient.getConf().getHdfsTimeout() / 1000) + " seconds expired."));
            try {
                closeThreads(true);
            } catch (IOException e) {
                builder.add(e);
            }
            this.dfsClient.endFileLease(this.fileId);
            IOException build = builder.build();
            if (build != null) {
                throw build;
            }
        }
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    boolean isClosed() {
        if (this.closed) {
            return true;
        }
        Iterator<StripedDataStreamer> it = this.streamers.iterator();
        while (it.hasNext()) {
            if (!it.next().streamerClosed()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    protected void closeThreads(boolean z) throws IOException {
        MultipleIOException.Builder builder = new MultipleIOException.Builder();
        try {
            Iterator<StripedDataStreamer> it = this.streamers.iterator();
            while (it.hasNext()) {
                StripedDataStreamer next = it.next();
                try {
                    try {
                        next.close(z);
                        next.join();
                        next.closeSocket();
                        next.setSocketToNull();
                    } catch (Exception e) {
                        try {
                            handleStreamerFailure("force=" + z, e, next);
                        } catch (IOException e2) {
                            builder.add(e2);
                        }
                        next.setSocketToNull();
                    }
                } finally {
                }
            }
            IOException build = builder.build();
            if (build != null) {
                throw build;
            }
        } finally {
            setClosed();
        }
    }

    private boolean generateParityCellsForLastStripe() {
        long numBytes = (this.currentBlockGroup == null ? 0L : this.currentBlockGroup.getNumBytes()) % stripeDataSize();
        if (numBytes == 0) {
            return false;
        }
        long j = numBytes < ((long) this.cellSize) ? numBytes : this.cellSize;
        ByteBuffer[] buffers = this.cellBuffers.getBuffers();
        for (int i = 0; i < this.numAllBlocks; i++) {
            int position = buffers[i].position();
            if (!$assertionsDisabled && position > j) {
                throw new AssertionError("If an internal block is smaller than parity block, then its last cell should be small than last parity cell");
            }
            for (int i2 = 0; i2 < j - position; i2++) {
                buffers[i].put((byte) 0);
            }
            buffers[i].flip();
        }
        return true;
    }

    void writeParityCells() throws IOException {
        ByteBuffer[] buffers = this.cellBuffers.getBuffers();
        if (checkAnyParityStreamerIsHealthy()) {
            encode(this.encoder, this.numDataBlocks, buffers);
            for (int i = this.numDataBlocks; i < this.numAllBlocks; i++) {
                writeParity(i, buffers[i], this.cellBuffers.getChecksumArray(i));
            }
            this.cellBuffers.clear();
        }
    }

    private boolean checkAnyParityStreamerIsHealthy() {
        for (int i = this.numDataBlocks; i < this.numAllBlocks; i++) {
            if (this.streamers.get(i).isHealthy()) {
                return true;
            }
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Skips encoding and writing parity cells as there are no healthy parity data streamers: " + this.streamers);
        return false;
    }

    void writeParity(int i, ByteBuffer byteBuffer, byte[] bArr) throws IOException {
        StripedDataStreamer currentStreamer = setCurrentStreamer(i);
        int limit = byteBuffer.limit();
        long bytesCurBlock = currentStreamer.getBytesCurBlock();
        if (currentStreamer.isHealthy()) {
            try {
                DataChecksum dataChecksum = getDataChecksum();
                if (byteBuffer.isDirect()) {
                    ByteBuffer buffer = BUFFER_POOL.getBuffer(true, bArr.length);
                    dataChecksum.calculateChunkedSums(byteBuffer, buffer);
                    buffer.get(bArr);
                    BUFFER_POOL.putBuffer(buffer);
                } else {
                    dataChecksum.calculateChunkedSums(byteBuffer.array(), 0, limit, bArr, 0);
                }
                for (int i2 = 0; i2 < limit; i2 += dataChecksum.getBytesPerChecksum()) {
                    super.writeChunk(byteBuffer, Math.min(dataChecksum.getBytesPerChecksum(), limit - i2), bArr, (i2 / dataChecksum.getBytesPerChecksum()) * getChecksumSize(), getChecksumSize());
                }
            } catch (Exception e) {
                handleCurrentStreamerFailure("oldBytes=" + bytesCurBlock + ", len=" + limit, e);
            }
        }
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    void setClosed() {
        super.setClosed();
        for (int i = 0; i < this.numAllBlocks; i++) {
            getStripedDataStreamer(i).release();
        }
        this.cellBuffers.release();
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    protected synchronized void closeImpl() throws IOException {
        IOException build;
        boolean z = this.dfsClient.getConfiguration().getBoolean(HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, false);
        try {
            try {
                if (isClosed()) {
                    this.exceptionLastSeen.check(true);
                    int numDataUnits = this.ecPolicy.getNumDataUnits();
                    int i = 0;
                    MultipleIOException.Builder builder = new MultipleIOException.Builder();
                    Iterator<StripedDataStreamer> it = this.streamers.iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().getLastException().check(true);
                            i++;
                        } catch (IOException e) {
                            builder.add(e);
                        }
                    }
                    if (i < numDataUnits && (build = builder.build()) != null) {
                        throw build;
                    }
                    setClosed();
                    this.flushAllExecutor.shutdownNow();
                    this.encoder.release();
                    return;
                }
                try {
                    flushBuffer();
                    if (generateParityCellsForLastStripe()) {
                        writeParityCells();
                    }
                    enqueueAllCurrentPackets();
                    flushAllInternals();
                    checkStreamerFailures(false);
                    for (int i2 = 0; i2 < this.numAllBlocks; i2++) {
                        StripedDataStreamer currentStreamer = setCurrentStreamer(i2);
                        if (currentStreamer.isHealthy()) {
                            try {
                                if (currentStreamer.getBytesCurBlock() > 0) {
                                    setCurrentPacketToEmpty();
                                }
                                flushInternal();
                            } catch (Exception e2) {
                            }
                        }
                    }
                    closeThreads(true);
                    TraceScope newScope = this.dfsClient.getTracer().newScope("completeFile");
                    Throwable th = null;
                    try {
                        try {
                            completeFile(this.currentBlockGroup);
                            if (newScope != null) {
                                if (0 != 0) {
                                    try {
                                        newScope.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    newScope.close();
                                }
                            }
                            logCorruptBlocks();
                            setClosed();
                            this.flushAllExecutor.shutdownNow();
                            this.encoder.release();
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                        }
                    } catch (Throwable th4) {
                        if (newScope != null) {
                            if (th != null) {
                                try {
                                    newScope.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                newScope.close();
                            }
                        }
                        throw th4;
                    }
                } catch (Throwable th6) {
                    closeThreads(true);
                    throw th6;
                }
            } catch (Throwable th7) {
                setClosed();
                this.flushAllExecutor.shutdownNow();
                this.encoder.release();
                throw th7;
            }
        } catch (ClosedChannelException e3) {
            setClosed();
            this.flushAllExecutor.shutdownNow();
            this.encoder.release();
        } catch (IOException e4) {
            recoverLease(z);
            throw e4;
        }
    }

    @VisibleForTesting
    void enqueueAllCurrentPackets() throws IOException {
        int indexOf = this.streamers.indexOf(getCurrentStreamer());
        for (int i = 0; i < this.streamers.size(); i++) {
            if (setCurrentStreamer(i).isHealthy() && this.currentPacket != null) {
                try {
                    enqueueCurrentPacket();
                } catch (IOException e) {
                    handleCurrentStreamerFailure("enqueueAllCurrentPackets, i=" + i, e);
                }
            }
        }
        setCurrentStreamer(indexOf);
    }

    void flushAllInternals() throws IOException {
        HashMap hashMap = new HashMap();
        Future<Void> future = null;
        int currentIndex = getCurrentIndex();
        for (int i = 0; i < this.numAllBlocks; i++) {
            final StripedDataStreamer currentStreamer = setCurrentStreamer(i);
            if (currentStreamer.isHealthy()) {
                try {
                    final long flushInternalWithoutWaitingAck = flushInternalWithoutWaitingAck();
                    future = this.flushAllExecutorCompletionService.submit(new Callable<Void>() { // from class: org.apache.hadoop.hdfs.DFSStripedOutputStream.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            currentStreamer.waitForAckedSeqno(flushInternalWithoutWaitingAck);
                            return null;
                        }
                    });
                    hashMap.put(future, Integer.valueOf(i));
                } catch (Exception e) {
                    handleCurrentStreamerFailure("flushInternal " + currentStreamer, e);
                }
            }
        }
        setCurrentStreamer(currentIndex);
        for (int i2 = 0; i2 < hashMap.size(); i2++) {
            try {
                future = this.flushAllExecutorCompletionService.take();
                future.get();
            } catch (InterruptedException e2) {
                throw DFSUtilClient.toInterruptedIOException("Interrupted during waiting all streamer flush, ", e2);
            } catch (ExecutionException e3) {
                LOG.warn("Caught ExecutionException while waiting all streamer flush, ", e3);
                StripedDataStreamer stripedDataStreamer = this.streamers.get(((Integer) hashMap.get(future)).intValue());
                handleStreamerFailure("flushInternal " + stripedDataStreamer, (Exception) e3.getCause(), stripedDataStreamer);
            }
        }
    }

    static void sleep(long j, String str) throws InterruptedIOException {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw DFSUtilClient.toInterruptedIOException("Sleep interrupted during " + str, e);
        }
    }

    private void logCorruptBlocks() {
        for (Map.Entry<Integer, Integer> entry : this.corruptBlockCountMap.entrySet()) {
            int intValue = entry.getKey().intValue();
            int intValue2 = entry.getValue().intValue();
            StringBuilder sb = new StringBuilder();
            sb.append("Block group <").append(intValue).append("> failed to write ").append(intValue2).append(" blocks.");
            if (intValue2 == this.numAllBlocks - this.numDataBlocks) {
                sb.append(" It's at high risk of losing data.");
            }
            LOG.warn(sb.toString());
        }
    }

    @Override // org.apache.hadoop.hdfs.DFSOutputStream
    ExtendedBlock getBlock() {
        return this.currentBlockGroup;
    }

    static {
        $assertionsDisabled = !DFSStripedOutputStream.class.desiredAssertionStatus();
        BUFFER_POOL = new ElasticByteBufferPool();
    }
}
