package org.apache.flink.runtime.rpc;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest.class */
public class AsyncCallsTest extends TestLogger {
    private static final Duration timeout = Duration.ofSeconds(10);
    private static RpcService rpcService;

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$FencedTestEndpoint.class */
    public static class FencedTestEndpoint extends FencedRpcEndpoint<UUID> implements FencedTestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;
        private final OneShotLatch enteringSetNewFencingToken;
        private final OneShotLatch triggerSetNewFencingToken;

        protected FencedTestEndpoint(RpcService rpcService, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean) {
            this(rpcService, reentrantLock, atomicBoolean, UUID.randomUUID(), new OneShotLatch(), new OneShotLatch());
        }

        protected FencedTestEndpoint(RpcService rpcService, UUID uuid, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) {
            this(rpcService, new ReentrantLock(), new AtomicBoolean(false), uuid, oneShotLatch, oneShotLatch2);
        }

        private FencedTestEndpoint(RpcService rpcService, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean, UUID uuid, OneShotLatch oneShotLatch, OneShotLatch oneShotLatch2) {
            super(rpcService, uuid);
            this.lock = reentrantLock;
            this.concurrentAccess = atomicBoolean;
            this.enteringSetNewFencingToken = oneShotLatch;
            this.triggerSetNewFencingToken = oneShotLatch2;
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.FencedTestGateway
        public CompletableFuture<Acknowledge> setNewFencingToken(UUID uuid, Time time) {
            this.enteringSetNewFencingToken.trigger();
            try {
                this.triggerSetNewFencingToken.await();
                setFencingToken(uuid);
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (InterruptedException e) {
                throw new RuntimeException("TriggerSetNewFencingToken OneShotLatch was interrupted.");
            }
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void someCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void anotherCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$FencedTestGateway.class */
    public interface FencedTestGateway extends FencedRpcGateway<UUID>, TestGateway {
        CompletableFuture<Acknowledge> setNewFencingToken(UUID uuid, @RpcTimeout Time time);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$RpcEndpointFactory.class */
    public interface RpcEndpointFactory {
        RpcEndpoint create(RpcService rpcService, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean);
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestEndpoint.class */
    private static class TestEndpoint extends RpcEndpoint implements TestGateway {
        private final ReentrantLock lock;
        private final AtomicBoolean concurrentAccess;

        TestEndpoint(RpcService rpcService, ReentrantLock reentrantLock, AtomicBoolean atomicBoolean) {
            super(rpcService);
            this.lock = reentrantLock;
            this.concurrentAccess = atomicBoolean;
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void someCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }

        @Override // org.apache.flink.runtime.rpc.AsyncCallsTest.TestGateway
        public void anotherCall() {
            if (this.lock.tryLock()) {
                this.lock.unlock();
            } else {
                this.concurrentAccess.set(true);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rpc/AsyncCallsTest$TestGateway.class */
    public interface TestGateway extends RpcGateway {
        void someCall();

        void anotherCall();
    }

    @BeforeClass
    public static void setup() throws Exception {
        rpcService = RpcSystem.load().localServiceBuilder(new Configuration()).createAndStart();
    }

    @AfterClass
    public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        rpcService.stopService().get();
    }

    @Test
    public void testScheduleWithNoDelay() throws Exception {
        runScheduleWithNoDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithNoDelay() throws Exception {
        runScheduleWithNoDelayTest(FencedTestEndpoint::new);
    }

    private void runScheduleWithNoDelayTest(RpcEndpointFactory rpcEndpointFactory) throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        RpcEndpoint create = rpcEndpointFactory.create(rpcService, reentrantLock, atomicBoolean);
        create.start();
        try {
            TestGateway testGateway = (TestGateway) create.getSelfGateway(TestGateway.class);
            testGateway.someCall();
            testGateway.anotherCall();
            testGateway.someCall();
            for (int i = 0; i < 10000; i++) {
                create.runAsync(() -> {
                    if (reentrantLock.tryLock()) {
                        reentrantLock.unlock();
                    } else {
                        atomicBoolean.set(true);
                    }
                });
            }
            Assert.assertEquals("test", (String) create.callAsync(() -> {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                    return "test";
                }
                atomicBoolean.set(true);
                return "test";
            }, Duration.ofSeconds(30L)).get(30L, TimeUnit.SECONDS));
            Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{create});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{create});
            throw th;
        }
    }

    @Test
    public void testScheduleWithDelay() throws Exception {
        runScheduleWithDelayTest(TestEndpoint::new);
    }

    @Test
    public void testFencedScheduleWithDelay() throws Exception {
        runScheduleWithDelayTest(FencedTestEndpoint::new);
    }

    private void runScheduleWithDelayTest(RpcEndpointFactory rpcEndpointFactory) throws Exception {
        ReentrantLock reentrantLock = new ReentrantLock();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        OneShotLatch oneShotLatch = new OneShotLatch();
        RpcEndpoint create = rpcEndpointFactory.create(rpcService, reentrantLock, atomicBoolean);
        create.start();
        try {
            create.runAsync(() -> {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
            });
            long nanoTime = System.nanoTime();
            create.scheduleRunAsync(() -> {
                if (reentrantLock.tryLock()) {
                    reentrantLock.unlock();
                } else {
                    atomicBoolean.set(true);
                }
                oneShotLatch.trigger();
            }, 10L, TimeUnit.MILLISECONDS);
            oneShotLatch.await();
            long nanoTime2 = System.nanoTime();
            Assert.assertFalse("Rpc Endpoint had concurrent access", atomicBoolean.get());
            Assert.assertTrue("call was not properly delayed", (nanoTime2 - nanoTime) / 1000000 >= 10);
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{create});
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{create});
            throw th;
        }
    }

    @Test
    public void testRunAsyncWithFencing() throws Exception {
        Duration ofMillis = Duration.ofMillis(100L);
        UUID randomUUID = UUID.randomUUID();
        CompletableFuture completableFuture = new CompletableFuture();
        testRunAsync(fencedTestEndpoint -> {
            fencedTestEndpoint.runAsync(() -> {
                completableFuture.complete(fencedTestEndpoint.getFencingToken());
            });
            return completableFuture;
        }, randomUUID);
        try {
            completableFuture.get(ofMillis.toMillis(), TimeUnit.MILLISECONDS);
            Assert.fail("The async run operation should not complete since it is filtered out due to the changed fencing token.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testRunAsyncWithoutFencing() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        UUID randomUUID = UUID.randomUUID();
        testRunAsync(fencedTestEndpoint -> {
            fencedTestEndpoint.runAsyncWithoutFencing(() -> {
                completableFuture.complete(fencedTestEndpoint.getFencingToken());
            });
            return completableFuture;
        }, randomUUID);
        Assert.assertEquals(randomUUID, completableFuture.get());
    }

    @Test
    public void testCallAsyncWithFencing() throws Exception {
        try {
            testRunAsync(fencedTestEndpoint -> {
                return fencedTestEndpoint.callAsync(() -> {
                    return true;
                }, timeout);
            }, UUID.randomUUID()).get();
            Assert.fail("The async call operation should fail due to the changed fencing token.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
        }
    }

    @Test
    public void testCallAsyncWithoutFencing() throws Exception {
        Assert.assertTrue(((Boolean) testRunAsync(fencedTestEndpoint -> {
            return fencedTestEndpoint.callAsyncWithoutFencing(() -> {
                return true;
            }, timeout);
        }, UUID.randomUUID()).get()).booleanValue());
    }

    @Test
    public void testUnfencedMainThreadExecutor() throws Exception {
        Assert.assertThat(testRunAsync(fencedTestEndpoint -> {
            return CompletableFuture.supplyAsync(() -> {
                return true;
            }, fencedTestEndpoint.getUnfencedMainThreadExecutor());
        }, UUID.randomUUID()).get(), Matchers.is(true));
    }

    private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint, CompletableFuture<T>> function, UUID uuid) throws Exception {
        UUID randomUUID = UUID.randomUUID();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        RpcEndpoint fencedTestEndpoint = new FencedTestEndpoint(rpcService, randomUUID, oneShotLatch, oneShotLatch2);
        FencedTestGateway fencedTestGateway = (FencedTestGateway) fencedTestEndpoint.getSelfGateway(FencedTestGateway.class);
        try {
            fencedTestEndpoint.start();
            CompletableFuture<Acknowledge> newFencingToken = fencedTestGateway.setNewFencingToken(uuid, Time.fromDuration(timeout));
            Assert.assertFalse(newFencingToken.isDone());
            Assert.assertEquals(randomUUID, fencedTestEndpoint.getFencingToken());
            CompletableFuture<T> apply = function.apply(fencedTestEndpoint);
            oneShotLatch.await();
            oneShotLatch2.trigger();
            newFencingToken.get();
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestEndpoint});
            return apply;
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(new RpcEndpoint[]{fencedTestEndpoint});
            throw th;
        }
    }
}
