package org.apache.flink.changelog.fs;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/changelog/fs/RetryingExecutor.class */
public class RetryingExecutor implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
    private final ScheduledExecutorService timer;
    private final ScheduledExecutorService blockingExecutor;
    private final Histogram attemptsPerTaskHistogram;
    private final Histogram totalAttemptsPerTaskHistogram;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/changelog/fs/RetryingExecutor$RetriableAction.class */
    public interface RetriableAction<Result> {
        Result tryExecute() throws Exception;

        void completeWithResult(Result result);

        void discardResult(Result result) throws Exception;

        void handleFailure(Throwable th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/changelog/fs/RetryingExecutor$RetriableActionAttempt.class */
    public static final class RetriableActionAttempt<Result> implements Runnable {
        private final RetriableAction<Result> action;
        private final ScheduledExecutorService blockingExecutor;
        private final ScheduledExecutorService timer;
        private final int attemptNumber;
        private final RetryPolicy retryPolicy;
        private final AtomicBoolean actionCompleted;
        private final AtomicBoolean attemptCompleted = new AtomicBoolean(false);
        private final AtomicInteger activeAttempts;
        private final AtomicInteger totalAttempts;
        private final Histogram attemptsPerTaskHistogram;
        private final Histogram totalAttemptsPerTaskHistogram;

        private RetriableActionAttempt(int i, AtomicBoolean atomicBoolean, RetriableAction<Result> retriableAction, RetryPolicy retryPolicy, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, AtomicInteger atomicInteger, AtomicInteger atomicInteger2, Histogram histogram, Histogram histogram2) {
            this.attemptNumber = i;
            this.action = retriableAction;
            this.retryPolicy = retryPolicy;
            this.blockingExecutor = scheduledExecutorService;
            this.actionCompleted = atomicBoolean;
            this.attemptsPerTaskHistogram = histogram;
            this.totalAttemptsPerTaskHistogram = histogram2;
            this.timer = scheduledExecutorService2;
            this.activeAttempts = atomicInteger;
            this.totalAttempts = atomicInteger2;
        }

        @Override // java.lang.Runnable
        public void run() {
            RetryingExecutor.LOG.debug("starting attempt {}", Integer.valueOf(this.attemptNumber));
            if (this.actionCompleted.get()) {
                return;
            }
            Optional<ScheduledFuture<?>> scheduleTimeout = scheduleTimeout();
            try {
                try {
                    Result tryExecute = this.action.tryExecute();
                    if (this.actionCompleted.compareAndSet(false, true)) {
                        RetryingExecutor.LOG.debug("succeeded with {} attempts", Integer.valueOf(this.attemptNumber));
                        this.action.completeWithResult(tryExecute);
                        this.attemptsPerTaskHistogram.update(this.attemptNumber);
                        this.totalAttemptsPerTaskHistogram.update(this.totalAttempts.get());
                    } else {
                        RetryingExecutor.LOG.debug("discard unnecessarily uploaded state, attempt {}", Integer.valueOf(this.attemptNumber));
                        try {
                            this.action.discardResult(tryExecute);
                        } catch (Exception e) {
                            RetryingExecutor.LOG.warn("unable to discard execution attempt result", e);
                        }
                    }
                    scheduleTimeout.ifPresent(scheduledFuture -> {
                        scheduledFuture.cancel(true);
                    });
                } catch (Exception e2) {
                    handleError(e2);
                    scheduleTimeout.ifPresent(scheduledFuture2 -> {
                        scheduledFuture2.cancel(true);
                    });
                }
            } catch (Throwable th) {
                scheduleTimeout.ifPresent(scheduledFuture22 -> {
                    scheduledFuture22.cancel(true);
                });
                throw th;
            }
        }

        private void handleError(Exception exc) {
            if (!this.attemptCompleted.compareAndSet(false, true) || this.actionCompleted.get()) {
                return;
            }
            RetryingExecutor.LOG.debug("execution attempt {} failed: {}", Integer.valueOf(this.attemptNumber), exc.getMessage());
            long retryAfter = this.retryPolicy.retryAfter(this.attemptNumber, exc);
            if (retryAfter >= 0) {
                this.activeAttempts.incrementAndGet();
                this.totalAttempts.incrementAndGet();
                scheduleNext(retryAfter, next());
            }
            if (this.activeAttempts.decrementAndGet() == 0 && this.actionCompleted.compareAndSet(false, true)) {
                RetryingExecutor.LOG.info("failed with {} attempts: {}", Integer.valueOf(this.attemptNumber), exc.getMessage());
                this.action.handleFailure(exc);
            }
        }

        private void scheduleNext(long j, RetriableActionAttempt<Result> retriableActionAttempt) {
            if (j == 0) {
                this.blockingExecutor.submit(retriableActionAttempt);
            } else if (j > 0) {
                this.blockingExecutor.schedule(retriableActionAttempt, j, TimeUnit.MILLISECONDS);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> RetriableActionAttempt<T> initialize(RetriableAction<T> retriableAction, RetryPolicy retryPolicy, ScheduledExecutorService scheduledExecutorService, Histogram histogram, Histogram histogram2, ScheduledExecutorService scheduledExecutorService2) {
            return new RetriableActionAttempt<>(1, new AtomicBoolean(false), retriableAction, retryPolicy, scheduledExecutorService, scheduledExecutorService2, new AtomicInteger(1), new AtomicInteger(1), histogram, histogram2);
        }

        private RetriableActionAttempt<Result> next() {
            return new RetriableActionAttempt<>(this.attemptNumber + 1, this.actionCompleted, this.action, this.retryPolicy, this.blockingExecutor, this.timer, this.activeAttempts, this.totalAttempts, this.attemptsPerTaskHistogram, this.totalAttemptsPerTaskHistogram);
        }

        private Optional<ScheduledFuture<?>> scheduleTimeout() {
            long timeoutFor = this.retryPolicy.timeoutFor(this.attemptNumber);
            return timeoutFor <= 0 ? Optional.empty() : Optional.of(this.timer.schedule(() -> {
                handleError(fmtError(timeoutFor));
            }, timeoutFor, TimeUnit.MILLISECONDS));
        }

        private TimeoutException fmtError(long j) {
            return new TimeoutException(String.format("Attempt %d timed out after %dms", Integer.valueOf(this.attemptNumber), Long.valueOf(j)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RetryingExecutor(int i, Histogram histogram, Histogram histogram2) {
        this(SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG), SchedulerFactory.create(i, "ChangelogBlockingExecutor", LOG), histogram, histogram2);
    }

    @VisibleForTesting
    RetryingExecutor(ScheduledExecutorService scheduledExecutorService, Histogram histogram, Histogram histogram2) {
        this(scheduledExecutorService, scheduledExecutorService, histogram, histogram2);
    }

    RetryingExecutor(ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, Histogram histogram, Histogram histogram2) {
        this.timer = scheduledExecutorService;
        this.blockingExecutor = scheduledExecutorService2;
        this.attemptsPerTaskHistogram = histogram;
        this.totalAttemptsPerTaskHistogram = histogram2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void execute(RetryPolicy retryPolicy, RetriableAction<T> retriableAction) {
        LOG.debug("execute with retryPolicy: {}", retryPolicy);
        this.blockingExecutor.submit(RetriableActionAttempt.initialize(retriableAction, retryPolicy, this.blockingExecutor, this.attemptsPerTaskHistogram, this.totalAttemptsPerTaskHistogram, this.timer));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.debug("close");
        Exception exc = null;
        try {
            this.timer.shutdownNow();
        } catch (Exception e) {
            exc = e;
        }
        try {
            this.blockingExecutor.shutdownNow();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (!this.timer.awaitTermination(1L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 1s");
        }
        if (!this.blockingExecutor.awaitTermination(1L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
        }
        if (exc != null) {
            throw exc;
        }
    }
}
