/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.common.util.Pair;
import com.datatorrent.contrib.kafka.AbstractKafkaOutputOperator;
import com.datatorrent.contrib.kafka.KafkaMetadataUtil;
import com.google.common.collect.Sets;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractExactlyOnceKafkaOutputOperator<T, K, V>
extends AbstractKafkaOutputOperator<K, V> {
    private Map<Integer, Pair<byte[], byte[]>> lastMsgs;
    private transient Partitioner partitioner;
    private transient int partitionNum = 1;
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>(){

        public void process(T tuple) {
            Pair lastMsg;
            Pair keyValue = AbstractExactlyOnceKafkaOutputOperator.this.tupleToKeyValue(tuple);
            int pid = 0;
            if (AbstractExactlyOnceKafkaOutputOperator.this.partitioner != null) {
                pid = AbstractExactlyOnceKafkaOutputOperator.this.partitioner.partition(keyValue.first, AbstractExactlyOnceKafkaOutputOperator.this.partitionNum);
            }
            if ((lastMsg = (Pair)AbstractExactlyOnceKafkaOutputOperator.this.lastMsgs.get(pid)) == null || AbstractExactlyOnceKafkaOutputOperator.this.compareToLastMsg(keyValue, (Pair<byte[], byte[]>)lastMsg) > 0) {
                AbstractExactlyOnceKafkaOutputOperator.this.getProducer().send(new KeyedMessage(AbstractExactlyOnceKafkaOutputOperator.this.getTopic(), keyValue.first, keyValue.second));
                ++AbstractExactlyOnceKafkaOutputOperator.this.sendCount;
            } else {
                logger.debug("Ingore tuple " + tuple);
                return;
            }
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        try {
            String className = (String)this.getConfigProperties().get("partitioner.class");
            if (className != null) {
                this.partitioner = (Partitioner)Class.forName(className).newInstance();
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to initialize partitioner", e);
        }
        this.initializeLastProcessingOffset();
    }

    private void initializeLastProcessingOffset() {
        TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((Object[])new String[]{(String)this.getConfigProperties().get("metadata.broker.list")}), this.getTopic());
        if (tm == null) {
            throw new RuntimeException("Failed to retrieve topic metadata");
        }
        this.partitionNum = tm.partitionsMetadata().size();
        this.lastMsgs = new HashMap<Integer, Pair<byte[], byte[]>>(this.partitionNum);
        for (PartitionMetadata pm : tm.partitionsMetadata()) {
            String leadBroker = pm.leader().host();
            int port = pm.leader().port();
            String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 65536, clientName);
            long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), OffsetRequest.LatestTime(), clientName);
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1L, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
                Message m = messageAndOffset.message();
                ByteBuffer payload = m.payload();
                ByteBuffer key = m.key();
                byte[] valueBytes = new byte[payload.limit()];
                byte[] keyBytes = new byte[key.limit()];
                payload.get(valueBytes);
                key.get(keyBytes);
                this.lastMsgs.put(pm.partitionId(), (Pair<byte[], byte[]>)new Pair((Object)keyBytes, (Object)valueBytes));
            }
        }
    }

    protected abstract int compareToLastMsg(Pair<K, V> var1, Pair<byte[], byte[]> var2);

    protected abstract Pair<K, V> tupleToKeyValue(T var1);
}

