/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.outbound;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.Lifecycle;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.kafka.support.KafkaSendFailureException;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;

public class KafkaProducerMessageHandler<K, V>
extends AbstractReplyProducingMessageHandler
implements Lifecycle {
    private static final long DEFAULT_SEND_TIMEOUT = 10000L;
    private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<String, Set<Integer>>();
    private final KafkaTemplate<K, V> kafkaTemplate;
    private final boolean isGateway;
    private final boolean transactional;
    private final AtomicBoolean running = new AtomicBoolean();
    private EvaluationContext evaluationContext;
    private Expression topicExpression;
    private Expression messageKeyExpression;
    private Expression partitionIdExpression;
    private Expression timestampExpression;
    private boolean sync;
    private Expression sendTimeoutExpression = new ValueExpression((Object)10000L);
    private KafkaHeaderMapper headerMapper;
    private RecordMessageConverter replyMessageConverter = new MessagingMessageConverter();
    private MessageChannel sendFailureChannel;
    private String sendFailureChannelName;
    private MessageChannel sendSuccessChannel;
    private String sendSuccessChannelName;
    private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
    private Type replyPayloadType = Object.class;
    private volatile boolean noOutputChannel;

    public KafkaProducerMessageHandler(KafkaTemplate<K, V> kafkaTemplate) {
        Assert.notNull(kafkaTemplate, (String)"kafkaTemplate cannot be null");
        this.kafkaTemplate = kafkaTemplate;
        this.isGateway = kafkaTemplate instanceof ReplyingKafkaTemplate;
        if (this.isGateway) {
            this.setAsync(true);
            this.updateNotPropagatedHeaders(new String[]{"kafka_topic", "kafka_partitionId", "kafka_messageKey"}, false);
        }
        this.headerMapper = JacksonPresent.isJackson2Present() ? new DefaultKafkaHeaderMapper() : new SimpleKafkaHeaderMapper();
        this.transactional = kafkaTemplate.isTransactional();
        if (this.transactional && this.isGateway) {
            this.logger.warn((Object)"The KafkaTemplate is transactional; this gateway will only work if the consumer is configured to read uncommitted records");
        }
    }

    public void setTopicExpression(Expression topicExpression) {
        this.topicExpression = topicExpression;
    }

    public void setMessageKeyExpression(Expression messageKeyExpression) {
        this.messageKeyExpression = messageKeyExpression;
    }

    public void setPartitionIdExpression(Expression partitionIdExpression) {
        this.partitionIdExpression = partitionIdExpression;
    }

    public void setTimestampExpression(Expression timestampExpression) {
        this.timestampExpression = timestampExpression;
    }

    public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
        this.headerMapper = headerMapper;
    }

    public KafkaTemplate<?, ?> getKafkaTemplate() {
        return this.kafkaTemplate;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public void setSendTimeout(long sendTimeout) {
        super.setSendTimeout(sendTimeout);
        this.setSendTimeoutExpression((Expression)new ValueExpression((Object)sendTimeout));
    }

    public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
        Assert.notNull((Object)sendTimeoutExpression, (String)"'sendTimeoutExpression' must not be null");
        this.sendTimeoutExpression = sendTimeoutExpression;
    }

    public void setSendFailureChannel(MessageChannel sendFailureChannel) {
        this.sendFailureChannel = sendFailureChannel;
    }

    public void setSendFailureChannelName(String sendFailureChannelName) {
        this.sendFailureChannelName = sendFailureChannelName;
    }

    public void setSendSuccessChannel(MessageChannel sendSuccessChannel) {
        this.sendSuccessChannel = sendSuccessChannel;
    }

    public void setSendSuccessChannelName(String sendSuccessChannelName) {
        this.sendSuccessChannelName = sendSuccessChannelName;
    }

    public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
        Assert.notNull((Object)errorMessageStrategy, (String)"'errorMessageStrategy' cannot be null");
        this.errorMessageStrategy = errorMessageStrategy;
    }

    public void setReplyMessageConverter(RecordMessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.replyMessageConverter = messageConverter;
    }

    public void setReplyPayloadType(Type payloadType) {
        Assert.notNull((Object)payloadType, (String)"'payloadType' cannot be null");
        this.replyPayloadType = payloadType;
    }

    public String getComponentType() {
        return this.isGateway ? "kafka:outbound-gateway" : "kafka:outbound-channel-adapter";
    }

    protected MessageChannel getSendFailureChannel() {
        if (this.sendFailureChannel != null) {
            return this.sendFailureChannel;
        }
        if (this.sendFailureChannelName != null) {
            this.sendFailureChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.sendFailureChannelName);
            return this.sendFailureChannel;
        }
        return null;
    }

    protected MessageChannel getSendSuccessChannel() {
        if (this.sendSuccessChannel != null) {
            return this.sendSuccessChannel;
        }
        if (this.sendSuccessChannelName != null) {
            this.sendSuccessChannel = (MessageChannel)this.getChannelResolver().resolveDestination(this.sendSuccessChannelName);
            return this.sendSuccessChannel;
        }
        return null;
    }

    protected void doInit() {
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
    }

    public void start() {
        this.running.set(true);
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            this.kafkaTemplate.flush();
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    protected Object handleRequestMessage(Message<?> message) {
        ListenableFuture sendFuture;
        MessageHeaders messageHeaders = message.getHeaders();
        String topic = this.topicExpression != null ? (String)this.topicExpression.getValue(this.evaluationContext, message, String.class) : (String)messageHeaders.get((Object)"kafka_topic", String.class);
        Assert.state((boolean)StringUtils.hasText((String)topic), (String)"The 'topic' can not be empty or null");
        Integer partitionId = this.partitionIdExpression != null ? (Integer)this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class) : (Integer)messageHeaders.get((Object)"kafka_partitionId", Integer.class);
        Object messageKey = this.messageKeyExpression != null ? this.messageKeyExpression.getValue(this.evaluationContext, message) : messageHeaders.get((Object)"kafka_messageKey");
        Long timestamp = this.timestampExpression != null ? (Long)this.timestampExpression.getValue(this.evaluationContext, message, Long.class) : (Long)messageHeaders.get((Object)"kafka_timestamp", Long.class);
        Object payload = message.getPayload();
        if (payload instanceof KafkaNull) {
            payload = null;
        }
        RecordHeaders headers = null;
        if (this.headerMapper != null) {
            headers = new RecordHeaders();
            this.headerMapper.fromHeaders(messageHeaders, (Headers)headers);
        }
        ProducerRecord producerRecord = new ProducerRecord(topic, partitionId, timestamp, messageKey, payload, (Iterable)headers);
        RequestReplyFuture gatewayFuture = null;
        if (this.isGateway) {
            producerRecord.headers().add((Header)new RecordHeader("kafka_replyTopic", this.getReplyTopic(message)));
            gatewayFuture = ((ReplyingKafkaTemplate)this.kafkaTemplate).sendAndReceive(producerRecord);
            sendFuture = gatewayFuture.getSendFuture();
        } else {
            sendFuture = this.transactional && TransactionSynchronizationManager.getResource((Object)this.kafkaTemplate.getProducerFactory()) == null ? (ListenableFuture)this.kafkaTemplate.executeInTransaction(t -> t.send(producerRecord)) : this.kafkaTemplate.send(producerRecord);
        }
        try {
            this.processSendResult(message, producerRecord, sendFuture, this.getSendSuccessChannel());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MessageHandlingException(message, (Throwable)e);
        }
        catch (ExecutionException e) {
            throw new MessageHandlingException(message, e.getCause());
        }
        return this.processReplyFuture(gatewayFuture);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private byte[] getReplyTopic(Message<?> message) {
        if (this.replyTopicsAndPartitions.isEmpty()) {
            this.determineValidReplyTopicsAndPartitions();
        }
        Object replyHeader = message.getHeaders().get((Object)"kafka_replyTopic");
        byte[] replyTopic = null;
        String topicToCheck = null;
        if (replyHeader instanceof String) {
            replyTopic = ((String)replyHeader).getBytes(StandardCharsets.UTF_8);
            topicToCheck = (String)replyHeader;
        } else if (replyHeader instanceof byte[]) {
            replyTopic = (byte[])replyHeader;
        } else if (replyHeader != null) {
            throw new IllegalStateException("kafka_replyTopic must be String or byte[]");
        }
        if (replyTopic == null) {
            if (this.replyTopicsAndPartitions.size() != 1) throw new IllegalStateException("No reply topic header and no default reply topic is can be determined");
            replyTopic = this.replyTopicsAndPartitions.keySet().iterator().next().getBytes(StandardCharsets.UTF_8);
        } else {
            if (topicToCheck == null) {
                topicToCheck = new String(replyTopic, StandardCharsets.UTF_8);
            }
            if (!this.replyTopicsAndPartitions.keySet().contains(topicToCheck)) {
                throw new IllegalStateException("The reply topic header [" + topicToCheck + "] does not match any reply container topic: " + this.replyTopicsAndPartitions.keySet());
            }
        }
        Integer replyPartition = (Integer)message.getHeaders().get((Object)"kafka_replyPartition", Integer.class);
        if (replyPartition == null) return replyTopic;
        if (topicToCheck == null) {
            topicToCheck = new String(replyTopic, StandardCharsets.UTF_8);
        }
        if (this.replyTopicsAndPartitions.get(topicToCheck).contains(replyPartition)) return replyTopic;
        throw new IllegalStateException("The reply partition header [" + replyPartition + "] does not match any reply container partition for topic [" + topicToCheck + "]: " + this.replyTopicsAndPartitions.get(topicToCheck));
    }

    private void determineValidReplyTopicsAndPartitions() {
        ReplyingKafkaTemplate rkt = (ReplyingKafkaTemplate)this.kafkaTemplate;
        Collection replyTopics = rkt.getAssignedReplyTopicPartitions();
        HashMap topicsAndPartitions = new HashMap();
        if (replyTopics != null) {
            replyTopics.forEach(tp -> {
                topicsAndPartitions.computeIfAbsent(tp.topic(), k -> new TreeSet());
                ((Set)topicsAndPartitions.get(tp.topic())).add(tp.partition());
            });
            this.replyTopicsAndPartitions.putAll(topicsAndPartitions);
        }
    }

    public void processSendResult(final Message<?> message, final ProducerRecord<K, V> producerRecord, ListenableFuture<SendResult<K, V>> future, final MessageChannel metadataChannel) throws InterruptedException, ExecutionException {
        if (this.getSendFailureChannel() != null || metadataChannel != null) {
            future.addCallback(new ListenableFutureCallback<SendResult<K, V>>(){

                public void onSuccess(SendResult<K, V> result) {
                    if (metadataChannel != null) {
                        KafkaProducerMessageHandler.this.messagingTemplate.send((Object)metadataChannel, KafkaProducerMessageHandler.this.getMessageBuilderFactory().fromMessage(message).setHeader("kafka_recordMetadata", (Object)result.getRecordMetadata()).build());
                    }
                }

                public void onFailure(Throwable ex) {
                    if (KafkaProducerMessageHandler.this.getSendFailureChannel() != null) {
                        KafkaProducerMessageHandler.this.messagingTemplate.send((Object)KafkaProducerMessageHandler.this.getSendFailureChannel(), (Message)KafkaProducerMessageHandler.this.errorMessageStrategy.buildErrorMessage((Throwable)((Object)new KafkaSendFailureException(message, producerRecord, ex)), null));
                    }
                }
            });
        }
        if (this.sync) {
            Long sendTimeout = (Long)this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
            if (sendTimeout == null || sendTimeout < 0L) {
                future.get();
            } else {
                try {
                    future.get(sendTimeout.longValue(), TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    throw new MessageTimeoutException(message, "Timeout waiting for response from KafkaProducer", (Throwable)te);
                }
            }
        }
    }

    private Future<?> processReplyFuture(RequestReplyFuture<?, ?, Object> future) {
        if (future == null) {
            return null;
        }
        return new ConvertingReplyFuture(future);
    }

    private final class ConvertingReplyFuture
    extends SettableListenableFuture<Object> {
        ConvertingReplyFuture(RequestReplyFuture<?, ?, Object> future) {
            this.addCallback(future);
        }

        private void addCallback(RequestReplyFuture<?, ?, Object> future) {
            future.addCallback(new ListenableFutureCallback<ConsumerRecord<?, Object>>(){

                public void onSuccess(ConsumerRecord<?, Object> result) {
                    try {
                        ConvertingReplyFuture.this.set(this.dontLeakHeaders(KafkaProducerMessageHandler.this.replyMessageConverter.toMessage(result, null, null, KafkaProducerMessageHandler.this.replyPayloadType)));
                    }
                    catch (Exception e) {
                        ConvertingReplyFuture.this.setException(e);
                    }
                }

                private Message<?> dontLeakHeaders(Message<?> message) {
                    if (message.getHeaders() instanceof KafkaMessageHeaders) {
                        Map headers = ((KafkaMessageHeaders)message.getHeaders()).getRawHeaders();
                        headers.remove("kafka_correlationId");
                        headers.remove("kafka_replyTopic");
                        headers.remove("kafka_replyPartition");
                        return message;
                    }
                    return KafkaProducerMessageHandler.this.getMessageBuilderFactory().fromMessage(message).removeHeader("kafka_correlationId").removeHeader("kafka_replyTopic").removeHeader("kafka_replyPartition").build();
                }

                public void onFailure(Throwable ex) {
                    ConvertingReplyFuture.this.setException(ex);
                }
            });
        }
    }
}

