/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.kafka;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.AbstractKafkaPartitioner;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.common.PartitionInfo;

@InterfaceStability.Evolving
public class OneToManyPartitioner
extends AbstractKafkaPartitioner {
    public OneToManyPartitioner(String[] clusters, String[] topics, AbstractKafkaInputOperator protoTypeOperator) {
        super(clusters, topics, protoTypeOperator);
    }

    @Override
    List<Set<AbstractKafkaPartitioner.PartitionMeta>> assign(Map<String, Map<String, List<PartitionInfo>>> metadata) {
        if (this.prototypeOperator.getInitialPartitionCount() <= 0) {
            throw new IllegalArgumentException("Num of partitions should be greater or equal to 1");
        }
        int partitionCount = this.prototypeOperator.getInitialPartitionCount();
        ArrayList<Set<AbstractKafkaPartitioner.PartitionMeta>> eachPartitionAssignment = new ArrayList<Set<AbstractKafkaPartitioner.PartitionMeta>>(this.prototypeOperator.getInitialPartitionCount());
        int i = 0;
        for (Map.Entry<String, Map<String, List<PartitionInfo>>> clusterMap : metadata.entrySet()) {
            for (Map.Entry<String, List<PartitionInfo>> topicPartition : clusterMap.getValue().entrySet()) {
                for (PartitionInfo pif : topicPartition.getValue()) {
                    int index;
                    if ((index = i++ % partitionCount) >= eachPartitionAssignment.size()) {
                        eachPartitionAssignment.add(new HashSet());
                    }
                    eachPartitionAssignment.get(index).add(new AbstractKafkaPartitioner.PartitionMeta(clusterMap.getKey(), topicPartition.getKey(), pif.partition()));
                }
            }
        }
        return eachPartitionAssignment;
    }
}

