/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.kafka.core.KafkaMessageMetadata;
import org.springframework.integration.kafka.listener.AbstractDecodingMessageListener;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    private KafkaMessageListenerContainer messageListenerContainer;
    private Decoder<?> keyDecoder = new DefaultDecoder(null);
    private Decoder<?> payloadDecoder = new DefaultDecoder(null);

    public KafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer messageListenerContainer) {
        Assert.notNull((Object)messageListenerContainer);
        Assert.isNull((Object)messageListenerContainer.getMessageListener());
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
    }

    public void setKeyDecoder(Decoder<?> keyDecoder) {
        this.keyDecoder = keyDecoder;
    }

    public void setPayloadDecoder(Decoder<?> payloadDecoder) {
        this.payloadDecoder = payloadDecoder;
    }

    protected void onInit() {
        this.messageListenerContainer.setMessageListener(new ChannelForwardingMessageListener());
        super.onInit();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private class ChannelForwardingMessageListener
    extends AbstractDecodingMessageListener {
        public ChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata) {
            Message message = KafkaMessageDrivenChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).setHeader("kafka_messageKey", key).setHeader("kafka_topic", (Object)metadata.getPartition().getTopic()).setHeader("kafka__partitionId", (Object)metadata.getPartition().getId()).setHeader("kafka__offset", (Object)metadata.getOffset()).build();
            KafkaMessageDrivenChannelAdapter.this.sendMessage(message);
        }
    }
}

