/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import com.google.api.client.util.BackOff;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.FluentBackoff;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
extends BoundedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
    private final UnboundedSource<T, CheckpointMarkT> source;
    private final Duration maxReadTime;
    private final int numInitialSplits;
    private final long maxNumRecords;
    private final int sourceId;
    private final int splitId;

    MicrobatchSource(UnboundedSource<T, CheckpointMarkT> source, Duration maxReadTime, int numInitialSplits, long maxNumRecords, int splitId, int sourceId) {
        this.source = source;
        this.maxReadTime = maxReadTime;
        this.numInitialSplits = numInitialSplits;
        this.maxNumRecords = maxNumRecords;
        this.splitId = splitId;
        this.sourceId = sourceId;
    }

    private static long[] splitNumRecords(long numRecords, int numSplits) {
        int i;
        long[] splitNumRecords = new long[numSplits];
        for (i = 0; i < numSplits; ++i) {
            splitNumRecords[i] = numRecords / (long)numSplits;
        }
        i = 0;
        while ((long)i < numRecords % (long)numSplits) {
            splitNumRecords[i] = splitNumRecords[i] + 1L;
            ++i;
        }
        return splitNumRecords;
    }

    public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
        ArrayList<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<MicrobatchSource<T, CheckpointMarkT>>();
        List splits = this.source.generateInitialSplits(this.numInitialSplits, options);
        int numSplits = splits.size();
        long[] numRecords = MicrobatchSource.splitNumRecords(this.maxNumRecords, numSplits);
        for (int i = 0; i < numSplits; ++i) {
            result.add(new MicrobatchSource<T, CheckpointMarkT>((UnboundedSource)splits.get(i), this.maxReadTime, 1, numRecords[i], i, this.sourceId));
        }
        return result;
    }

    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
        return 0L;
    }

    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
        return false;
    }

    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
        return this.createReader(options, null);
    }

    public BoundedSource.BoundedReader<T> createReader(PipelineOptions options, CheckpointMarkT checkpointMark) throws IOException {
        return new Reader(this.source.createReader(options, checkpointMark));
    }

    public void validate() {
        this.source.validate();
    }

    public Coder<T> getDefaultOutputCoder() {
        return this.source.getDefaultOutputCoder();
    }

    public Coder<CheckpointMarkT> getCheckpointMarkCoder() {
        return this.source.getCheckpointMarkCoder();
    }

    public String getId() {
        return this.sourceId + "_" + this.splitId;
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof MicrobatchSource)) {
            return false;
        }
        MicrobatchSource that = (MicrobatchSource)((Object)o);
        if (this.sourceId != that.sourceId) {
            return false;
        }
        return this.splitId == that.splitId;
    }

    public int hashCode() {
        int result = this.sourceId;
        result = 31 * result + this.splitId;
        return result;
    }

    public class Reader
    extends BoundedSource.BoundedReader<T> {
        private long recordsRead = 0L;
        private final Instant endTime;
        private final FluentBackoff backoffFactory;
        private final UnboundedSource.UnboundedReader<T> reader;

        private Reader(UnboundedSource.UnboundedReader<T> reader) {
            this.endTime = Instant.now().plus((ReadableDuration)MicrobatchSource.this.maxReadTime);
            this.reader = reader;
            this.backoffFactory = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis((long)10L)).withMaxBackoff(MicrobatchSource.this.maxReadTime.minus(1L)).withMaxCumulativeBackoff(MicrobatchSource.this.maxReadTime.minus(1L));
        }

        public boolean start() throws IOException {
            LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a max read time of {} msec, and max number of records {}.", new Object[]{MicrobatchSource.this.splitId, MicrobatchSource.this.maxReadTime, MicrobatchSource.this.maxNumRecords});
            if (this.reader.start()) {
                ++this.recordsRead;
                return true;
            }
            return this.advanceWithBackoff();
        }

        public boolean advance() throws IOException {
            if (this.recordsRead >= MicrobatchSource.this.maxNumRecords) {
                this.finalizeCheckpoint();
                return false;
            }
            return this.advanceWithBackoff();
        }

        private boolean advanceWithBackoff() throws IOException {
            BackOff backoff = this.backoffFactory.backoff();
            long nextSleep = backoff.nextBackOffMillis();
            while (nextSleep != -1L) {
                if (this.endTime != null && Instant.now().isAfter((ReadableInstant)this.endTime)) {
                    this.finalizeCheckpoint();
                    return false;
                }
                if (this.reader.advance()) {
                    ++this.recordsRead;
                    return true;
                }
                Uninterruptibles.sleepUninterruptibly((long)nextSleep, (TimeUnit)TimeUnit.MILLISECONDS);
                nextSleep = backoff.nextBackOffMillis();
            }
            this.finalizeCheckpoint();
            return false;
        }

        private void finalizeCheckpoint() throws IOException {
            this.reader.getCheckpointMark().finalizeCheckpoint();
            LOG.debug("MicrobatchReader-{}: finalized CheckpointMark successfully after reading {} records.", (Object)MicrobatchSource.this.splitId, (Object)this.recordsRead);
        }

        public T getCurrent() throws NoSuchElementException {
            return this.reader.getCurrent();
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.reader.getCurrentTimestamp();
        }

        public void close() throws IOException {
            this.reader.close();
        }

        public BoundedSource<T> getCurrentSource() {
            return MicrobatchSource.this;
        }

        public CheckpointMarkT getCheckpointMark() {
            return this.reader.getCheckpointMark();
        }

        public long getNumRecordsRead() {
            return this.recordsRead;
        }
    }
}

