/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public final class CreateStream<T>
extends PTransform<PBegin, PCollection<T>> {
    private final Duration batchInterval;
    private final Queue<Iterable<TimestampedValue<T>>> batches = new LinkedList<Iterable<TimestampedValue<T>>>();
    private final Deque<GlobalWatermarkHolder.SparkWatermarks> times = new LinkedList<GlobalWatermarkHolder.SparkWatermarks>();
    private final Coder<T> coder;
    private Instant initialSystemTime;
    private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    private CreateStream(Duration batchInterval, Instant initialSystemTime, Coder<T> coder) {
        this.batchInterval = batchInterval;
        this.initialSystemTime = initialSystemTime;
        this.coder = coder;
    }

    public static <T> CreateStream<T> of(Coder<T> coder, Duration batchInterval) {
        return new CreateStream<T>(batchInterval, new Instant(0L), coder);
    }

    @SafeVarargs
    public final CreateStream<T> nextBatch(TimestampedValue<T> ... batchElements) {
        TimestampedValue<T>[] timestampedValueArray = batchElements;
        int n = timestampedValueArray.length;
        for (int i = 0; i < n; ++i) {
            TimestampedValue<T> element;
            TimestampedValue<T> timestampedValue = element = timestampedValueArray[i];
            Preconditions.checkArgument(timestampedValue.getTimestamp().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", (Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)timestampedValue.getTimestamp());
        }
        this.batches.offer(Arrays.asList(batchElements));
        return this;
    }

    @SafeVarargs
    public final CreateStream<T> nextBatch(T ... batchElements) {
        ArrayList<TimestampedValue> timestamped = Lists.newArrayListWithCapacity(batchElements.length);
        for (T element : batchElements) {
            timestamped.add(TimestampedValue.atMinimumTimestamp(element));
        }
        this.batches.offer(timestamped);
        return this;
    }

    public CreateStream<T> emptyBatch() {
        this.batches.offer(Collections.emptyList());
        return this;
    }

    public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime) {
        this.initialSystemTime = initialSystemTime;
        return this;
    }

    public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark) {
        Preconditions.checkArgument(!newWatermark.isBefore((ReadableInstant)this.lowWatermark), "The watermark is not allowed to decrease!");
        Preconditions.checkArgument(newWatermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", (Object)newWatermark, (Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        return this.advance(newWatermark);
    }

    public CreateStream<T> advanceNextBatchWatermarkToInfinity() {
        return this.advance(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    private CreateStream<T> advance(Instant newWatermark) {
        Instant currentSynchronizedProcessingTime = this.times.peekLast() == null ? this.initialSystemTime : this.times.peekLast().getSynchronizedProcessingTime();
        Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus((ReadableDuration)this.batchInterval);
        Preconditions.checkArgument(nextSynchronizedProcessingTime.isAfter((ReadableInstant)currentSynchronizedProcessingTime), "Synchronized processing time must always advance.");
        this.times.offer(new GlobalWatermarkHolder.SparkWatermarks(this.lowWatermark, newWatermark, nextSynchronizedProcessingTime));
        this.lowWatermark = newWatermark;
        return this;
    }

    public Queue<Iterable<TimestampedValue<T>>> getBatches() {
        return this.batches;
    }

    public Queue<GlobalWatermarkHolder.SparkWatermarks> getTimes() {
        return this.times;
    }

    public PCollection<T> expand(PBegin input) {
        return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED);
    }

    protected Coder<T> getDefaultOutputCoder() throws CannotProvideCoderException {
        return this.coder;
    }
}

