/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.inbound;

import java.util.HashMap;
import java.util.Map;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.kafka.core.KafkaMessageMetadata;
import org.springframework.integration.kafka.listener.AbstractDecodingAcknowledgingMessageListener;
import org.springframework.integration.kafka.listener.AbstractDecodingMessageListener;
import org.springframework.integration.kafka.listener.Acknowledgment;
import org.springframework.integration.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.MutableMessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

public class KafkaMessageDrivenChannelAdapter
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    private final KafkaMessageListenerContainer messageListenerContainer;
    private Decoder<?> keyDecoder = new DefaultDecoder(null);
    private Decoder<?> payloadDecoder = new DefaultDecoder(null);
    private boolean generateMessageId = false;
    private boolean generateTimestamp = false;
    private boolean useMessageBuilderFactory = false;
    private boolean autoCommitOffset = true;

    public KafkaMessageDrivenChannelAdapter(KafkaMessageListenerContainer messageListenerContainer) {
        Assert.notNull((Object)messageListenerContainer);
        Assert.isNull((Object)messageListenerContainer.getMessageListener());
        this.messageListenerContainer = messageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
    }

    public void setKeyDecoder(Decoder<?> keyDecoder) {
        this.keyDecoder = keyDecoder;
    }

    public void setPayloadDecoder(Decoder<?> payloadDecoder) {
        this.payloadDecoder = payloadDecoder;
    }

    public void setAutoCommitOffset(boolean autoCommitOffset) {
        this.autoCommitOffset = autoCommitOffset;
    }

    public void setGenerateMessageId(boolean generateMessageId) {
        this.generateMessageId = generateMessageId;
    }

    public void setGenerateTimestamp(boolean generateTimestamp) {
        this.generateTimestamp = generateTimestamp;
    }

    public void setUseMessageBuilderFactory(boolean useMessageBuilderFactory) {
        this.useMessageBuilderFactory = useMessageBuilderFactory;
    }

    protected void onInit() {
        this.messageListenerContainer.setMessageListener(this.autoCommitOffset ? new AutoAcknowledgingChannelForwardingMessageListener() : new AcknowledgingChannelForwardingMessageListener());
        if (!this.generateMessageId && !this.generateTimestamp && this.getMessageBuilderFactory() instanceof DefaultMessageBuilderFactory) {
            this.setMessageBuilderFactory((MessageBuilderFactory)new MutableMessageBuilderFactory());
        }
        super.onInit();
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:message-driven-channel-adapter";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return this.getPhase();
    }

    public int afterShutdown() {
        return this.getPhase();
    }

    private Message<Object> toMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
        HashMap<String, Object> headers = new HashMap<String, Object>();
        headers.put("kafka_messageKey", key);
        headers.put("kafka_topic", metadata.getPartition().getTopic());
        headers.put("kafka_partitionId", metadata.getPartition().getId());
        headers.put("kafka_offset", metadata.getOffset());
        headers.put("kafka_nextOffset", metadata.getNextOffset());
        if (!this.generateMessageId) {
            headers.put("id", MessageHeaders.ID_VALUE_NONE);
        }
        if (!this.generateTimestamp) {
            headers.put("timestamp", -1L);
        }
        if (!this.autoCommitOffset) {
            headers.put("kafka_acknowledgment", acknowledgment);
        }
        if (this.useMessageBuilderFactory) {
            return this.getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build();
        }
        return new KafkaMessage(payload, headers);
    }

    private class KafkaMessageHeaders
    extends MessageHeaders {
        public KafkaMessageHeaders(Map<String, Object> headers, boolean generateId, boolean generateTimestamp) {
            super(headers, generateId ? null : ID_VALUE_NONE, generateTimestamp ? null : Long.valueOf(-1L));
        }
    }

    private class KafkaMessage
    implements Message<Object> {
        private final Object payload;
        private final MessageHeaders messageHeaders;

        public KafkaMessage(Object payload, Map<String, Object> headers) {
            this.payload = payload;
            this.messageHeaders = new KafkaMessageHeaders(headers, KafkaMessageDrivenChannelAdapter.this.generateMessageId, KafkaMessageDrivenChannelAdapter.this.generateTimestamp);
        }

        public Object getPayload() {
            return this.payload;
        }

        public MessageHeaders getHeaders() {
            return this.messageHeaders;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder(this.getClass().getSimpleName());
            sb.append(" [payload=");
            if (this.payload instanceof byte[]) {
                sb.append("byte[").append(((byte[])this.payload).length).append("]");
            } else {
                sb.append(this.payload);
            }
            sb.append(", headers=").append(this.messageHeaders).append("]");
            return sb.toString();
        }
    }

    private class AcknowledgingChannelForwardingMessageListener
    extends AbstractDecodingAcknowledgingMessageListener {
        public AcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata, Acknowledgment acknowledgment) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(key, payload, metadata, acknowledgment));
        }
    }

    private class AutoAcknowledgingChannelForwardingMessageListener
    extends AbstractDecodingMessageListener {
        public AutoAcknowledgingChannelForwardingMessageListener() {
            super(KafkaMessageDrivenChannelAdapter.this.keyDecoder, KafkaMessageDrivenChannelAdapter.this.payloadDecoder);
        }

        public void doOnMessage(Object key, Object payload, KafkaMessageMetadata metadata) {
            KafkaMessageDrivenChannelAdapter.this.sendMessage(KafkaMessageDrivenChannelAdapter.this.toMessage(key, payload, metadata, null));
        }
    }
}

