package org.dcache.xrootd.tpc;

import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.dcache.xrootd.core.XrootdException;
import org.dcache.xrootd.protocol.XrootdProtocol;
import org.dcache.xrootd.tpc.protocol.messages.AbstractXrootdInboundResponse;
import org.dcache.xrootd.tpc.protocol.messages.InboundAttnResponse;
import org.dcache.xrootd.tpc.protocol.messages.InboundChecksumResponse;
import org.dcache.xrootd.tpc.protocol.messages.InboundReadResponse;
import org.dcache.xrootd.tpc.protocol.messages.OutboundChecksumRequest;
import org.dcache.xrootd.tpc.protocol.messages.OutboundReadRequest;

/* loaded from: input_file:org/dcache/xrootd/tpc/TpcSourceReadHandler.class */
public abstract class TpcSourceReadHandler extends AbstractClientSourceHandler {
    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void doOnAsynResponse(ChannelHandlerContext channelHandlerContext, InboundAttnResponse inboundAttnResponse) throws XrootdException {
        switch (inboundAttnResponse.getRequestId()) {
            case 3001:
                sendChecksumRequest(channelHandlerContext);
                return;
            case 3013:
                sendReadRequest(channelHandlerContext);
                return;
            default:
                super.doOnAsynResponse(channelHandlerContext, inboundAttnResponse);
                return;
        }
    }

    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void doOnChecksumResponse(ChannelHandlerContext channelHandlerContext, InboundChecksumResponse inboundChecksumResponse) throws XrootdException {
        int status = inboundChecksumResponse.getStatus();
        XrootdTpcInfo info = this.client.getInfo();
        LOGGER.trace("Checksum query response for {} on {}, channel {}, stream {} received, status {}.", new Object[]{info.getLfn(), info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId()), Integer.valueOf(status)});
        if (status != 0) {
            handleTransferTerminated(status, String.format("Checksum query for %s failed.", info.getLfn()), channelHandlerContext);
        } else {
            validateChecksum(inboundChecksumResponse, channelHandlerContext);
        }
    }

    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void doOnReadResponse(ChannelHandlerContext channelHandlerContext, InboundReadResponse inboundReadResponse) {
        try {
            int status = inboundReadResponse.getStatus();
            XrootdTpcInfo info = this.client.getInfo();
            int dlen = inboundReadResponse.getDlen();
            LOGGER.trace("Read response received for {} on {}, channel {}, stream {}: status {}, got {} more bytes.", new Object[]{info.getLfn(), info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId()), Integer.valueOf(status), Integer.valueOf(dlen)});
            if (status != 0 && status != 4000) {
                handleTransferTerminated(XrootdProtocol.kXR_error, String.format("Read of %s failed with status %s.", info.getLfn(), Integer.valueOf(status)), channelHandlerContext);
                ReferenceCountUtil.release(inboundReadResponse);
                return;
            }
            long writeOffset = this.client.getWriteOffset();
            if (dlen > 0) {
                try {
                    inboundReadResponse.setWriteOffset(writeOffset);
                    this.client.getWriteHandler().write(inboundReadResponse);
                    writeOffset += dlen;
                    this.client.setWriteOffset(writeOffset);
                    LOGGER.trace("Read of {} on {}, channel {}, stream {}: wrote {}, so far {}, expected {}.", new Object[]{info.getLfn(), info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId()), Integer.valueOf(dlen), Long.valueOf(writeOffset), Long.valueOf(info.getAsize())});
                } catch (ClosedChannelException e) {
                    handleTransferTerminated(3012, "Channel " + channelHandlerContext.channel().id() + " was forcefully closed by the server.", channelHandlerContext);
                    ReferenceCountUtil.release(inboundReadResponse);
                    return;
                } catch (IOException e2) {
                    handleTransferTerminated(3007, e2.toString(), channelHandlerContext);
                    ReferenceCountUtil.release(inboundReadResponse);
                    return;
                }
            }
            if (status == 4000) {
                LOGGER.trace("Waiting for more data for {} on {}, channel {}, stream {}", new Object[]{info.getLfn(), info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId())});
                ReferenceCountUtil.release(inboundReadResponse);
                return;
            }
            if (writeOffset < info.getAsize()) {
                sendReadRequest(channelHandlerContext);
            } else if (info.getCks() != null) {
                sendChecksumRequest(channelHandlerContext);
            } else {
                LOGGER.trace("Read for {} on {}, channel {}, stream {}, completed without checksum verification.", new Object[]{info.getLfn(), info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId())});
                handleTransferTerminated(0, null, channelHandlerContext);
            }
            ReferenceCountUtil.release(inboundReadResponse);
        } catch (Throwable th) {
            ReferenceCountUtil.release(inboundReadResponse);
            throw th;
        }
    }

    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void doOnWaitResponse(ChannelHandlerContext channelHandlerContext, AbstractXrootdInboundResponse abstractXrootdInboundResponse) throws XrootdException {
        switch (abstractXrootdInboundResponse.getRequestId()) {
            case 3001:
                this.client.getExecutor().schedule(() -> {
                    sendChecksumRequest(channelHandlerContext);
                }, getWaitInSeconds(abstractXrootdInboundResponse), TimeUnit.SECONDS);
                return;
            case 3013:
                this.client.getExecutor().schedule(() -> {
                    sendReadRequest(channelHandlerContext);
                }, getWaitInSeconds(abstractXrootdInboundResponse), TimeUnit.SECONDS);
                return;
            default:
                super.doOnWaitResponse(channelHandlerContext, abstractXrootdInboundResponse);
                return;
        }
    }

    protected void handleTransferTerminated(int i, String str, ChannelHandlerContext channelHandlerContext) {
        this.client.getWriteHandler().fireDelayedSync(i, str);
        LOGGER.trace("handleTransferTerminated called fire delayed sync, calling client shutdown");
        try {
            this.client.shutDown(channelHandlerContext);
        } catch (InterruptedException e) {
            LOGGER.warn("Client shutdown interrupted.");
        }
    }

    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void sendReadRequest(ChannelHandlerContext channelHandlerContext) {
        LOGGER.trace("sendReadRequest to {}, channel {}, stream {}, fhandle {}, offset {}, chunksize {}.", new Object[]{this.client.getInfo().getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId()), Integer.valueOf(this.client.getFhandle()), Long.valueOf(this.client.getWriteOffset()), Integer.valueOf(getChunkSize())});
        this.client.setExpectedResponse(3013);
        channelHandlerContext.writeAndFlush(new OutboundReadRequest(this.client.getStreamId(), this.client.getFhandle(), this.client.getWriteOffset(), getChunkSize()), channelHandlerContext.newPromise()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    @Override // org.dcache.xrootd.tpc.AbstractClientSourceHandler, org.dcache.xrootd.tpc.AbstractClientRequestHandler
    protected void sendChecksumRequest(ChannelHandlerContext channelHandlerContext) {
        XrootdTpcInfo info = this.client.getInfo();
        LOGGER.trace("sendChecksumRequest to {}, channel {}, stream {}, fhandle {}.", new Object[]{info.getSrc(), channelHandlerContext.channel().id(), Integer.valueOf(this.client.getStreamId()), Integer.valueOf(this.client.getFhandle())});
        this.client.setExpectedResponse(3001);
        channelHandlerContext.writeAndFlush(new OutboundChecksumRequest(this.client.getStreamId(), info.getLfn()), channelHandlerContext.newPromise()).addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    protected abstract void validateChecksum(InboundChecksumResponse inboundChecksumResponse, ChannelHandlerContext channelHandlerContext) throws XrootdException;

    protected abstract int getChunkSize();
}
