package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.translation.EvaluationContext;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.TransformTranslator;
import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.class */
public class SparkRunnerStreamingContextFactory implements JavaStreamingContextFactory {
    private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerStreamingContextFactory.class);
    private static final String KNOWN_RELIABLE_FS_PATTERN = "^(hdfs|s3|gs)";
    private final Pipeline pipeline;
    private final SparkPipelineOptions options;
    private EvaluationContext ctxt;

    public SparkRunnerStreamingContextFactory(Pipeline pipeline, SparkPipelineOptions sparkPipelineOptions) {
        this.pipeline = pipeline;
        this.options = sparkPipelineOptions;
    }

    public JavaStreamingContext create() {
        LOG.info("Creating a new Spark Streaming Context");
        Preconditions.checkArgument(this.options.getMinReadTimeMillis().longValue() < this.options.getBatchIntervalMillis().longValue(), "Minimum read time has to be less than batch time.");
        Preconditions.checkArgument(this.options.getReadTimePercentage().doubleValue() > 0.0d && this.options.getReadTimePercentage().doubleValue() < 1.0d, "Read time percentage is bound to (0, 1).");
        StreamingTransformTranslator.Translator translator = new StreamingTransformTranslator.Translator(new TransformTranslator.Translator());
        Duration duration = new Duration(this.options.getBatchIntervalMillis().longValue());
        LOG.info("Setting Spark streaming batchDuration to {} msec", Long.valueOf(duration.milliseconds()));
        JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.options);
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, duration);
        this.ctxt = new EvaluationContext(sparkContext, this.pipeline, javaStreamingContext);
        this.pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, this.ctxt));
        this.ctxt.computeOutputs();
        String checkpointDir = this.options.getCheckpointDir();
        if (!checkpointDir.matches(KNOWN_RELIABLE_FS_PATTERN)) {
            LOG.warn("The specified checkpoint dir {} does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.", checkpointDir);
        }
        LOG.info("Checkpoint dir set to: {}", checkpointDir);
        javaStreamingContext.checkpoint(checkpointDir);
        for (JavaStreamingListener javaStreamingListener : ((SparkContextOptions) this.options.as(SparkContextOptions.class)).getListeners()) {
            LOG.info("Registered listener {}." + javaStreamingListener.getClass().getSimpleName());
            javaStreamingContext.addStreamingListener(new JavaStreamingListenerWrapper(javaStreamingListener));
        }
        return javaStreamingContext;
    }

    public EvaluationContext getCtxt() {
        return this.ctxt;
    }
}
