package org.apache.beam.runners.spark.io;

import com.google.api.client.util.BackOff;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.spark.relocated.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/io/MicrobatchSource.class */
public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends BoundedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
    private final UnboundedSource<T, CheckpointMarkT> source;
    private final Duration maxReadTime;
    private final int numInitialSplits;
    private final long maxNumRecords;
    private final int sourceId;
    private final int splitId;

    /* loaded from: input_file:org/apache/beam/runners/spark/io/MicrobatchSource$Reader.class */
    public class Reader extends BoundedSource.BoundedReader<T> {
        private long recordsRead;
        private final Instant endTime;
        private final FluentBackoff backoffFactory;
        private final UnboundedSource.UnboundedReader<T> reader;

        private Reader(UnboundedSource.UnboundedReader<T> unboundedReader) {
            this.recordsRead = 0L;
            this.endTime = Instant.now().plus(MicrobatchSource.this.maxReadTime);
            this.reader = unboundedReader;
            this.backoffFactory = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(10L)).withMaxBackoff(MicrobatchSource.this.maxReadTime.minus(1L)).withMaxCumulativeBackoff(MicrobatchSource.this.maxReadTime.minus(1L));
        }

        public boolean start() throws IOException {
            MicrobatchSource.LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a max read time of {} msec, and max number of records {}.", new Object[]{Integer.valueOf(MicrobatchSource.this.splitId), MicrobatchSource.this.maxReadTime, Long.valueOf(MicrobatchSource.this.maxNumRecords)});
            if (!this.reader.start()) {
                return advanceWithBackoff();
            }
            this.recordsRead++;
            return true;
        }

        public boolean advance() throws IOException {
            if (this.recordsRead < MicrobatchSource.this.maxNumRecords) {
                return advanceWithBackoff();
            }
            finalizeCheckpoint();
            return false;
        }

        private boolean advanceWithBackoff() throws IOException {
            BackOff backoff = this.backoffFactory.backoff();
            long nextBackOffMillis = backoff.nextBackOffMillis();
            while (true) {
                long j = nextBackOffMillis;
                if (j == -1) {
                    finalizeCheckpoint();
                    return false;
                }
                if (this.endTime != null && Instant.now().isAfter(this.endTime)) {
                    finalizeCheckpoint();
                    return false;
                }
                if (this.reader.advance()) {
                    this.recordsRead++;
                    return true;
                }
                Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                nextBackOffMillis = backoff.nextBackOffMillis();
            }
        }

        private void finalizeCheckpoint() throws IOException {
            this.reader.getCheckpointMark().finalizeCheckpoint();
            MicrobatchSource.LOG.debug("MicrobatchReader-{}: finalized CheckpointMark successfully after reading {} records.", Integer.valueOf(MicrobatchSource.this.splitId), Long.valueOf(this.recordsRead));
        }

        public T getCurrent() throws NoSuchElementException {
            return (T) this.reader.getCurrent();
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.reader.getCurrentTimestamp();
        }

        public void close() throws IOException {
            this.reader.close();
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public BoundedSource<T> m16getCurrentSource() {
            return MicrobatchSource.this;
        }

        public CheckpointMarkT getCheckpointMark() {
            return (CheckpointMarkT) this.reader.getCheckpointMark();
        }

        public Instant getWatermark() {
            return this.reader.getWatermark();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MicrobatchSource(UnboundedSource<T, CheckpointMarkT> unboundedSource, Duration duration, int i, long j, int i2, int i3) {
        this.source = unboundedSource;
        this.maxReadTime = duration;
        this.numInitialSplits = i;
        this.maxNumRecords = j;
        this.splitId = i2;
        this.sourceId = i3;
    }

    private static long[] splitNumRecords(long j, int i) {
        long[] jArr = new long[i];
        for (int i2 = 0; i2 < i; i2++) {
            jArr[i2] = j / i;
        }
        for (int i3 = 0; i3 < j % i; i3++) {
            jArr[i3] = jArr[i3] + 1;
        }
        return jArr;
    }

    public List<? extends BoundedSource<T>> splitIntoBundles(long j, PipelineOptions pipelineOptions) throws Exception {
        ArrayList arrayList = new ArrayList();
        List generateInitialSplits = this.source.generateInitialSplits(this.numInitialSplits, pipelineOptions);
        int size = generateInitialSplits.size();
        long[] splitNumRecords = splitNumRecords(this.maxNumRecords, size);
        for (int i = 0; i < size; i++) {
            arrayList.add(new MicrobatchSource((UnboundedSource) generateInitialSplits.get(i), this.maxReadTime, 1, splitNumRecords[i], i, this.sourceId));
        }
        return arrayList;
    }

    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
        return 0L;
    }

    public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions) throws IOException {
        return createReader(pipelineOptions, null);
    }

    public BoundedSource.BoundedReader<T> createReader(PipelineOptions pipelineOptions, CheckpointMarkT checkpointmarkt) throws IOException {
        return new Reader(this.source.createReader(pipelineOptions, checkpointmarkt));
    }

    public void validate() {
        this.source.validate();
    }

    public Coder<T> getDefaultOutputCoder() {
        return this.source.getDefaultOutputCoder();
    }

    public Coder<CheckpointMarkT> getCheckpointMarkCoder() {
        return this.source.getCheckpointMarkCoder();
    }

    public String getId() {
        return this.sourceId + "_" + this.splitId;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (!(obj instanceof MicrobatchSource)) {
            return false;
        }
        MicrobatchSource microbatchSource = (MicrobatchSource) obj;
        return this.sourceId == microbatchSource.sourceId && this.splitId == microbatchSource.splitId;
    }

    public int hashCode() {
        return (31 * this.sourceId) + this.splitId;
    }
}
