/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.apache.spark.streaming.scheduler.StreamingListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkRunnerStreamingContextFactory
implements JavaStreamingContextFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
    private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
    private final Pipeline pipeline;
    private final SparkPipelineOptions options;
    private EvaluationContext ctxt;

    public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions options) {
        this.pipeline = pipeline;
        this.options = options;
    }

    public JavaStreamingContext create() {
        LOG.info("Creating a new Spark Streaming Context");
        Preconditions.checkArgument((this.options.getMinReadTimeMillis() < this.options.getBatchIntervalMillis() ? 1 : 0) != 0, (Object)"Minimum read time has to be less than batch time.");
        Preconditions.checkArgument((this.options.getReadTimePercentage() > 0.0 && this.options.getReadTimePercentage() < 1.0 ? 1 : 0) != 0, (Object)"Read time percentage is bound to (0, 1).");
        StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
        Duration batchDuration = new Duration(this.options.getBatchIntervalMillis().longValue());
        LOG.info("Setting Spark streaming batchDuration to {} msec", (Object)batchDuration.milliseconds());
        JavaSparkContext jsc = SparkContextFactory.getSparkContext(this.options);
        JavaStreamingContext jssc = new JavaStreamingContext(jsc, batchDuration);
        this.ctxt = new EvaluationContext(jsc, this.pipeline, jssc);
        this.pipeline.traverseTopologically((Pipeline.PipelineVisitor)new SparkRunner.Evaluator(translator, this.ctxt));
        this.ctxt.computeOutputs();
        String checkpointDir = this.options.getCheckpointDir();
        if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
            LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.", (Object)checkpointDir);
        }
        LOG.info("Checkpoint dir set to: {}", (Object)checkpointDir);
        jssc.checkpoint(checkpointDir);
        for (JavaStreamingListener listener : ((SparkContextOptions)this.options.as(SparkContextOptions.class)).getListeners()) {
            LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
            jssc.addStreamingListener((StreamingListener)new JavaStreamingListenerWrapper(listener));
        }
        return jssc;
    }

    public EvaluationContext getCtxt() {
        return this.ctxt;
    }
}

