package tachyon.worker.netty;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.StorageLevelAlias;
import tachyon.conf.TachyonConf;
import tachyon.exception.BlockDoesNotExistException;
import tachyon.exception.InvalidWorkerStateException;
import tachyon.network.protocol.RPCBlockReadRequest;
import tachyon.network.protocol.RPCBlockReadResponse;
import tachyon.network.protocol.RPCBlockWriteRequest;
import tachyon.network.protocol.RPCBlockWriteResponse;
import tachyon.network.protocol.RPCErrorResponse;
import tachyon.network.protocol.RPCMessage;
import tachyon.network.protocol.RPCResponse;
import tachyon.network.protocol.databuffer.DataBuffer;
import tachyon.network.protocol.databuffer.DataByteBuffer;
import tachyon.network.protocol.databuffer.DataFileChannel;
import tachyon.worker.block.BlockDataManager;
import tachyon.worker.block.io.BlockReader;
import tachyon.worker.block.io.BlockWriter;

@ChannelHandler.Sharable
/* loaded from: input_file:tachyon/worker/netty/DataServerHandler.class */
public final class DataServerHandler extends SimpleChannelInboundHandler<RPCMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
    private final BlockDataManager mDataManager;
    private final TachyonConf mTachyonConf;
    private final FileTransferType mTransferType;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: tachyon.worker.netty.DataServerHandler$1, reason: invalid class name */
    /* loaded from: input_file:tachyon/worker/netty/DataServerHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$tachyon$network$protocol$RPCMessage$Type;

        static {
            try {
                $SwitchMap$tachyon$worker$netty$FileTransferType[FileTransferType.MAPPED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$tachyon$worker$netty$FileTransferType[FileTransferType.TRANSFER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$tachyon$network$protocol$RPCMessage$Type = new int[RPCMessage.Type.values().length];
            try {
                $SwitchMap$tachyon$network$protocol$RPCMessage$Type[RPCMessage.Type.RPC_BLOCK_READ_REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$tachyon$network$protocol$RPCMessage$Type[RPCMessage.Type.RPC_BLOCK_WRITE_REQUEST.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public DataServerHandler(BlockDataManager blockDataManager, TachyonConf tachyonConf) {
        this.mDataManager = (BlockDataManager) Preconditions.checkNotNull(blockDataManager);
        this.mTachyonConf = (TachyonConf) Preconditions.checkNotNull(tachyonConf);
        this.mTransferType = (FileTransferType) this.mTachyonConf.getEnum("tachyon.worker.network.netty.file.transfer", FileTransferType.class);
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, RPCMessage rPCMessage) throws IOException {
        switch (AnonymousClass1.$SwitchMap$tachyon$network$protocol$RPCMessage$Type[rPCMessage.getType().ordinal()]) {
            case 1:
                handleBlockReadRequest(channelHandlerContext, (RPCBlockReadRequest) rPCMessage);
                return;
            case 2:
                handleBlockWriteRequest(channelHandlerContext, (RPCBlockWriteRequest) rPCMessage);
                return;
            default:
                channelHandlerContext.writeAndFlush(new RPCErrorResponse(RPCResponse.Status.UNKNOWN_MESSAGE_ERROR));
                throw new IllegalArgumentException("No handler implementation for rpc msg type: " + rPCMessage.getType());
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        LOG.warn("Exception thrown while processing request", th);
        channelHandlerContext.close();
    }

    private void handleBlockReadRequest(ChannelHandlerContext channelHandlerContext, RPCBlockReadRequest rPCBlockReadRequest) throws IOException {
        long blockId = rPCBlockReadRequest.getBlockId();
        long offset = rPCBlockReadRequest.getOffset();
        long length = rPCBlockReadRequest.getLength();
        try {
            long lockBlock = this.mDataManager.lockBlock(-1L, blockId);
            try {
                BlockReader readBlockRemote = this.mDataManager.readBlockRemote(-1L, blockId, lockBlock);
                try {
                    try {
                        rPCBlockReadRequest.validate();
                        long length2 = readBlockRemote.getLength();
                        validateBounds(rPCBlockReadRequest, length2);
                        long returnLength = returnLength(offset, length, length2);
                        ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new RPCBlockReadResponse(blockId, offset, returnLength, getDataBuffer(rPCBlockReadRequest, readBlockRemote, returnLength), RPCResponse.Status.SUCCESS));
                        writeAndFlush.addListener(ChannelFutureListener.CLOSE);
                        writeAndFlush.addListener(new ClosableResourceChannelListener(readBlockRemote));
                        this.mDataManager.accessBlock(-1L, blockId);
                        LOG.info("Preparation for responding to remote block request for: " + blockId + " done.");
                        try {
                            this.mDataManager.unlockBlock(lockBlock);
                        } catch (BlockDoesNotExistException e) {
                            throw new IOException((Throwable) e);
                        }
                    } catch (Throwable th) {
                        try {
                            this.mDataManager.unlockBlock(lockBlock);
                            throw th;
                        } catch (BlockDoesNotExistException e2) {
                            throw new IOException((Throwable) e2);
                        }
                    }
                } catch (Exception e3) {
                    LOG.error("The file is not here : " + e3.getMessage(), e3);
                    channelHandlerContext.writeAndFlush(RPCBlockReadResponse.createErrorResponse(rPCBlockReadRequest, RPCResponse.Status.FILE_DNE)).addListener(ChannelFutureListener.CLOSE);
                    if (readBlockRemote != null) {
                        readBlockRemote.close();
                    }
                    try {
                        this.mDataManager.unlockBlock(lockBlock);
                    } catch (BlockDoesNotExistException e4) {
                        throw new IOException((Throwable) e4);
                    }
                }
            } catch (BlockDoesNotExistException e5) {
                throw new IOException((Throwable) e5);
            } catch (InvalidWorkerStateException e6) {
                throw new IOException((Throwable) e6);
            }
        } catch (BlockDoesNotExistException e7) {
            LOG.error("Failed to lock block: " + blockId, e7);
            channelHandlerContext.writeAndFlush(RPCBlockReadResponse.createErrorResponse(rPCBlockReadRequest, RPCResponse.Status.BLOCK_LOCK_ERROR)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void handleBlockWriteRequest(ChannelHandlerContext channelHandlerContext, RPCBlockWriteRequest rPCBlockWriteRequest) throws IOException {
        long sessionId = rPCBlockWriteRequest.getSessionId();
        long blockId = rPCBlockWriteRequest.getBlockId();
        long offset = rPCBlockWriteRequest.getOffset();
        long length = rPCBlockWriteRequest.getLength();
        DataBuffer payloadDataBuffer = rPCBlockWriteRequest.getPayloadDataBuffer();
        BlockWriter blockWriter = null;
        try {
            rPCBlockWriteRequest.validate();
            ByteBuffer readOnlyByteBuffer = payloadDataBuffer.getReadOnlyByteBuffer();
            if (offset == 0) {
                this.mDataManager.createBlockRemote(sessionId, blockId, StorageLevelAlias.MEM.getValue(), length);
            } else {
                this.mDataManager.requestSpace(sessionId, blockId, length);
            }
            blockWriter = this.mDataManager.getTempBlockWriterRemote(sessionId, blockId);
            blockWriter.append(readOnlyByteBuffer);
            ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new RPCBlockWriteResponse(sessionId, blockId, offset, length, RPCResponse.Status.SUCCESS));
            writeAndFlush.addListener(ChannelFutureListener.CLOSE);
            writeAndFlush.addListener(new ClosableResourceChannelListener(blockWriter));
        } catch (Exception e) {
            LOG.error("Error writing remote block : " + e.getMessage(), e);
            channelHandlerContext.writeAndFlush(RPCBlockWriteResponse.createErrorResponse(rPCBlockWriteRequest, RPCResponse.Status.WRITE_ERROR)).addListener(ChannelFutureListener.CLOSE);
            if (blockWriter != null) {
                blockWriter.close();
            }
        }
    }

    private long returnLength(long j, long j2, long j3) {
        return j2 == -1 ? j3 - j : j2;
    }

    private void validateBounds(RPCBlockReadRequest rPCBlockReadRequest, long j) {
        Preconditions.checkArgument(rPCBlockReadRequest.getOffset() <= j, "Offset(%s) is larger than file length(%s)", new Object[]{Long.valueOf(rPCBlockReadRequest.getOffset()), Long.valueOf(j)});
        Preconditions.checkArgument(rPCBlockReadRequest.getLength() == -1 || rPCBlockReadRequest.getOffset() + rPCBlockReadRequest.getLength() <= j, "Offset(%s) plus length(%s) is larger than file length(%s)", new Object[]{Long.valueOf(rPCBlockReadRequest.getOffset()), Long.valueOf(rPCBlockReadRequest.getLength()), Long.valueOf(j)});
    }

    private DataBuffer getDataBuffer(RPCBlockReadRequest rPCBlockReadRequest, BlockReader blockReader, long j) throws IOException, IllegalArgumentException {
        switch (this.mTransferType) {
            case MAPPED:
                return new DataByteBuffer(blockReader.read(rPCBlockReadRequest.getOffset(), (int) j), j);
            case TRANSFER:
            default:
                if (blockReader.getChannel() instanceof FileChannel) {
                    return new DataFileChannel((FileChannel) blockReader.getChannel(), rPCBlockReadRequest.getOffset(), j);
                }
                blockReader.close();
                throw new IllegalArgumentException("Only FileChannel is supported!");
        }
    }
}
