package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.class */
public class NettyPartitionRequestClient implements PartitionRequestClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyPartitionRequestClient.class);
    private final Channel tcpChannel;
    private final NetworkClientHandler clientHandler;
    private final ConnectionID connectionId;
    private final PartitionRequestClientFactory clientFactory;
    private final AtomicInteger closeReferenceCounter = new AtomicInteger(0);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient$AcknowledgeAllRecordsProcessedMessage.class */
    private static class AcknowledgeAllRecordsProcessedMessage extends ClientOutboundMessage {
        private AcknowledgeAllRecordsProcessedMessage(RemoteInputChannel remoteInputChannel) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.runtime.io.network.netty.ClientOutboundMessage
        public Object buildMessage() {
            return new NettyMessage.AckAllUserRecordsProcessed(this.inputChannel.getInputChannelId());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient$AddCreditMessage.class */
    private static class AddCreditMessage extends ClientOutboundMessage {
        private AddCreditMessage(RemoteInputChannel remoteInputChannel) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.runtime.io.network.netty.ClientOutboundMessage
        public Object buildMessage() {
            int andResetUnannouncedCredit = this.inputChannel.getAndResetUnannouncedCredit();
            if (andResetUnannouncedCredit > 0) {
                return new NettyMessage.AddCredit(andResetUnannouncedCredit, this.inputChannel.getInputChannelId());
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient$NewBufferSizeMessage.class */
    private static class NewBufferSizeMessage extends ClientOutboundMessage {
        private final int bufferSize;

        private NewBufferSizeMessage(RemoteInputChannel remoteInputChannel, int i) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
            this.bufferSize = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.runtime.io.network.netty.ClientOutboundMessage
        public Object buildMessage() {
            return new NettyMessage.NewBufferSize(this.bufferSize, this.inputChannel.getInputChannelId());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient$ResumeConsumptionMessage.class */
    private static class ResumeConsumptionMessage extends ClientOutboundMessage {
        private ResumeConsumptionMessage(RemoteInputChannel remoteInputChannel) {
            super((RemoteInputChannel) Preconditions.checkNotNull(remoteInputChannel));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // org.apache.flink.runtime.io.network.netty.ClientOutboundMessage
        public Object buildMessage() {
            return new NettyMessage.ResumeConsumption(this.inputChannel.getInputChannelId());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyPartitionRequestClient(Channel channel, NetworkClientHandler networkClientHandler, ConnectionID connectionID, PartitionRequestClientFactory partitionRequestClientFactory) {
        this.tcpChannel = (Channel) Preconditions.checkNotNull(channel);
        this.clientHandler = (NetworkClientHandler) Preconditions.checkNotNull(networkClientHandler);
        this.connectionId = (ConnectionID) Preconditions.checkNotNull(connectionID);
        this.clientFactory = (PartitionRequestClientFactory) Preconditions.checkNotNull(partitionRequestClientFactory);
        networkClientHandler.setConnectionId(connectionID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canBeDisposed() {
        return this.closeReferenceCounter.get() == 0 && !canBeReused();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean validateClientAndIncrementReferenceCounter() {
        return !this.clientHandler.hasChannelError() && this.closeReferenceCounter.incrementAndGet() > 0;
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void requestSubpartition(ResultPartitionID resultPartitionID, int i, final RemoteInputChannel remoteInputChannel, int i2) throws IOException {
        checkNotClosed();
        LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", new Object[]{Integer.valueOf(i), resultPartitionID, Integer.valueOf(i2)});
        this.clientHandler.addInputChannel(remoteInputChannel);
        final NettyMessage.PartitionRequest partitionRequest = new NettyMessage.PartitionRequest(resultPartitionID, i, remoteInputChannel.getInputChannelId(), remoteInputChannel.getInitialCredit());
        final ChannelFutureListener channelFutureListener = new ChannelFutureListener() { // from class: org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                NettyPartitionRequestClient.this.clientHandler.removeInputChannel(remoteInputChannel);
                remoteInputChannel.onError(new LocalTransportException(String.format("Sending the partition request to '%s [%s] (#%d)' failed.", NettyPartitionRequestClient.this.connectionId.getAddress(), NettyPartitionRequestClient.this.connectionId.getResourceID().getStringWithMetadata(), Integer.valueOf(NettyPartitionRequestClient.this.connectionId.getConnectionIndex())), channelFuture.channel().localAddress(), channelFuture.cause()));
                NettyPartitionRequestClient.this.sendToChannel(new ConnectionErrorMessage(channelFuture.cause() == null ? new RuntimeException("Cannot send partition request.") : channelFuture.cause()));
            }
        };
        if (i2 == 0) {
            this.tcpChannel.writeAndFlush(partitionRequest).addListener(channelFutureListener);
        } else {
            final ChannelFuture[] channelFutureArr = new ChannelFuture[1];
            this.tcpChannel.eventLoop().schedule(new Runnable() { // from class: org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.2
                @Override // java.lang.Runnable
                public void run() {
                    channelFutureArr[0] = NettyPartitionRequestClient.this.tcpChannel.writeAndFlush(partitionRequest);
                    channelFutureArr[0].addListener(channelFutureListener);
                }
            }, i2, TimeUnit.MILLISECONDS);
        }
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void sendTaskEvent(ResultPartitionID resultPartitionID, TaskEvent taskEvent, final RemoteInputChannel remoteInputChannel) throws IOException {
        checkNotClosed();
        this.tcpChannel.writeAndFlush(new NettyMessage.TaskEventRequest(taskEvent, resultPartitionID, remoteInputChannel.getInputChannelId())).addListener(new ChannelFutureListener() { // from class: org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient.3
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                remoteInputChannel.onError(new LocalTransportException(String.format("Sending the task event to '%s [%s] (#%d)' failed.", NettyPartitionRequestClient.this.connectionId.getAddress(), NettyPartitionRequestClient.this.connectionId.getResourceID().getStringWithMetadata(), Integer.valueOf(NettyPartitionRequestClient.this.connectionId.getConnectionIndex())), channelFuture.channel().localAddress(), channelFuture.cause()));
                NettyPartitionRequestClient.this.sendToChannel(new ConnectionErrorMessage(channelFuture.cause() == null ? new RuntimeException("Cannot send task event.") : channelFuture.cause()));
            }
        });
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void notifyCreditAvailable(RemoteInputChannel remoteInputChannel) {
        sendToChannel(new AddCreditMessage(remoteInputChannel));
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void notifyNewBufferSize(RemoteInputChannel remoteInputChannel, int i) {
        sendToChannel(new NewBufferSizeMessage(remoteInputChannel, i));
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void resumeConsumption(RemoteInputChannel remoteInputChannel) {
        sendToChannel(new ResumeConsumptionMessage(remoteInputChannel));
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void acknowledgeAllRecordsProcessed(RemoteInputChannel remoteInputChannel) {
        sendToChannel(new AcknowledgeAllRecordsProcessedMessage(remoteInputChannel));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendToChannel(Object obj) {
        this.tcpChannel.eventLoop().execute(() -> {
            this.tcpChannel.pipeline().fireUserEventTriggered(obj);
        });
    }

    @Override // org.apache.flink.runtime.io.network.PartitionRequestClient
    public void close(RemoteInputChannel remoteInputChannel) throws IOException {
        this.clientHandler.removeInputChannel(remoteInputChannel);
        if (this.closeReferenceCounter.updateAndGet(i -> {
            return Math.max(i - 1, 0);
        }) != 0 || canBeReused()) {
            this.clientHandler.cancelRequestFor(remoteInputChannel.getInputChannelId());
        } else {
            closeConnection();
        }
    }

    public void closeConnection() {
        Preconditions.checkState(canBeDisposed(), "The connection should not be closed before disposed.");
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.tcpChannel.writeAndFlush(new NettyMessage.CloseRequest()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        this.clientFactory.destroyPartitionRequestClient(this.connectionId, this);
    }

    private boolean canBeReused() {
        return this.clientFactory.isConnectionReuseEnabled() && !this.clientHandler.hasChannelError();
    }

    private void checkNotClosed() throws IOException {
        if (this.closed.get()) {
            throw new LocalTransportException(String.format("Channel to '%s [%s]' closed.", this.tcpChannel.remoteAddress(), this.connectionId.getResourceID().getStringWithMetadata()), this.tcpChannel.localAddress());
        }
    }
}
