package io.aeron.archive;

import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.checksum.Checksum;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.EnumSet;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.CountedErrorHandler;
import org.agrona.concurrent.UnsafeBuffer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/aeron/archive/ReplaySession.class */
public class ReplaySession implements Session, AutoCloseable {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ);
    private final long connectDeadlineMs;
    private final long correlationId;
    private final long sessionId;
    private final long recordingId;
    private final long startPosition;
    private long replayPosition;
    private long stopPosition;
    private long replayLimit;
    private volatile long segmentFileBasePosition;
    private int termBaseSegmentOffset;
    private int termOffset;
    private final int streamId;
    private final int termLength;
    private final int segmentLength;
    private final long replayBufferAddress;
    private final Checksum checksum;
    private final ExclusivePublication publication;
    private final ControlSession controlSession;
    private final CachedEpochClock epochClock;
    private final File archiveDir;
    private final Catalog catalog;
    private final Counter limitPosition;
    private final UnsafeBuffer replayBuffer;
    private final int mtuLength;
    private FileChannel fileChannel;
    private File segmentFile;
    private State state = State.INIT;
    private String errorMessage = null;
    private volatile boolean isAborted;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/aeron/archive/ReplaySession$State.class */
    public enum State {
        INIT,
        REPLAY,
        INACTIVE,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplaySession(long j, long j2, long j3, long j4, long j5, ControlSession controlSession, ControlResponseProxy controlResponseProxy, UnsafeBuffer unsafeBuffer, Catalog catalog, File file, CachedEpochClock cachedEpochClock, ExclusivePublication exclusivePublication, RecordingSummary recordingSummary, Counter counter, Checksum checksum) {
        this.controlSession = controlSession;
        this.sessionId = j3;
        this.correlationId = j5;
        this.recordingId = recordingSummary.recordingId;
        this.segmentLength = recordingSummary.segmentFileLength;
        this.termLength = recordingSummary.termBufferLength;
        this.streamId = recordingSummary.streamId;
        this.mtuLength = recordingSummary.mtuLength;
        this.epochClock = cachedEpochClock;
        this.archiveDir = file;
        this.publication = exclusivePublication;
        this.limitPosition = counter;
        this.replayBuffer = unsafeBuffer;
        this.replayBufferAddress = unsafeBuffer.addressOffset();
        this.catalog = catalog;
        this.checksum = checksum;
        this.startPosition = recordingSummary.startPosition;
        this.stopPosition = null == this.limitPosition ? recordingSummary.stopPosition : this.limitPosition.get();
        long j6 = j == -1 ? this.startPosition : j;
        long j7 = null == this.limitPosition ? this.stopPosition - j6 : Long.MAX_VALUE - j6;
        long min = j2 == -1 ? j7 : Math.min(j2, j7);
        if (min < 0) {
            close();
            String str = "replay recording " + this.recordingId + " - length must be positive: " + min;
            controlSession.attemptErrorResponse(j5, str, controlResponseProxy);
            throw new ArchiveException(str);
        }
        if (null != this.limitPosition) {
            long j8 = this.limitPosition.get();
            if (j8 < j6) {
                close();
                String str2 = "replay recording " + this.recordingId + " - " + j6 + " after current position of " + j8;
                controlSession.attemptErrorResponse(j5, str2, controlResponseProxy);
                throw new ArchiveException(str2);
            }
        }
        this.segmentFileBasePosition = AeronArchive.segmentFileBasePosition(this.startPosition, j6, this.termLength, this.segmentLength);
        this.replayPosition = j6;
        this.replayLimit = j6 + min;
        this.segmentFile = new File(file, Archive.segmentFileName(this.recordingId, this.segmentFileBasePosition));
        controlSession.sendOkResponse(j5, j3, controlResponseProxy);
        this.connectDeadlineMs = cachedEpochClock.time() + j4;
    }

    @Override // io.aeron.archive.Session
    public void close() {
        CountedErrorHandler countedErrorHandler = this.controlSession.archiveConductor().context().countedErrorHandler();
        CloseHelper.close(countedErrorHandler, this.publication);
        CloseHelper.close(countedErrorHandler, this.fileChannel);
    }

    @Override // io.aeron.archive.Session
    public long sessionId() {
        return this.sessionId;
    }

    @Override // io.aeron.archive.Session
    public int doWork() {
        int i = 0;
        if (this.isAborted) {
            state(State.INACTIVE);
        }
        try {
            if (State.INIT == this.state) {
                i = 0 + init();
            }
            if (State.REPLAY == this.state) {
                i += replay();
            }
        } catch (IOException e) {
            onError("IOException - " + e.getMessage() + " - " + this.segmentFile.getName());
            LangUtil.rethrowUnchecked(e);
        }
        if (State.INACTIVE == this.state) {
            closeRecordingSegment();
            state(State.DONE);
        }
        return i;
    }

    @Override // io.aeron.archive.Session
    public void abort() {
        this.isAborted = true;
    }

    @Override // io.aeron.archive.Session
    public boolean isDone() {
        return this.state == State.DONE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long recordingId() {
        return this.recordingId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public State state() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String replayChannel() {
        return this.publication.channel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int replayStreamId() {
        return this.publication.streamId();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long segmentFileBasePosition() {
        return this.segmentFileBasePosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Counter limitPosition() {
        return this.limitPosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendPendingError(ControlResponseProxy controlResponseProxy) {
        if (null == this.errorMessage || this.controlSession.isDone()) {
            return;
        }
        onPendingError(this.sessionId, this.recordingId, this.errorMessage);
        this.controlSession.attemptErrorResponse(this.correlationId, this.errorMessage, controlResponseProxy);
    }

    void onPendingError(long j, long j2, String str) {
    }

    private int init() throws IOException {
        if (this.replayBuffer.capacity() < this.mtuLength) {
            onError("fileIoMaxLength is less than mtuLength");
            return 0;
        }
        if (null == this.fileChannel) {
            if (!this.segmentFile.exists()) {
                if (this.epochClock.time() <= this.connectDeadlineMs) {
                    return 0;
                }
                onError("recording segment not found " + this.segmentFile);
                return 0;
            }
            int i = (int) ((this.replayPosition - (this.startPosition - (this.startPosition & (this.termLength - 1)))) & (this.segmentLength - 1));
            int computeTermIdFromPosition = LogBufferDescriptor.computeTermIdFromPosition(this.replayPosition, this.publication.positionBitsToShift(), this.publication.initialTermId());
            openRecordingSegment();
            this.termOffset = (int) (this.replayPosition & (this.termLength - 1));
            this.termBaseSegmentOffset = i - this.termOffset;
            if (this.replayPosition > this.startPosition && this.replayPosition != this.stopPosition && notHeaderAligned(this.fileChannel, this.replayBuffer, i, this.termOffset, computeTermIdFromPosition, this.streamId)) {
                onError(this.replayPosition + " position not aligned to a data header");
                return 0;
            }
        }
        if (this.publication.isConnected()) {
            state(State.REPLAY);
            return 1;
        }
        if (this.epochClock.time() <= this.connectDeadlineMs) {
            return 0;
        }
        onError("no connection established for replay");
        return 0;
    }

    private int replay() throws IOException {
        if (!this.publication.isConnected()) {
            state(State.INACTIVE);
            return 0;
        }
        if (this.replayPosition >= this.stopPosition && null != this.limitPosition && noNewData(this.replayPosition, this.stopPosition)) {
            return 0;
        }
        if (this.termOffset == this.termLength) {
            nextTerm();
        }
        int i = 0;
        int readRecording = readRecording(this.stopPosition - this.replayPosition);
        if (readRecording > 0) {
            int i2 = 0;
            int i3 = 0;
            int sessionId = this.publication.sessionId();
            int streamId = this.publication.streamId();
            long j = this.replayLimit - this.replayPosition;
            Checksum checksum = this.checksum;
            UnsafeBuffer unsafeBuffer = this.replayBuffer;
            while (true) {
                if (i2 >= readRecording || i2 >= j) {
                    break;
                }
                int frameLength = FrameDescriptor.frameLength(unsafeBuffer, i2);
                if (frameLength <= 0) {
                    raiseError(frameLength, readRecording, i2, j);
                }
                int frameType = FrameDescriptor.frameType(unsafeBuffer, i2);
                int align = BitUtil.align(frameLength, 32);
                if (1 == frameType) {
                    if (i2 + align > readRecording) {
                        break;
                    }
                    if (null != checksum) {
                        verifyChecksum(checksum, i2, align);
                    }
                    unsafeBuffer.putInt(i2 + 12, sessionId, ByteOrder.LITTLE_ENDIAN);
                    unsafeBuffer.putInt(i2 + 16, streamId, ByteOrder.LITTLE_ENDIAN);
                    i2 += align;
                } else if (0 == frameType) {
                    i3 = frameLength;
                    break;
                }
            }
            if (i2 > 0) {
                if (hasPublicationAdvanced(this.publication.offerBlock(unsafeBuffer, 0, i2), i2)) {
                    i = 0 + 1;
                } else {
                    i3 = 0;
                }
            }
            if (i3 > 0 && hasPublicationAdvanced(this.publication.appendPadding(i3 - 32), BitUtil.align(i3, 32))) {
                i++;
            }
        }
        return i;
    }

    private void raiseError(int i, int i2, int i3, long j) {
        throw new IllegalStateException("unexpected end of recording " + this.recordingId + " frameLength=" + i + " replayPosition=" + this.replayPosition + " remaining=" + j + " limitPosition=" + this.limitPosition + " batchOffset=" + i3 + " bytesRead=" + i2);
    }

    private boolean hasPublicationAdvanced(long j, int i) {
        if (j <= 0) {
            if (-4 != j && -1 != j) {
                return false;
            }
            onError("stream closed before replay complete");
            return false;
        }
        this.termOffset += i;
        this.replayPosition += i;
        if (this.replayPosition < this.replayLimit) {
            return true;
        }
        state(State.INACTIVE);
        return true;
    }

    private void verifyChecksum(Checksum checksum, int i, int i2) {
        int compute = checksum.compute(this.replayBufferAddress, i + 32, i2 - 32);
        int frameSessionId = FrameDescriptor.frameSessionId(this.replayBuffer, i);
        if (compute != frameSessionId) {
            throw new ArchiveException("CRC checksum mismatch at offset=" + i + ": recorded checksum=" + frameSessionId + ", computed checksum=" + compute);
        }
    }

    private int readRecording(long j) throws IOException {
        if (this.publication.availableWindow() <= 0) {
            return 0;
        }
        int min = Math.min((int) Math.min(j, this.replayBuffer.capacity()), this.termLength - this.termOffset);
        ByteBuffer byteBuffer = this.replayBuffer.byteBuffer();
        byteBuffer.clear().limit(min);
        int i = this.termBaseSegmentOffset + this.termOffset;
        do {
            int read = this.fileChannel.read(byteBuffer, i);
            if (read <= 0) {
                break;
            }
            i += read;
        } while (byteBuffer.remaining() > 0);
        return byteBuffer.limit();
    }

    private void onError(String str) {
        this.errorMessage = str + ", recordingId=" + this.recordingId + ", sessionId=" + this.sessionId;
        state(State.INACTIVE);
    }

    private boolean noNewData(long j, long j2) {
        Counter counter = this.limitPosition;
        long j3 = counter.get();
        boolean isClosed = counter.isClosed();
        long stopPosition = isClosed ? this.catalog.stopPosition(this.recordingId) : j3;
        if (isClosed) {
            if (-1 == stopPosition) {
                this.replayLimit = j2;
            } else if (stopPosition < this.replayLimit) {
                this.replayLimit = stopPosition;
            }
        }
        if (j >= this.replayLimit) {
            state(State.INACTIVE);
            return true;
        }
        if (stopPosition <= j2) {
            return true;
        }
        this.stopPosition = stopPosition;
        return false;
    }

    private void nextTerm() throws IOException {
        this.termOffset = 0;
        this.termBaseSegmentOffset += this.termLength;
        if (this.termBaseSegmentOffset == this.segmentLength) {
            closeRecordingSegment();
            this.segmentFileBasePosition += this.segmentLength;
            openRecordingSegment();
            this.termBaseSegmentOffset = 0;
        }
    }

    private void closeRecordingSegment() {
        CloseHelper.close(this.fileChannel);
        this.fileChannel = null;
        this.segmentFile = null;
    }

    private void openRecordingSegment() throws IOException {
        if (null == this.segmentFile) {
            String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFileBasePosition);
            this.segmentFile = new File(this.archiveDir, segmentFileName);
            if (!this.segmentFile.exists()) {
                String str = "recording segment not found " + segmentFileName;
                onError(str);
                throw new ArchiveException(str);
            }
        }
        this.fileChannel = FileChannel.open(this.segmentFile.toPath(), FILE_OPTIONS, new FileAttribute[0]);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean notHeaderAligned(FileChannel fileChannel, UnsafeBuffer unsafeBuffer, int i, int i2, int i3, int i4) throws IOException {
        ByteBuffer byteBuffer = unsafeBuffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != fileChannel.read(byteBuffer, i)) {
            throw new ArchiveException("failed to read fragment header");
        }
        return isInvalidHeader(unsafeBuffer, i4, i3, i2);
    }

    private void state(State state) {
        this.state = state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isInvalidHeader(UnsafeBuffer unsafeBuffer, int i, int i2, int i3) {
        return (DataHeaderFlyweight.termOffset(unsafeBuffer, 0) == i3 && DataHeaderFlyweight.termId(unsafeBuffer, 0) == i2 && DataHeaderFlyweight.streamId(unsafeBuffer, 0) == i) ? false : true;
    }
}
