/*
 * Decompiled with CFR 0.152.
 */
package org.deeplearning4j.scaleout.actor.core.actor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.japi.Function;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.canova.api.conf.Configuration;
import org.deeplearning4j.nn.conf.DeepLearningConfigurable;
import org.deeplearning4j.scaleout.actor.core.actor.MasterActor;
import org.deeplearning4j.scaleout.actor.core.actor.WorkerState;
import org.deeplearning4j.scaleout.actor.core.protocol.Ack;
import org.deeplearning4j.scaleout.actor.core.protocol.ClearWorker;
import org.deeplearning4j.scaleout.api.statetracker.StateTracker;
import org.deeplearning4j.scaleout.job.Job;
import org.deeplearning4j.scaleout.perform.WorkerPerformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;

public class WorkerActor
extends UntypedActor
implements DeepLearningConfigurable {
    protected Job currentJob;
    protected String id;
    Cluster cluster = Cluster.get((ActorSystem)this.getContext().system());
    protected ActorRef clusterClient;
    protected String masterPath;
    protected StateTracker tracker;
    protected AtomicBoolean isWorking = new AtomicBoolean(false);
    protected Configuration conf;
    protected ActorRef mediator = DistributedPubSubExtension.get((ActorSystem)this.getContext().system()).mediator();
    protected Cancellable heartbeat;
    protected static final Logger log = LoggerFactory.getLogger(WorkerActor.class);
    protected WorkerPerformer workerPerformer;

    public WorkerActor(Configuration conf, StateTracker tracker, WorkerPerformer workerPerformer) throws Exception {
        this(conf, null, tracker, workerPerformer);
    }

    public WorkerActor(Configuration conf, ActorRef client, StateTracker tracker, WorkerPerformer workerPerformer) throws Exception {
        this.tracker = tracker;
        this.workerPerformer = workerPerformer;
        this.mediator.tell((Object)new DistributedPubSubMediator.Put(this.getSelf()), this.getSelf());
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, this.getSelf()), this.getSelf());
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(MasterActor.SHUTDOWN, this.getSelf()), this.getSelf());
        this.id = this.generateId();
        this.mediator.tell((Object)new DistributedPubSubMediator.Publish(MasterActor.MASTER, (Object)this.register()), this.getSelf());
        this.clusterClient = client;
        tracker.availableForWork(this.id);
        this.masterPath = conf.get("org.deeplearning4j.scaleout.masterpath", "");
        log.info("Registered with master " + this.id + " at master " + conf.get("org.deeplearning4j.scaleout.masterpath"));
        this.heartbeat();
        tracker.addWorker(this.id);
        this.setup(conf);
        this.mediator.tell((Object)new DistributedPubSubMediator.Put(this.getSelf()), this.getSelf());
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(MasterActor.BROADCAST, this.getSelf()), this.getSelf());
        this.mediator.tell((Object)new DistributedPubSubMediator.Subscribe(this.id, this.getSelf()), this.getSelf());
        this.heartbeat();
        tracker.addWorker(this.id);
    }

    public static Props propsFor(Configuration conf, StateTracker tracker, WorkerPerformer performer) {
        return Props.create(WorkerActor.class, (Object[])new Object[]{conf, tracker, performer});
    }

    public void onReceive(Object message) throws Exception {
        if (message instanceof DistributedPubSubMediator.SubscribeAck || message instanceof DistributedPubSubMediator.UnsubscribeAck) {
            DistributedPubSubMediator.SubscribeAck ack = (DistributedPubSubMediator.SubscribeAck)message;
            this.mediator.tell((Object)new DistributedPubSubMediator.Publish("topics", message), this.getSelf());
            log.info("Subscribed to " + ack.toString());
        } else if (message instanceof Ack) {
            log.info("Ack from master on worker " + this.id);
        } else {
            this.unhandled(message);
        }
    }

    public void aroundPostStop() {
        super.aroundPostStop();
        this.mediator.tell((Object)new DistributedPubSubMediator.Publish(MasterActor.MASTER, (Object)new ClearWorker(this.id)), this.getSelf());
        this.heartbeat.cancel();
    }

    protected void heartbeat() throws Exception {
        this.heartbeat = this.context().system().scheduler().schedule(Duration.apply((long)1L, (TimeUnit)TimeUnit.SECONDS), Duration.apply((long)1L, (TimeUnit)TimeUnit.SECONDS), new Runnable(){

            @Override
            public void run() {
                if (!WorkerActor.this.tracker.isDone()) {
                    WorkerActor.this.tracker.addWorker(WorkerActor.this.id);
                }
                try {
                    WorkerActor.this.checkJobAvailable();
                    if (WorkerActor.this.getCurrentJob() != null) {
                        Job job = WorkerActor.this.getCurrentJob();
                        if (job.getWork() == null) {
                            WorkerActor.this.tracker.clearJob(WorkerActor.this.id);
                            WorkerActor.this.tracker.enableWorker(WorkerActor.this.id);
                            log.warn("Work for worker " + WorkerActor.this.id + " was null");
                            return;
                        }
                        String id = job.workerId();
                        if (id == null || id.isEmpty()) {
                            job.setWorkerId(id);
                        }
                        log.info("Confirmation from " + job.workerId() + " on work");
                        long start = System.currentTimeMillis();
                        WorkerActor.this.workerPerformer.perform(job);
                        long end = System.currentTimeMillis();
                        long diff = Math.abs(end - start);
                        log.info("Job took " + diff + " milliseconds");
                        WorkerActor.this.tracker.addUpdate(id, job);
                        WorkerActor.this.tracker.clearJob(id);
                        WorkerActor.this.setCurrentJob(null);
                    } else if ((WorkerActor.this.getCurrentJob() == null || !WorkerActor.this.isWorking.get() && WorkerActor.this.tracker.jobFor(WorkerActor.this.id) != null) && WorkerActor.this.tracker.jobFor(WorkerActor.this.id) != null) {
                        WorkerActor.this.tracker.clearJob(WorkerActor.this.id);
                        log.info("Clearing stale job... " + WorkerActor.this.id);
                    }
                }
                catch (HazelcastInstanceNotActiveException e1) {
                    log.warn("Hazel cast shut down...exiting");
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }, (ExecutionContext)this.context().dispatcher());
    }

    public synchronized void setCurrentJob(Job j) {
        this.currentJob = j;
    }

    public synchronized Job getCurrentJob() {
        return this.currentJob;
    }

    public WorkerState register() {
        return new WorkerState(this.id);
    }

    public String generateId() {
        String base = UUID.randomUUID().toString();
        String host = System.getProperty("akka.remote.netty.tcp.hostname", "localhost");
        return host + "-" + base;
    }

    public void postStop() throws Exception {
        super.postStop();
        try {
            this.tracker.removeWorker(this.id);
        }
        catch (Exception e) {
            log.info("Tracker already shut down");
        }
        log.info("Post stop on worker actor");
        this.cluster.unsubscribe(this.getSelf());
    }

    public void preStart() throws Exception {
        super.preStart();
        this.cluster.subscribe(this.getSelf(), new Class[]{ClusterEvent.MemberEvent.class});
        log.info("Pre start on worker");
    }

    protected void checkJobAvailable() throws Exception {
        Job j = this.tracker.jobFor(this.id);
        if (j == null) {
            if (!this.isWorking.get() && j != null) {
                this.tracker.clearJob(this.id);
                log.info("Clearing stale job " + this.id);
            }
            return;
        }
        if (this.tracker.needsReplicate(this.id)) {
            try {
                log.info("Updating worker " + this.id);
                this.setCurrentJob((Job)this.tracker.getCurrent());
                this.tracker.doneReplicating(this.id);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (j != null && this.getCurrentJob() == null) {
            log.info("Assigning job for worker " + this.id);
            this.setCurrentJob(j);
        }
    }

    public void setup(Configuration conf) {
        this.conf = conf;
        String url = conf.get("org.deeplearning4j.scaleout.masterurl");
        if (url != null) {
            this.masterPath = conf.get("org.deeplearning4j.scaleout.masterpath");
            Address a = AddressFromURIString.apply((String)url);
            Cluster.get((ActorSystem)this.context().system()).join(a);
            this.mediator = DistributedPubSubExtension.get((ActorSystem)this.getContext().system()).mediator();
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, (Duration)Duration.Zero(), (Function)new Function<Throwable, SupervisorStrategy.Directive>(){

            public SupervisorStrategy.Directive apply(Throwable cause) {
                log.error("Problem with processing", cause);
                WorkerActor.this.mediator.tell((Object)new DistributedPubSubMediator.Publish(MasterActor.MASTER, (Object)new ClearWorker(WorkerActor.this.id)), WorkerActor.this.getSelf());
                return SupervisorStrategy.restart();
            }
        });
    }
}

