package org.apache.apex.malhar.kafka;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.kafka.AbstractKafkaPartitioner;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.class */
public abstract class AbstractKafkaInputOperator implements InputOperator, Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointListener, Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback {
    private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);

    @NotNull
    private String[] clusters;

    @NotNull
    private String[] topics;
    private transient int operatorId;
    private Properties consumerProps;
    private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
    private transient String applicationName;
    private transient AbstractKafkaPartitioner partitioner;
    private transient long currentWindowId;

    @AutoMetric
    private transient KafkaMetrics metrics;
    private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap();
    private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset = new HashMap();
    private int initialPartitionCount = 1;
    private long repartitionInterval = 30000;
    private long repartitionCheckInterval = 5000;

    @Min(1)
    private int maxTuplesPerWindow = Integer.MAX_VALUE;
    private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
    private long metricsRefreshInterval = 5000;
    private long consumerTimeout = 5000;
    private int holdingBufferSize = 1024;
    private final transient KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper();
    private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
    private transient int emitCount = 0;
    private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory = new LinkedList();
    private transient long lastCheckTime = 0;
    private transient long lastRepartitionTime = 0;
    private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();

    /* loaded from: input_file:org/apache/apex/malhar/kafka/AbstractKafkaInputOperator$InitialOffset.class */
    public enum InitialOffset {
        EARLIEST,
        LATEST,
        APPLICATION_OR_EARLIEST,
        APPLICATION_OR_LATEST
    }

    public void activate(Context.OperatorContext operatorContext) {
        this.consumerWrapper.start(isIdempotent());
    }

    public void deactivate() {
        this.consumerWrapper.stop();
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        if (this.initialOffset == InitialOffset.LATEST || this.initialOffset == InitialOffset.EARLIEST) {
            return;
        }
        Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> it = this.offsetHistory.iterator();
        while (it.hasNext()) {
            Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> next = it.next();
            if (((Long) next.getLeft()).longValue() <= j) {
                if (((Long) next.getLeft()).longValue() == j) {
                    this.consumerWrapper.commitOffsets((Map) next.getRight());
                }
                it.remove();
            }
        }
        if (isIdempotent()) {
            try {
                this.windowDataManager.deleteUpTo(this.operatorId, j);
            } catch (IOException e) {
                DTThrowable.rethrow(e);
            }
        }
    }

    public void emitTuples() {
        int messageSize = this.consumerWrapper.messageSize();
        if (this.maxTuplesPerWindow > 0) {
            messageSize = Math.min(messageSize, this.maxTuplesPerWindow - this.emitCount);
        }
        for (int i = 0; i < messageSize; i++) {
            Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage = this.consumerWrapper.pollMessage();
            ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) pollMessage.getRight();
            emitTuple((String) pollMessage.getLeft(), consumerRecord);
            AbstractKafkaPartitioner.PartitionMeta partitionMeta = new AbstractKafkaPartitioner.PartitionMeta((String) pollMessage.getLeft(), consumerRecord.topic(), consumerRecord.partition());
            this.offsetTrack.put(partitionMeta, Long.valueOf(consumerRecord.offset() + 1));
            if (isIdempotent() && !this.windowStartOffset.containsKey(partitionMeta)) {
                this.windowStartOffset.put(partitionMeta, Long.valueOf(consumerRecord.offset()));
            }
        }
        this.emitCount += messageSize;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void emitTuple(String str, ConsumerRecord<byte[], byte[]> consumerRecord);

    public void beginWindow(long j) {
        this.emitCount = 0;
        this.currentWindowId = j;
        this.windowStartOffset.clear();
        if (!isIdempotent() || j > this.windowDataManager.getLargestRecoveryWindow()) {
            this.consumerWrapper.afterReplay();
        } else {
            replay(j);
        }
    }

    private void replay(long j) {
        try {
            this.consumerWrapper.emitImmediately((Map) this.windowDataManager.load(this.operatorId, j));
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
    }

    public void endWindow() {
        this.offsetHistory.add(Pair.of(Long.valueOf(this.currentWindowId), new HashMap(this.offsetTrack)));
        this.metrics.updateMetrics(this.clusters, this.consumerWrapper.getAllConsumerMetrics());
        if (isIdempotent()) {
            try {
                HashMap hashMap = new HashMap();
                for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> entry : this.windowStartOffset.entrySet()) {
                    hashMap.put(entry.getKey(), new MutablePair(entry.getValue(), Long.valueOf(this.offsetTrack.get(entry.getKey()).longValue() - entry.getValue().longValue())));
                }
                this.windowDataManager.save(hashMap, this.operatorId, this.currentWindowId);
            } catch (IOException e) {
                DTThrowable.rethrow(e);
            }
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.applicationName = (String) operatorContext.getValue(Context.DAGContext.APPLICATION_NAME);
        this.consumerWrapper.create(this);
        this.metrics = new KafkaMetrics(this.metricsRefreshInterval);
        this.windowDataManager.setup(operatorContext);
        this.operatorId = operatorContext.getId();
    }

    public void teardown() {
        this.windowDataManager.teardown();
    }

    private void initPartitioner() {
        if (this.partitioner == null) {
            logger.info("Initialize Partitioner");
            switch (this.strategy) {
                case ONE_TO_ONE:
                    this.partitioner = new OneToOnePartitioner(this.clusters, this.topics, this);
                    break;
                case ONE_TO_MANY:
                    this.partitioner = new OneToManyPartitioner(this.clusters, this.topics, this);
                    break;
                case ONE_TO_MANY_HEURISTIC:
                    throw new UnsupportedOperationException("Not implemented yet");
                default:
                    throw new RuntimeException("Invalid strategy");
            }
            logger.info("Actual Partitioner is {}", this.partitioner.getClass());
        }
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.repartitionInterval < 0 || this.repartitionCheckInterval < 0 || currentTimeMillis - this.lastCheckTime < this.repartitionCheckInterval || currentTimeMillis - this.lastRepartitionTime < this.repartitionInterval) {
            StatsListener.Response response = new StatsListener.Response();
            response.repartitionRequired = false;
            return response;
        }
        try {
            logger.debug("Process stats");
            initPartitioner();
            StatsListener.Response processStats = this.partitioner.processStats(batchedOperatorStats);
            this.lastCheckTime = System.currentTimeMillis();
            return processStats;
        } catch (Throwable th) {
            this.lastCheckTime = System.currentTimeMillis();
            throw th;
        }
    }

    public Collection<Partitioner.Partition<AbstractKafkaInputOperator>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
        logger.debug("Define partitions");
        initPartitioner();
        return this.partitioner.definePartitions(collection, partitioningContext);
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator>> map) {
        this.lastRepartitionTime = System.currentTimeMillis();
        initPartitioner();
        this.partitioner.partitioned(map);
    }

    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
        if (logger.isDebugEnabled()) {
            logger.debug("Commit offsets complete {} ", Joiner.on(';').withKeyValueSeparator("=").join(map));
        }
        if (exc != null) {
            logger.warn("Exceptions in committing offsets {} : {} ", Joiner.on(';').withKeyValueSeparator("=").join(map), exc);
        }
    }

    public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> set) {
        this.assignment = set;
    }

    public Set<AbstractKafkaPartitioner.PartitionMeta> assignment() {
        return this.assignment;
    }

    private boolean isIdempotent() {
        return (this.windowDataManager == null || (this.windowDataManager instanceof WindowDataManager.NoopWindowDataManager)) ? false : true;
    }

    public void setInitialPartitionCount(int i) {
        this.initialPartitionCount = i;
    }

    public int getInitialPartitionCount() {
        return this.initialPartitionCount;
    }

    public void setClusters(String str) {
        this.clusters = str.split(";");
    }

    public String getClusters() {
        return Joiner.on(';').join(this.clusters);
    }

    public void setTopics(String str) {
        this.topics = (String[]) Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(str), String.class);
    }

    public String getTopics() {
        return Joiner.on(", ").join(this.topics);
    }

    public void setStrategy(String str) {
        this.strategy = PartitionStrategy.valueOf(str.toUpperCase());
    }

    public String getStrategy() {
        return this.strategy.name();
    }

    public void setInitialOffset(String str) {
        this.initialOffset = InitialOffset.valueOf(str.toUpperCase());
    }

    public String getInitialOffset() {
        return this.initialOffset.name();
    }

    public String getApplicationName() {
        return this.applicationName;
    }

    public void setConsumerProps(Properties properties) {
        this.consumerProps = properties;
    }

    public Properties getConsumerProps() {
        return this.consumerProps;
    }

    public void setMaxTuplesPerWindow(int i) {
        this.maxTuplesPerWindow = i;
    }

    public int getMaxTuplesPerWindow() {
        return this.maxTuplesPerWindow;
    }

    public long getConsumerTimeout() {
        return this.consumerTimeout;
    }

    public void setConsumerTimeout(long j) {
        this.consumerTimeout = j;
    }

    public int getHoldingBufferSize() {
        return this.holdingBufferSize;
    }

    public void setHoldingBufferSize(int i) {
        this.holdingBufferSize = i;
    }

    public long getMetricsRefreshInterval() {
        return this.metricsRefreshInterval;
    }

    public void setMetricsRefreshInterval(long j) {
        this.metricsRefreshInterval = j;
    }

    public void setRepartitionCheckInterval(long j) {
        this.repartitionCheckInterval = j;
    }

    public long getRepartitionCheckInterval() {
        return this.repartitionCheckInterval;
    }

    public void setRepartitionInterval(long j) {
        this.repartitionInterval = j;
    }

    public long getRepartitionInterval() {
        return this.repartitionInterval;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack() {
        return this.offsetTrack;
    }
}
