/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.dispatch.OnComplete;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.QuarantineHandler;
import org.apache.flink.runtime.akka.QuarantineMonitor;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class QuarantineMonitorTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);
    private static final FiniteDuration zeroDelay = new FiniteDuration(0L, TimeUnit.SECONDS);
    private static ActorSystem actorSystem1;
    private ActorSystem actorSystem2;

    @BeforeClass
    public static void setup() {
        Properties properties = new Properties();
        properties.setProperty("akka.remote.watch-failure-detector.threshold", "0.00001");
        properties.setProperty("akka.remote.watch-failure-detector.heartbeat-interval", "1 ms");
        properties.setProperty("akka.remote.watch-failure-detector.acceptable-heartbeat-pause", "1 ms");
        Config deathWatch = ConfigFactory.parseProperties((Properties)properties);
        Config defaultConfig = AkkaUtils.getDefaultAkkaConfig();
        actorSystem1 = AkkaUtils.createActorSystem((Config)deathWatch.withFallback((ConfigMergeable)defaultConfig));
    }

    @AfterClass
    public static void tearDown() {
        if (actorSystem1 != null) {
            actorSystem1.shutdown();
            actorSystem1.awaitTermination();
        }
    }

    @Before
    public void setupTest() {
        this.actorSystem2 = AkkaUtils.createDefaultActorSystem();
    }

    @After
    public void tearDownTest() {
        if (this.actorSystem2 != null) {
            this.actorSystem2.shutdown();
            this.actorSystem2.awaitTermination();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testWatcheeQuarantined() throws ExecutionException, InterruptedException {
        TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
        ActorRef watchee = null;
        ActorRef watcher = null;
        ActorRef monitor = null;
        FiniteDuration timeout = new FiniteDuration(5L, TimeUnit.SECONDS);
        FiniteDuration interval = new FiniteDuration(200L, TimeUnit.MILLISECONDS);
        try {
            monitor = this.actorSystem2.actorOf(QuarantineMonitorTest.getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
            watchee = this.actorSystem2.actorOf(QuarantineMonitorTest.getWatcheeProps(timeout, interval, quarantineHandler), "watchee");
            watcher = actorSystem1.actorOf(QuarantineMonitorTest.getWatcherProps(timeout, interval, quarantineHandler), "watcher");
            Address actorSystem1Address = AkkaUtils.getAddress((ActorSystem)actorSystem1);
            String watcheeAddress = AkkaUtils.getAkkaURL((ActorSystem)this.actorSystem2, (ActorRef)watchee);
            String watcherAddress = AkkaUtils.getAkkaURL((ActorSystem)actorSystem1, (ActorRef)watcher);
            watchee.tell((Object)new Ping(watcherAddress), ActorRef.noSender());
            watcher.tell((Object)new Watch(watcheeAddress), ActorRef.noSender());
            Future<String> quarantineFuture = quarantineHandler.getWasQuarantinedByFuture();
            Assert.assertEquals((Object)actorSystem1Address.toString(), (Object)quarantineFuture.get());
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(watchee);
            TestingUtils.stopActor(watcher);
            TestingUtils.stopActor(monitor);
            throw throwable;
        }
        TestingUtils.stopActor(watchee);
        TestingUtils.stopActor(watcher);
        TestingUtils.stopActor(monitor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testWatcherQuarantining() throws ExecutionException, InterruptedException {
        TestingQuarantineHandler quarantineHandler = new TestingQuarantineHandler();
        ActorRef watchee = null;
        ActorRef watcher = null;
        ActorRef monitor = null;
        FiniteDuration timeout = new FiniteDuration(5L, TimeUnit.SECONDS);
        FiniteDuration interval = new FiniteDuration(200L, TimeUnit.MILLISECONDS);
        try {
            monitor = actorSystem1.actorOf(QuarantineMonitorTest.getQuarantineMonitorProps(quarantineHandler), "quarantineMonitor");
            watchee = this.actorSystem2.actorOf(QuarantineMonitorTest.getWatcheeProps(timeout, interval, quarantineHandler), "watchee");
            watcher = actorSystem1.actorOf(QuarantineMonitorTest.getWatcherProps(timeout, interval, quarantineHandler), "watcher");
            Address actorSystem1Address = AkkaUtils.getAddress((ActorSystem)this.actorSystem2);
            String watcheeAddress = AkkaUtils.getAkkaURL((ActorSystem)this.actorSystem2, (ActorRef)watchee);
            String watcherAddress = AkkaUtils.getAkkaURL((ActorSystem)actorSystem1, (ActorRef)watcher);
            watchee.tell((Object)new Ping(watcherAddress), ActorRef.noSender());
            watcher.tell((Object)new Watch(watcheeAddress), ActorRef.noSender());
            Future<String> quarantineFuture = quarantineHandler.getHasQuarantinedFuture();
            Assert.assertEquals((Object)actorSystem1Address.toString(), (Object)quarantineFuture.get());
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(watchee);
            TestingUtils.stopActor(watcher);
            TestingUtils.stopActor(monitor);
            throw throwable;
        }
        TestingUtils.stopActor(watchee);
        TestingUtils.stopActor(watcher);
        TestingUtils.stopActor(monitor);
    }

    static Props getWatcheeProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
        return Props.create(Watchee.class, (Object[])new Object[]{timeout, interval, errorHandler});
    }

    static Props getWatcherProps(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
        return Props.create(Watcher.class, (Object[])new Object[]{timeout, interval, errorHandler});
    }

    static Props getQuarantineMonitorProps(QuarantineHandler handler) {
        return Props.create(QuarantineMonitor.class, (Object[])new Object[]{handler, LOG});
    }

    static class Ping {
        private final String target;

        Ping(String target) {
            this.target = target;
        }

        public String getTarget() {
            return this.target;
        }
    }

    static class Watch {
        private final String target;

        Watch(String target) {
            this.target = target;
        }

        public String getTarget() {
            return this.target;
        }
    }

    static class Watchee
    extends UntypedActor {
        private final FiniteDuration timeout;
        private final FiniteDuration interval;
        private final ErrorHandler errorHandler;

        Watchee(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
            this.timeout = (FiniteDuration)Preconditions.checkNotNull((Object)timeout);
            this.interval = (FiniteDuration)Preconditions.checkNotNull((Object)interval);
            this.errorHandler = (ErrorHandler)Preconditions.checkNotNull((Object)errorHandler);
        }

        public void onReceive(Object message) throws Exception {
            if (message instanceof Ping) {
                Ping ping = (Ping)message;
                this.getContext().actorSelection(ping.getTarget()).resolveOne(this.timeout).onComplete((Function1)new OnComplete<ActorRef>(){

                    public void onComplete(Throwable failure, ActorRef success) throws Throwable {
                        if (success != null) {
                            Watchee.this.getContext().system().scheduler().schedule(zeroDelay, Watchee.this.interval, success, (Object)"Watchee message", (ExecutionContext)Watchee.this.getContext().dispatcher(), Watchee.this.getSelf());
                        } else {
                            Watchee.this.errorHandler.handleError(failure);
                        }
                    }
                }, (ExecutionContext)this.getContext().dispatcher());
            }
        }
    }

    static class Watcher
    extends UntypedActor {
        private final FiniteDuration timeout;
        private final FiniteDuration interval;
        private final ErrorHandler errorHandler;

        Watcher(FiniteDuration timeout, FiniteDuration interval, ErrorHandler errorHandler) {
            this.timeout = (FiniteDuration)Preconditions.checkNotNull((Object)timeout);
            this.interval = (FiniteDuration)Preconditions.checkNotNull((Object)interval);
            this.errorHandler = (ErrorHandler)Preconditions.checkNotNull((Object)errorHandler);
        }

        public void onReceive(Object message) throws Exception {
            if (message instanceof Watch) {
                Watch watch = (Watch)message;
                this.getContext().actorSelection(watch.getTarget()).resolveOne(this.timeout).onComplete((Function1)new OnComplete<ActorRef>(){

                    public void onComplete(Throwable failure, ActorRef success) throws Throwable {
                        if (success != null) {
                            Watcher.this.getContext().watch(success);
                            Watcher.this.getContext().system().scheduler().schedule(zeroDelay, Watcher.this.interval, success, (Object)"Watcher message", (ExecutionContext)Watcher.this.getContext().dispatcher(), Watcher.this.getSelf());
                        } else {
                            Watcher.this.errorHandler.handleError(failure);
                        }
                    }
                }, (ExecutionContext)this.getContext().dispatcher());
            }
        }
    }

    private static interface ErrorHandler {
        public void handleError(Throwable var1);
    }

    private static class TestingQuarantineHandler
    implements QuarantineHandler,
    ErrorHandler {
        private final CompletableFuture<String> wasQuarantinedByFuture = new FlinkCompletableFuture();
        private final CompletableFuture<String> hasQuarantinedFuture = new FlinkCompletableFuture();

        public void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem) {
            this.wasQuarantinedByFuture.complete((Object)remoteSystem);
        }

        public void hasQuarantined(String remoteSystem, ActorSystem actorSystem) {
            this.hasQuarantinedFuture.complete((Object)remoteSystem);
        }

        public Future<String> getWasQuarantinedByFuture() {
            return this.wasQuarantinedByFuture;
        }

        public Future<String> getHasQuarantinedFuture() {
            return this.hasQuarantinedFuture;
        }

        @Override
        public void handleError(Throwable failure) {
            this.wasQuarantinedByFuture.completeExceptionally(failure);
            this.hasQuarantinedFuture.completeExceptionally(failure);
        }
    }
}

