/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.sender.internals;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.internals.DefaultKafkaSender;
import reactor.kafka.sender.internals.Response;
import reactor.util.context.Context;

class SendSubscriber<K, V, C>
implements CoreSubscriber<ProducerRecord<K, V>> {
    private final CoreSubscriber<? super SenderResult<C>> actual;
    private final Producer<K, V> producer;
    private final AtomicInteger inflight = new AtomicInteger();
    private final AtomicReference<Throwable> firstException = new AtomicReference();
    private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
    private final SenderOptions<K, V> senderOptions;

    SendSubscriber(SenderOptions<K, V> senderOptions, Producer<K, V> producer, CoreSubscriber<? super SenderResult<C>> actual) {
        this.senderOptions = senderOptions;
        this.producer = producer;
        this.actual = actual;
    }

    public Context currentContext() {
        return this.actual.currentContext();
    }

    public void onSubscribe(Subscription s) {
        this.state.set(State.ACTIVE);
        this.actual.onSubscribe(s);
    }

    public void onNext(ProducerRecord<K, V> record) {
        if (this.state.get() == State.COMPLETE) {
            Operators.onNextDropped(record, (Context)this.currentContext());
            return;
        }
        this.inflight.incrementAndGet();
        if (this.senderOptions.isTransactional()) {
            DefaultKafkaSender.log.trace("Transactional send initiated for producer {} in state {} inflight {}: {}", new Object[]{this.senderOptions.transactionalId(), this.state, this.inflight, record});
        }
        Object correlationMetadata = record instanceof SenderRecord ? ((SenderRecord)record).correlationMetadata() : null;
        Callback callback = (metadata, exception) -> {
            if (this.senderOptions.isTransactional()) {
                DefaultKafkaSender.log.trace("Transactional send completed for producer {} in state {} inflight {}: {}", new Object[]{this.senderOptions.transactionalId(), this.state, this.inflight, record});
            }
            if (this.state.get() == State.COMPLETE) {
                return;
            }
            if (exception != null) {
                DefaultKafkaSender.log.trace("Sender failed: ", (Throwable)exception);
                this.firstException.compareAndSet(null, exception);
                if (this.senderOptions.stopOnError() || this.senderOptions.fatalException(exception)) {
                    this.onError(exception);
                    return;
                }
            }
            this.actual.onNext(new Response<Object>(metadata, exception, correlationMetadata));
            if (this.inflight.decrementAndGet() == 0) {
                this.maybeComplete();
            }
        };
        try {
            this.producer.send(record, callback);
        }
        catch (Exception e) {
            callback.onCompletion(null, e);
        }
    }

    public void onError(Throwable t) {
        DefaultKafkaSender.log.trace("Sender failed with exception", t);
        if (this.state.getAndSet(State.COMPLETE) == State.COMPLETE) {
            Operators.onErrorDropped((Throwable)t, (Context)this.currentContext());
            return;
        }
        this.actual.onError(t);
    }

    public void onComplete() {
        if (this.state.compareAndSet(State.ACTIVE, State.INBOUND_DONE) && this.inflight.get() == 0) {
            this.maybeComplete();
        }
    }

    private void maybeComplete() {
        if (this.state.compareAndSet(State.INBOUND_DONE, State.COMPLETE)) {
            Throwable exception = this.firstException.get();
            if (exception != null) {
                this.actual.onError(exception);
            } else {
                this.actual.onComplete();
            }
        }
    }

    static enum State {
        INIT,
        ACTIVE,
        INBOUND_DONE,
        COMPLETE;

    }
}

