/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.kafka;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaMetadataUtil;
import com.datatorrent.contrib.kafka.KafkaPartition;
import com.datatorrent.contrib.kafka.OffsetManager;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(partitionable=true)
public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer>
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
Operator.CheckpointNotificationListener,
Partitioner<AbstractKafkaInputOperator<K>>,
StatsListener {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
    @Min(value=1L)
    private int maxTuplesPerWindow = Integer.MAX_VALUE;
    @Min(value=1L)
    private long maxTotalMsgSizePerWindow = Long.MAX_VALUE;
    private transient int emitCount = 0;
    private transient long emitTotalMsgSize = 0L;
    protected WindowDataManager windowDataManager;
    protected transient long currentWindowId;
    protected transient int operatorId;
    protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState;
    protected Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>();
    protected transient List<Pair<Long, Map<KafkaPartition, Long>>> offsetTrackHistory = new LinkedList<Pair<Long, Map<KafkaPartition, Long>>>();
    private transient Context.OperatorContext context = null;
    public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
    @Deprecated
    private long msgRateUpperBound = Long.MAX_VALUE;
    @Deprecated
    private long byteRateUpperBound = Long.MAX_VALUE;
    private transient List<PartitionInfo> currentPartitionInfo = Lists.newLinkedList();
    private transient Map<Integer, List<KafkaConsumer.KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
    private OffsetManager offsetManager = null;
    private long repartitionInterval = 30000L;
    private long repartitionCheckInterval = 5000L;
    private transient long lastCheckTime = 0L;
    private transient long lastRepartitionTime = 0L;
    private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>();
    private transient KafkaConsumer.KafkaMessage pendingMessage;
    @Min(value=1L)
    private int initialPartitionCount = 1;
    @NotNull
    @Valid
    protected KafkaConsumer consumer = new SimpleKafkaConsumer();

    public AbstractKafkaInputOperator() {
        this.windowDataManager = new WindowDataManager.NoopWindowDataManager();
        this.currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>();
    }

    protected abstract void emitTuple(Message var1);

    protected void emitTuple(KafkaConsumer.KafkaMessage message) {
        this.emitTuple(message.msg);
    }

    public int getMaxTuplesPerWindow() {
        return this.maxTuplesPerWindow;
    }

    public void setMaxTuplesPerWindow(int maxTuplesPerWindow) {
        this.maxTuplesPerWindow = maxTuplesPerWindow;
    }

    public long getMaxTotalMsgSizePerWindow() {
        return this.maxTotalMsgSizePerWindow;
    }

    public void setMaxTotalMsgSizePerWindow(long maxTotalMsgSizePerWindow) {
        this.maxTotalMsgSizePerWindow = maxTotalMsgSizePerWindow;
    }

    public void setup(Context.OperatorContext context) {
        logger.debug("consumer {} topic {} cacheSize {}", new Object[]{this.consumer, this.consumer.getTopic(), this.consumer.getCacheSize()});
        this.consumer.create();
        if (this.consumer instanceof SimpleKafkaConsumer && !this.offsetStats.isEmpty()) {
            HashMap<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>();
            for (Map.Entry<KafkaPartition, Long> e : this.offsetStats.entrySet()) {
                currentOffsets.put(e.getKey(), e.getValue() + 1L);
            }
            ((SimpleKafkaConsumer)this.consumer).resetOffset(currentOffsets);
        }
        this.context = context;
        this.operatorId = context.getId();
        if (this.consumer instanceof HighlevelKafkaConsumer && !(this.windowDataManager instanceof WindowDataManager.NoopWindowDataManager)) {
            throw new RuntimeException("Idempotency is not supported for High Level Kafka Consumer");
        }
        this.windowDataManager.setup((Context)context);
    }

    public void teardown() {
        this.windowDataManager.teardown();
        this.consumer.teardown();
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestCompletedWindow()) {
            this.replay(windowId);
        }
        this.emitCount = 0;
        this.emitTotalMsgSize = 0L;
    }

    protected void replay(long windowId) {
        try {
            Map<String, List<PartitionMetadata>> pms;
            Map recoveredData = (Map)this.windowDataManager.retrieve(windowId);
            if (recoveredData != null && (pms = KafkaMetadataUtil.getPartitionsForTopic(this.getConsumer().brokers, this.getConsumer().topic)) != null) {
                SimpleKafkaConsumer cons = (SimpleKafkaConsumer)this.getConsumer();
                FetchRequestBuilder frb = new FetchRequestBuilder().clientId(cons.getClientId());
                block2: for (Map.Entry rc : recoveredData.entrySet()) {
                    KafkaPartition kp = (KafkaPartition)rc.getKey();
                    List<PartitionMetadata> pmsVal = pms.get(kp.getClusterId());
                    Iterator<PartitionMetadata> pmIterator = pmsVal.iterator();
                    PartitionMetadata pm = pmIterator.next();
                    while (pm.partitionId() != kp.getPartitionId() && pmIterator.hasNext()) {
                        pm = pmIterator.next();
                    }
                    if (pm.partitionId() != kp.getPartitionId()) continue;
                    Broker bk = pm.leader();
                    frb.addFetch(this.consumer.topic, ((KafkaPartition)rc.getKey()).getPartitionId(), ((Long)((MutablePair)rc.getValue()).left).longValue(), cons.getBufferSize());
                    FetchRequest req = frb.build();
                    SimpleConsumer ksc = new SimpleConsumer(bk.host(), bk.port(), cons.getTimeout(), cons.getBufferSize(), cons.getClientId());
                    FetchResponse fetchResponse = ksc.fetch(req);
                    Integer count = 0;
                    for (MessageAndOffset msg : fetchResponse.messageSet(this.consumer.topic, kp.getPartitionId())) {
                        KafkaConsumer.KafkaMessage kafkaMessage = new KafkaConsumer.KafkaMessage(kp, msg.message(), msg.offset());
                        this.emitTuple(kafkaMessage);
                        this.offsetStats.put(kp, msg.offset());
                        if (!(count = Integer.valueOf(count + 1)).equals(((MutablePair)rc.getValue()).right)) continue;
                        continue block2;
                    }
                }
            }
            if (windowId == this.windowDataManager.getLargestCompletedWindow()) {
                SimpleKafkaConsumer cons = (SimpleKafkaConsumer)this.getConsumer();
                HashMap<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>(cons.getCurrentOffsets());
                for (Map.Entry<KafkaPartition, Long> e : this.offsetStats.entrySet()) {
                    currentOffsets.put(e.getKey(), e.getValue() + 1L);
                }
                cons.resetOffset(currentOffsets);
                cons.start();
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void endWindow() {
        if (this.getConsumer() instanceof SimpleKafkaConsumer) {
            HashMap<KafkaPartition, Long> carryOn = new HashMap<KafkaPartition, Long>(this.offsetStats);
            this.offsetTrackHistory.add((Pair<Long, Map<KafkaPartition, Long>>)Pair.of((Object)this.currentWindowId, carryOn));
        }
        if (this.currentWindowId > this.windowDataManager.getLargestCompletedWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
    }

    public void checkpointed(long windowId) {
        this.getConsumer().commitOffset();
    }

    public void beforeCheckpoint(long windowId) {
    }

    public void committed(long windowId) {
        if (this.getConsumer() instanceof SimpleKafkaConsumer) {
            SimpleKafkaConsumer cons = (SimpleKafkaConsumer)this.getConsumer();
            Iterator<Pair<Long, Map<KafkaPartition, Long>>> iter = this.offsetTrackHistory.iterator();
            while (iter.hasNext()) {
                Pair<Long, Map<KafkaPartition, Long>> item = iter.next();
                if ((Long)item.getLeft() < windowId) {
                    iter.remove();
                    continue;
                }
                if ((Long)item.getLeft() != windowId) break;
                if (logger.isDebugEnabled()) {
                    logger.debug("report offsets {} ", (Object)Joiner.on((char)';').withKeyValueSeparator("=").join((Map)item.getRight()));
                }
                this.context.setCounters((Object)cons.getConsumerStats((Map)item.getRight()));
                break;
            }
        }
        try {
            this.windowDataManager.committed(windowId);
        }
        catch (IOException e) {
            throw new RuntimeException("deleting state", e);
        }
    }

    public void activate(Context.OperatorContext ctx) {
        if ((Long)this.context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != -1L && (Long)this.context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        this.consumer.start();
    }

    public void deactivate() {
        this.consumer.stop();
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            return;
        }
        int count = this.consumer.messageSize() + (this.pendingMessage != null ? 1 : 0);
        if (this.maxTuplesPerWindow > 0) {
            count = Math.min(count, this.maxTuplesPerWindow - this.emitCount);
        }
        KafkaConsumer.KafkaMessage message = null;
        for (int i = 0; i < count; ++i) {
            if (this.pendingMessage != null) {
                message = this.pendingMessage;
                this.pendingMessage = null;
            } else {
                message = this.consumer.pollMessage();
            }
            if (this.emitCount > 0 && this.maxTotalMsgSizePerWindow - this.emitTotalMsgSize < (long)message.msg.size()) {
                this.pendingMessage = message;
                break;
            }
            this.emitTuple(message);
            ++this.emitCount;
            this.emitTotalMsgSize += (long)message.msg.size();
            this.offsetStats.put(message.kafkaPart, message.offSet);
            MutablePair<Long, Integer> offsetAndCount = this.currentWindowRecoveryState.get(message.kafkaPart);
            if (offsetAndCount == null) {
                this.currentWindowRecoveryState.put(message.kafkaPart, (MutablePair<Long, Integer>)new MutablePair((Object)message.offSet, (Object)1));
                continue;
            }
            offsetAndCount.setRight((Object)((Integer)offsetAndCount.right + 1));
        }
    }

    public void setConsumer(K consumer) {
        this.consumer = consumer;
    }

    public KafkaConsumer getConsumer() {
        return this.consumer;
    }

    @Deprecated
    public void setTopic(String topic) {
        this.consumer.setTopic(topic);
    }

    @Deprecated
    public void setZookeeper(String zookeeperString) {
        this.consumer.setZookeeper(zookeeperString);
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions) {
        this.lastRepartitionTime = System.currentTimeMillis();
    }

    public Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions, Partitioner.PartitioningContext context) {
        this.getConsumer().initBrokers();
        boolean isInitialParitition = true;
        if (partitions.iterator().hasNext()) {
            isInitialParitition = partitions.iterator().next().getStats() == null;
        }
        AbstractList newPartitions = null;
        Map<KafkaPartition, Long> initOffset = null;
        if (isInitialParitition && this.offsetManager != null) {
            initOffset = this.offsetManager.loadInitialOffsets();
            logger.info("Initial offsets: {} ", (Object)("{ " + Joiner.on((String)", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }"));
        }
        HashSet deletedOperators = Sets.newHashSet();
        Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> resultPartitions = partitions;
        boolean numPartitionsChanged = false;
        switch (this.strategy) {
            case ONE_TO_ONE: {
                Map<String, List<PartitionMetadata>> kafkaPartitions;
                if (isInitialParitition) {
                    this.lastRepartitionTime = System.currentTimeMillis();
                    logger.info("[ONE_TO_ONE]: Initializing partition(s)");
                    kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(this.getConsumer().brokers, this.getConsumer().getTopic());
                    newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
                    for (Map.Entry<String, List<PartitionMetadata>> kp : kafkaPartitions.entrySet()) {
                        String clusterId = kp.getKey();
                        for (PartitionMetadata partitionMetadata : kp.getValue()) {
                            logger.info("[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", new Object[]{clusterId, this.getConsumer().topic, partitionMetadata.partitionId()});
                            newPartitions.add(this.createPartition(Sets.newHashSet((Object[])new KafkaPartition[]{new KafkaPartition(clusterId, this.consumer.topic, partitionMetadata.partitionId())}), initOffset));
                        }
                    }
                    resultPartitions = newPartitions;
                    numPartitionsChanged = true;
                    break;
                }
                if (this.newWaitingPartition.size() == 0) break;
                for (KafkaPartition newPartition : this.newWaitingPartition) {
                    logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", new Object[]{newPartition.getClusterId(), this.getConsumer().topic, newPartition.getPartitionId()});
                    partitions.add(this.createPartition(Sets.newHashSet((Object[])new KafkaPartition[]{newPartition}), null));
                }
                this.newWaitingPartition.clear();
                resultPartitions = partitions;
                numPartitionsChanged = true;
                break;
            }
            case ONE_TO_MANY: {
                if (this.getConsumer() instanceof HighlevelKafkaConsumer) {
                    throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
                }
                if (!isInitialParitition && this.newWaitingPartition.size() == 0) break;
                this.lastRepartitionTime = System.currentTimeMillis();
                logger.info("[ONE_TO_MANY]: Initializing partition(s)");
                Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(this.getConsumer().brokers, this.getConsumer().getTopic());
                int size = this.initialPartitionCount;
                Set[] kps = (Set[])Array.newInstance(new HashSet().getClass(), size);
                int i = 0;
                for (Map.Entry<String, List<PartitionMetadata>> entry : kafkaPartitions.entrySet()) {
                    String clusterId = entry.getKey();
                    for (PartitionMetadata pm : entry.getValue()) {
                        if (kps[i % size] == null) {
                            kps[i % size] = new HashSet();
                        }
                        kps[i % size].add(new KafkaPartition(clusterId, this.consumer.topic, pm.partitionId()));
                        ++i;
                    }
                }
                size = i > size ? size : i;
                newPartitions = new ArrayList(size);
                for (i = 0; i < size; ++i) {
                    logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", (Object)StringUtils.join((Iterable)kps[i], (String)", "));
                    newPartitions.add(this.createPartition(kps[i], initOffset));
                }
                for (Partitioner.Partition partition : partitions) {
                    deletedOperators.add(((AbstractKafkaInputOperator)partition.getPartitionedInstance()).operatorId);
                }
                this.newWaitingPartition.clear();
                resultPartitions = newPartitions;
                numPartitionsChanged = true;
                break;
            }
            case ONE_TO_MANY_HEURISTIC: {
                throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
            }
        }
        if (numPartitionsChanged) {
            List managers = this.windowDataManager.partition(resultPartitions.size(), (Set)deletedOperators);
            int i = 0;
            for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition : resultPartitions) {
                ((AbstractKafkaInputOperator)partition.getPartitionedInstance()).setWindowDataManager((WindowDataManager)managers.get(i++));
            }
        }
        return resultPartitions;
    }

    @Deprecated
    protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<WindowDataManager> newManagers) {
        return this.createPartition(pIds, initOffsets);
    }

    protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets) {
        DefaultPartition p = new DefaultPartition(KryoCloneUtils.cloneObject((Object)this));
        if (((AbstractKafkaInputOperator)p.getPartitionedInstance()).getConsumer() instanceof SimpleKafkaConsumer) {
            ((AbstractKafkaInputOperator)p.getPartitionedInstance()).getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
            if (initOffsets != null) {
                ((AbstractKafkaInputOperator)p.getPartitionedInstance()).offsetStats.putAll(((AbstractKafkaInputOperator)p.getPartitionedInstance()).getConsumer().getCurrentOffsets());
            }
        }
        PartitionInfo pif = new PartitionInfo();
        pif.kpids = pIds;
        this.currentPartitionInfo.add(pif);
        return p;
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
        StatsListener.Response resp = new StatsListener.Response();
        List<KafkaConsumer.KafkaMeterStats> kstats = this.extractKafkaStats(stats);
        resp.repartitionRequired = this.isPartitionRequired(stats.getOperatorId(), kstats);
        return resp;
    }

    private void updateOffsets(List<KafkaConsumer.KafkaMeterStats> kstats) {
        Map<KafkaPartition, Long> offsetsForPartitions;
        if (this.offsetManager != null && (offsetsForPartitions = KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions(kstats)).size() > 0) {
            logger.debug("Passing offset updates to offset manager");
            this.offsetManager.updateOffsets(offsetsForPartitions);
        }
    }

    private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats) {
        LinkedList<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
        for (Stats.OperatorStats os : stats.getLastWindowedStats()) {
            if (os == null || !(os.counters instanceof KafkaConsumer.KafkaMeterStats)) continue;
            kmsList.add((KafkaConsumer.KafkaMeterStats)os.counters);
        }
        return kmsList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isPartitionRequired(int opid, List<KafkaConsumer.KafkaMeterStats> kstats) {
        long t = System.currentTimeMillis();
        if (kstats.size() > 0) {
            logger.debug("Checking offset updates for offset manager");
            this.updateOffsets(kstats);
        }
        if (t - this.lastCheckTime < this.repartitionCheckInterval) {
            return false;
        }
        if (this.repartitionInterval < 0L) {
            return false;
        }
        if (t - this.lastRepartitionTime < this.repartitionInterval) {
            return false;
        }
        this.kafkaStatsHolder.put(opid, kstats);
        if (this.kafkaStatsHolder.size() != this.currentPartitionInfo.size() || this.currentPartitionInfo.size() == 0) {
            return false;
        }
        try {
            HashSet<KafkaPartition> existingIds = new HashSet<KafkaPartition>();
            for (PartitionInfo pio : this.currentPartitionInfo) {
                existingIds.addAll(pio.kpids);
            }
            Map<String, List<PartitionMetadata>> partitionsMeta = KafkaMetadataUtil.getPartitionsForTopic(this.consumer.brokers, this.consumer.getTopic());
            if (partitionsMeta == null) {
                boolean pio = false;
                return pio;
            }
            for (Map.Entry<String, List<PartitionMetadata>> en : partitionsMeta.entrySet()) {
                if (en.getValue() == null) continue;
                for (PartitionMetadata pm : en.getValue()) {
                    KafkaPartition pa = new KafkaPartition(en.getKey(), this.consumer.topic, pm.partitionId());
                    if (existingIds.contains(pa)) continue;
                    this.newWaitingPartition.add(pa);
                }
            }
            if (this.newWaitingPartition.size() != 0) {
                this.lastRepartitionTime = t;
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.lastCheckTime = System.currentTimeMillis();
        }
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public void setInitialPartitionCount(int partitionCount) {
        this.initialPartitionCount = partitionCount;
    }

    public int getInitialPartitionCount() {
        return this.initialPartitionCount;
    }

    public long getMsgRateUpperBound() {
        return this.msgRateUpperBound;
    }

    public void setMsgRateUpperBound(long msgRateUpperBound) {
        this.msgRateUpperBound = msgRateUpperBound;
    }

    public long getByteRateUpperBound() {
        return this.byteRateUpperBound;
    }

    public void setByteRateUpperBound(long byteRateUpperBound) {
        this.byteRateUpperBound = byteRateUpperBound;
    }

    public void setInitialOffset(String initialOffset) {
        this.consumer.initialOffset = initialOffset;
    }

    public void setOffsetManager(OffsetManager offsetManager) {
        this.offsetManager = offsetManager;
    }

    public OffsetManager getOffsetManager() {
        return this.offsetManager;
    }

    public void setRepartitionCheckInterval(long repartitionCheckInterval) {
        this.repartitionCheckInterval = repartitionCheckInterval;
    }

    public long getRepartitionCheckInterval() {
        return this.repartitionCheckInterval;
    }

    public void setRepartitionInterval(long repartitionInterval) {
        this.repartitionInterval = repartitionInterval;
    }

    public long getRepartitionInterval() {
        return this.repartitionInterval;
    }

    public void setStrategy(String policy) {
        this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
    }

    static class PartitionInfo {
        Set<KafkaPartition> kpids;
        long msgRateLeft;
        long byteRateLeft;

        PartitionInfo() {
        }
    }

    public static enum PartitionStrategy {
        ONE_TO_ONE,
        ONE_TO_MANY,
        ONE_TO_MANY_HEURISTIC;

    }
}

