/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.springframework.context.Lifecycle;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;

class QueueingMessageListenerInvoker
implements Runnable,
Lifecycle {
    private BlockingQueue<KafkaMessage> messages;
    private volatile boolean running = false;
    private final MessageListener delegate;
    private final OffsetManager offsetManager;
    private final ErrorHandler errorHandler;

    public QueueingMessageListenerInvoker(int capacity, OffsetManager offsetManager, MessageListener delegate, ErrorHandler errorHandler) {
        this.offsetManager = offsetManager;
        this.delegate = delegate;
        this.errorHandler = errorHandler;
        this.messages = new ArrayBlockingQueue<KafkaMessage>(capacity, true);
    }

    public void enqueue(KafkaMessage message) {
        boolean wasInterruptedWhileRunning = false;
        if (this.running) {
            boolean added = false;
            while (!added && this.running) {
                try {
                    this.messages.put(message);
                    added = true;
                }
                catch (InterruptedException e) {
                    wasInterruptedWhileRunning = true;
                }
            }
        }
        if (wasInterruptedWhileRunning) {
            Thread.currentThread().interrupt();
        }
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        boolean wasInterrupted = false;
        while (this.running) {
            try {
                KafkaMessage message = this.messages.take();
                try {
                    this.delegate.onMessage(message);
                }
                catch (Exception e) {
                    if (this.errorHandler == null) continue;
                    this.errorHandler.handle(e, message);
                }
                finally {
                    this.offsetManager.updateOffset(message.getMetadata().getPartition(), message.getMetadata().getNextOffset());
                }
            }
            catch (InterruptedException e) {
                wasInterrupted = true;
            }
        }
        if (wasInterrupted) {
            Thread.currentThread().interrupt();
        }
    }
}

