/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.springframework.context.Lifecycle;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.listener.AcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.DefaultAcknowledgment;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;

class QueueingMessageListenerInvoker
implements Runnable,
Lifecycle {
    private BlockingQueue<KafkaMessage> messages;
    private volatile boolean running = false;
    private final MessageListener messageListener;
    private final AcknowledgingMessageListener acknowledgingMessageListener;
    private final OffsetManager offsetManager;
    private final ErrorHandler errorHandler;

    public QueueingMessageListenerInvoker(int capacity, OffsetManager offsetManager, Object delegate, ErrorHandler errorHandler) {
        if (delegate instanceof MessageListener) {
            this.messageListener = (MessageListener)delegate;
            this.acknowledgingMessageListener = null;
        } else if (delegate instanceof AcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (AcknowledgingMessageListener)delegate;
            this.messageListener = null;
        } else {
            throw new IllegalArgumentException("Either a " + MessageListener.class.getName() + " or a " + AcknowledgingMessageListener.class.getName() + " must be provided");
        }
        this.offsetManager = offsetManager;
        this.errorHandler = errorHandler;
        this.messages = new ArrayBlockingQueue<KafkaMessage>(capacity);
    }

    public void enqueue(KafkaMessage message) {
        boolean wasInterruptedWhileRunning = false;
        if (this.running) {
            boolean added = false;
            while (!added && this.running) {
                try {
                    this.messages.put(message);
                    added = true;
                }
                catch (InterruptedException e) {
                    wasInterruptedWhileRunning = true;
                }
            }
        }
        if (wasInterruptedWhileRunning) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        boolean wasInterrupted = false;
        while (this.running) {
            try {
                KafkaMessage message = this.messages.take();
                if (!this.isRunning()) continue;
                try {
                    if (this.messageListener != null) {
                        this.messageListener.onMessage(message);
                        continue;
                    }
                    this.acknowledgingMessageListener.onMessage(message, new DefaultAcknowledgment(this.offsetManager, message));
                }
                catch (Exception e) {
                    if (this.errorHandler == null) continue;
                    this.errorHandler.handle(e, message);
                }
                finally {
                    if (this.messageListener == null) continue;
                    this.offsetManager.updateOffset(message.getMetadata().getPartition(), message.getMetadata().getNextOffset());
                }
            }
            catch (InterruptedException e) {
                wasInterrupted = true;
            }
        }
        if (wasInterrupted) {
            Thread.currentThread().interrupt();
        }
    }
}

