/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class AkkaRpcActorTest
extends TestLogger {
    private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static Time timeout = Time.milliseconds((long)10000L);
    private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, timeout);

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
        actorSystem.awaitTermination();
    }

    @Test
    public void testAddressResolution() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        Future futureRpcGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
        Assert.assertEquals((Object)rpcEndpoint.getAddress(), (Object)rpcGateway.getAddress());
    }

    @Test
    public void testFailingAddressResolution() throws Exception {
        Future futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class);
        try {
            futureRpcGateway.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"The rpc connection resolution should have failed.");
        }
        catch (ExecutionException exception) {
            Assert.assertTrue((boolean)(exception.getCause() instanceof RpcConnectionException));
        }
    }

    @Test
    public void testMessageDiscarding() throws Exception {
        int expectedValue = 1337;
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)rpcEndpoint.getSelf();
        Future<Integer> result = rpcGateway.foobar();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"Expected an AkkaRpcException.");
        }
        catch (ExecutionException ee) {
            Assert.assertTrue((boolean)(ee.getCause() instanceof AkkaRpcException));
        }
        rpcEndpoint.setFoobar(expectedValue);
        rpcEndpoint.start();
        result = rpcGateway.foobar();
        Integer actualValue = (Integer)result.get(timeout.getSize(), timeout.getUnit());
        Assert.assertThat((String)"The new foobar value should have been returned.", (Object)actualValue, (Matcher)Is.is((Object)expectedValue));
        rpcEndpoint.shutDown();
    }

    @Test
    public void testWrongGatewayEndpointConnection() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        Future futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
        WrongRpcGateway gateway = (WrongRpcGateway)futureGateway.get(timeout.getSize(), timeout.getUnit());
        gateway.tell("foobar");
        Future<Boolean> result = gateway.barfoo();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"We expected a RpcConnectionException.");
        }
        catch (ExecutionException executionException) {
            Assert.assertTrue((boolean)(executionException.getCause() instanceof RpcConnectionException));
        }
    }

    @Test(timeout=5000L)
    public void testRpcEndpointTerminationFuture() throws Exception {
        final DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        Future terminationFuture = rpcEndpoint.getTerminationFuture();
        Assert.assertFalse((boolean)terminationFuture.isDone());
        FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                rpcEndpoint.shutDown();
                return null;
            }
        }, (Executor)actorSystem.dispatcher());
        terminationFuture.get();
    }

    @Test
    public void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelf();
        Future<Integer> result = rpcGateway.doStuff();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"this should fail with an exception");
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(RuntimeException.class, cause.getClass());
            Assert.assertEquals((Object)"my super specific test exception", (Object)cause.getMessage());
        }
    }

    @Test
    public void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint((RpcService)akkaRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelf();
        Future<Integer> result = rpcGateway.doStuff();
        try {
            result.get(timeout.getSize(), timeout.getUnit());
            Assert.fail((String)"this should fail with an exception");
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(Exception.class, cause.getClass());
            Assert.assertEquals((Object)"some test", (Object)cause.getMessage());
        }
    }

    private static class ExceptionalFutureEndpoint
    extends RpcEndpoint<ExceptionalGateway> {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @RpcMethod
        public Future<Integer> doStuff() {
            final FlinkCompletableFuture future = new FlinkCompletableFuture();
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    future.completeExceptionally((Throwable)new Exception("some test"));
                }
            }.start();
            return future;
        }
    }

    private static class ExceptionalEndpoint
    extends RpcEndpoint<ExceptionalGateway> {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @RpcMethod
        public int doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    private static interface ExceptionalGateway
    extends RpcGateway {
        public Future<Integer> doStuff();
    }

    private static class DummyRpcEndpoint
    extends RpcEndpoint<DummyRpcGateway> {
        private volatile int _foobar = 42;

        protected DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @RpcMethod
        public int foobar() {
            return this._foobar;
        }

        public void setFoobar(int value) {
            this._foobar = value;
        }
    }

    private static interface WrongRpcGateway
    extends RpcGateway {
        public Future<Boolean> barfoo();

        public void tell(String var1);
    }

    private static interface DummyRpcGateway
    extends RpcGateway {
        public Future<Integer> foobar();
    }
}

