package org.apache.flink.runtime.rpc;

import akka.actor.ActorSystem;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest.class */
public class AsyncCallsTest extends TestLogger {
    private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
    private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds(10000));

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestEndpoint.class */
    public static class TestEndpoint extends RpcEndpoint<TestGateway> {
        private final ReentrantLock lock;
        private volatile boolean concurrentAccess;

        public TestEndpoint(RpcService rpcService, ReentrantLock reentrantLock) {
            super(rpcService);
            this.lock = reentrantLock;
        }

        @RpcMethod
        public void someCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        @RpcMethod
        public void anotherCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess = true;
            }
        }

        public boolean hasConcurrentAccess() {
            return this.concurrentAccess;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestGateway.class */
    public interface TestGateway extends RpcGateway {
        void someCall();

        void anotherCall();
    }

    @AfterClass
    public static void shutdown() {
        akkaRpcService.stopService();
        actorSystem.shutdown();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, reentrantLock);
        testEndpoint.start();
        TestGateway testGateway = (TestGateway) testEndpoint.getSelf();
        testGateway.someCall();
        testGateway.anotherCall();
        testGateway.someCall();
        for (int i = 0; i < 10000; i++) {
            testEndpoint.runAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.1
                @Override // java.lang.Runnable
                public void run() {
                    if (reentrantLock.tryLock()) {
                        reentrantLock.unlock();
                    } else {
                        atomicBoolean.set(true);
                    }
                }
            });
        }
        Assert.assertEquals("test", (String) testEndpoint.callAsync(new Callable<String>() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public String call() throws Exception {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                    return "test";
                }
                atomicBoolean.set(true);
                return "test";
            }
        }, Time.seconds(30L)).get(30L, TimeUnit.SECONDS));
        Assert.assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
        Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
        akkaRpcService.stopServer(testEndpoint.getSelf());
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        final ReentrantLock reentrantLock = new ReentrantLock();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, reentrantLock);
        testEndpoint.start();
        testEndpoint.runAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.3
            @Override // java.lang.Runnable
            public void run() {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
            }
        });
        long nanoTime = System.nanoTime();
        testEndpoint.scheduleRunAsync(new Runnable() { // from class: org.apache.flink.runtime.rpc.AsyncCallsTest.4
            @Override // java.lang.Runnable
            public void run() {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
                oneShotLatch.trigger();
            }
        }, 100L, TimeUnit.MILLISECONDS);
        oneShotLatch.await();
        long nanoTime2 = System.nanoTime();
        Assert.assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess());
        Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
        Assert.assertTrue("call was not properly delayed", (nanoTime2 - nanoTime) / 1000000 >= 100);
    }
}
