/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.util;

import com.google.common.base.Supplier;
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.util.DataChecksum;

@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput
implements Closeable {
    private final Configuration conf;
    private final FSUtils fsUtils;
    private final DistributedFileSystem dfs;
    private final DFSClient client;
    private final ClientProtocol namenode;
    private final String clientName;
    private final String src;
    private final long fileId;
    private final LocatedBlock locatedBlock;
    private final EventLoop eventLoop;
    private final List<Channel> datanodeList;
    private final DataChecksum summer;
    private final ByteBufAllocator alloc;
    private final Deque<Callback> waitingAckQueue = new ArrayDeque<Callback>();
    private long nextPacketOffsetInBlock = 0L;
    private long nextPacketSeqno = 0L;
    private ByteBuf buf;
    private State state;

    private void completed(Channel channel) {
        if (this.waitingAckQueue.isEmpty()) {
            return;
        }
        for (Callback c : this.waitingAckQueue) {
            if (!c.unfinishedReplicas.remove(channel)) continue;
            if (c.unfinishedReplicas.isEmpty()) {
                Callback cb;
                c.promise.trySuccess(null);
                this.waitingAckQueue.removeFirst();
                while ((cb = this.waitingAckQueue.peekFirst()) != null && cb.ackedLength == c.ackedLength) {
                    cb.promise.trySuccess(null);
                    this.waitingAckQueue.removeFirst();
                }
            }
            return;
        }
    }

    private void failed(Channel channel, Supplier<Throwable> errorSupplier) {
        Callback c;
        if (this.state == State.BROKEN || this.state == State.CLOSED) {
            return;
        }
        if (!(this.state != State.CLOSING || (c = this.waitingAckQueue.peekFirst()) != null && c.unfinishedReplicas.contains(channel))) {
            return;
        }
        this.state = State.BROKEN;
        Throwable error = (Throwable)errorSupplier.get();
        for (Callback c2 : this.waitingAckQueue) {
            c2.promise.tryFailure(error);
        }
        this.waitingAckQueue.clear();
        for (Channel ch : this.datanodeList) {
            ch.close();
        }
    }

    private void setupReceiver(final int timeoutMs) {
        SimpleChannelInboundHandler<DataTransferProtos.PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<DataTransferProtos.PipelineAckProto>(){

            public boolean isSharable() {
                return true;
            }

            protected void channelRead0(final ChannelHandlerContext ctx, DataTransferProtos.PipelineAckProto ack) throws Exception {
                final DataTransferProtos.Status reply = FanOutOneBlockAsyncDFSOutputHelper.getStatus(ack);
                if (reply != DataTransferProtos.Status.SUCCESS) {
                    FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), (Supplier<Throwable>)((Supplier)new Supplier<Throwable>(){

                        public Throwable get() {
                            return new IOException("Bad response " + reply + " for block " + FanOutOneBlockAsyncDFSOutput.this.locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
                        }
                    }));
                    return;
                }
                if (PipelineAck.isRestartOOBStatus((DataTransferProtos.Status)reply)) {
                    FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), (Supplier<Throwable>)((Supplier)new Supplier<Throwable>(){

                        public Throwable get() {
                            return new IOException("Restart response " + reply + " for block " + FanOutOneBlockAsyncDFSOutput.this.locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
                        }
                    }));
                    return;
                }
                if (ack.getSeqno() == -1L) {
                    return;
                }
                FanOutOneBlockAsyncDFSOutput.this.completed(ctx.channel());
            }

            public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
                FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), (Supplier<Throwable>)((Supplier)new Supplier<Throwable>(){

                    public Throwable get() {
                        return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
                    }
                }));
            }

            public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) throws Exception {
                FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), (Supplier<Throwable>)((Supplier)new Supplier<Throwable>(){

                    public Throwable get() {
                        return cause;
                    }
                }));
            }

            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent e = (IdleStateEvent)evt;
                    if (e.state() == IdleState.READER_IDLE) {
                        FanOutOneBlockAsyncDFSOutput.this.failed(ctx.channel(), (Supplier<Throwable>)((Supplier)new Supplier<Throwable>(){

                            public Throwable get() {
                                return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
                            }
                        }));
                    } else if (e.state() == IdleState.WRITER_IDLE) {
                        PacketHeader heartbeat = new PacketHeader(4, 0L, -1L, false, 0, false);
                        int len = heartbeat.getSerializedSize();
                        ByteBuf buf = FanOutOneBlockAsyncDFSOutput.this.alloc.buffer(len);
                        heartbeat.putInBuffer(buf.nioBuffer(0, len));
                        buf.writerIndex(len);
                        ctx.channel().writeAndFlush((Object)buf);
                    }
                    return;
                }
                super.userEventTriggered(ctx, evt);
            }
        };
        for (Channel ch : this.datanodeList) {
            ch.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler((long)timeoutMs, (long)(timeoutMs / 2), 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder((MessageLite)DataTransferProtos.PipelineAckProto.getDefaultInstance()), ackHandler});
            ch.config().setAutoRead(true);
        }
    }

    FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
        this.conf = conf;
        this.fsUtils = fsUtils;
        this.dfs = dfs;
        this.client = client;
        this.namenode = namenode;
        this.fileId = fileId;
        this.clientName = clientName;
        this.src = src;
        this.locatedBlock = locatedBlock;
        this.eventLoop = eventLoop;
        this.datanodeList = datanodeList;
        this.summer = summer;
        this.alloc = alloc;
        this.buf = alloc.directBuffer();
        this.state = State.STREAMING;
        this.setupReceiver(conf.getInt("dfs.client.socket-timeout", 60000));
    }

    public void write(byte[] b) {
        this.write(b, 0, b.length);
    }

    public void write(final byte[] b, final int off, final int len) {
        if (this.eventLoop.inEventLoop()) {
            this.buf.ensureWritable(len).writeBytes(b, off, len);
        } else {
            this.eventLoop.submit(new Runnable(){

                @Override
                public void run() {
                    FanOutOneBlockAsyncDFSOutput.this.buf.ensureWritable(len).writeBytes(b, off, len);
                }
            }).syncUninterruptibly();
        }
    }

    public int buffered() {
        if (this.eventLoop.inEventLoop()) {
            return this.buf.readableBytes();
        }
        return (Integer)this.eventLoop.submit((Callable)new Callable<Integer>(){

            @Override
            public Integer call() throws Exception {
                return FanOutOneBlockAsyncDFSOutput.this.buf.readableBytes();
            }
        }).syncUninterruptibly().getNow();
    }

    public DatanodeInfo[] getPipeline() {
        return this.locatedBlock.getLocations();
    }

    private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler, boolean syncBlock) {
        if (this.state != State.STREAMING) {
            handler.failed(new IOException("stream already broken"), attachment);
            return;
        }
        int dataLen = this.buf.readableBytes();
        final long ackedLength = this.nextPacketOffsetInBlock + (long)dataLen;
        if (ackedLength == this.locatedBlock.getBlock().getNumBytes()) {
            handler.completed(this.locatedBlock.getBlock().getNumBytes(), attachment);
            return;
        }
        Promise promise = this.eventLoop.newPromise();
        promise.addListener((GenericFutureListener)new FutureListener<Void>(){

            public void operationComplete(Future<Void> future) throws Exception {
                if (future.isSuccess()) {
                    FanOutOneBlockAsyncDFSOutput.this.locatedBlock.getBlock().setNumBytes(ackedLength);
                    handler.completed(ackedLength, attachment);
                } else {
                    handler.failed(future.cause(), attachment);
                }
            }
        });
        Callback c = this.waitingAckQueue.peekLast();
        if (c != null && ackedLength == c.ackedLength) {
            this.waitingAckQueue.addLast(new Callback((Promise<Void>)promise, ackedLength, Collections.emptyList()));
            return;
        }
        int chunkLen = this.summer.getBytesPerChecksum();
        int trailingPartialChunkLen = dataLen % chunkLen;
        int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
        int checksumLen = numChecks * this.summer.getChecksumSize();
        ByteBuf checksumBuf = this.alloc.directBuffer(checksumLen);
        this.summer.calculateChunkedSums(this.buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
        checksumBuf.writerIndex(checksumLen);
        PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, this.nextPacketOffsetInBlock, this.nextPacketSeqno, false, dataLen, syncBlock);
        int headerLen = header.getSerializedSize();
        ByteBuf headerBuf = this.alloc.buffer(headerLen);
        header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
        headerBuf.writerIndex(headerLen);
        this.waitingAckQueue.addLast(new Callback((Promise<Void>)promise, ackedLength, this.datanodeList));
        for (Channel ch : this.datanodeList) {
            ch.write((Object)headerBuf.duplicate().retain());
            ch.write((Object)checksumBuf.duplicate().retain());
            ch.writeAndFlush((Object)this.buf.duplicate().retain());
        }
        checksumBuf.release();
        headerBuf.release();
        ByteBuf newBuf = this.alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
        if (trailingPartialChunkLen != 0) {
            this.buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
        }
        this.buf.release();
        this.buf = newBuf;
        this.nextPacketOffsetInBlock += (long)(dataLen - trailingPartialChunkLen);
        ++this.nextPacketSeqno;
    }

    public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler, final boolean syncBlock) {
        if (this.eventLoop.inEventLoop()) {
            this.flush0(attachment, handler, syncBlock);
        } else {
            this.eventLoop.execute(new Runnable(){

                @Override
                public void run() {
                    FanOutOneBlockAsyncDFSOutput.this.flush0(attachment, handler, syncBlock);
                }
            });
        }
    }

    private void endBlock(Promise<Void> promise, long size) {
        if (this.state != State.STREAMING) {
            promise.tryFailure((Throwable)new IOException("stream already broken"));
            return;
        }
        if (!this.waitingAckQueue.isEmpty()) {
            promise.tryFailure((Throwable)new IllegalStateException("should call flush first before calling close"));
            return;
        }
        this.state = State.CLOSING;
        PacketHeader header = new PacketHeader(4, size, this.nextPacketSeqno, true, 0, false);
        this.buf.release();
        this.buf = null;
        int headerLen = header.getSerializedSize();
        ByteBuf headerBuf = this.alloc.buffer(headerLen);
        header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
        headerBuf.writerIndex(headerLen);
        this.waitingAckQueue.add(new Callback(promise, size, this.datanodeList));
        for (Channel ch : this.datanodeList) {
            ch.writeAndFlush((Object)headerBuf.duplicate().retain());
        }
        headerBuf.release();
    }

    public void recoverAndClose(CancelableProgressable reporter) throws IOException {
        assert (!this.eventLoop.inEventLoop());
        for (Channel ch : this.datanodeList) {
            ch.closeFuture().awaitUninterruptibly();
        }
        FanOutOneBlockAsyncDFSOutputHelper.endFileLease(this.client, this.src, this.fileId);
        this.fsUtils.recoverFileLease((FileSystem)this.dfs, new Path(this.src), this.conf, reporter == null ? new FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose(this.client) : reporter);
    }

    @Override
    public void close() throws IOException {
        assert (!this.eventLoop.inEventLoop());
        final Promise promise = this.eventLoop.newPromise();
        this.eventLoop.execute(new Runnable(){

            @Override
            public void run() {
                FanOutOneBlockAsyncDFSOutput.this.endBlock((Promise<Void>)promise, FanOutOneBlockAsyncDFSOutput.this.nextPacketOffsetInBlock + (long)FanOutOneBlockAsyncDFSOutput.this.buf.readableBytes());
            }
        });
        promise.addListener((GenericFutureListener)new FutureListener<Void>(){

            public void operationComplete(Future<Void> future) throws Exception {
                for (Channel ch : FanOutOneBlockAsyncDFSOutput.this.datanodeList) {
                    ch.close();
                }
            }
        }).syncUninterruptibly();
        for (Channel ch : this.datanodeList) {
            ch.closeFuture().awaitUninterruptibly();
        }
        FanOutOneBlockAsyncDFSOutputHelper.completeFile(this.client, this.namenode, this.src, this.clientName, this.locatedBlock.getBlock(), this.fileId);
    }

    private static enum State {
        STREAMING,
        CLOSING,
        BROKEN,
        CLOSED;

    }

    private static final class Callback {
        public final Promise<Void> promise;
        public final long ackedLength;
        public final Set<Channel> unfinishedReplicas;

        public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) {
            this.promise = promise;
            this.ackedLength = ackedLength;
            if (replicas.isEmpty()) {
                this.unfinishedReplicas = Collections.emptySet();
            } else {
                this.unfinishedReplicas = Collections.newSetFromMap(new IdentityHashMap(replicas.size()));
                this.unfinishedReplicas.addAll(replicas);
            }
        }
    }
}

