package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.collect.Iterables;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.runners.spark.translation.Dataset;
import org.apache.beam.runners.spark.translation.WindowingHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.class */
public class UnboundedDataset<T> implements Dataset {
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedDataset.class);

    @Nullable
    private transient JavaStreamingContext jssc;
    private Iterable<Iterable<T>> values;
    private Coder<T> coder;
    private JavaDStream<WindowedValue<T>> dStream;

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedDataset(JavaDStream<WindowedValue<T>> javaDStream) {
        this.dStream = javaDStream;
    }

    public UnboundedDataset(Iterable<Iterable<T>> iterable, JavaStreamingContext javaStreamingContext, Coder<T> coder) {
        this.values = iterable;
        this.jssc = javaStreamingContext;
        this.coder = coder;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JavaDStream<WindowedValue<T>> getDStream() {
        if (this.dStream == null) {
            WindowedValue.ValueOnlyWindowedValueCoder valueOnlyCoder = WindowedValue.getValueOnlyCoder(this.coder);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            JavaRDD javaRDD = null;
            Iterator<Iterable<T>> it = this.values.iterator();
            while (it.hasNext()) {
                JavaRDD map = this.jssc.sc().parallelize(CoderHelpers.toByteArrays(Iterables.transform(it.next(), WindowingHelpers.windowValueFunction()), valueOnlyCoder)).map(CoderHelpers.fromByteFunction(valueOnlyCoder));
                linkedBlockingQueue.offer(map);
                javaRDD = map;
            }
            this.dStream = javaRDD != null ? this.jssc.queueStream(linkedBlockingQueue, true, javaRDD) : this.jssc.queueStream(linkedBlockingQueue, true);
        }
        return this.dStream;
    }

    public void cache() {
        this.dStream.cache();
    }

    @Override // org.apache.beam.runners.spark.translation.Dataset
    public void cache(String str) {
        LOG.warn("Provided StorageLevel ignored for stream, using default level");
        cache();
    }

    @Override // org.apache.beam.runners.spark.translation.Dataset
    public void action() {
        this.dStream.foreachRDD(new VoidFunction<JavaRDD<WindowedValue<T>>>() { // from class: org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.1
            public void call(JavaRDD<WindowedValue<T>> javaRDD) throws Exception {
                javaRDD.count();
            }
        });
    }

    @Override // org.apache.beam.runners.spark.translation.Dataset
    public void setName(String str) {
    }
}
