/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyPartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.NeverCompletingChannelFuture;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class PartitionRequestClientFactoryTest
extends TestLogger {
    @Parameterized.Parameter
    public boolean connectionReuseEnabled;

    @Parameterized.Parameters(name="connection reuse enabled = {0}")
    public static Object[] parameters() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInterruptsNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            AwaitingNettyClient nettyClient = new AwaitingNettyClient(nettyServerAndClient.client());
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)nettyClient, this.connectionReuseEnabled);
            nettyClient.awaitForInterrupts = true;
            this.connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(0));
            nettyClient.awaitForInterrupts = false;
            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0));
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    private void connectAndInterrupt(PartitionRequestClientFactory factory, ConnectionID connectionId) throws Exception {
        CompletableFuture started = new CompletableFuture();
        CompletableFuture interrupted = new CompletableFuture();
        Thread thread = new Thread(() -> {
            try {
                started.complete(null);
                factory.createPartitionRequestClient(connectionId);
            }
            catch (InterruptedException e) {
                interrupted.complete(null);
            }
            catch (Exception e) {
                interrupted.completeExceptionally(e);
            }
        });
        thread.start();
        started.get();
        thread.interrupt();
        interrupted.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testExceptionsAreNotCached() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new UnstableNettyClient(nettyServerAndClient.client(), 1), this.connectionReuseEnabled);
            ConnectionID connectionID = nettyServerAndClient.getConnectionID(0);
            try {
                factory.createPartitionRequestClient(connectionID);
                Assert.fail((String)"Expected the first request to fail.");
            }
            catch (RemoteTransportException remoteTransportException) {
                // empty catch block
            }
            factory.createPartitionRequestClient(connectionID);
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    @Test
    public void testReuseNettyPartitionRequestClient() throws Exception {
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = this.createNettyServerAndClient();
        try {
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 2);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
            this.checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
        }
        finally {
            nettyServerAndClient.client().shutdown();
            nettyServerAndClient.server().shutdown();
        }
    }

    private void checkReuseNettyPartitionRequestClient(NettyTestUtil.NettyServerAndClient nettyServerAndClient, int maxNumberOfConnections) throws Exception {
        HashSet<NettyPartitionRequestClient> set = new HashSet<NettyPartitionRequestClient>();
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory(nettyServerAndClient.client(), 0, maxNumberOfConnections, this.connectionReuseEnabled);
        for (int i = 0; i < Math.max(100, maxNumberOfConnections); ++i) {
            ConnectionID connectionID = nettyServerAndClient.getConnectionID((int)(Math.random() * 2.147483647E9));
            set.add(factory.createPartitionRequestClient(connectionID));
        }
        Assert.assertTrue((set.size() <= maxNumberOfConnections ? 1 : 0) != 0);
    }

    @Test
    public void testNettyClientConnectRetry() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
        factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    @Test(expected=IOException.class)
    public void testFailureReportedToSubsequentRequests() throws Exception {
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)new FailingNettyClient(), 2, 1, this.connectionReuseEnabled);
        try {
            factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
        }
        catch (Exception exception) {
            // empty catch block
        }
        factory.createPartitionRequestClient(new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=IOException.class)
    public void testNettyClientConnectRetryFailure() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 3);
        try {
            PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
            factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
        }
        finally {
            serverAndClient.client().shutdown();
            serverAndClient.server().shutdown();
        }
    }

    @Test
    public void testNettyClientConnectRetryMultipleThread() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = this.createNettyServerAndClient();
        UnstableNettyClient unstableNettyClient = new UnstableNettyClient(serverAndClient.client(), 2);
        PartitionRequestClientFactory factory = new PartitionRequestClientFactory((NettyClient)unstableNettyClient, 2, 1, this.connectionReuseEnabled);
        ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);
        ArrayList<Future<NettyPartitionRequestClient>> futures = new ArrayList<Future<NettyPartitionRequestClient>>();
        for (int i = 0; i < 10; ++i) {
            Future<NettyPartitionRequestClient> future = threadPoolExecutor.submit(() -> {
                NettyPartitionRequestClient client = null;
                try {
                    client = factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
                }
                catch (Exception e) {
                    Assert.fail((String)e.getMessage());
                }
                return client;
            });
            futures.add(future);
        }
        futures.forEach(runnableFuture -> {
            try {
                NettyPartitionRequestClient client = (NettyPartitionRequestClient)runnableFuture.get();
                Assert.assertNotNull((Object)client);
            }
            catch (Exception e) {
                System.out.println(e.getMessage());
                Assert.fail();
            }
        });
        threadPoolExecutor.shutdown();
        serverAndClient.client().shutdown();
        serverAndClient.server().shutdown();
    }

    private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() throws Exception {
        return NettyTestUtil.initServerAndClient(new NettyProtocol(null, null){

            public ChannelHandler[] getServerChannelHandlers() {
                return new ChannelHandler[10];
            }

            public ChannelHandler[] getClientChannelHandlers() {
                return new ChannelHandler[]{(ChannelHandler)Mockito.mock(NetworkClientHandler.class)};
            }
        });
    }

    private static class AwaitingNettyClient
    extends NettyClient {
        private volatile boolean awaitForInterrupts;
        private final NettyClient client;

        AwaitingNettyClient(NettyClient client) {
            super(null);
            this.client = client;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.awaitForInterrupts) {
                return new NeverCompletingChannelFuture();
            }
            try {
                return this.client.connect(serverSocketAddress);
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }
    }

    private static class FailingNettyClient
    extends NettyClient {
        FailingNettyClient() {
            super(null);
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            throw new ChannelException("Simulate connect failure");
        }
    }

    private static class UnstableNettyClient
    extends NettyClient {
        private final NettyClient nettyClient;
        private int retry;

        UnstableNettyClient(NettyClient nettyClient, int retry) {
            super(null);
            this.nettyClient = nettyClient;
            this.retry = retry;
        }

        ChannelFuture connect(InetSocketAddress serverSocketAddress) {
            if (this.retry > 0) {
                --this.retry;
                throw new ChannelException("Simulate connect failure");
            }
            return this.nettyClient.connect(serverSocketAddress);
        }
    }
}

