package org.apache.kafka.raft.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.common.serialization.RecordSerde;

/* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulator.class */
public class BatchAccumulator<T> implements Closeable {
    private final int epoch;
    private final Time time;
    private final int lingerMs;
    private final int maxBatchSize;
    private final int maxNumberOfBatches;
    private final Compression compression;
    private final MemoryPool memoryPool;
    private final RecordSerde<T> serde;
    private final SimpleTimer lingerTimer = new SimpleTimer();
    private final AtomicLong drainOffset = new AtomicLong(Long.MAX_VALUE);
    private final ConcurrentLinkedQueue<CompletedBatch<T>> completed = new ConcurrentLinkedQueue<>();
    private volatile DrainStatus drainStatus = DrainStatus.NONE;
    private final ReentrantLock appendLock = new ReentrantLock();
    private long nextOffset;
    private BatchBuilder<T> currentBatch;

    /* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulator$CompletedBatch.class */
    public static class CompletedBatch<T> {
        public final long baseOffset;
        public final int numRecords;
        public final Optional<List<T>> records;
        public final MemoryRecords data;
        private final MemoryPool pool;
        private final ByteBuffer initialBuffer;

        private CompletedBatch(long j, List<T> list, MemoryRecords memoryRecords, MemoryPool memoryPool, ByteBuffer byteBuffer) {
            this.baseOffset = j;
            this.records = Optional.of(list);
            this.numRecords = list.size();
            this.data = memoryRecords;
            this.pool = memoryPool;
            this.initialBuffer = byteBuffer;
            validateContruction();
        }

        private CompletedBatch(long j, int i, MemoryRecords memoryRecords, MemoryPool memoryPool, ByteBuffer byteBuffer) {
            this.baseOffset = j;
            this.records = Optional.empty();
            this.numRecords = i;
            this.data = memoryRecords;
            this.pool = memoryPool;
            this.initialBuffer = byteBuffer;
            validateContruction();
        }

        private void validateContruction() {
            Objects.requireNonNull(this.data.firstBatch(), "Expected memory records to contain one batch");
            if (this.numRecords <= 0) {
                throw new IllegalArgumentException(String.format("Completed batch must contain at least one record: %s", Integer.valueOf(this.numRecords)));
            }
        }

        public int sizeInBytes() {
            return this.data.sizeInBytes();
        }

        public void release() {
            this.pool.release(this.initialBuffer);
        }

        public long appendTimestamp() {
            return this.data.firstBatch().maxTimestamp();
        }

