package org.apache.flink.runtime.leaderelection;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.test.TestingServer;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.class */
public class ZooKeeperLeaderRetrievalTest extends TestLogger {
    private TestingServer testingServer;

    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest$FindConnectingAddress.class */
    static class FindConnectingAddress implements Runnable {
        private final Configuration config;
        private final FiniteDuration timeout;
        private InetAddress result;
        private Exception exception;

        public FindConnectingAddress(Configuration configuration, FiniteDuration finiteDuration) {
            this.config = configuration;
            this.timeout = finiteDuration;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.result = LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.createLeaderRetrievalService(this.config), this.timeout);
            } catch (Exception e) {
                this.exception = e;
            }
        }

        public InetAddress getInetAddress() throws Exception {
            if (this.exception != null) {
                throw this.exception;
            }
            return this.result;
        }
    }

    @Before
    public void before() {
        try {
            this.testingServer = new TestingServer();
        } catch (Exception e) {
            throw new RuntimeException("Could not start ZooKeeper testing cluster.", e);
        }
    }

    @After
    public void after() {
        if (this.testingServer != null) {
            try {
                this.testingServer.stop();
                this.testingServer = null;
            } catch (IOException e) {
                throw new RuntimeException("Could not stop ZooKeeper testing cluster.", e);
            }
        }
    }

    @Test
    public void testConnectingAddressRetrievalWithDelayedLeaderElection() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(1L, TimeUnit.MINUTES);
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        LeaderElectionService leaderElectionService = null;
        CuratorFramework[] curatorFrameworkArr = new CuratorFramework[2];
        try {
            curatorFrameworkArr[0] = ZooKeeperUtils.startCuratorFramework(configuration);
            curatorFrameworkArr[1] = ZooKeeperUtils.startCuratorFramework(configuration);
            String remoteJobManagerAkkaURL = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(configuration), NetUtils.unresolvedHostAndPortToNormalizedString("1.1.1.1", 1234), Option.empty());
            try {
                try {
                    InetAddress localHost = InetAddress.getLocalHost();
                    InetSocketAddress inetSocketAddress = new InetSocketAddress(localHost, new ServerSocket(0, 50, localHost).getLocalPort());
                    String remoteJobManagerAkkaURL2 = JobManager.getRemoteJobManagerAkkaURL(AkkaUtils.getAkkaProtocol(configuration), NetUtils.unresolvedHostAndPortToNormalizedString(localHost.getHostName(), inetSocketAddress.getPort()), Option.empty());
                    ZooKeeperLeaderElectionService createLeaderElectionService = ZooKeeperUtils.createLeaderElectionService(curatorFrameworkArr[0], configuration);
                    createLeaderElectionService.start(new TestingContender(remoteJobManagerAkkaURL, createLeaderElectionService));
                    FindConnectingAddress findConnectingAddress = new FindConnectingAddress(configuration, finiteDuration);
                    Thread thread = new Thread(findConnectingAddress);
                    thread.start();
                    leaderElectionService = ZooKeeperUtils.createLeaderElectionService(curatorFrameworkArr[1], configuration);
                    TestingContender testingContender = new TestingContender(remoteJobManagerAkkaURL2, leaderElectionService);
                    Thread.sleep(1000L);
                    createLeaderElectionService.stop();
                    leaderElectionService.start(testingContender);
                    thread.join();
                    InetAddress inetAddress = findConnectingAddress.getInetAddress();
                    Socket socket = new Socket();
                    try {
                        socket.bind(new InetSocketAddress(inetAddress, 0));
                        socket.connect(inetSocketAddress, 1000);
                        socket.close();
                        if (leaderElectionService != null) {
                            leaderElectionService.stop();
                        }
                        if (curatorFrameworkArr[0] != null) {
                            curatorFrameworkArr[0].close();
                        }
                        if (curatorFrameworkArr[1] != null) {
                            curatorFrameworkArr[1].close();
                        }
                    } catch (Throwable th) {
                        socket.close();
                        throw th;
                    }
                } catch (IOException e) {
                    System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
                    if (0 != 0) {
                        leaderElectionService.stop();
                    }
                    if (curatorFrameworkArr[0] != null) {
                        curatorFrameworkArr[0].close();
                    }
                    if (curatorFrameworkArr[1] != null) {
                        curatorFrameworkArr[1].close();
                    }
                }
            } catch (UnknownHostException e2) {
                System.err.println("Skipping 'testNetworkInterfaceSelection' test.");
                if (0 != 0) {
                    leaderElectionService.stop();
                }
                if (curatorFrameworkArr[0] != null) {
                    curatorFrameworkArr[0].close();
                }
                if (curatorFrameworkArr[1] != null) {
                    curatorFrameworkArr[1].close();
                }
            }
        } catch (Throwable th2) {
            if (leaderElectionService != null) {
                leaderElectionService.stop();
            }
            if (curatorFrameworkArr[0] != null) {
                curatorFrameworkArr[0].close();
            }
            if (curatorFrameworkArr[1] != null) {
                curatorFrameworkArr[1].close();
            }
            throw th2;
        }
    }

    @Test
    public void testTimeoutOfFindConnectingAddress() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.testingServer.getConnectString());
        Assert.assertEquals(InetAddress.getLocalHost(), LeaderRetrievalUtils.findConnectingAddress(LeaderRetrievalUtils.createLeaderRetrievalService(configuration), new FiniteDuration(10L, TimeUnit.SECONDS)));
    }
}
