/*
 * Decompiled with CFR 0.152.
 */
package io.camunda.client.impl.worker;

import io.camunda.client.api.CamundaFuture;
import io.camunda.client.api.command.FinalCommandStep;
import io.camunda.client.api.command.StreamJobsCommandStep1;
import io.camunda.client.api.response.ActivatedJob;
import io.camunda.client.api.response.StreamJobsResponse;
import io.camunda.client.api.worker.BackoffSupplier;
import io.camunda.client.api.worker.JobClient;
import io.camunda.client.impl.Loggers;
import io.camunda.client.impl.worker.JobStreamer;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
final class JobStreamerImpl
implements JobStreamer {
    private static final Logger LOGGER = Loggers.JOB_WORKER_LOGGER;
    private final JobClient jobClient;
    private final String jobType;
    private final String workerName;
    private final Duration timeout;
    private final List<String> fetchVariables;
    private final List<String> tenantIds;
    private final Duration streamTimeout;
    private final BackoffSupplier backoffSupplier;
    private final ScheduledExecutorService executor;
    private final Lock streamLock;
    @GuardedBy(value="streamLock")
    private CamundaFuture<StreamJobsResponse> streamControl;
    @GuardedBy(value="streamLock")
    private FinalCommandStep<StreamJobsResponse> command;
    @GuardedBy(value="streamLock")
    private boolean isClosed;
    @GuardedBy(value="streamLock")
    private long retryDelay;
    @GuardedBy(value="streamLock")
    private ScheduledFuture<?> scheduledRecreationTriggerTask;

    public JobStreamerImpl(JobClient jobClient, String jobType, String workerName, Duration timeout, List<String> fetchVariables, List<String> tenantIds, Duration streamTimeout, BackoffSupplier backoffSupplier, ScheduledExecutorService executor) {
        this.jobClient = jobClient;
        this.jobType = jobType;
        this.workerName = workerName;
        this.timeout = timeout;
        this.fetchVariables = fetchVariables;
        this.tenantIds = tenantIds;
        this.streamTimeout = streamTimeout;
        this.backoffSupplier = backoffSupplier;
        this.executor = executor;
        this.streamLock = new ReentrantLock();
    }

    @Override
    public void close() {
        try {
            this.streamLock.lockInterruptibly();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        try {
            this.lockedClose();
        }
        finally {
            this.streamLock.unlock();
        }
    }

    @Override
    public boolean isOpen() {
        return !this.isClosed;
    }

    @Override
    public void openStreamer(Consumer<ActivatedJob> jobConsumer) {
        FinalCommandStep<StreamJobsResponse> command = this.buildCommand(jobConsumer);
        this.open(command);
    }

    private void open(FinalCommandStep<StreamJobsResponse> command) {
        try {
            this.streamLock.lockInterruptibly();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        if (this.isClosed) {
            LOGGER.trace("Skip opening stream '{}' for worker '{}' because it's closed", (Object)this.jobType, (Object)this.workerName);
            return;
        }
        try {
            this.command = command;
            this.lockedOpen();
        }
        finally {
            this.streamLock.unlock();
        }
    }

    private void handleStreamComplete(Throwable error) {
        try {
            this.streamLock.lockInterruptibly();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        try {
            this.lockedHandleStreamComplete(error);
        }
        finally {
            this.streamLock.unlock();
        }
    }

    private FinalCommandStep<StreamJobsResponse> buildCommand(Consumer<ActivatedJob> jobConsumer) {
        StreamJobsCommandStep1.StreamJobsCommandStep3 command = ((StreamJobsCommandStep1.StreamJobsCommandStep3)this.jobClient.newStreamJobsCommand().jobType(this.jobType).consumer(jobConsumer).workerName(this.workerName).tenantIds(this.tenantIds)).timeout(this.timeout);
        if (this.fetchVariables != null) {
            command = command.fetchVariables(this.fetchVariables);
        }
        return command;
    }

    @GuardedBy(value="streamLock")
    private void lockedClose() {
        LOGGER.debug("Closing job stream for type '{}' and worker '{}'", (Object)this.jobType, (Object)this.workerName);
        this.isClosed = true;
        if (this.scheduledRecreationTriggerTask != null) {
            this.scheduledRecreationTriggerTask.cancel(true);
        }
        if (this.streamControl != null) {
            this.streamControl.cancel(true);
        }
        LOGGER.debug("Closed job stream for type '{}' and worker '{}'", (Object)this.jobType, (Object)this.workerName);
    }

    @GuardedBy(value="streamLock")
    private void lockedOpen() {
        if (this.streamControl != null) {
            this.streamControl.cancel(true);
            this.streamControl = null;
        }
        CamundaFuture<StreamJobsResponse> control = this.command.send();
        control.whenCompleteAsync((ignored, error) -> this.handleStreamComplete((Throwable)error), this.executor);
        this.streamControl = control;
        LOGGER.debug("Opened job stream of type '{}' for worker '{}'", (Object)this.jobType, (Object)this.workerName);
        if (this.streamTimeout != null) {
            LOGGER.debug("Scheduling recreation of the job stream of type '{}' for worker '{}' after '{}s'", new Object[]{this.jobType, this.workerName, this.streamTimeout.getSeconds()});
            if (this.scheduledRecreationTriggerTask != null && !this.scheduledRecreationTriggerTask.isDone()) {
                this.scheduledRecreationTriggerTask.cancel(true);
            }
            this.scheduledRecreationTriggerTask = this.executor.schedule(() -> this.triggerRecreation(this.streamControl), this.streamTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private void triggerRecreation(CamundaFuture<StreamJobsResponse> streamControl) {
        try {
            this.streamLock.lockInterruptibly();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
        try {
            if (!this.isClosed) {
                if (this.streamControl == streamControl) {
                    LOGGER.debug("Job streaming timeout reached for type '{}' and worker '{}'. Closing existing stream.", (Object)this.jobType, (Object)this.workerName);
                    this.streamControl.cancel(true, (Throwable)Status.CANCELLED.withCause((Throwable)new StreamingTimeoutException()).asException());
                    this.streamControl = null;
                } else if (!streamControl.isDone()) {
                    LOGGER.error("Job stream for type '{}' and worker '{}' is a different instance than for which the streaming timeout hit", (Object)this.jobType, (Object)this.workerName);
                }
            }
        }
        finally {
            this.streamLock.unlock();
        }
    }

    @GuardedBy(value="streamLock")
    private void lockedHandleStreamComplete(Throwable error) {
        if (this.isClosed) {
            LOGGER.trace("Skip re-opening job stream of type '{}' for worker '{}'", (Object)this.jobType, (Object)this.workerName);
            return;
        }
        if (error != null) {
            StatusRuntimeException statusError;
            if (error instanceof StatusRuntimeException && (statusError = (StatusRuntimeException)error).getStatus().getCode() == Status.CANCELLED.getCode() && statusError.getCause() instanceof StatusException && statusError.getCause().getCause() instanceof StreamingTimeoutException) {
                LOGGER.debug("Recreating job stream for type '{}' and worker '{}' after timeout", (Object)this.jobType, (Object)this.workerName);
                this.lockedOpen();
                return;
            }
            this.logStreamError(error);
            this.retryDelay = this.backoffSupplier.supplyRetryDelay(this.retryDelay);
            LOGGER.atDebug().addArgument((Object)this.jobType).addArgument((Object)this.workerName).addArgument(() -> Duration.ofMillis(this.retryDelay)).setMessage("Recreating closed stream of type '{}' and worker '{}' in {}").log();
            this.executor.schedule(() -> this.open(this.command), this.retryDelay, TimeUnit.MILLISECONDS);
        }
    }

    private void logStreamError(Throwable error) {
        StatusRuntimeException statusRuntimeException;
        String errorMsg = "Failed to stream jobs of type '{}' to worker '{}'";
        if (error instanceof StatusRuntimeException && (statusRuntimeException = (StatusRuntimeException)error).getStatus().getCode() == Status.RESOURCE_EXHAUSTED.getCode()) {
            LOGGER.trace("Failed to stream jobs of type '{}' to worker '{}'", new Object[]{this.jobType, this.workerName, error});
            return;
        }
        LOGGER.warn("Failed to stream jobs of type '{}' to worker '{}'", new Object[]{this.jobType, this.workerName, error});
    }

    private static final class StreamingTimeoutException
    extends RuntimeException {
        private StreamingTimeoutException() {
        }
    }
}