        public boolean drainable(long j) {
            return (this.baseOffset + ((long) this.numRecords)) - 1 < j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulator$DrainStatus.class */
    public enum DrainStatus {
        STARTED,
        FINISHED,
        NONE
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulator$MemoryRecordsCreator.class */
    public interface MemoryRecordsCreator {
        MemoryRecords create(long j, int i, Compression compression, ByteBuffer byteBuffer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/raft/internals/BatchAccumulator$SimpleTimer.class */
    public static class SimpleTimer {
        private final AtomicLong deadlineMs = new AtomicLong(Long.MAX_VALUE);

        private SimpleTimer() {
        }

        boolean isRunning() {
            return this.deadlineMs.get() != Long.MAX_VALUE;
        }

        void reset(long j) {
            this.deadlineMs.set(j);
        }

        long remainingMs(long j) {
            return Math.max(0L, this.deadlineMs.get() - j);
        }
    }

    public BatchAccumulator(int i, long j, int i2, int i3, int i4, MemoryPool memoryPool, Time time, Compression compression, RecordSerde<T> recordSerde) {
        this.epoch = i;
        this.lingerMs = i2;
        this.maxBatchSize = i3;
        this.maxNumberOfBatches = i4;
        this.memoryPool = memoryPool;
        this.time = time;
        this.compression = compression;
        this.serde = recordSerde;
        this.nextOffset = j;
    }

    public long append(int i, List<T> list, boolean z) {
        int size = this.completed.size();
        if (i < this.epoch) {
            throw new NotLeaderException("Append failed because the given epoch " + i + " is stale. Current leader epoch = " + epoch());
        }
        if (i > this.epoch) {
            throw new IllegalArgumentException("Attempt to append from epoch " + i + " which is larger than the current epoch " + this.epoch);
        }
        if (size >= this.maxNumberOfBatches) {
            throw new IllegalStateException(String.format("Attempting to append records when the number of batches %s reached %s", Integer.valueOf(size), Integer.valueOf(this.maxNumberOfBatches)));
        }
        ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
        this.appendLock.lock();
        try {
            long size2 = (this.nextOffset + list.size()) - 1;
            maybeCompleteDrain();
            BatchBuilder<T> maybeAllocateBatch = maybeAllocateBatch(list, objectSerializationCache);
            if (maybeAllocateBatch == null) {
                throw new BufferAllocationException("Append failed because we failed to allocate memory to write the batch");
            }
            if (z) {
                this.drainOffset.compareAndSet(Long.MAX_VALUE, this.nextOffset);
            }
            Iterator<T> it = list.iterator();
            while (it.hasNext()) {
                maybeAllocateBatch.appendRecord(it.next(), objectSerializationCache);
            }
            maybeResetLinger();
            this.nextOffset = size2 + 1;
            this.appendLock.unlock();
            return size2;
        } catch (Throwable th) {
            this.appendLock.unlock();
            throw th;
        }
    }

    private void maybeResetLinger() {
        if (this.lingerTimer.isRunning()) {
            return;
        }
        this.lingerTimer.reset(this.time.milliseconds() + this.lingerMs);
    }

    private BatchBuilder<T> maybeAllocateBatch(Collection<T> collection, ObjectSerializationCache objectSerializationCache) {
        if (this.currentBatch == null) {
            startNewBatch();
        }
        if (this.currentBatch != null) {
            OptionalInt bytesNeeded = this.currentBatch.bytesNeeded(collection, objectSerializationCache);
            if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > this.maxBatchSize) {
                throw new RecordBatchTooLargeException(String.format("The total record(s) size of %d exceeds the maximum allowed batch size of %d", Integer.valueOf(bytesNeeded.getAsInt()), Integer.valueOf(this.maxBatchSize)));
            }
            if (bytesNeeded.isPresent()) {
                completeCurrentBatch();
                startNewBatch();
            }
        }
        return this.currentBatch;
    }

    private void completeCurrentBatch() {
        this.completed.add(new CompletedBatch<>(this.currentBatch.baseOffset(), this.currentBatch.records(), this.currentBatch.build(), this.memoryPool, this.currentBatch.initialBuffer()));
        this.currentBatch = null;
    }

    public void allowDrain() {
        this.drainOffset.set(Long.MAX_VALUE);
    }

    public long appendControlMessages(MemoryRecordsCreator memoryRecordsCreator) {
        this.appendLock.lock();
        try {
            ByteBuffer tryAllocate = this.memoryPool.tryAllocate(this.maxBatchSize);
            if (tryAllocate == null) {
                throw new IllegalStateException("Could not allocate buffer for the control record");
            }
            try {
                forceDrain();
                MemoryRecords create = memoryRecordsCreator.create(this.nextOffset, this.epoch, this.compression, tryAllocate);
                int validateMemoryRecordsAndReturnCount = validateMemoryRecordsAndReturnCount(create);
                this.completed.add(new CompletedBatch<>(this.nextOffset, validateMemoryRecordsAndReturnCount, create, this.memoryPool, tryAllocate));
                this.nextOffset += validateMemoryRecordsAndReturnCount;
                long j = this.nextOffset - 1;
                this.appendLock.unlock();
                return j;
            } catch (Exception e) {
                this.memoryPool.release(tryAllocate);
                throw e;
            }
        } catch (Throwable th) {
            this.appendLock.unlock();
            throw th;
        }
    }

    private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) {
        Iterator<T> it = memoryRecords.batches().iterator();
        if (!it.hasNext()) {
            throw new IllegalArgumentException("valueCreator didn't create a batch");
        }
        MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) it.next();
        Integer countOrNull = mutableRecordBatch.countOrNull();
        if (!mutableRecordBatch.isControlBatch()) {
            throw new IllegalArgumentException("valueCreator didn't create a control batch");
        }
        if (mutableRecordBatch.baseOffset() != this.nextOffset) {
            throw new IllegalArgumentException(String.format("Expected a base offset of %d but got %d", Long.valueOf(this.nextOffset), Long.valueOf(mutableRecordBatch.baseOffset())));
        }
        if (mutableRecordBatch.partitionLeaderEpoch() != this.epoch) {
            throw new IllegalArgumentException(String.format("Expected a partition leader epoch of %d but got %d", Integer.valueOf(this.epoch), Integer.valueOf(mutableRecordBatch.partitionLeaderEpoch())));
        }
        if (countOrNull == null) {
            throw new IllegalArgumentException("valueCreator didn't create a batch with the count");
        }
        if (countOrNull.intValue() < 1) {
            throw new IllegalArgumentException("valueCreator didn't create at least one control record");
        }
        if (it.hasNext()) {
            throw new IllegalArgumentException("valueCreator created more than one batch");
        }
        return countOrNull.intValue();
    }

    public long appendVotersRecord(VotersRecord votersRecord, long j) {
        return appendControlMessages((j2, i, compression, byteBuffer) -> {
            return MemoryRecords.withVotersRecord(j2, j, i, byteBuffer, votersRecord);
        });
    }

    public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long j) {
        appendControlMessages((j2, i, compression, byteBuffer) -> {
            return MemoryRecords.withLeaderChangeMessage(j2, j, i, byteBuffer, leaderChangeMessage);
        });
    }

