/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.BitSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.Preconditions;

public class TestingSerialRpcService
implements RpcService {
    private final DirectExecutorService executorService = new DirectExecutorService();
    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections = new ConcurrentHashMap(16);
    private final CompletableFuture<Void> terminationFuture = new FlinkCompletableFuture();
    private final ScheduledExecutor scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(this.scheduledExecutorService);

    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
        try {
            unit.sleep(delay);
            runnable.run();
            return new DoneScheduledFuture(null);
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    public void execute(Runnable runnable) {
        runnable.run();
    }

    public <T> Future<T> execute(Callable<T> callable) {
        try {
            T result = callable.call();
            return FlinkCompletableFuture.completed(result);
        }
        catch (Exception e) {
            return FlinkCompletableFuture.completedExceptionally((Throwable)e);
        }
    }

    public Executor getExecutor() {
        return this.executorService;
    }

    public ScheduledExecutor getScheduledExecutor() {
        return this.scheduledExecutorServiceAdapter;
    }

    public void stopService() {
        this.executorService.shutdown();
        this.scheduledExecutorService.shutdown();
        boolean terminated = false;
        try {
            terminated = this.scheduledExecutorService.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (!terminated) {
            List<Runnable> runnables = this.scheduledExecutorService.shutdownNow();
            for (Runnable runnable : runnables) {
                runnable.run();
            }
        }
        this.registeredConnections.clear();
        this.terminationFuture.complete(null);
    }

    public Future<Void> getTerminationFuture() {
        return this.terminationFuture;
    }

    public void stopServer(RpcGateway selfGateway) {
        this.registeredConnections.remove(selfGateway.getAddress());
    }

    public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
        String address = UUID.randomUUID().toString();
        TestingSerialInvocationHandler akkaInvocationHandler = new TestingSerialInvocationHandler(address, (RpcEndpoint)rpcEndpoint, null);
        ClassLoader classLoader = this.getClass().getClassLoader();
        RpcGateway self = (RpcGateway)Proxy.newProxyInstance(classLoader, new Class[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutable.class, StartStoppable.class, RpcGateway.class}, akkaInvocationHandler);
        this.registeredConnections.putIfAbsent(self.getAddress(), self);
        return (C)self;
    }

    public String getAddress() {
        return "";
    }

    public <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                RpcGateway typedGateway = gateway;
                return FlinkCompletableFuture.completed((Object)typedGateway);
            }
            return FlinkCompletableFuture.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + clazz));
        }
        return FlinkCompletableFuture.completedExceptionally((Throwable)new Exception("No gateway registered under that name"));
    }

    public void registerGateway(String address, RpcGateway gateway) {
        Preconditions.checkNotNull((Object)address);
        Preconditions.checkNotNull((Object)gateway);
        if (this.registeredConnections.putIfAbsent(address, gateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + address);
        }
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }

    private static class DoneScheduledFuture<V>
    implements ScheduledFuture<V> {
        private final V value;

        private DoneScheduledFuture(V value) {
            this.value = value;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return 0L;
        }

        @Override
        public int compareTo(Delayed o) {
            return 0;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }

        @Override
        public V get() throws InterruptedException, ExecutionException {
            return this.value;
        }

        @Override
        public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.value;
        }
    }

    private static final class TestingSerialInvocationHandler<C extends RpcGateway, T extends RpcEndpoint<C>>
    implements InvocationHandler,
    RpcGateway,
    MainThreadExecutable,
    StartStoppable {
        private final T rpcEndpoint;
        private final Time timeout;
        private final String address;

        private TestingSerialInvocationHandler(String address, T rpcEndpoint) {
            this(address, rpcEndpoint, Time.seconds((long)10L));
        }

        private TestingSerialInvocationHandler(String address, T rpcEndpoint, Time timeout) {
            this.rpcEndpoint = rpcEndpoint;
            this.timeout = timeout;
            this.address = address;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Class<?> declaringClass = method.getDeclaringClass();
            if (declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) {
                return method.invoke((Object)this, args);
            }
            String methodName = method.getName();
            Class<?>[] parameterTypes = method.getParameterTypes();
            Annotation[][] parameterAnnotations = method.getParameterAnnotations();
            Time futureTimeout = TestingSerialInvocationHandler.extractRpcTimeout(parameterAnnotations, args, this.timeout);
            Tuple2<Class<?>[], Object[]> filteredArguments = TestingSerialInvocationHandler.filterArguments(parameterTypes, parameterAnnotations, args);
            Class<?> returnType = method.getReturnType();
            if (returnType.equals(Future.class)) {
                try {
                    Object result = this.handleRpcInvocationSync(methodName, (Class[])filteredArguments.f0, (Object[])filteredArguments.f1, futureTimeout);
                    return FlinkCompletableFuture.completed((Object)result);
                }
                catch (Throwable e) {
                    return FlinkCompletableFuture.completedExceptionally((Throwable)e);
                }
            }
            return this.handleRpcInvocationSync(methodName, (Class[])filteredArguments.f0, (Object[])filteredArguments.f1, futureTimeout);
        }

        private Object handleRpcInvocationSync(String methodName, Class<?>[] parameterTypes, Object[] args, Time futureTimeout) throws Exception {
            Method rpcMethod = this.lookupRpcMethod(methodName, parameterTypes);
            Object result = rpcMethod.invoke(this.rpcEndpoint, args);
            if (result instanceof Future) {
                Future future = (Future)result;
                return future.get(futureTimeout.getSize(), futureTimeout.getUnit());
            }
            return result;
        }

        public void runAsync(Runnable runnable) {
            runnable.run();
        }

        public <V> Future<V> callAsync(Callable<V> callable, Time callTimeout) {
            try {
                return FlinkCompletableFuture.completed(callable.call());
            }
            catch (Throwable e) {
                return FlinkCompletableFuture.completedExceptionally((Throwable)e);
            }
        }

        public void scheduleRunAsync(Runnable runnable, long delay) {
            try {
                TimeUnit.MILLISECONDS.sleep(delay);
                runnable.run();
            }
            catch (Throwable e) {
                throw new RuntimeException(e);
            }
        }

        public String getAddress() {
            return this.address;
        }

        public String getHostname() {
            return this.address;
        }

        public void start() {
        }

        public void stop() {
        }

        private Method lookupRpcMethod(String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
            return this.rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
        }

        private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) {
            if (args != null) {
                Preconditions.checkArgument((parameterAnnotations.length == args.length ? 1 : 0) != 0);
                for (int i = 0; i < parameterAnnotations.length; ++i) {
                    if (!TestingSerialInvocationHandler.isRpcTimeout(parameterAnnotations[i])) continue;
                    if (args[i] instanceof Time) {
                        return (Time)args[i];
                    }
                    throw new RuntimeException("The rpc timeout parameter must be of type " + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported.");
                }
            }
            return defaultTimeout;
        }

        private static Tuple2<Class<?>[], Object[]> filterArguments(Class<?>[] parameterTypes, Annotation[][] parameterAnnotations, Object[] args) {
            Object[] filteredArgs;
            Class<?>[] filteredParameterTypes;
            if (args == null) {
                filteredParameterTypes = parameterTypes;
                filteredArgs = null;
            } else {
                Preconditions.checkArgument((parameterTypes.length == parameterAnnotations.length ? 1 : 0) != 0);
                Preconditions.checkArgument((parameterAnnotations.length == args.length ? 1 : 0) != 0);
                BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
                int numberRpcParameters = parameterTypes.length;
                for (int i = 0; i < parameterTypes.length; ++i) {
                    if (!TestingSerialInvocationHandler.isRpcTimeout(parameterAnnotations[i])) continue;
                    isRpcTimeoutParameter.set(i);
                    --numberRpcParameters;
                }
                if (numberRpcParameters == parameterTypes.length) {
                    filteredParameterTypes = parameterTypes;
                    filteredArgs = args;
                } else {
                    filteredParameterTypes = new Class[numberRpcParameters];
                    filteredArgs = new Object[numberRpcParameters];
                    int counter = 0;
                    for (int i = 0; i < parameterTypes.length; ++i) {
                        if (isRpcTimeoutParameter.get(i)) continue;
                        filteredParameterTypes[counter] = parameterTypes[i];
                        filteredArgs[counter] = args[i];
                        ++counter;
                    }
                }
            }
            return Tuple2.of(filteredParameterTypes, filteredArgs);
        }

        private static boolean isRpcTimeout(Annotation[] annotations) {
            for (Annotation annotation : annotations) {
                if (!annotation.annotationType().equals(RpcTimeout.class)) continue;
                return true;
            }
            return false;
        }

        /* synthetic */ TestingSerialInvocationHandler(String x0, RpcEndpoint x1, 1 x2) {
            this(x0, x1);
        }
    }
}

