/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import kafka.common.InterBrokerSendThread$;
import kafka.common.RequestAndCompletionHandler;
import kafka.common.UnsentRequests;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005-c!B\u0001\u0003\u0003\u00039!!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\r\u001a\u0006\u0003\u0007\u0011\taaY8n[>t'\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\tQ!\u001e;jYNL!!\u0004\u0006\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u0005\n\u001f\u0001\u0011\t\u0011)A\u0005!u\tAA\\1nKB\u0011\u0011C\u0007\b\u0003%a\u0001\"a\u0005\f\u000e\u0003QQ!!\u0006\u0004\u0002\rq\u0012xn\u001c;?\u0015\u00059\u0012!B:dC2\f\u0017BA\r\u0017\u0003\u0019\u0001&/\u001a3fM&\u00111\u0004\b\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005e1\u0012BA\b\r\u0011!y\u0002A!A!\u0002\u0013\u0001\u0013!\u00048fi^|'o[\"mS\u0016tG\u000f\u0005\u0002\"S5\t!E\u0003\u0002$I\u000591\r\\5f]R\u001c(BA\u0003&\u0015\t1s%\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002Q\u0005\u0019qN]4\n\u0005)\u0012#!\u0004(fi^|'o[\"mS\u0016tG\u000f\u0003\u0005-\u0001\t\u0005\t\u0015!\u0003.\u0003\u0011!\u0018.\\3\u0011\u00059\nT\"A\u0018\u000b\u0005-\u0001$BA\u0002%\u0013\t\u0011tF\u0001\u0003US6,\u0007\"\u0003\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u001b:\u0003=I7/\u00138uKJ\u0014X\u000f\u001d;jE2,\u0007C\u0001\u001c8\u001b\u00051\u0012B\u0001\u001d\u0017\u0005\u001d\u0011un\u001c7fC:L!\u0001\u000e\u0007\t\u000bm\u0002A\u0011\u0001\u001f\u0002\rqJg.\u001b;?)\u0015it\bQ!C!\tq\u0004!D\u0001\u0003\u0011\u0015y!\b1\u0001\u0011\u0011\u0015y\"\b1\u0001!\u0011\u0015a#\b1\u0001.\u0011\u001d!$\b%AA\u0002UBQ\u0001\u0012\u0001\u0007\u0002\u0015\u000b\u0001cZ3oKJ\fG/\u001a*fcV,7\u000f^:\u0015\u0003\u0019\u00032a\u0012'P\u001d\tA%J\u0004\u0002\u0014\u0013&\tq#\u0003\u0002L-\u00059\u0001/Y2lC\u001e,\u0017BA'O\u0005!IE/\u001a:bE2,'BA&\u0017!\tq\u0004+\u0003\u0002R\u0005\tY\"+Z9vKN$\u0018I\u001c3D_6\u0004H.\u001a;j_:D\u0015M\u001c3mKJDQa\u0015\u0001\u0007\u0002Q\u000b\u0001C]3rk\u0016\u001cH\u000fV5nK>,H/T:\u0016\u0003U\u0003\"A\u000e,\n\u0005]3\"aA%oi\"9\u0011\f\u0001b\u0001\n\u0013Q\u0016AD;og\u0016tGOU3rk\u0016\u001cHo]\u000b\u00027B\u0011a\bX\u0005\u0003;\n\u0011a\"\u00168tK:$(+Z9vKN$8\u000f\u0003\u0004`\u0001\u0001\u0006IaW\u0001\u0010k:\u001cXM\u001c;SKF,Xm\u001d;tA!)\u0011\r\u0001C\u0001E\u0006\t\u0002.Y:V]N,g\u000e\u001e*fcV,7\u000f^:\u0016\u0003UBQ\u0001\u001a\u0001\u0005B\u0015\f\u0001b\u001d5vi\u0012|wO\u001c\u000b\u0002MB\u0011agZ\u0005\u0003QZ\u0011A!\u00168ji\")!\u000e\u0001C!K\u00061Am\\,pe.DQ\u0001\u001c\u0001\u0005\n5\fAb]3oIJ+\u0017/^3tiN$\"A\\9\u0011\u0005Yz\u0017B\u00019\u0017\u0005\u0011auN\\4\t\u000bI\\\u0007\u0019\u00018\u0002\u00079|w\u000fC\u0003u\u0001\u0011%Q/\u0001\tdQ\u0016\u001c7\u000eR5tG>tg.Z2ugR\u0011aM\u001e\u0005\u0006eN\u0004\rA\u001c\u0005\u0006q\u0002!I!_\u0001\u0014M\u0006LG.\u0012=qSJ,GMU3rk\u0016\u001cHo\u001d\u000b\u0003MjDQA]<A\u00029DQ\u0001 \u0001\u0005\u0002u\facY8na2,G/Z,ji\"$\u0015n]2p]:,7\r\u001e\u000b\u0007Mz\f9!!\u0003\t\r}\\\b\u0019AA\u0001\u0003\u001d\u0011X-];fgR\u00042!IA\u0002\u0013\r\t)A\t\u0002\u000e\u00072LWM\u001c;SKF,Xm\u001d;\t\u000bI\\\b\u0019\u00018\t\u000f\u0005-1\u00101\u0001\u0002\u000e\u00059\u0012-\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\u001c\t\u0005\u0003\u001f\t)\"\u0004\u0002\u0002\u0012)\u0019\u00111\u0003\u0019\u0002\r\u0015\u0014(o\u001c:t\u0013\u0011\t9\"!\u0005\u0003/\u0005+H\u000f[3oi&\u001c\u0017\r^5p]\u0016C8-\u001a9uS>t\u0007BBA\u000e\u0001\u0011\u0005Q-\u0001\u0004xC.,W\u000f]\u0004\n\u0003?\u0011\u0011\u0011!E\u0001\u0003C\tQ#\u00138uKJ\u0014%o\\6feN+g\u000e\u001a+ie\u0016\fG\rE\u0002?\u0003G1\u0001\"\u0001\u0002\u0002\u0002#\u0005\u0011QE\n\u0005\u0003G\t9\u0003E\u00027\u0003SI1!a\u000b\u0017\u0005\u0019\te.\u001f*fM\"91(a\t\u0005\u0002\u0005=BCAA\u0011\u0011)\t\u0019$a\t\u0012\u0002\u0013\u0005\u0011QG\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001b\u0016\u0005\u0005]\"fA\u001b\u0002:-\u0012\u00111\b\t\u0005\u0003{\t9%\u0004\u0002\u0002@)!\u0011\u0011IA\"\u0003%)hn\u00195fG.,GMC\u0002\u0002FY\t!\"\u00198o_R\fG/[8o\u0013\u0011\tI%a\u0010\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW\r")
public abstract class InterBrokerSendThread
extends ShutdownableThread {
    private final NetworkClient networkClient;
    private final Time time;
    private final UnsentRequests unsentRequests;

    public static boolean $lessinit$greater$default$4() {
        return InterBrokerSendThread$.MODULE$.$lessinit$greater$default$4();
    }

    public abstract Iterable<RequestAndCompletionHandler> generateRequests();

    public abstract int requestTimeoutMs();

    private UnsentRequests unsentRequests() {
        return this.unsentRequests;
    }

    public boolean hasUnsentRequests() {
        return this.unsentRequests().iterator().hasNext();
    }

    @Override
    public void shutdown() {
        this.initiateShutdown();
        this.networkClient.wakeup();
        this.awaitShutdown();
    }

    @Override
    public void doWork() {
        LongRef now = LongRef.create((long)this.time.milliseconds());
        this.generateRequests().foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$doWork$1(this, now, request);
            return BoxedUnit.UNIT;
        });
        try {
            long timeout = this.sendRequests(now.elem);
            this.networkClient.poll(timeout, now.elem);
            now.elem = this.time.milliseconds();
            this.checkDisconnects(now.elem);
            this.failExpiredRequests(now.elem);
            this.unsentRequests().clean();
        }
        catch (FatalExitError e) {
            throw e;
        }
        catch (Throwable t) {
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "unhandled exception caught in InterBrokerSendThread", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> t);
            throw new FatalExitError();
        }
    }

    private long sendRequests(long now) {
        LongRef pollTimeout = LongRef.create((long)Long.MAX_VALUE);
        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(this.unsentRequests().nodes()).asScala()).foreach((Function1 & Serializable & scala.Serializable)node -> {
            InterBrokerSendThread.$anonfun$sendRequests$1(this, now, pollTimeout, node);
            return BoxedUnit.UNIT;
        });
        return pollTimeout.elem;
    }

    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = this.unsentRequests().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Node, ArrayDeque<ClientRequest>> entry = iterator.next();
            Tuple2 tuple2 = new Tuple2((Object)entry.getKey(), entry.getValue());
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            Node node = (Node)tuple2._1();
            ArrayDeque requests = (ArrayDeque)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)node, (Object)requests);
            Tuple2 tuple23 = tuple22;
            Node node2 = (Node)tuple23._1();
            ArrayDeque requests2 = (ArrayDeque)tuple23._2();
            if (requests2.isEmpty() || !this.networkClient.connectionFailed(node2)) continue;
            iterator.remove();
            ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)requests2).asScala()).foreach((Function1 & Serializable & scala.Serializable)request -> {
                InterBrokerSendThread.$anonfun$checkDisconnects$1(this, now, node2, request);
                return BoxedUnit.UNIT;
            });
        }
    }

    private void failExpiredRequests(long now) {
        Collection<ClientRequest> timedOutRequests = this.unsentRequests().removeAllTimedOut(now);
        ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(timedOutRequests).asScala()).foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$failExpiredRequests$1(this, now, request);
            return BoxedUnit.UNIT;
        });
    }

    public void completeWithDisconnect(ClientRequest request, long now, AuthenticationException authenticationException) {
        RequestCompletionHandler handler = request.callback();
        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), handler, request.destination(), now, now, true, null, authenticationException, null));
    }

    public void wakeup() {
        this.networkClient.wakeup();
    }

    public static final /* synthetic */ void $anonfun$doWork$1(InterBrokerSendThread $this, LongRef now$1, RequestAndCompletionHandler request) {
        RequestCompletionHandler completionHandler = request.handler();
        $this.unsentRequests().put(request.destination(), $this.networkClient.newClientRequest(request.destination().idString(), request.request(), now$1.elem, true, $this.requestTimeoutMs(), completionHandler));
    }

    public static final /* synthetic */ void $anonfun$sendRequests$1(InterBrokerSendThread $this, long now$2, LongRef pollTimeout$1, Node node) {
        Iterator<ClientRequest> requestIterator = $this.unsentRequests().requestIterator(node);
        while (requestIterator.hasNext()) {
            ClientRequest request = requestIterator.next();
            if ($this.networkClient.ready(node, now$2)) {
                $this.networkClient.send(request, now$2);
                requestIterator.remove();
                continue;
            }
            pollTimeout$1.elem = Math.min(pollTimeout$1.elem, $this.networkClient.connectionDelay(node, now$2));
        }
    }

    public static final /* synthetic */ void $anonfun$checkDisconnects$1(InterBrokerSendThread $this, long now$3, Node node$1, ClientRequest request) {
        AuthenticationException authenticationException = $this.networkClient.authenticationException(node$1);
        if (authenticationException != null) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Failed to send the following request due to authentication error: ").append(request).toString());
        }
        $this.completeWithDisconnect(request, now$3, authenticationException);
    }

    public static final /* synthetic */ void $anonfun$failExpiredRequests$1(InterBrokerSendThread $this, long now$4, ClientRequest request) {
        $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Failed to send the following request after ").append(request.requestTimeoutMs()).append(" ms: ").append(request).toString());
        $this.completeWithDisconnect(request, now$4, null);
    }

    public InterBrokerSendThread(String name, NetworkClient networkClient, Time time, boolean isInterruptible) {
        this.networkClient = networkClient;
        this.time = time;
        super(name, isInterruptible);
        this.unsentRequests = new UnsentRequests();
    }
}