    public void appendSnapshotHeaderRecord(SnapshotHeaderRecord snapshotHeaderRecord, long j) {
        appendControlMessages((j2, i, compression, byteBuffer) -> {
            return MemoryRecords.withSnapshotHeaderRecord(j2, j, i, byteBuffer, snapshotHeaderRecord);
        });
    }

    public void appendSnapshotFooterRecord(SnapshotFooterRecord snapshotFooterRecord, long j) {
        appendControlMessages((j2, i, compression, byteBuffer) -> {
            return MemoryRecords.withSnapshotFooterRecord(j2, j, i, byteBuffer, snapshotFooterRecord);
        });
    }

    public void forceDrain() {
        this.appendLock.lock();
        try {
            this.drainStatus = DrainStatus.STARTED;
            maybeCompleteDrain();
        } finally {
            this.appendLock.unlock();
        }
    }

    private void maybeCompleteDrain() {
        if (this.drainStatus == DrainStatus.STARTED) {
            if (this.currentBatch != null && this.currentBatch.nonEmpty()) {
                completeCurrentBatch();
            }
            this.lingerTimer.reset(Long.MAX_VALUE);
            this.drainStatus = DrainStatus.FINISHED;
        }
    }

    private void startNewBatch() {
        ByteBuffer tryAllocate = this.memoryPool.tryAllocate(this.maxBatchSize);
        if (tryAllocate != null) {
            this.currentBatch = new BatchBuilder<>(tryAllocate, this.serde, this.compression, this.nextOffset, this.time.milliseconds(), this.epoch, this.maxBatchSize);
        }
    }

    public boolean needsDrain(long j) {
        return timeUntilDrain(j) <= 0;
    }

    public long timeUntilDrain(long j) {
        if (((Boolean) Optional.ofNullable(this.completed.peek()).map(completedBatch -> {
            return Boolean.valueOf(completedBatch.drainable(this.drainOffset.get()));
        }).orElse(false)).booleanValue()) {
            return 0L;
        }
        return this.lingerTimer.remainingMs(j);
    }

    public int epoch() {
        return this.epoch;
    }

    public List<CompletedBatch<T>> drain() {
        return drain(this.drainOffset.get());
    }

    private List<CompletedBatch<T>> drain(long j) {
        if (this.drainStatus == DrainStatus.NONE) {
            this.drainStatus = DrainStatus.STARTED;
        }
        if (this.appendLock.tryLock()) {
            try {
                maybeCompleteDrain();
            } finally {
                this.appendLock.unlock();
            }
        }
        if (this.drainStatus != DrainStatus.FINISHED) {
            return Collections.emptyList();
        }
        this.drainStatus = DrainStatus.NONE;
        return drainCompleted(j);
    }

    private List<CompletedBatch<T>> drainCompleted(long j) {
        ArrayList arrayList = new ArrayList();
        while (true) {
            CompletedBatch<T> peek = this.completed.peek();
            if (peek == null || !peek.drainable(j)) {
                break;
            }
            this.completed.poll();
            arrayList.add(peek);
        }
        return arrayList;
    }

    public boolean isEmpty() {
        return !this.lingerTimer.isRunning();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.appendLock.lock();
        try {
            drain(Long.MAX_VALUE).forEach((v0) -> {
                v0.release();
            });
        } finally {
            this.appendLock.unlock();
        }
    }
}
