/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.apex.malhar.kafka.AbstractKafkaOutputOperator;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class KafkaSinglePortExactlyOnceOutputOperator<T>
extends AbstractKafkaOutputOperator<String, T>
implements Operator.CheckpointNotificationListener {
    private transient String key;
    private transient String appName;
    private transient Integer operatorId;
    private transient Long windowId;
    private transient Map<T, Integer> partialWindowTuples = new HashMap<T, Integer>();
    private transient KafkaConsumer consumer;
    private WindowDataManager windowDataManager = new FSWindowDataManager();
    private final int KAFKA_CONNECT_ATTEMPT = 10;
    private final String KEY_SEPARATOR = "#";
    public static final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>(){

        public void process(T tuple) {
            KafkaSinglePortExactlyOnceOutputOperator.this.sendTuple(tuple);
        }
    };
    private static final Logger logger = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);

    public void setup(Context.OperatorContext context) {
        this.setProperty("key.serializer", KEY_SERIALIZER);
        if (this.getProperties().getProperty("value.deserializer") == null) {
            throw new IllegalArgumentException("Value deserializer needs to be set for the operator, as it is used during recovery.");
        }
        super.setup(context);
        this.operatorId = context.getId();
        this.windowDataManager.setup((Context)context);
        this.appName = (String)context.getValue(Context.DAGContext.APPLICATION_NAME);
        this.key = this.appName + "#" + new Integer(this.operatorId);
        this.consumer = this.KafkaConsumerInit();
    }

    public void beginWindow(long windowId) {
        this.windowId = windowId;
        if (windowId == this.windowDataManager.getLargestCompletedWindow()) {
            this.rebuildPartialWindow();
        }
    }

    public void checkpointed(long windowId) {
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void beforeCheckpoint(long windowId) {
    }

    public void teardown() {
        this.consumer.close();
        super.teardown();
    }

    public void endWindow() {
        if (this.windowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        if (!this.partialWindowTuples.isEmpty()) {
            throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset.");
        }
        this.getProducer().flush();
        try {
            this.windowDataManager.save(this.getPartitionsAndOffsets(true), this.windowId.longValue());
        }
        catch (IOException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    private boolean doesKeyBelongsToThisInstance(int operatorId, String key) {
        String[] split = key.split("#");
        if (split.length != 2) {
            return false;
        }
        return Integer.parseInt(split[1]) == operatorId && split[0].equals(this.appName);
    }

    private boolean alreadyInKafka(T message) {
        if (this.windowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return true;
        }
        if (this.partialWindowTuples.containsKey(message)) {
            Integer val = this.partialWindowTuples.get(message);
            if (val == 0) {
                return false;
            }
            if (val == 1) {
                this.partialWindowTuples.remove(message);
            } else {
                this.partialWindowTuples.put(message, val - 1);
            }
            return true;
        }
        return false;
    }

    private Map<Integer, Long> getPartitionsAndOffsets(boolean latest) throws ExecutionException, InterruptedException {
        List partitionInfoList = this.consumer.partitionsFor(this.getTopic());
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (PartitionInfo partitionInfo : partitionInfoList) {
            topicPartitionList.add(new TopicPartition(this.getTopic(), partitionInfo.partition()));
        }
        HashMap<Integer, Long> parttionsAndOffset = new HashMap<Integer, Long>();
        this.consumer.assign(topicPartitionList);
        for (PartitionInfo partitionInfo : partitionInfoList) {
            try {
                TopicPartition topicPartition = new TopicPartition(this.getTopic(), partitionInfo.partition());
                if (latest) {
                    this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
                } else {
                    this.consumer.seekToBeginning(new TopicPartition[]{topicPartition});
                }
                parttionsAndOffset.put(partitionInfo.partition(), this.consumer.position(topicPartition));
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
        return parttionsAndOffset;
    }

    private void rebuildPartialWindow() {
        Map<Integer, Long> currentOffsets;
        Map<Integer, Long> storedOffsets;
        logger.info("Rebuild the partial window after " + this.windowDataManager.getLargestCompletedWindow());
        try {
            storedOffsets = (Map<Integer, Long>)this.windowDataManager.retrieve(this.windowId.longValue());
            currentOffsets = this.getPartitionsAndOffsets(true);
        }
        catch (IOException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
        if (currentOffsets == null) {
            logger.info("No tuples found while building partial window " + this.windowDataManager.getLargestCompletedWindow());
            return;
        }
        if (storedOffsets == null) {
            logger.info("Stored offset not available, seeking to the beginning of the Kafka Partition.");
            try {
                storedOffsets = this.getPartitionsAndOffsets(false);
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
            topicPartitions.add(new TopicPartition(this.getTopic(), entry.getKey().intValue()));
        }
        this.consumer.assign(topicPartitions);
        block7: for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
            boolean crossedBoundary;
            Long storedOffset = 0L;
            Integer currentPartition = entry.getKey();
            Long currentOffset = entry.getValue();
            if (storedOffsets.containsKey(currentPartition)) {
                storedOffset = storedOffsets.get(currentPartition);
            }
            if (storedOffset >= currentOffset) continue;
            try {
                this.consumer.seek(new TopicPartition(this.getTopic(), currentPartition.intValue()), storedOffset.longValue());
            }
            catch (Exception ex) {
                logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible.");
                throw new RuntimeException(ex);
            }
            int kafkaAttempt = 0;
            block8: do {
                ConsumerRecords consumerRecords;
                if ((consumerRecords = this.consumer.poll(100L)).count() == 0) {
                    if (kafkaAttempt++ == 10) {
                        continue block7;
                    }
                } else {
                    kafkaAttempt = 0;
                }
                crossedBoundary = false;
                for (ConsumerRecord consumerRecord : consumerRecords) {
                    if (consumerRecord.offset() >= currentOffset) {
                        crossedBoundary = true;
                        continue block8;
                    }
                    if (!this.doesKeyBelongsToThisInstance(this.operatorId, (String)consumerRecord.key())) continue;
                    Object value = consumerRecord.value();
                    if (this.partialWindowTuples.containsKey(value)) {
                        Integer count = this.partialWindowTuples.get(value);
                        this.partialWindowTuples.put(value, count + 1);
                        continue;
                    }
                    this.partialWindowTuples.put(value, 1);
                }
            } while (!crossedBoundary);
        }
    }

    private KafkaConsumer KafkaConsumerInit() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.getProperties().get("bootstrap.servers"));
        props.put("key.deserializer", KEY_DESERIALIZER);
        props.put("value.deserializer", this.getProperties().get("value.deserializer"));
        return new KafkaConsumer(props);
    }

    protected void sendTuple(T tuple) {
        if (this.alreadyInKafka(tuple)) {
            return;
        }
        this.getProducer().send(new ProducerRecord(this.getTopic(), (Object)this.key, tuple), new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    logger.info("Wrting to Kafka failed with an exception {}" + e.getMessage());
                    throw new RuntimeException(e);
                }
            }
        });
    }
}

