/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.base.Joiner;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractKafkaPartitioner
implements Partitioner<AbstractKafkaInputOperator>,
StatsListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaPartitioner.class);
    private static final String META_CONSUMER_GROUP_NAME = AbstractKafkaInputOperator.class.getName() + "META_GROUP";
    protected final String[] clusters;
    protected final String[] topics;
    protected final AbstractKafkaInputOperator prototypeOperator;
    private ArrayList<KafkaConsumer<byte[], byte[]>> metadataRefreshClients;
    private final List<Set<PartitionMeta>> currentPartitions = new LinkedList<Set<PartitionMeta>>();

    public AbstractKafkaPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator prototypeOperator) {
        this.clusters = clusters;
        this.topics = topics;
        this.prototypeOperator = prototypeOperator;
    }

    abstract List<Set<PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Collection<Partitioner.Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
        this.initMetadataClients();
        HashMap<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<String, Map<String, List<PartitionInfo>>>();
        try {
            for (int i = 0; i < this.clusters.length; ++i) {
                metadata.put(this.clusters[i], new HashMap());
                for (String topic : this.topics) {
                    int tryTime = 10;
                    while (tryTime-- > 0) {
                        try {
                            List ptis = this.metadataRefreshClients.get(i).partitionsFor(topic);
                            if (ptis != null) {
                                if (logger.isDebugEnabled()) {
                                    logger.debug("Partition metadata for topic {} : {}", (Object)topic, (Object)Joiner.on((char)';').join((Iterable)ptis));
                                }
                                ((Map)metadata.get(this.clusters[i])).put(topic, ptis);
                                break;
                            }
                            logger.warn("Partition metadata for topic {} is null. retrying...", (Object)topic);
                        }
                        catch (Exception e) {
                            logger.warn("Got Exception when trying get partition info for topic {}.", (Object)topic, (Object)e);
                        }
                        try {
                            Thread.sleep(100L);
                        }
                        catch (Exception exception) {}
                    }
                    if (tryTime != 0) continue;
                    throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic);
                }
            }
        }
        finally {
            this.closeClients();
        }
        List<Set<PartitionMeta>> parts = null;
        try {
            parts = this.assign(metadata);
        }
        catch (Exception e) {
            logger.error("assign() exception.", (Throwable)e);
            e.printStackTrace();
        }
        if (this.currentPartitions == parts || this.currentPartitions.equals(parts)) {
            logger.debug("No partition change found");
            return collection;
        }
        logger.info("Partition change detected: ");
        this.currentPartitions.clear();
        this.currentPartitions.addAll(parts);
        int i = 0;
        LinkedList<Partitioner.Partition<AbstractKafkaInputOperator>> result = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator>>();
        for (Partitioner.Partition<AbstractKafkaInputOperator> nextPartition : collection) {
            if (!parts.remove(((AbstractKafkaInputOperator)nextPartition.getPartitionedInstance()).assignment())) continue;
            if (logger.isInfoEnabled()) {
                logger.info("[Existing] Partition {} with assignment {} ", (Object)i, (Object)Joiner.on((char)';').join(((AbstractKafkaInputOperator)nextPartition.getPartitionedInstance()).assignment()));
            }
            result.add(nextPartition);
            ++i;
        }
        for (Set<PartitionMeta> partitionAssignment : parts) {
            if (logger.isInfoEnabled()) {
                logger.info("[New] Partition {} with assignment {} ", (Object)i, (Object)Joiner.on((char)';').join(partitionAssignment));
            }
            result.add(this.createPartition(partitionAssignment));
            ++i;
        }
        return result;
    }

    protected void closeClients() {
        for (KafkaConsumer<byte[], byte[]> consume : this.metadataRefreshClients) {
            consume.close();
        }
        this.metadataRefreshClients = null;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator>> map) {
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        StatsListener.Response response = new StatsListener.Response();
        response.repartitionRequired = true;
        return response;
    }

    protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<PartitionMeta> partitionAssignment) {
        DefaultPartition p = new DefaultPartition(KryoCloneUtils.cloneObject((Object)this.prototypeOperator));
        ((AbstractKafkaInputOperator)p.getPartitionedInstance()).assign(partitionAssignment);
        return p;
    }

    private void initMetadataClients() {
        if (this.metadataRefreshClients != null && this.metadataRefreshClients.size() == this.clusters.length) {
            return;
        }
        if (this.clusters == null || this.clusters.length == 0) {
            throw new IllegalStateException("clusters can not be null");
        }
        this.metadataRefreshClients = new ArrayList(this.clusters.length);
        int index = 0;
        for (String c : this.clusters) {
            Properties prop = this.prototypeOperator.getConsumerProps();
            prop.put("group.id", META_CONSUMER_GROUP_NAME);
            prop.put("bootstrap.servers", c);
            prop.put("key.deserializer", ByteArrayDeserializer.class.getName());
            prop.put("value.deserializer", ByteArrayDeserializer.class.getName());
            prop.put("enable.auto.commit", "false");
            if (logger.isInfoEnabled()) {
                logger.info("Consumer Properties :  {} ", (Object)this.getPropertyAsString(prop));
            }
            this.metadataRefreshClients.add(index++, (KafkaConsumer<byte[], byte[]>)new KafkaConsumer(prop));
        }
    }

    private String getPropertyAsString(Properties prop) {
        StringWriter writer = new StringWriter();
        try {
            prop.store(writer, "");
        }
        catch (IOException e) {
            logger.error("Cannot retrieve consumer properties for Logging : {}", (Object)e.getMessage());
        }
        return writer.getBuffer().toString();
    }

    public static class PartitionMeta {
        private String cluster;
        private transient TopicPartition topicPartition;
        private String topic;
        private int partitionId;

        public PartitionMeta() {
        }

        public PartitionMeta(String cluster, String topic, int partitionId) {
            this.cluster = cluster;
            this.topic = topic;
            this.partitionId = partitionId;
            this.topicPartition = new TopicPartition(topic, partitionId);
        }

        public String getCluster() {
            return this.cluster;
        }

        public int getPartitionId() {
            return this.partitionId;
        }

        public String getTopic() {
            return this.topic;
        }

        public TopicPartition getTopicPartition() {
            if (this.topicPartition == null) {
                this.topicPartition = new TopicPartition(this.topic, this.partitionId);
            }
            return this.topicPartition;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            PartitionMeta that = (PartitionMeta)o;
            return Objects.equals(this.partitionId, that.partitionId) && Objects.equals(this.cluster, that.cluster) && Objects.equals(this.topic, that.topic);
        }

        public int hashCode() {
            return Objects.hash(this.cluster, this.topic, this.partitionId);
        }

        public String toString() {
            return "PartitionMeta{cluster='" + this.cluster + '\'' + ", topicPartition=" + this.getTopicPartition() + '}';
        }
    }
}

