/*
 * Decompiled with CFR 0.152.
 */
package com.blazebit.job.impl;

import com.blazebit.actor.ActorContext;
import com.blazebit.actor.ActorRunResult;
import com.blazebit.actor.ScheduledActor;
import com.blazebit.actor.spi.ClusterNodeInfo;
import com.blazebit.actor.spi.ClusterStateListener;
import com.blazebit.actor.spi.ClusterStateManager;
import com.blazebit.actor.spi.LockService;
import com.blazebit.actor.spi.Scheduler;
import com.blazebit.actor.spi.SchedulerFactory;
import com.blazebit.actor.spi.StateReturningEvent;
import com.blazebit.job.JobContext;
import com.blazebit.job.JobException;
import com.blazebit.job.JobInstance;
import com.blazebit.job.JobInstanceListener;
import com.blazebit.job.JobInstanceProcessingContext;
import com.blazebit.job.JobInstanceProcessor;
import com.blazebit.job.JobInstanceState;
import com.blazebit.job.JobManager;
import com.blazebit.job.JobRateLimitException;
import com.blazebit.job.JobTemporaryException;
import com.blazebit.job.PartitionKey;
import com.blazebit.job.ScheduleContext;
import com.blazebit.job.TimeFrame;
import com.blazebit.job.impl.JobSchedulerCancelEvent;
import com.blazebit.job.impl.JobSchedulerStatusEvent;
import com.blazebit.job.impl.JobSchedulerTraceEvent;
import com.blazebit.job.spi.JobScheduler;
import com.blazebit.job.spi.TransactionSupport;
import java.io.Serializable;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class JobSchedulerImpl
implements JobScheduler,
ClusterStateListener {
    private static final Logger LOG = Logger.getLogger(JobSchedulerImpl.class.getName());
    private static final long COMPLETION_TX_TIMEOUT = 10000L;
    private final JobContext jobContext;
    private final ActorContext actorContext;
    private final Clock clock;
    private final Scheduler scheduler;
    private final JobManager jobManager;
    private final JobInstanceRunner runner;
    private final String actorName;
    private final PartitionKey partitionKey;
    private final int processCount;
    private final long transactionTimeout;
    private final long temporaryErrorDeferSeconds;
    private final long rateLimitDeferSeconds;
    private final AtomicLong earliestKnownSchedule = new AtomicLong(Long.MAX_VALUE);
    private final ConcurrentMap<JobInstance<?>, Boolean> jobInstancesToSchedule = new ConcurrentHashMap();
    private final ConcurrentMap<Object, JobInstanceExecution> longRunningJobInstances = new ConcurrentHashMap<Object, JobInstanceExecution>();
    private volatile ClusterNodeInfo clusterNodeInfo;
    private volatile boolean closed;

    public JobSchedulerImpl(JobContext jobContext, ActorContext actorContext, SchedulerFactory schedulerFactory, String actorName, int processCount, PartitionKey partitionKey) {
        this.jobContext = jobContext;
        this.actorContext = actorContext;
        this.clock = jobContext.getService(Clock.class) == null ? Clock.systemUTC() : (Clock)jobContext.getService(Clock.class);
        this.scheduler = schedulerFactory.createScheduler(actorContext, actorName + "/processor");
        this.jobManager = jobContext.getJobManager();
        this.runner = new JobInstanceRunner();
        this.actorName = actorName;
        this.processCount = processCount;
        this.partitionKey = partitionKey;
        this.transactionTimeout = partitionKey.getTransactionTimeoutMillis() < 0 ? (long)jobContext.getTransactionTimeoutMillis() : (long)partitionKey.getTransactionTimeoutMillis();
        this.temporaryErrorDeferSeconds = partitionKey.getTemporaryErrorBackoffSeconds() < 0 ? (long)jobContext.getTemporaryErrorBackoffSeconds() : (long)partitionKey.getTemporaryErrorBackoffSeconds();
        this.rateLimitDeferSeconds = partitionKey.getRateLimitBackoffSeconds() < 0 ? (long)jobContext.getRateLimitBackoffSeconds() : (long)partitionKey.getRateLimitBackoffSeconds();
    }

    public void start() {
        this.actorContext.getActorManager().registerSuspendedActor(this.actorName, (ScheduledActor)this.runner);
        ClusterStateManager clusterStateManager = (ClusterStateManager)this.actorContext.getService(ClusterStateManager.class);
        clusterStateManager.registerListener((ClusterStateListener)this);
        clusterStateManager.registerListener(JobSchedulerCancelEvent.class, e -> {
            JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(e.getJobInstanceId());
            if (execution != null) {
                execution.future.cancel(true);
            }
        });
        clusterStateManager.registerListener(JobSchedulerStatusEvent.class, e -> {
            Serializable[] jobInstanceIds = e.getJobInstanceIds();
            int[] clusterPositions = new int[jobInstanceIds.length];
            int clusterPosition = this.clusterNodeInfo.getClusterPosition();
            for (int i = 0; i < jobInstanceIds.length; ++i) {
                clusterPositions[i] = this.longRunningJobInstances.containsKey(jobInstanceIds[i]) ? clusterPosition : -1;
            }
            e.setClusterPositions(clusterPositions);
        });
        clusterStateManager.registerListener(JobSchedulerTraceEvent.class, e -> {
            Thread thread;
            JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(e.getJobInstanceId());
            if (execution != null && (thread = execution.thread) != null) {
                StackTraceElement[] stackTrace = thread.getStackTrace();
                StringBuilder sb = new StringBuilder();
                for (StackTraceElement stackTraceElement : stackTrace) {
                    sb.append(stackTraceElement).append('\n');
                }
                e.setTrace(sb.toString());
            }
        });
    }

    public void onClusterStateChanged(ClusterNodeInfo clusterNodeInfo) {
        block12: {
            this.clusterNodeInfo = clusterNodeInfo;
            if (this.closed) break block12;
            Instant nextSchedule = this.jobManager.getNextSchedule(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), this.partitionKey, this.jobContext.isScheduleRefreshedOnly() ? this.jobInstancesToSchedule.keySet() : null);
            if (nextSchedule == null) {
                this.resetEarliestKnownSchedule();
            } else {
                this.refreshSchedules(nextSchedule.toEpochMilli());
            }
            List runningJobInstances = this.jobManager.getRunningJobInstances(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), this.partitionKey);
            ClusterStateManager clusterStateManager = (ClusterStateManager)this.actorContext.getService(ClusterStateManager.class);
            ArrayList<Serializable> jobInstanceIds = new ArrayList<Serializable>(runningJobInstances.size());
            Iterator iterator = runningJobInstances.iterator();
            while (iterator.hasNext()) {
                JobInstance runningJobInstance = (JobInstance)iterator.next();
                JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(runningJobInstance.getId());
                if (execution == null) {
                    jobInstanceIds.add((Serializable)runningJobInstance.getId());
                    continue;
                }
                iterator.remove();
            }
            if (!jobInstanceIds.isEmpty()) {
                LockService lockService = clusterStateManager.getLockService();
                JobSchedulerStatusEvent statusEvent = new JobSchedulerStatusEvent(jobInstanceIds.toArray(new Serializable[0]));
                Map futureMap = clusterStateManager.fireEventExcludeSelf((StateReturningEvent)statusEvent);
                if (futureMap.isEmpty()) {
                    for (JobInstance runningJobInstance : runningJobInstances) {
                        this.scheduleLongRunning(lockService, runningJobInstance);
                    }
                } else {
                    try {
                        for (Future future : futureMap.values()) {
                            int[] clusterPositions = (int[])future.get();
                            for (int i = 0; i < clusterPositions.length; ++i) {
                                if (clusterPositions[i] != -1) continue;
                                this.scheduleLongRunning(lockService, (JobInstance)runningJobInstances.get(i));
                            }
                        }
                    }
                    catch (Exception ex) {
                        throw new JobException("Could not get the cluster position state for running job instances.", (Throwable)ex);
                    }
                }
            }
        }
    }

    private void scheduleLongRunning(LockService lockService, JobInstance<?> jobInstance) {
        Instant now = this.clock.instant();
        MutableJobInstanceProcessingContext jobProcessingContext = new MutableJobInstanceProcessingContext(this.jobContext, this.partitionKey, this.processCount);
        jobProcessingContext.setPartitionCount(this.clusterNodeInfo.getClusterSize());
        jobProcessingContext.setPartitionId(this.clusterNodeInfo.getClusterPosition());
        jobProcessingContext.setLastProcessed(jobInstance.getLastProcessed());
        MutableScheduleContext scheduleContext = new MutableScheduleContext();
        Instant lastExecutionTime = jobInstance.getLastExecutionTime();
        if (lastExecutionTime == null) {
            lastExecutionTime = now;
        }
        scheduleContext.setLastScheduleTime(jobInstance.getScheduleTime().toEpochMilli());
        scheduleContext.setLastExecutionTime(lastExecutionTime.toEpochMilli());
        JobInstanceProcessor jobInstanceProcessor = this.jobContext.getJobInstanceProcessor(jobInstance);
        JobInstanceExecution execution = new JobInstanceExecution(jobInstance, jobInstance.getDeferCount(), scheduleContext, jobProcessingContext, null);
        jobInstance.setLastExecutionTime(Instant.now());
        jobInstance.markRunning((JobInstanceProcessingContext)jobProcessingContext);
        this.jobManager.updateJobInstance(jobInstance);
        this.longRunningJobInstances.put(jobInstance.getId(), execution);
        Lock lock = lockService.getLock("jobInstance/" + jobInstance.getId());
        execution.future = this.scheduler.submit((Callable)new NotifyingSpecialThrowingCallable(jobInstanceProcessor, execution, lock));
    }

    public void refreshSchedules(long earliestNewSchedule) {
        long delayMillis = this.rescan(earliestNewSchedule);
        if (delayMillis != -1L) {
            this.actorContext.getActorManager().rescheduleActor(this.actorName, delayMillis);
        }
    }

    public void reschedule(JobInstance<?> jobInstance) {
        if (this.jobContext.isScheduleRefreshedOnly()) {
            this.jobInstancesToSchedule.put(jobInstance, Boolean.TRUE);
        }
        this.actorContext.getActorManager().rescheduleActor(this.actorName, 0L);
    }

    private long rescan(long earliestNewSchedule) {
        if (!this.closed) {
            long earliestKnownSchedule;
            if (earliestNewSchedule == 0L) {
                ClusterNodeInfo clusterNodeInfo = this.clusterNodeInfo;
                Instant nextSchedule = this.jobManager.getNextSchedule(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), this.partitionKey, this.jobContext.isScheduleRefreshedOnly() ? this.jobInstancesToSchedule.keySet() : null);
                if (nextSchedule == null) {
                    this.resetEarliestKnownSchedule();
                    return -1L;
                }
                earliestNewSchedule = nextSchedule.toEpochMilli();
            }
            if (earliestNewSchedule <= (earliestKnownSchedule = this.earliestKnownSchedule.get())) {
                if (!this.updateEarliestKnownSchedule(earliestKnownSchedule, earliestNewSchedule)) {
                    return -1L;
                }
                long delayMillis = earliestNewSchedule - this.clock.millis();
                delayMillis = delayMillis < 0L ? 0L : delayMillis;
                return delayMillis;
            }
        }
        return -1L;
    }

    private boolean updateEarliestKnownSchedule(long oldValue, long newValue) {
        do {
            if (!this.earliestKnownSchedule.compareAndSet(oldValue, newValue)) continue;
            return true;
        } while ((oldValue = this.earliestKnownSchedule.get()) <= newValue);
        return false;
    }

    private boolean updateEarliestKnownSchedule(long newValue) {
        long oldValue = this.earliestKnownSchedule.get();
        while (oldValue <= newValue) {
            if (this.earliestKnownSchedule.compareAndSet(oldValue, newValue)) {
                return true;
            }
            oldValue = this.earliestKnownSchedule.get();
        }
        return false;
    }

    private void resetEarliestKnownSchedule() {
        long earliestKnownSchedule = this.earliestKnownSchedule.get();
        if (earliestKnownSchedule < this.clock.millis()) {
            this.updateEarliestKnownSchedule(earliestKnownSchedule, Long.MAX_VALUE);
        }
    }

    public int getClusterPosition(JobInstance<?> jobInstance) {
        if (!jobInstance.isLongRunning()) {
            return -1;
        }
        JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(jobInstance.getId());
        if (execution == null) {
            JobSchedulerStatusEvent event = new JobSchedulerStatusEvent(new Serializable[]{(Serializable)jobInstance.getId()});
            Map result = ((ClusterStateManager)this.actorContext.getService(ClusterStateManager.class)).fireEventExcludeSelf((StateReturningEvent)event);
            try {
                for (Map.Entry entry : result.entrySet()) {
                    int position = ((int[])((Future)entry.getValue()).get())[0];
                    if (position == -1) continue;
                    return position;
                }
            }
            catch (Exception e) {
                throw new JobException("Could not retrieve cluster position for job instance: " + jobInstance, (Throwable)e);
            }
            return -1;
        }
        return this.clusterNodeInfo.getClusterPosition();
    }

    public String getTrace(JobInstance<?> jobInstance) {
        if (!jobInstance.isLongRunning()) {
            return null;
        }
        JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(jobInstance.getId());
        if (execution == null) {
            JobSchedulerTraceEvent event = new JobSchedulerTraceEvent((Serializable)jobInstance.getId());
            Map result = ((ClusterStateManager)this.actorContext.getService(ClusterStateManager.class)).fireEventExcludeSelf((StateReturningEvent)event);
            try {
                for (Map.Entry entry : result.entrySet()) {
                    String trace = (String)((Future)entry.getValue()).get();
                    if (trace == null) continue;
                    return trace;
                }
            }
            catch (Exception e) {
                throw new JobException("Could not retrieve trace for job instance: " + jobInstance, (Throwable)e);
            }
            return null;
        }
        Thread thread = execution.thread;
        if (thread != null) {
            StackTraceElement[] stackTrace = thread.getStackTrace();
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : stackTrace) {
                sb.append(stackTraceElement).append('\n');
            }
            return sb.toString();
        }
        return null;
    }

    public void cancel(JobInstance<?> jobInstance) {
        if (!jobInstance.isLongRunning()) {
            return;
        }
        JobInstanceExecution execution = (JobInstanceExecution)this.longRunningJobInstances.get(jobInstance.getId());
        if (execution == null) {
            JobSchedulerCancelEvent event = new JobSchedulerCancelEvent((Serializable)jobInstance.getId());
            ((ClusterStateManager)this.actorContext.getService(ClusterStateManager.class)).fireEventExcludeSelf((Serializable)event, false);
        } else {
            execution.future.cancel(true);
        }
    }

    public void stop() {
        this.closed = true;
        this.actorContext.stop();
    }

    public void stop(long timeout, TimeUnit unit) throws InterruptedException {
        this.closed = true;
        this.actorContext.stop(timeout, unit);
    }

    static <T extends Throwable> void sneakyThrow(Throwable e) throws T {
        throw e;
    }

    private static long getWaitTime(long maximum, long base, long attempt) {
        long expWait = (long)Math.pow(2.0, attempt) * base;
        return expWait <= 0L ? maximum : Math.min(maximum, expWait);
    }

    private static class MutableJobInstanceProcessingContext
    implements JobInstanceProcessingContext<Object> {
        private final JobContext jobContext;
        private final PartitionKey partitionKey;
        private final int processCount;
        private int partitionId;
        private int partitionCount;
        private Object lastProcessed;

        public MutableJobInstanceProcessingContext(JobContext jobContext, PartitionKey partitionKey, int processCount) {
            this.jobContext = jobContext;
            this.partitionKey = partitionKey;
            this.processCount = processCount;
        }

        public JobContext getJobContext() {
            return this.jobContext;
        }

        public PartitionKey getPartitionKey() {
            return this.partitionKey;
        }

        public int getProcessCount() {
            return this.processCount;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public void setPartitionId(int partitionId) {
            this.partitionId = partitionId;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public void setPartitionCount(int partitionCount) {
            this.partitionCount = partitionCount;
        }

        public Object getLastProcessed() {
            return this.lastProcessed;
        }

        public void setLastProcessed(Object lastProcessed) {
            this.lastProcessed = lastProcessed;
        }
    }

    private static class MutableScheduleContext
    implements ScheduleContext {
        private long lastScheduleTime;
        private long lastExecutionTime;
        private long lastCompletionTime;

        private MutableScheduleContext() {
        }

        public long getLastScheduleTime() {
            return this.lastScheduleTime;
        }

        public void setLastScheduleTime(long lastScheduleTime) {
            this.lastScheduleTime = lastScheduleTime;
        }

        public long getLastExecutionTime() {
            return this.lastExecutionTime;
        }

        public void setLastExecutionTime(long lastExecutionTime) {
            this.lastExecutionTime = lastExecutionTime;
        }

        public long getLastCompletionTime() {
            return this.lastCompletionTime;
        }

        public void setLastCompletionTime(long lastCompletionTime) {
            this.lastCompletionTime = lastCompletionTime;
        }
    }

    private static class JobInstanceExecution {
        private final JobInstance<?> jobInstance;
        private final int deferCount;
        private final MutableScheduleContext scheduleContext;
        private final MutableJobInstanceProcessingContext jobProcessingContext;
        private volatile Thread thread;
        private Future<Object> future;

        public JobInstanceExecution(JobInstance<?> jobInstance, int deferCount, MutableScheduleContext scheduleContext, MutableJobInstanceProcessingContext jobProcessingContext, Future<Object> future) {
            this.jobInstance = jobInstance;
            this.deferCount = deferCount;
            this.scheduleContext = scheduleContext;
            this.jobProcessingContext = jobProcessingContext;
            this.future = future;
        }
    }

    private static class SyncJobInstanceProcessorFuture
    implements Future<Object> {
        private final JobInstanceProcessor jobInstanceProcessor;
        private final JobInstance<?> jobInstance;
        private final JobInstanceProcessingContext<?> processingContext;
        private boolean done;
        private Object result;
        private Exception exception;

        public SyncJobInstanceProcessorFuture(JobInstanceProcessor jobInstanceProcessor, JobInstance<?> jobInstance, JobInstanceProcessingContext<?> processingContext) {
            this.jobInstanceProcessor = jobInstanceProcessor;
            this.jobInstance = jobInstance;
            this.processingContext = processingContext;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.done;
        }

        @Override
        public Object get() throws InterruptedException, ExecutionException {
            if (this.done) {
                if (this.exception == null) {
                    return this.result;
                }
                throw new ExecutionException(this.exception);
            }
            this.done = true;
            try {
                this.jobInstance.setLastExecutionTime(Instant.now());
                this.result = this.jobInstanceProcessor.process(this.jobInstance, this.processingContext);
                return this.result;
            }
            catch (Exception e) {
                this.exception = e;
                throw new ExecutionException(this.exception);
            }
        }

        @Override
        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.get();
        }
    }

    private static class JobInstanceErrorListenerConsumer
    implements Consumer<JobInstanceListener> {
        private final JobInstance<?> jobInstance;
        private final MutableJobInstanceProcessingContext jobProcessingContext;

        public JobInstanceErrorListenerConsumer(JobInstance<?> jobInstance, MutableJobInstanceProcessingContext jobProcessingContext) {
            this.jobInstance = jobInstance;
            this.jobProcessingContext = jobProcessingContext;
        }

        @Override
        public void accept(JobInstanceListener listener) {
            listener.onJobInstanceError(this.jobInstance, (JobInstanceProcessingContext)this.jobProcessingContext);
        }
    }

    private static class JobInstanceSuccessListenerConsumer
    implements Consumer<JobInstanceListener> {
        private final JobInstance<?> jobInstance;
        private final MutableJobInstanceProcessingContext jobProcessingContext;

        public JobInstanceSuccessListenerConsumer(JobInstance<?> jobInstance, MutableJobInstanceProcessingContext jobProcessingContext) {
            this.jobInstance = jobInstance;
            this.jobProcessingContext = jobProcessingContext;
        }

        @Override
        public void accept(JobInstanceListener listener) {
            listener.onJobInstanceSuccess(this.jobInstance, (JobInstanceProcessingContext)this.jobProcessingContext);
        }
    }

    private static class JobInstanceChunkSuccessListenerConsumer
    implements Consumer<JobInstanceListener> {
        private final JobInstance<?> jobInstance;
        private final MutableJobInstanceProcessingContext jobProcessingContext;

        public JobInstanceChunkSuccessListenerConsumer(JobInstance<?> jobInstance, MutableJobInstanceProcessingContext jobProcessingContext) {
            this.jobInstance = jobInstance;
            this.jobProcessingContext = jobProcessingContext;
        }

        @Override
        public void accept(JobInstanceListener listener) {
            listener.onJobInstanceChunkSuccess(this.jobInstance, (JobInstanceProcessingContext)this.jobProcessingContext);
        }
    }

    private static class CallableThrowable
    extends Throwable {
        public CallableThrowable(Throwable cause) {
            super(cause);
        }

        private static <T extends Throwable> void doThrow(Throwable e) throws T {
            throw new CallableThrowable(e);
        }
    }

    private class NotifyingSpecialThrowingCallable
    implements Callable<Object> {
        private final JobInstanceProcessor jobInstanceProcessor;
        private final JobInstanceExecution execution;
        private final Lock lock;

        public NotifyingSpecialThrowingCallable(JobInstanceProcessor jobInstanceProcessor, JobInstanceExecution execution, Lock lock) {
            this.jobInstanceProcessor = jobInstanceProcessor;
            this.execution = execution;
            this.lock = lock;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Object call() throws Exception {
            JobInstance jobInstance = this.execution.jobInstance;
            if (!this.lock.tryLock()) {
                JobSchedulerImpl.this.longRunningJobInstances.remove(jobInstance.getId());
                return null;
            }
            this.execution.thread = Thread.currentThread();
            MutableScheduleContext scheduleContext = this.execution.scheduleContext;
            int deferCount = this.execution.deferCount;
            MutableJobInstanceProcessingContext jobProcessingContext = this.execution.jobProcessingContext;
            try {
                Object lastProcessed = this.jobInstanceProcessor.process(jobInstance, (JobInstanceProcessingContext)jobProcessingContext);
                TransactionSupport transactionSupport = JobSchedulerImpl.this.jobContext.getTransactionSupport();
                transactionSupport.transactional(JobSchedulerImpl.this.jobContext, 10000L, false, () -> {
                    try {
                        Instant nextSchedule;
                        jobProcessingContext.setLastProcessed(lastProcessed);
                        scheduleContext.setLastCompletionTime(JobSchedulerImpl.this.clock.millis());
                        if (jobInstance.getState() == JobInstanceState.NEW) {
                            nextSchedule = jobInstance.nextSchedule(JobSchedulerImpl.this.jobContext, (ScheduleContext)scheduleContext);
                            if (nextSchedule.toEpochMilli() != scheduleContext.getLastScheduleTime()) {
                                if (jobInstance.getDeferCount() == deferCount) {
                                    jobInstance.onChunkSuccess((JobInstanceProcessingContext)jobProcessingContext);
                                    JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext));
                                }
                                jobInstance.setScheduleTime(nextSchedule);
                                JobSchedulerImpl.this.updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli());
                                Object var7_8 = null;
                                return var7_8;
                            }
                            if (lastProcessed != null) {
                                if (jobInstance.getDeferCount() == deferCount) {
                                    jobInstance.onChunkSuccess((JobInstanceProcessingContext)jobProcessingContext);
                                    JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext));
                                }
                                JobSchedulerImpl.this.updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli());
                                Object var7_9 = null;
                                return var7_9;
                            }
                        }
                        if (jobInstance.getState() == JobInstanceState.NEW) {
                            jobInstance.markDone((JobInstanceProcessingContext)jobProcessingContext, lastProcessed);
                            JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceSuccessListenerConsumer(jobInstance, jobProcessingContext));
                        } else if (jobInstance.getState() == JobInstanceState.DONE) {
                            JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceSuccessListenerConsumer(jobInstance, jobProcessingContext));
                        }
                        nextSchedule = null;
                        return nextSchedule;
                    }
                    catch (Throwable t) {
                        jobInstance.markFailed((JobInstanceProcessingContext)jobProcessingContext, t);
                        JobSchedulerImpl.sneakyThrow(t);
                        Object var7_10 = null;
                        return var7_10;
                    }
                    finally {
                        if (JobSchedulerImpl.this.jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) {
                            JobSchedulerImpl.this.jobInstancesToSchedule.remove(jobInstance);
                        }
                        JobSchedulerImpl.this.jobManager.updateJobInstance(jobInstance);
                    }
                }, t2 -> LOG.log(Level.SEVERE, "An error occurred in the long running job instance completion handler", (Throwable)t2));
                Object object = lastProcessed;
                return object;
            }
            catch (Throwable t) {
                TransactionSupport transactionSupport = JobSchedulerImpl.this.jobContext.getTransactionSupport();
                transactionSupport.transactional(JobSchedulerImpl.this.jobContext, 10000L, false, () -> {
                    if (t instanceof JobRateLimitException) {
                        JobRateLimitException e = (JobRateLimitException)t;
                        LOG.log(Level.FINEST, "Deferring job instance due to rate limit", (Throwable)e);
                        Instant rescheduleRateLimitTime = e.getDeferMillis() != -1L ? JobSchedulerImpl.this.clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS) : JobSchedulerImpl.this.clock.instant().plus(JobSchedulerImpl.this.rateLimitDeferSeconds, ChronoUnit.SECONDS);
                        jobInstance.setScheduleTime(rescheduleRateLimitTime);
                        JobSchedulerImpl.this.updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli());
                    } else if (t instanceof JobTemporaryException) {
                        JobTemporaryException e = (JobTemporaryException)t;
                        LOG.log(Level.FINEST, "Deferring job instance due to temporary error", (Throwable)e);
                        if (e.getDeferMillis() != -1L) {
                            jobInstance.setScheduleTime(JobSchedulerImpl.this.clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS));
                        } else {
                            jobInstance.setScheduleTime(JobSchedulerImpl.this.clock.instant().plus(JobSchedulerImpl.this.temporaryErrorDeferSeconds, ChronoUnit.SECONDS));
                        }
                        JobSchedulerImpl.this.updateEarliestKnownSchedule(jobInstance.getScheduleTime().toEpochMilli());
                    } else {
                        LOG.log(Level.SEVERE, "An error occurred in the job instance processor", t);
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                        jobInstance.markFailed((JobInstanceProcessingContext)jobProcessingContext, t);
                        JobSchedulerImpl.this.jobManager.updateJobInstance(jobInstance);
                    }
                    return null;
                }, t2 -> LOG.log(Level.SEVERE, "An error occurred in the long running job instance error handler", (Throwable)t2));
                CallableThrowable.doThrow(t);
                Object var7_10 = null;
                return var7_10;
            }
            finally {
                JobSchedulerImpl.this.longRunningJobInstances.remove(jobInstance.getId());
                this.execution.thread = null;
                this.lock.unlock();
            }
        }
    }

    private static class SpecialThrowingCallable
    implements Callable<Object> {
        final JobInstanceProcessor jobInstanceProcessor;
        final JobInstance<?> jobInstance;
        final MutableJobInstanceProcessingContext jobProcessingContext;

        public SpecialThrowingCallable(JobInstanceProcessor jobInstanceProcessor, JobInstance<?> jobInstance, MutableJobInstanceProcessingContext jobProcessingContext) {
            this.jobInstanceProcessor = jobInstanceProcessor;
            this.jobInstance = jobInstance;
            this.jobProcessingContext = jobProcessingContext;
        }

        @Override
        public Object call() throws Exception {
            try {
                return this.jobInstanceProcessor.process(this.jobInstance, (JobInstanceProcessingContext)this.jobProcessingContext);
            }
            catch (Exception ex) {
                CallableThrowable.doThrow(ex);
                return null;
            }
        }
    }

    private class JobInstanceRunner
    implements ScheduledActor,
    Callable<ActorRunResult> {
        private final long maxBackOff = 10000L;
        private final long baseBackOff = 1000L;
        private int retryAttempt;

        private JobInstanceRunner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public ActorRunResult call() throws Exception {
            JobManager jobManager = JobSchedulerImpl.this.jobContext.getJobManager();
            LockService lockService = ((ClusterStateManager)JobSchedulerImpl.this.actorContext.getService(ClusterStateManager.class)).getLockService();
            ClusterNodeInfo clusterNodeInfo = JobSchedulerImpl.this.clusterNodeInfo;
            List jobInstancesToProcess = jobManager.getJobInstancesToProcess(clusterNodeInfo.getClusterPosition(), clusterNodeInfo.getClusterSize(), JobSchedulerImpl.this.processCount, JobSchedulerImpl.this.partitionKey, JobSchedulerImpl.this.jobContext.isScheduleRefreshedOnly() ? JobSchedulerImpl.this.jobInstancesToSchedule.keySet() : null);
            int size = jobInstancesToProcess.size();
            if (size == 0) {
                return ActorRunResult.suspend();
            }
            Instant earliestNewSchedule = Instant.MAX;
            ArrayList<JobInstanceExecution> jobInstanceExecutions = new ArrayList<JobInstanceExecution>(size);
            for (int i = 0; i < size; ++i) {
                JobInstance jobInstance = (JobInstance)jobInstancesToProcess.get(i);
                Instant now = JobSchedulerImpl.this.clock.instant();
                MutableJobInstanceProcessingContext jobProcessingContext = new MutableJobInstanceProcessingContext(JobSchedulerImpl.this.jobContext, JobSchedulerImpl.this.partitionKey, JobSchedulerImpl.this.processCount);
                jobProcessingContext.setPartitionCount(clusterNodeInfo.getClusterSize());
                jobProcessingContext.setPartitionId(clusterNodeInfo.getClusterPosition());
                jobProcessingContext.setLastProcessed(jobInstance.getLastProcessed());
                MutableScheduleContext scheduleContext = new MutableScheduleContext();
                boolean future = false;
                Instant lastExecutionTime = jobInstance.getLastExecutionTime();
                if (lastExecutionTime == null) {
                    lastExecutionTime = now;
                }
                scheduleContext.setLastScheduleTime(jobInstance.getScheduleTime().toEpochMilli());
                scheduleContext.setLastExecutionTime(lastExecutionTime.toEpochMilli());
                try {
                    Instant deadline = jobInstance.getJobConfiguration().getDeadline();
                    if (deadline != null && deadline.compareTo(now) <= 0) {
                        jobInstance.markDeadlineReached((JobInstanceProcessingContext)jobProcessingContext);
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                        continue;
                    }
                    Set executionTimeFrames = jobInstance.getJobConfiguration().getExecutionTimeFrames();
                    if (TimeFrame.isContained((Set)executionTimeFrames, (Instant)now)) {
                        Future<Object> f;
                        int deferCount = jobInstance.getDeferCount();
                        JobInstanceProcessor jobInstanceProcessor = JobSchedulerImpl.this.jobContext.getJobInstanceProcessor(jobInstance);
                        if (jobInstanceProcessor.isTransactional()) {
                            f = new SyncJobInstanceProcessorFuture(jobInstanceProcessor, jobInstance, jobProcessingContext);
                            jobInstanceExecutions.add(new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, f));
                        } else {
                            jobInstance.setLastExecutionTime(Instant.now());
                            if (jobInstance.isLongRunning()) {
                                jobInstance.markRunning((JobInstanceProcessingContext)jobProcessingContext);
                                jobManager.updateJobInstance(jobInstance);
                                JobInstanceExecution execution = new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, null);
                                JobSchedulerImpl.this.longRunningJobInstances.put(jobInstance.getId(), execution);
                                Lock lock = lockService.getLock("jobInstance/" + jobInstance.getId());
                                execution.future = JobSchedulerImpl.this.scheduler.submit((Callable)new NotifyingSpecialThrowingCallable(jobInstanceProcessor, execution, lock));
                            } else {
                                f = JobSchedulerImpl.this.scheduler.submit((Callable)new SpecialThrowingCallable(jobInstanceProcessor, jobInstance, jobProcessingContext));
                                jobInstanceExecutions.add(new JobInstanceExecution(jobInstance, deferCount, scheduleContext, jobProcessingContext, f));
                            }
                        }
                        future = true;
                        continue;
                    }
                    Instant nextSchedule = TimeFrame.getNearestTimeFrameSchedule((Set)executionTimeFrames, (Instant)now);
                    if (nextSchedule == Instant.MAX) {
                        if (LOG.isLoggable(Level.FINEST)) {
                            LOG.log(Level.FINEST, "Dropping job instance: " + jobInstance);
                        }
                        jobInstance.markDropped((JobInstanceProcessingContext)jobProcessingContext);
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                        continue;
                    }
                    if (LOG.isLoggable(Level.FINEST)) {
                        LOG.log(Level.FINEST, "Deferring job instance to " + nextSchedule);
                    }
                    jobInstance.markDeferred((JobInstanceProcessingContext)jobProcessingContext, nextSchedule);
                    if (jobInstance.getState() == JobInstanceState.DROPPED) {
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                    }
                    if (!jobInstance.getScheduleTime().isBefore(earliestNewSchedule)) continue;
                    earliestNewSchedule = jobInstance.getScheduleTime();
                    continue;
                }
                catch (Throwable t) {
                    LOG.log(Level.SEVERE, "An error occurred in the job scheduler", t);
                    jobInstance.markFailed((JobInstanceProcessingContext)jobProcessingContext, t);
                    if (JobSchedulerImpl.this.jobContext.isScheduleRefreshedOnly()) {
                        JobSchedulerImpl.this.jobInstancesToSchedule.remove(jobInstance);
                    }
                    JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                    continue;
                }
                finally {
                    if (!future) {
                        if (JobSchedulerImpl.this.jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) {
                            JobSchedulerImpl.this.jobInstancesToSchedule.remove(jobInstance);
                        }
                        jobManager.updateJobInstance(jobInstance);
                    }
                }
            }
            Instant rescheduleRateLimitTime = null;
            for (int i = 0; i < jobInstanceExecutions.size(); ++i) {
                JobInstanceExecution execution = (JobInstanceExecution)jobInstanceExecutions.get(i);
                JobInstance jobInstance = execution.jobInstance;
                MutableJobInstanceProcessingContext jobProcessingContext = execution.jobProcessingContext;
                MutableScheduleContext scheduleContext = execution.scheduleContext;
                int deferCount = execution.deferCount;
                Future future = execution.future;
                boolean success = true;
                try {
                    Object lastProcessed = future.get();
                    jobProcessingContext.setLastProcessed(lastProcessed);
                    scheduleContext.setLastCompletionTime(JobSchedulerImpl.this.clock.millis());
                    if (jobInstance.getState() == JobInstanceState.NEW) {
                        Instant nextSchedule = jobInstance.nextSchedule(JobSchedulerImpl.this.jobContext, (ScheduleContext)scheduleContext);
                        if (nextSchedule.toEpochMilli() != scheduleContext.getLastScheduleTime()) {
                            if (jobInstance.getDeferCount() == deferCount) {
                                jobInstance.onChunkSuccess((JobInstanceProcessingContext)jobProcessingContext);
                                JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext));
                            }
                            jobInstance.setScheduleTime(nextSchedule);
                            if (!jobInstance.getScheduleTime().isBefore(earliestNewSchedule)) continue;
                            earliestNewSchedule = jobInstance.getScheduleTime();
                            continue;
                        }
                        if (lastProcessed != null) {
                            if (jobInstance.getDeferCount() == deferCount) {
                                jobInstance.onChunkSuccess((JobInstanceProcessingContext)jobProcessingContext);
                                JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceChunkSuccessListenerConsumer(jobInstance, jobProcessingContext));
                            }
                            if (!jobInstance.getScheduleTime().isBefore(earliestNewSchedule)) continue;
                            earliestNewSchedule = jobInstance.getScheduleTime();
                            continue;
                        }
                    }
                    if (jobInstance.getState() == JobInstanceState.NEW) {
                        jobInstance.markDone((JobInstanceProcessingContext)jobProcessingContext, lastProcessed);
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceSuccessListenerConsumer(jobInstance, jobProcessingContext));
                        continue;
                    }
                    if (jobInstance.getState() != JobInstanceState.DONE) continue;
                    JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceSuccessListenerConsumer(jobInstance, jobProcessingContext));
                    continue;
                }
                catch (ExecutionException ex) {
                    JobRateLimitException e;
                    Throwable t = ex.getCause() instanceof CallableThrowable ? ex.getCause().getCause() : ex.getCause();
                    if (t instanceof JobRateLimitException) {
                        e = (JobRateLimitException)t;
                        LOG.log(Level.FINEST, "Deferring job instance due to rate limit", (Throwable)e);
                        if (rescheduleRateLimitTime == null) {
                            rescheduleRateLimitTime = e.getDeferMillis() != -1L ? JobSchedulerImpl.this.clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS) : JobSchedulerImpl.this.clock.instant().plus(JobSchedulerImpl.this.rateLimitDeferSeconds, ChronoUnit.SECONDS);
                        }
                        jobInstance.setScheduleTime(rescheduleRateLimitTime);
                        if (!jobInstance.getScheduleTime().isBefore(earliestNewSchedule)) continue;
                        earliestNewSchedule = jobInstance.getScheduleTime();
                        continue;
                    }
                    if (t instanceof JobTemporaryException) {
                        e = (JobTemporaryException)t;
                        LOG.log(Level.FINEST, "Deferring job instance due to temporary error", (Throwable)e);
                        if (e.getDeferMillis() != -1L) {
                            jobInstance.setScheduleTime(JobSchedulerImpl.this.clock.instant().plus(e.getDeferMillis(), ChronoUnit.MILLIS));
                        } else {
                            jobInstance.setScheduleTime(JobSchedulerImpl.this.clock.instant().plus(JobSchedulerImpl.this.temporaryErrorDeferSeconds, ChronoUnit.SECONDS));
                        }
                        if (!jobInstance.getScheduleTime().isBefore(earliestNewSchedule)) continue;
                        earliestNewSchedule = jobInstance.getScheduleTime();
                        continue;
                    }
                    LOG.log(Level.SEVERE, "An error occurred in the job instance processor", t);
                    success = false;
                    TransactionSupport transactionSupport = JobSchedulerImpl.this.jobContext.getTransactionSupport();
                    transactionSupport.transactional(JobSchedulerImpl.this.jobContext, 10000L, true, () -> {
                        JobSchedulerImpl.this.jobContext.forEachJobInstanceListeners((Consumer)new JobInstanceErrorListenerConsumer(jobInstance, jobProcessingContext));
                        jobInstance.markFailed((JobInstanceProcessingContext)jobProcessingContext, t);
                        jobManager.updateJobInstance(jobInstance);
                        return null;
                    }, t2 -> LOG.log(Level.SEVERE, "An error occurred in the job instance error handler", (Throwable)t2));
                    continue;
                }
                finally {
                    if (JobSchedulerImpl.this.jobContext.isScheduleRefreshedOnly() && jobInstance.getState() != JobInstanceState.NEW) {
                        JobSchedulerImpl.this.jobInstancesToSchedule.remove(jobInstance);
                    }
                    if (success) {
                        jobManager.updateJobInstance(jobInstance);
                    }
                }
            }
            if (earliestNewSchedule == Instant.MAX) {
                return ActorRunResult.suspend();
            }
            long delay = earliestNewSchedule.toEpochMilli() - JobSchedulerImpl.this.clock.millis();
            if (LOG.isLoggable(Level.FINEST)) {
                LOG.log(Level.FINEST, "Rescheduling in: {0}", delay);
            }
            return ActorRunResult.rescheduleIn((long)delay);
        }

        public ActorRunResult work() {
            if (JobSchedulerImpl.this.closed) {
                return ActorRunResult.done();
            }
            TransactionSupport transactionSupport = JobSchedulerImpl.this.jobContext.getTransactionSupport();
            long earliestKnownNotificationSchedule = JobSchedulerImpl.this.earliestKnownSchedule.get();
            ActorRunResult result = (ActorRunResult)transactionSupport.transactional(JobSchedulerImpl.this.jobContext, JobSchedulerImpl.this.transactionTimeout, false, (Callable)this, t -> LOG.log(Level.SEVERE, "An error occurred in the job scheduler", (Throwable)t));
            if (JobSchedulerImpl.this.closed) {
                return ActorRunResult.done();
            }
            if (result == null) {
                long delay = JobSchedulerImpl.getWaitTime(10000L, 1000L, this.retryAttempt++);
                LOG.log(Level.INFO, "Rescheduling due to error in: {0}", delay);
                return ActorRunResult.rescheduleIn((long)delay);
            }
            this.retryAttempt = 0;
            if (result.isSuspend()) {
                long delayMillis;
                JobSchedulerImpl.this.updateEarliestKnownSchedule(earliestKnownNotificationSchedule, Long.MAX_VALUE);
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.finest("Rescan due to suspend");
                }
                if ((delayMillis = JobSchedulerImpl.this.rescan(0L)) != -1L) {
                    if (LOG.isLoggable(Level.FINEST)) {
                        LOG.log(Level.FINEST, "Rescheduling after suspend in: {0}", delayMillis);
                    }
                    return ActorRunResult.rescheduleIn((long)delayMillis);
                }
            } else {
                JobSchedulerImpl.this.updateEarliestKnownSchedule(earliestKnownNotificationSchedule, JobSchedulerImpl.this.clock.millis() + result.getDelayMillis());
            }
            return result;
        }
    }
}

