/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.registration;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.registration.TestRegistrationGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.LoggerFactory;

public class RetryingRegistrationTest
extends TestLogger {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimpleSuccessfulRegistration() throws Exception {
        String testId = "laissez les bon temps roulez";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        TestRegistrationGateway testGateway = new TestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        TestingRpcService rpc = new TestingRpcService();
        try {
            rpc.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration((RpcService)rpc, "<test-address>", leaderId);
            registration.startRegistration();
            Future future = registration.getFuture();
            Assert.assertNotNull((Object)future);
            Assert.assertEquals((Object)future, (Object)registration.getFuture());
            Tuple2 success = (Tuple2)future.get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"laissez les bon temps roulez", (Object)((TestRegistrationSuccess)((Object)success.f1)).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
        }
        finally {
            testGateway.stop();
            rpc.stopService();
        }
    }

    @Test
    public void testPropagateFailures() throws Exception {
        String testExceptionMessage = "testExceptionMessage";
        RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
        Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenThrow(new Throwable[]{new RuntimeException("testExceptionMessage")});
        TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
        registration.startRegistration();
        Future future = registration.getFuture();
        Assert.assertTrue((boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"We expected an ExecutionException.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)"testExceptionMessage", (Object)e.getCause().getMessage());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryConnectOnFailure() throws Exception {
        String testId = "laissez les bon temps roulez";
        UUID leaderId = UUID.randomUUID();
        ExecutorService executor = Executors.newCachedThreadPool();
        TestRegistrationGateway testGateway = new TestRegistrationGateway(new RegistrationResponse[]{new TestRegistrationSuccess("laissez les bon temps roulez")});
        try {
            RpcService rpc = (RpcService)Mockito.mock(RpcService.class);
            Mockito.when((Object)rpc.connect(Mockito.anyString(), (Class)Mockito.any(Class.class))).thenReturn((Object)FlinkCompletableFuture.completedExceptionally((Throwable)new Exception("test connect failure")), (Object[])new Future[]{FlinkCompletableFuture.completed((Object)testGateway)});
            Mockito.when((Object)rpc.getExecutor()).thenReturn((Object)executor);
            TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId);
            registration.startRegistration();
            Tuple2 success = (Tuple2)registration.getFuture().get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)"laissez les bon temps roulez", (Object)((TestRegistrationSuccess)((Object)success.f1)).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
        }
        finally {
            testGateway.stop();
            executor.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetriesOnTimeouts() throws Exception {
        String testId = "rien ne va plus";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        TestRegistrationGateway testGateway = new TestRegistrationGateway(new RegistrationResponse[]{null, null, new TestRegistrationSuccess("rien ne va plus")});
        TestingRpcService rpc = new TestingRpcService();
        try {
            rpc.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration((RpcService)rpc, "<test-address>", leaderId);
            long started = System.nanoTime();
            registration.startRegistration();
            Future future = registration.getFuture();
            Tuple2 success = (Tuple2)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assert.assertEquals((Object)"rien ne va plus", (Object)((TestRegistrationSuccess)((Object)success.f1)).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
            Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 60L ? 1 : 0) != 0);
        }
        finally {
            rpc.stopService();
            testGateway.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDecline() throws Exception {
        String testId = "qui a coupe le fromage";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        TestingRpcService rpc = new TestingRpcService();
        TestRegistrationGateway testGateway = new TestRegistrationGateway(new RegistrationResponse[]{null, new RegistrationResponse.Decline("no reason "), null, new TestRegistrationSuccess("qui a coupe le fromage")});
        try {
            rpc.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration((RpcService)rpc, "<test-address>", leaderId);
            long started = System.nanoTime();
            registration.startRegistration();
            Future future = registration.getFuture();
            Tuple2 success = (Tuple2)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assert.assertEquals((Object)"qui a coupe le fromage", (Object)((TestRegistrationSuccess)((Object)success.f1)).getCorrelationId());
            Assert.assertEquals((Object)leaderId, (Object)testGateway.getInvocations().take().leaderId());
            Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 240L ? 1 : 0) != 0);
        }
        finally {
            testGateway.stop();
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetryOnError() throws Exception {
        String testId = "Petit a petit, l'oiseau fait son nid";
        String testEndpointAddress = "<test-address>";
        UUID leaderId = UUID.randomUUID();
        TestingRpcService rpc = new TestingRpcService();
        try {
            TestRegistrationGateway testGateway = (TestRegistrationGateway)Mockito.mock(TestRegistrationGateway.class);
            Mockito.when(testGateway.registrationCall((UUID)Mockito.any(UUID.class), Mockito.anyLong())).thenReturn((Object)FlinkCompletableFuture.completedExceptionally((Throwable)new Exception("test exception")), (Object[])new Future[]{FlinkCompletableFuture.completed((Object)((Object)new TestRegistrationSuccess("Petit a petit, l'oiseau fait son nid")))});
            rpc.registerGateway("<test-address>", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration((RpcService)rpc, "<test-address>", leaderId);
            long started = System.nanoTime();
            registration.startRegistration();
            Future future = registration.getFuture();
            Tuple2 success = (Tuple2)future.get(10L, TimeUnit.SECONDS);
            long finished = System.nanoTime();
            long elapsedMillis = (finished - started) / 1000000L;
            Assert.assertEquals((Object)"Petit a petit, l'oiseau fait son nid", (Object)((TestRegistrationSuccess)((Object)success.f1)).getCorrelationId());
            Assert.assertTrue((String)"retries did not properly back off", (elapsedMillis >= 200L ? 1 : 0) != 0);
        }
        finally {
            rpc.stopService();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancellation() throws Exception {
        String testEndpointAddress = "my-test-address";
        UUID leaderId = UUID.randomUUID();
        TestingRpcService rpc = new TestingRpcService();
        try {
            FlinkCompletableFuture result = new FlinkCompletableFuture();
            TestRegistrationGateway testGateway = (TestRegistrationGateway)Mockito.mock(TestRegistrationGateway.class);
            Mockito.when(testGateway.registrationCall((UUID)Mockito.any(UUID.class), Mockito.anyLong())).thenReturn((Object)result);
            rpc.registerGateway("my-test-address", testGateway);
            TestRetryingRegistration registration = new TestRetryingRegistration((RpcService)rpc, "my-test-address", leaderId);
            registration.startRegistration();
            registration.cancel();
            result.completeExceptionally((Throwable)new TimeoutException());
            ((TestRegistrationGateway)Mockito.verify((Object)testGateway, (VerificationMode)Mockito.atMost((int)1))).registrationCall((UUID)Mockito.any(UUID.class), Mockito.anyLong());
        }
        finally {
            rpc.stopService();
        }
    }

    protected static class TestRetryingRegistration
    extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
        static final long INITIAL_TIMEOUT = 20L;
        static final long MAX_TIMEOUT = 200L;
        static final long DELAY_ON_ERROR = 200L;
        static final long DELAY_ON_DECLINE = 200L;

        public TestRetryingRegistration(RpcService rpc, String targetAddress, UUID leaderId) {
            super(LoggerFactory.getLogger(RetryingRegistrationTest.class), rpc, "TestEndpoint", TestRegistrationGateway.class, targetAddress, leaderId, 20L, 200L, 200L, 200L);
        }

        protected Future<RegistrationResponse> invokeRegistration(TestRegistrationGateway gateway, UUID leaderId, long timeoutMillis) {
            return gateway.registrationCall(leaderId, timeoutMillis);
        }
    }

    protected static class TestRegistrationSuccess
    extends RegistrationResponse.Success {
        private static final long serialVersionUID = 5542698790917150604L;
        private final String correlationId;

        public TestRegistrationSuccess(String correlationId) {
            this.correlationId = correlationId;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }
    }
}

