/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import com.gs.collections.api.block.procedure.Procedure;
import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.impl.factory.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.springframework.context.Lifecycle;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.listener.ErrorHandler;
import org.springframework.integration.kafka.listener.MessageListener;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;

class ConcurrentMessageListenerDispatcher
implements Lifecycle {
    public static final CustomizableThreadFactory THREAD_FACTORY = new CustomizableThreadFactory("dispatcher-");
    private static final StartDelegateProcedure startDelegateProcedure = new StartDelegateProcedure();
    private static final StopDelegateProcedure stopDelegateProcedure = new StopDelegateProcedure();
    private final Object lifecycleMonitor = new Object();
    private final Collection<Partition> partitions;
    private final int consumers;
    private volatile boolean running;
    private final MessageListener delegateListener;
    private final ErrorHandler errorHandler;
    private final OffsetManager offsetManager;
    private MutableMap<Partition, QueueingMessageListenerInvoker> delegates;
    private final int queueSize;
    private Executor taskExecutor;

    public ConcurrentMessageListenerDispatcher(MessageListener delegateListener, ErrorHandler errorHandler, Collection<Partition> partitions, OffsetManager offsetManager, int consumers, int queueSize) {
        Assert.notEmpty(partitions, (String)"A set of partitions must be provided");
        Assert.isTrue((consumers <= partitions.size() ? 1 : 0) != 0, (String)"The number of consumers must be smaller or equal to the number of partitions");
        Assert.notNull((Object)delegateListener, (String)"A delegate must be provided");
        this.delegateListener = delegateListener;
        this.errorHandler = errorHandler;
        this.partitions = partitions;
        this.offsetManager = offsetManager;
        this.consumers = consumers;
        this.queueSize = queueSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (!this.isRunning()) {
                this.initializeAndStartDispatching();
                this.running = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (this.isRunning()) {
                this.running = false;
                this.delegates.flip().keyBag().toSet().forEach((Procedure)stopDelegateProcedure);
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void dispatch(KafkaMessage message) {
        ((QueueingMessageListenerInvoker)this.delegates.get((Object)message.getMetadata().getPartition())).enqueue(message);
    }

    private void initializeAndStartDispatching() {
        int i;
        ArrayList<QueueingMessageListenerInvoker> delegateList = new ArrayList<QueueingMessageListenerInvoker>(this.consumers);
        for (i = 0; i < this.consumers; ++i) {
            QueueingMessageListenerInvoker blockingQueueMessageListenerInvoker = new QueueingMessageListenerInvoker(this.queueSize, this.offsetManager, this.delegateListener, this.errorHandler);
            delegateList.add(blockingQueueMessageListenerInvoker);
        }
        this.delegates = Maps.mutable.of();
        i = 0;
        for (Partition partition : this.partitions) {
            this.delegates.put((Object)partition, delegateList.get(i++ % this.consumers));
        }
        if (this.taskExecutor == null) {
            this.taskExecutor = Executors.newFixedThreadPool(this.consumers, (ThreadFactory)THREAD_FACTORY);
        }
        this.delegates.flip().keyBag().toSet().forEachWith((Procedure2)startDelegateProcedure, (Object)this.taskExecutor);
    }

    private static class StartDelegateProcedure
    implements Procedure2<QueueingMessageListenerInvoker, Executor> {
        private StartDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker delegate, Executor executor) {
            delegate.start();
            executor.execute(delegate);
        }
    }

    private static class StopDelegateProcedure
    implements Procedure<QueueingMessageListenerInvoker> {
        private StopDelegateProcedure() {
        }

        public void value(QueueingMessageListenerInvoker delegate) {
            delegate.stop();
        }
    }
}

