package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.class */
public class PartitionRequestClientFactoryTest {
    private static final ResourceID RESOURCE_ID = ResourceID.generate();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$AwaitingNettyClient.class */
    private static class AwaitingNettyClient extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient nettyClient) {
            super((NettyConfig) null);
            this.client = nettyClient;
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(inetSocketAddress);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$ChannelInactiveFutureHandler.class */
    private static class ChannelInactiveFutureHandler extends CreditBasedPartitionRequestClientHandler {
        private final CompletableFuture<Void> inactiveFuture;

        private ChannelInactiveFutureHandler(CompletableFuture<Void> completableFuture) {
            this.inactiveFuture = completableFuture;
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            super.channelInactive(channelHandlerContext);
            this.inactiveFuture.complete(null);
        }

        public CompletableFuture<Void> getInactiveFuture() {
            return this.inactiveFuture;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$FailingNettyClient.class */
    private static class FailingNettyClient extends NettyClient {
        FailingNettyClient() {
            super((NettyConfig) null);
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest$UnstableNettyClient.class */
    private static class UnstableNettyClient extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int i) {
            super((NettyConfig) null);
            this.nettyClient = nettyClient;
            this.retry = i;
        }

        ChannelFuture connect(InetSocketAddress inetSocketAddress) {
            if (this.retry <= 0) {
                return this.nettyClient.connect(inetSocketAddress);
            }
            this.retry--;
            throw new ChannelException("Simulate connect failure");
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testInterruptsNotCached(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            AwaitingNettyClient awaitingNettyClient = new AwaitingNettyClient(createNettyServerAndClient.client());
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(awaitingNettyClient, z);
            awaitingNettyClient.awaitForInterrupts = true;
            connectAndInterrupt(partitionRequestClientFactory, createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
            awaitingNettyClient.awaitForInterrupts = false;
            partitionRequestClientFactory.createPartitionRequestClient(createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory partitionRequestClientFactory, ConnectionID connectionID) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                completableFuture.complete(null);
                partitionRequestClientFactory.createPartitionRequestClient(connectionID);
            } catch (InterruptedException e) {
                completableFuture2.complete(null);
            } catch (Exception e2) {
                completableFuture2.completeExceptionally(e2);
            }
        });
        thread.start();
        completableFuture.get();
        thread.interrupt();
        completableFuture2.get();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testExceptionsAreNotCached(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 1), z);
            ConnectionID connectionID = createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
            Assertions.assertThatThrownBy(() -> {
                partitionRequestClientFactory.createPartitionRequestClient(connectionID);
            }).withFailMessage("Expected the first request to fail.", new Object[0]).isInstanceOf(RemoteTransportException.class);
            partitionRequestClientFactory.createPartitionRequestClient(connectionID);
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testReuseNettyPartitionRequestClient(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            checkReuseNettyPartitionRequestClient(z, createNettyServerAndClient, 1);
            checkReuseNettyPartitionRequestClient(z, createNettyServerAndClient, 2);
            checkReuseNettyPartitionRequestClient(z, createNettyServerAndClient, 5);
            checkReuseNettyPartitionRequestClient(z, createNettyServerAndClient, 10);
        } finally {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        }
    }

    private void checkReuseNettyPartitionRequestClient(boolean z, NettyTestUtil.NettyServerAndClient nettyServerAndClient, int i) throws Exception {
        HashSet hashSet = new HashSet();
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(nettyServerAndClient.client(), 0, i, z);
        for (int i2 = 0; i2 < Math.max(100, i); i2++) {
            hashSet.add(partitionRequestClientFactory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(RESOURCE_ID, (int) (Math.random() * 2.147483647E9d))));
        }
        Assertions.assertThat(hashSet.size()).isLessThanOrEqualTo(i);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testConnectionReuseWhenRemoteCloseAndNoInputChannel(boolean z) throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(new NettyProtocol(null, null) { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactoryTest.1
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactoryTest.1.1
                    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
                        super.channelRegistered(channelHandlerContext);
                        completableFuture2.complete(channelHandlerContext.channel());
                    }
                }};
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{new ChannelInactiveFutureHandler(completableFuture)};
            }
        });
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(initServerAndClient.client(), 2, 1, z);
        ConnectionID connectionID = initServerAndClient.getConnectionID(ResourceID.generate(), 0);
        NettyPartitionRequestClient createPartitionRequestClient = partitionRequestClientFactory.createPartitionRequestClient(connectionID);
        ((Channel) completableFuture2.get()).close();
        completableFuture.get();
        Assertions.assertThat(partitionRequestClientFactory.createPartitionRequestClient(connectionID)).as("Factory should create a new client.", new Object[0]).isNotSameAs(createPartitionRequestClient);
        NettyTestUtil.shutdown(initServerAndClient);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testNettyClientConnectRetry(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 2), 2, 1, z).createPartitionRequestClient(createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
        createNettyServerAndClient.client().shutdown();
        createNettyServerAndClient.server().shutdown();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testFailureReportedToSubsequentRequests(boolean z) throws Exception {
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new FailingNettyClient(), 2, 1, z);
        Assertions.assertThatThrownBy(() -> {
            partitionRequestClientFactory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
        });
        Assertions.assertThatThrownBy(() -> {
            partitionRequestClientFactory.createPartitionRequestClient(new ConnectionID(ResourceID.generate(), new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
        }).isInstanceOf(IOException.class);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testNettyClientConnectRetryFailure(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        try {
            PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 3), 2, 1, z);
            Assertions.assertThatThrownBy(() -> {
                partitionRequestClientFactory.createPartitionRequestClient(createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
            }).isInstanceOf(IOException.class);
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
        } catch (Throwable th) {
            createNettyServerAndClient.client().shutdown();
            createNettyServerAndClient.server().shutdown();
            throw th;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testNettyClientConnectRetryMultipleThread(boolean z) throws Exception {
        NettyTestUtil.NettyServerAndClient createNettyServerAndClient = createNettyServerAndClient();
        PartitionRequestClientFactory partitionRequestClientFactory = new PartitionRequestClientFactory(new UnstableNettyClient(createNettyServerAndClient.client(), 2), 2, 1, z);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(newFixedThreadPool.submit(() -> {
                NettyPartitionRequestClient nettyPartitionRequestClient = null;
                try {
                    nettyPartitionRequestClient = partitionRequestClientFactory.createPartitionRequestClient(createNettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
                } catch (Exception e) {
                    Assertions.fail(e.getMessage());
                }
                return nettyPartitionRequestClient;
            }));
        }
        arrayList.forEach(future -> {
            try {
                Assertions.assertThat((NettyPartitionRequestClient) future.get()).isNotNull();
            } catch (Exception e) {
                System.out.println(e.getMessage());
                Assertions.fail(e.getMessage());
            }
        });
        newFixedThreadPool.shutdown();
        createNettyServerAndClient.client().shutdown();
        createNettyServerAndClient.server().shutdown();
    }

    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null) { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactoryTest.2
            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler) Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }
}
