package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.PendingCheckpointTest;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.class */
public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger {
    private static final String PATH = "/path";

    @Rule
    public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

    @Rule
    public final TestingFatalErrorHandlerResource fatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private final Configuration configuration = new Configuration();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionConnectionHandlingTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$leaderelection$ZooKeeperLeaderElectionConnectionHandlingTest$Problem = new int[Problem.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$leaderelection$ZooKeeperLeaderElectionConnectionHandlingTest$Problem[Problem.SUSPENDED_CONNECTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$leaderelection$ZooKeeperLeaderElectionConnectionHandlingTest$Problem[Problem.LOST_CONNECTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest$Problem.class */
    public enum Problem {
        LOST_CONNECTION,
        SUSPENDED_CONNECTION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest$TestingConnectionStateListener.class */
    public static final class TestingConnectionStateListener implements ConnectionStateListener {
        private final OneShotLatch connectionSuspendedLatch = new OneShotLatch();
        private final OneShotLatch reconnectedLatch = new OneShotLatch();
        private final OneShotLatch connectionLostLatch = new OneShotLatch();

        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            if (connectionState == ConnectionState.SUSPENDED) {
                this.connectionSuspendedLatch.trigger();
            }
            if (connectionState == ConnectionState.RECONNECTED) {
                this.reconnectedLatch.trigger();
            }
            if (connectionState == ConnectionState.LOST) {
                this.connectionLostLatch.trigger();
            }
        }

        public void awaitSuspendedConnection() throws InterruptedException {
            this.connectionSuspendedLatch.await();
        }

        public void awaitReconnectedConnection() throws InterruptedException {
            this.reconnectedLatch.await();
        }

        public void awaitLostConnection() throws InterruptedException {
            this.connectionLostLatch.await();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest$TestingContender.class */
    public final class TestingContender implements LeaderContender {
        private final OneShotLatch grantLeadershipLatch;
        private final OneShotLatch revokeLeadershipLatch;

        private TestingContender() {
            this.grantLeadershipLatch = new OneShotLatch();
            this.revokeLeadershipLatch = new OneShotLatch();
        }

        public void grantLeadership(UUID uuid) {
            this.grantLeadershipLatch.trigger();
        }

        public void revokeLeadership() {
            this.revokeLeadershipLatch.trigger();
        }

        public void handleError(Exception exc) {
            ZooKeeperLeaderElectionConnectionHandlingTest.this.fatalErrorHandlerResource.getFatalErrorHandler().onFatalError(exc);
        }

        public void awaitGrantLeadership() throws InterruptedException {
            this.grantLeadershipLatch.await();
        }

        public boolean hasRevokeLeadershipBeenTriggered() {
            return this.revokeLeadershipLatch.isTriggered();
        }

        public void awaitRevokeLeadership() throws InterruptedException {
            this.revokeLeadershipLatch.await();
        }

        /* synthetic */ TestingContender(ZooKeeperLeaderElectionConnectionHandlingTest zooKeeperLeaderElectionConnectionHandlingTest, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    @Before
    public void setup() {
        this.configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperResource.getConnectString());
    }

    @Test
    public void testLoseLeadershipOnConnectionSuspended() throws Exception {
        runTestWithBrieflySuspendedZooKeeperConnection(this.configuration, (testingConnectionStateListener, testingContender) -> {
            testingConnectionStateListener.awaitSuspendedConnection();
            testingContender.awaitRevokeLeadership();
        });
    }

    @Test
    public void testKeepLeadershipOnSuspendedConnectionIfTolerateSuspendedConnectionsIsEnabled() throws Exception {
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS, true);
        runTestWithBrieflySuspendedZooKeeperConnection(this.configuration, (testingConnectionStateListener, testingContender) -> {
            testingConnectionStateListener.awaitSuspendedConnection();
            testingConnectionStateListener.awaitReconnectedConnection();
            Assert.assertFalse(testingContender.hasRevokeLeadershipBeenTriggered());
        });
    }

    @Test
    public void testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled() throws Exception {
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS, true);
        runTestWithLostZooKeeperConnection(this.configuration, (testingConnectionStateListener, testingContender) -> {
            testingConnectionStateListener.awaitLostConnection();
            testingContender.awaitRevokeLeadership();
        });
    }

    private void runTestWithLostZooKeeperConnection(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> biConsumerWithException) throws Exception {
        runTestWithZooKeeperConnectionProblem(configuration, biConsumerWithException, Problem.LOST_CONNECTION);
    }

    private void runTestWithBrieflySuspendedZooKeeperConnection(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> biConsumerWithException) throws Exception {
        runTestWithZooKeeperConnectionProblem(configuration, biConsumerWithException, Problem.SUSPENDED_CONNECTION);
    }

    private void runTestWithZooKeeperConnectionProblem(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> biConsumerWithException, Problem problem) throws Exception {
        CuratorFrameworkWithUnhandledErrorListener startCuratorFramework = ZooKeeperUtils.startCuratorFramework(configuration, this.fatalErrorHandlerResource.getFatalErrorHandler());
        CuratorFramework asCuratorFramework = startCuratorFramework.asCuratorFramework();
        DefaultLeaderElectionService defaultLeaderElectionService = new DefaultLeaderElectionService(new ZooKeeperLeaderElectionDriverFactory(asCuratorFramework, PATH));
        try {
            TestingConnectionStateListener testingConnectionStateListener = new TestingConnectionStateListener();
            asCuratorFramework.getConnectionStateListenable().addListener(testingConnectionStateListener);
            TestingContender testingContender = new TestingContender(this, null);
            defaultLeaderElectionService.start(testingContender);
            testingContender.awaitGrantLeadership();
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$leaderelection$ZooKeeperLeaderElectionConnectionHandlingTest$Problem[problem.ordinal()]) {
                case PendingCheckpointTest.PARALLELISM /* 1 */:
                    this.zooKeeperResource.restart();
                    break;
                case 2:
                    this.zooKeeperResource.stop();
                    break;
                default:
                    throw new IllegalArgumentException(String.format("Unknown problem type %s.", problem));
            }
            biConsumerWithException.accept(testingConnectionStateListener, testingContender);
            defaultLeaderElectionService.stop();
            startCuratorFramework.close();
            if (problem == Problem.LOST_CONNECTION) {
                this.fatalErrorHandlerResource.getFatalErrorHandler().clearError();
            }
        } catch (Throwable th) {
            defaultLeaderElectionService.stop();
            startCuratorFramework.close();
            if (problem == Problem.LOST_CONNECTION) {
                this.fatalErrorHandlerResource.getFatalErrorHandler().clearError();
            }
            throw th;
        }
    }
}
