/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.hints;

import com.google.common.util.concurrent.RateLimiter;
import java.io.File;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.hints.EncodedHintMessage;
import org.apache.cassandra.hints.Hint;
import org.apache.cassandra.hints.HintMessage;
import org.apache.cassandra.hints.HintsReader;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.SimpleCondition;

final class HintsDispatcher
implements AutoCloseable {
    private final HintsReader reader;
    private final UUID hostId;
    private final InetAddress address;
    private final int messagingVersion;
    private final AtomicBoolean isPaused;
    private long currentPageOffset = 0L;

    private HintsDispatcher(HintsReader reader, UUID hostId, InetAddress address, int messagingVersion, AtomicBoolean isPaused) {
        this.reader = reader;
        this.hostId = hostId;
        this.address = address;
        this.messagingVersion = messagingVersion;
        this.isPaused = isPaused;
    }

    static HintsDispatcher create(File file, RateLimiter rateLimiter, UUID hostId, UUID hintFor, AtomicBoolean isPaused) {
        InetAddress address = StorageService.instance.getEndpointForHostId(hostId);
        int messagingVersion = MessagingService.instance().getVersion(address);
        return new HintsDispatcher(HintsReader.open(file, rateLimiter), hintFor, address, messagingVersion, isPaused);
    }

    @Override
    public void close() {
        this.reader.close();
    }

    void seek(long bytes) {
        this.reader.seek(bytes);
        this.currentPageOffset = 0L;
    }

    boolean dispatch() {
        for (HintsReader.Page page : this.reader) {
            this.currentPageOffset = page.offset;
            if (this.dispatch(page) == Action.CONTINUE) continue;
            return false;
        }
        return true;
    }

    long dispatchOffset() {
        return this.currentPageOffset;
    }

    private boolean isHostAlive() {
        return FailureDetector.instance.isAlive(this.address);
    }

    private boolean isPaused() {
        return this.isPaused.get();
    }

    private Action dispatch(HintsReader.Page page) {
        Action action = this.sendHintsAndAwait(page);
        return action == Action.RETRY ? this.dispatch(page) : action;
    }

    private Action sendHintsAndAwait(HintsReader.Page page) {
        Action action;
        ArrayList<Callback> callbacks = new ArrayList<Callback>();
        Action action2 = action = this.reader.descriptor().messagingVersion() == this.messagingVersion ? this.sendHints(page.buffersIterator(), callbacks, this::sendEncodedHint) : this.sendHints(page.hintsIterator(), callbacks, this::sendHint);
        if (action == Action.ABORT) {
            return action;
        }
        for (Callback cb : callbacks) {
            if (cb.await() == Callback.Outcome.SUCCESS) continue;
            return Action.RETRY;
        }
        return Action.CONTINUE;
    }

    private <T> Action sendHints(Iterator<T> hints, Collection<Callback> callbacks, Function<T, Callback> sendFunction) {
        while (hints.hasNext()) {
            if (!this.isHostAlive() || this.isPaused()) {
                return Action.ABORT;
            }
            callbacks.add(sendFunction.apply(hints.next()));
        }
        return Action.CONTINUE;
    }

    private Callback sendHint(Hint hint) {
        Callback callback = new Callback();
        HintMessage message = new HintMessage(this.hostId, hint);
        MessagingService.instance().sendRRWithFailure(message.createMessageOut(), this.address, callback);
        return callback;
    }

    private Callback sendEncodedHint(ByteBuffer hint) {
        Callback callback = new Callback();
        EncodedHintMessage message = new EncodedHintMessage(this.hostId, hint, this.messagingVersion);
        MessagingService.instance().sendRRWithFailure(message.createMessageOut(), this.address, callback);
        return callback;
    }

    private static final class Callback
    implements IAsyncCallbackWithFailure {
        private final long start = System.nanoTime();
        private final SimpleCondition condition = new SimpleCondition();
        private volatile Outcome outcome;

        private Callback() {
        }

        Outcome await() {
            boolean timedOut;
            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(MessagingService.Verb.HINT)) - (System.nanoTime() - this.start);
            try {
                timedOut = !this.condition.await(timeout, TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
            return timedOut ? Outcome.TIMEOUT : this.outcome;
        }

        @Override
        public void onFailure(InetAddress from) {
            this.outcome = Outcome.FAILURE;
            this.condition.signalAll();
        }

        @Override
        public void response(MessageIn msg) {
            this.outcome = Outcome.SUCCESS;
            this.condition.signalAll();
        }

        @Override
        public boolean isLatencyForSnitch() {
            return false;
        }

        static enum Outcome {
            SUCCESS,
            TIMEOUT,
            FAILURE;

        }
    }

    private static enum Action {
        CONTINUE,
        ABORT,
        RETRY;

    }
}

