/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class AsyncCallsTest
extends TestLogger {
    private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds((long)10000L));

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        final ReentrantLock lock = new ReentrantLock();
        final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)akkaRpcService, lock);
        testEndpoint.start();
        TestGateway gateway = (TestGateway)testEndpoint.getSelf();
        gateway.someCall();
        gateway.anotherCall();
        gateway.someCall();
        for (int i = 0; i < 10000; ++i) {
            testEndpoint.runAsync(new Runnable(){

                @Override
                public void run() {
                    boolean holdsLock = lock.tryLock();
                    if (holdsLock) {
                        lock.unlock();
                    } else {
                        concurrentAccess.set(true);
                    }
                }
            });
        }
        Future result = testEndpoint.callAsync(new Callable<String>(){

            @Override
            public String call() throws Exception {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                return "test";
            }
        }, Time.seconds((long)30L));
        String str = (String)result.get(30L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)"test", (Object)str);
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)testEndpoint.hasConcurrentAccess());
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        akkaRpcService.stopServer(testEndpoint.getSelf());
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        final ReentrantLock lock = new ReentrantLock();
        final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
        final OneShotLatch latch = new OneShotLatch();
        long delay = 100L;
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)akkaRpcService, lock);
        testEndpoint.start();
        testEndpoint.runAsync(new Runnable(){

            @Override
            public void run() {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
            }
        });
        long start = System.nanoTime();
        testEndpoint.scheduleRunAsync(new Runnable(){

            @Override
            public void run() {
                boolean holdsLock = lock.tryLock();
                if (holdsLock) {
                    lock.unlock();
                } else {
                    concurrentAccess.set(true);
                }
                latch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS);
        latch.await();
        long stop = System.nanoTime();
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)testEndpoint.hasConcurrentAccess());
        Assert.assertFalse((String)"Rpc Endpoint had concurrent access", (boolean)concurrentAccess.get());
        Assert.assertTrue((String)"call was not properly delayed", ((stop - start) / 1000000L >= 100L ? 1 : 0) != 0);
    }

    public static class TestEndpoint
    extends RpcEndpoint<TestGateway> {
        private final ReentrantLock lock;
        private volatile boolean concurrentAccess;

        public TestEndpoint(RpcService rpcService, ReentrantLock lock) {
            super(rpcService);
            this.lock = lock;
        }

        @RpcMethod
        public void someCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        @RpcMethod
        public void anotherCall() {
            boolean holdsLock = this.lock.tryLock();
            if (holdsLock) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        public boolean hasConcurrentAccess() {
            return this.concurrentAccess;
        }
    }

    public static interface TestGateway
    extends RpcGateway {
        public void someCall();

        public void anotherCall();
    }
}

