/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class AkkaRpcServiceTest
extends TestLogger {
    private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds((long)10000L));

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleRunnable() throws Exception {
        final OneShotLatch latch = new OneShotLatch();
        long delay = 100L;
        long start = System.nanoTime();
        ScheduledFuture scheduledFuture = akkaRpcService.scheduleRunnable(new Runnable(){

            @Override
            public void run() {
                latch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS);
        scheduledFuture.get();
        Assert.assertTrue((boolean)latch.isTriggered());
        long stop = System.nanoTime();
        Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 100L ? 1 : 0) != 0);
    }

    @Test
    public void testExecuteRunnable() throws Exception {
        final OneShotLatch latch = new OneShotLatch();
        akkaRpcService.execute(new Runnable(){

            @Override
            public void run() {
                latch.trigger();
            }
        });
        latch.await(30L, TimeUnit.SECONDS);
    }

    @Test
    public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException {
        final OneShotLatch latch = new OneShotLatch();
        int expected = 42;
        Future result = akkaRpcService.execute((Callable)new Callable<Integer>(){

            @Override
            public Integer call() throws Exception {
                latch.trigger();
                return 42;
            }
        });
        int actual = (Integer)result.get(30L, TimeUnit.SECONDS);
        Assert.assertEquals((long)42L, (long)actual);
        Assert.assertTrue((boolean)latch.isTriggered());
    }

    @Test
    public void testGetAddress() {
        Assert.assertEquals((Object)AkkaUtils.getAddress((ActorSystem)actorSystem).host().get(), (Object)akkaRpcService.getAddress());
    }

    @Test(timeout=60000L)
    public void testTerminationFuture() throws ExecutionException, InterruptedException {
        ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
        final AkkaRpcService rpcService = new AkkaRpcService(actorSystem, Time.milliseconds((long)1000L));
        Future terminationFuture = rpcService.getTerminationFuture();
        Assert.assertFalse((boolean)terminationFuture.isDone());
        FlinkFuture.supplyAsync((Callable)new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                rpcService.stopService();
                return null;
            }
        }, (Executor)actorSystem.dispatcher());
        terminationFuture.get();
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        final OneShotLatch latch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.schedule(new Runnable(){

            @Override
            public void run() {
                latch.trigger();
            }
        }, 10L, TimeUnit.MILLISECONDS);
        future.get();
        Assert.assertTrue((boolean)latch.isTriggered());
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                countDownLatch.countDown();
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        countDownLatch.await();
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        long finalTime = System.nanoTime() - currentTime;
        Assert.assertTrue((finalTime >= 40L ? 1 : 0) != 0);
        future.cancel(true);
    }

    @Test(timeout=60000L)
    public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException, InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                countDownLatch.countDown();
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS);
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        countDownLatch.await();
        Assert.assertTrue((!future.isDone() ? 1 : 0) != 0);
        long finalTime = System.nanoTime() - currentTime;
        Assert.assertTrue((finalTime >= 40L ? 1 : 0) != 0);
        future.cancel(true);
    }

    @Test
    public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
        ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
        long delay = 10L;
        final OneShotLatch futureTask = new OneShotLatch();
        final OneShotLatch latch = new OneShotLatch();
        final OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    if (!futureTask.isTriggered()) {
                        futureTask.trigger();
                        latch.await();
                    } else {
                        shouldNotBeTriggeredLatch.trigger();
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }, delay, delay, TimeUnit.MILLISECONDS);
        futureTask.await();
        future.cancel(false);
        latch.trigger();
        try {
            shouldNotBeTriggeredLatch.await(5L * delay, TimeUnit.MILLISECONDS);
            Assert.fail((String)"The shouldNotBeTriggeredLatch should never be triggered.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }
}

