/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import kafka.common.ErrorMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Connection;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.KafkaConsumerDefaults;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.PartitionNotFoundException;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.listener.OffsetManager;
import org.springframework.util.Assert;

public abstract class AbstractOffsetManager
implements OffsetManager,
DisposableBean {
    protected final Log log = LogFactory.getLog(this.getClass());
    protected String consumerId = "groupid";
    protected long referenceTimestamp = KafkaConsumerDefaults.DEFAULT_OFFSET_RESET;
    protected ConnectionFactory connectionFactory;
    protected Map<Partition, Long> initialOffsets;

    public AbstractOffsetManager(ConnectionFactory connectionFactory) {
        this(connectionFactory, new HashMap<Partition, Long>());
    }

    public AbstractOffsetManager(ConnectionFactory connectionFactory, Map<Partition, Long> initialOffsets) {
        Assert.notNull((Object)connectionFactory, (String)"A 'connectionFactory' can't be null");
        Assert.notNull(initialOffsets, (String)"An initialOffsets can't be null");
        this.connectionFactory = connectionFactory;
        this.initialOffsets = initialOffsets;
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public void setConsumerId(String consumerId) {
        this.consumerId = consumerId;
    }

    public void setReferenceTimestamp(long referenceTimestamp) {
        this.referenceTimestamp = referenceTimestamp;
    }

    public void destroy() throws Exception {
        try {
            this.flush();
        }
        catch (IOException e) {
            this.log.error((Object)"Error while flushing the OffsetManager", (Throwable)e);
        }
        try {
            this.close();
        }
        catch (IOException e) {
            this.log.error((Object)"Error while closing the OffsetManager", (Throwable)e);
        }
    }

    @Override
    public final synchronized void updateOffset(Partition partition, long offset) {
        this.doUpdateOffset(partition, offset);
    }

    @Override
    public final synchronized long getOffset(Partition partition) {
        Long storedOffset = this.doGetOffset(partition);
        if (storedOffset == null) {
            if (this.initialOffsets.containsKey(partition)) {
                return this.initialOffsets.get(partition);
            }
            BrokerAddress leader = this.connectionFactory.getLeader(partition);
            if (leader == null) {
                throw new PartitionNotFoundException(partition);
            }
            Connection connection = this.connectionFactory.connect(leader);
            Result<Long> offsetResult = connection.fetchInitialOffset(this.referenceTimestamp, partition);
            if (offsetResult.getErrors().size() > 0) {
                throw new ConsumerException(ErrorMapping.exceptionFor((short)offsetResult.getError(partition)));
            }
            if (!offsetResult.getResults().containsKey(partition)) {
                throw new IllegalStateException("Result does not contain an expected value");
            }
            return offsetResult.getResult(partition);
        }
        return storedOffset;
    }

    @Override
    public synchronized void resetOffsets(Collection<Partition> partitionsToReset) {
        for (Partition partition : partitionsToReset) {
            this.doRemoveOffset(partition);
            this.initialOffsets.remove(partition);
        }
    }

    @Override
    public synchronized void deleteOffset(Partition partition) {
        this.doRemoveOffset(partition);
    }

    protected abstract void doUpdateOffset(Partition var1, long var2);

    protected abstract void doRemoveOffset(Partition var1);

    protected abstract Long doGetOffset(Partition var1);
}

