package org.apache.apex.malhar.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.class */
public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOutputOperator<String, T> implements Operator.CheckpointNotificationListener {
    private transient String key;
    private transient String appName;
    private transient Integer operatorId;
    private transient Long windowId;
    private transient KafkaConsumer consumer;
    private static final Logger logger = LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
    private transient Map<T, Integer> partialWindowTuples = new HashMap();
    private WindowDataManager windowDataManager = new FSWindowDataManager();
    private final int KAFKA_CONNECT_ATTEMPT = 10;
    private final String KEY_SEPARATOR = "#";
    private final String KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private final String VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { // from class: org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.1
        public void process(T t) {
            KafkaSinglePortExactlyOnceOutputOperator.this.sendTuple(t);
        }
    };

    @Override // org.apache.apex.malhar.kafka.AbstractKafkaOutputOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        this.operatorId = Integer.valueOf(operatorContext.getId());
        this.windowDataManager.setup(operatorContext);
        this.appName = (String) operatorContext.getValue(Context.DAGContext.APPLICATION_NAME);
        this.key = this.appName + "#" + new Integer(this.operatorId.intValue());
        this.consumer = KafkaConsumerInit();
    }

    @Override // org.apache.apex.malhar.kafka.AbstractKafkaOutputOperator
    public void beginWindow(long j) {
        this.windowId = Long.valueOf(j);
        if (j == this.windowDataManager.getLargestCompletedWindow()) {
            rebuildPartialWindow();
        }
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        try {
            this.windowDataManager.committed(j);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void beforeCheckpoint(long j) {
    }

    @Override // org.apache.apex.malhar.kafka.AbstractKafkaOutputOperator
    public void teardown() {
        this.consumer.close();
        super.teardown();
    }

    @Override // org.apache.apex.malhar.kafka.AbstractKafkaOutputOperator
    public void endWindow() {
        if (!this.partialWindowTuples.isEmpty() && this.windowId.longValue() > this.windowDataManager.getLargestCompletedWindow()) {
            throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset.");
        }
        getProducer().flush();
        try {
            this.windowDataManager.save(getPartitionsAndOffsets(true), this.windowId.longValue());
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    private boolean doesKeyBelongsToThisInstance(int i, String str) {
        String[] split = str.split("#");
        return split.length == 2 && Integer.parseInt(split[1]) == i && split[0].equals(this.appName);
    }

    private boolean alreadyInKafka(T t) {
        if (this.windowId.longValue() <= this.windowDataManager.getLargestCompletedWindow()) {
            return true;
        }
        if (!this.partialWindowTuples.containsKey(t)) {
            return false;
        }
        Integer num = this.partialWindowTuples.get(t);
        if (num.intValue() == 0) {
            return false;
        }
        if (num.intValue() == 1) {
            this.partialWindowTuples.remove(t);
            return true;
        }
        this.partialWindowTuples.put(t, Integer.valueOf(num.intValue() - 1));
        return true;
    }

    private Map<Integer, Long> getPartitionsAndOffsets(boolean z) throws ExecutionException, InterruptedException {
        List<PartitionInfo> partitionsFor = this.consumer.partitionsFor(getTopic());
        ArrayList arrayList = new ArrayList();
        Iterator it = partitionsFor.iterator();
        while (it.hasNext()) {
            arrayList.add(new TopicPartition(getTopic(), ((PartitionInfo) it.next()).partition()));
        }
        HashMap hashMap = new HashMap();
        this.consumer.assign(arrayList);
        for (PartitionInfo partitionInfo : partitionsFor) {
            try {
                TopicPartition topicPartition = new TopicPartition(getTopic(), partitionInfo.partition());
                if (z) {
                    this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
                } else {
                    this.consumer.seekToBeginning(new TopicPartition[]{topicPartition});
                }
                hashMap.put(Integer.valueOf(partitionInfo.partition()), Long.valueOf(this.consumer.position(topicPartition)));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return hashMap;
    }

    private void rebuildPartialWindow() {
        boolean z;
        logger.info("Rebuild the partial window after " + this.windowDataManager.getLargestCompletedWindow());
        try {
            Map<Integer, Long> map = (Map) this.windowDataManager.retrieve(this.windowId.longValue());
            Map<Integer, Long> partitionsAndOffsets = getPartitionsAndOffsets(true);
            if (partitionsAndOffsets == null) {
                logger.debug("No tuples found while building partial window " + this.windowDataManager.getLargestCompletedWindow());
                return;
            }
            if (map == null) {
                logger.debug("Stored offset not available, seeking to the beginning of the Kafka Partition.");
                try {
                    map = getPartitionsAndOffsets(false);
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
            ArrayList arrayList = new ArrayList();
            Iterator<Map.Entry<Integer, Long>> it = partitionsAndOffsets.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(new TopicPartition(getTopic(), it.next().getKey().intValue()));
            }
            this.consumer.assign(arrayList);
            for (Map.Entry<Integer, Long> entry : partitionsAndOffsets.entrySet()) {
                Integer key = entry.getKey();
                Long value = entry.getValue();
                Long l = map.containsKey(key) ? map.get(key) : 0L;
                if (l.longValue() < value.longValue()) {
                    try {
                        this.consumer.seek(new TopicPartition(getTopic(), key.intValue()), l.longValue());
                        int i = 0;
                        do {
                            ConsumerRecords poll = this.consumer.poll(100L);
                            if (poll.count() == 0) {
                                int i2 = i;
                                i++;
                                if (i2 == 10) {
                                    break;
                                }
                            } else {
                                i = 0;
                            }
                            z = false;
                            Iterator it2 = poll.iterator();
                            while (true) {
                                if (!it2.hasNext()) {
                                    break;
                                }
                                ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                                if (doesKeyBelongsToThisInstance(this.operatorId.intValue(), (String) consumerRecord.key())) {
                                    Object value2 = consumerRecord.value();
                                    if (this.partialWindowTuples.containsKey(value2)) {
                                        this.partialWindowTuples.put(value2, Integer.valueOf(this.partialWindowTuples.get(value2).intValue() + 1));
                                    } else {
                                        this.partialWindowTuples.put(value2, 1);
                                    }
                                    if (consumerRecord.offset() >= value.longValue()) {
                                        z = true;
                                        break;
                                    }
                                }
                            }
                        } while (!z);
                    } catch (Exception e2) {
                        logger.info("Rebuilding of the partial window is not complete, exactly once recovery is not possible.");
                        throw new RuntimeException(e2);
                    }
                }
            }
        } catch (IOException | InterruptedException | ExecutionException e3) {
            throw new RuntimeException(e3);
        }
    }

    private KafkaConsumer KafkaConsumerInit() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getProperties().get("bootstrap.servers"));
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer(properties);
    }

    protected void sendTuple(T t) {
        if (alreadyInKafka(t)) {
            return;
        }
        getProducer().send(new ProducerRecord(getTopic(), this.key, t), new Callback() { // from class: org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.2
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    KafkaSinglePortExactlyOnceOutputOperator.logger.info("Wrting to Kafka failed with an exception {}" + exc.getMessage());
                    throw new RuntimeException(exc);
                }
            }
        });
    }
}
