package org.apache.beam.runners.spark;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.spark.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.spark.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.beam.runners.spark.stateful.SparkTimerInternals;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.commons.io.FileUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner.class */
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
    private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
    private final TestSparkPipelineOptions testSparkPipelineOptions;
    private SparkRunner delegate;
    private boolean isForceStreaming;

    /* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner$AdaptedBoundedAsUnbounded.class */
    private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
        private final BoundedReadFromUnboundedSource<T> source;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner$AdaptedBoundedAsUnbounded$Factory.class */
        public static class Factory<T> implements PTransformOverrideFactory<PBegin, PCollection<T>, BoundedReadFromUnboundedSource<T>> {
            Factory() {
            }

            public PTransformOverrideFactory.PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(AppliedPTransform<PBegin, PCollection<T>, BoundedReadFromUnboundedSource<T>> appliedPTransform) {
                return PTransformOverrideFactory.PTransformReplacement.of(appliedPTransform.getPipeline().begin(), new AdaptedBoundedAsUnbounded(appliedPTransform.getTransform()));
            }

            public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollection<T> pCollection) {
                return ReplacementOutputs.singleton(map, pCollection);
            }

            public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
                return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollection) pOutput);
            }
        }

        AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> boundedReadFromUnboundedSource) {
            this.source = boundedReadFromUnboundedSource;
        }

        public PCollection<T> expand(PBegin pBegin) {
            return pBegin.apply(new UnboundedReadFromBoundedSource(this.source.getAdaptedSource())).apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
        }
    }

    private TestSparkRunner(TestSparkPipelineOptions testSparkPipelineOptions) {
        this.delegate = SparkRunner.fromOptions(testSparkPipelineOptions);
        this.isForceStreaming = testSparkPipelineOptions.isForceStreaming();
        this.testSparkPipelineOptions = testSparkPipelineOptions;
    }

    public static TestSparkRunner fromOptions(PipelineOptions pipelineOptions) {
        return new TestSparkRunner((TestSparkPipelineOptions) PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, pipelineOptions));
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SparkPipelineResult m12run(Pipeline pipeline) {
        SparkPipelineResult m4run;
        if (this.isForceStreaming) {
            adaptBoundedReads(pipeline);
        }
        AggregatorsAccumulator.clear();
        MetricsAccumulator.clear();
        GlobalWatermarkHolder.clear();
        LOG.info("About to run test pipeline " + this.testSparkPipelineOptions.getJobName());
        if (this.isForceStreaming) {
            try {
                m4run = this.delegate.m4run(pipeline);
                awaitWatermarksOrTimeout(this.testSparkPipelineOptions, m4run);
                m4run.stop();
                PipelineResult.State state = m4run.getState();
                MatcherAssert.assertThat(String.format("Finish state %s is not allowed.", state), state, Matchers.isOneOf(new PipelineResult.State[]{PipelineResult.State.STOPPED, PipelineResult.State.DONE}));
                try {
                    FileUtils.deleteDirectory(new File(this.testSparkPipelineOptions.getCheckpointDir()));
                } catch (IOException e) {
                    throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
                }
            } catch (Throwable th) {
                try {
                    FileUtils.deleteDirectory(new File(this.testSparkPipelineOptions.getCheckpointDir()));
                    throw th;
                } catch (IOException e2) {
                    throw new RuntimeException("Failed to clear checkpoint tmp dir.", e2);
                }
            }
        } else {
            m4run = this.delegate.m4run(pipeline);
            m4run.waitUntilFinish();
            m4run.stop();
            PipelineResult.State state2 = m4run.getState();
            MatcherAssert.assertThat(String.format("Finish state %s is not allowed.", state2), state2, Matchers.is(PipelineResult.State.DONE));
            MatcherAssert.assertThat(m4run, this.testSparkPipelineOptions.getOnCreateMatcher());
            MatcherAssert.assertThat(m4run, this.testSparkPipelineOptions.getOnSuccessMatcher());
        }
        return m4run;
    }

    private static void awaitWatermarksOrTimeout(TestSparkPipelineOptions testSparkPipelineOptions, SparkPipelineResult sparkPipelineResult) {
        Instant currentInputWatermarkTime;
        Long valueOf = Long.valueOf(Duration.standardSeconds(((Long) Preconditions.checkNotNull(testSparkPipelineOptions.getTestTimeoutSeconds())).longValue()).getMillis());
        Long batchIntervalMillis = testSparkPipelineOptions.getBatchIntervalMillis();
        Instant instant = new Instant(testSparkPipelineOptions.getStopPipelineWatermark());
        sparkPipelineResult.waitUntilFinish(Duration.millis(batchIntervalMillis.longValue()));
        do {
            SparkTimerInternals global = SparkTimerInternals.global(GlobalWatermarkHolder.get());
            global.advanceWatermark();
            currentInputWatermarkTime = global.currentInputWatermarkTime();
            Uninterruptibles.sleepUninterruptibly(batchIntervalMillis.longValue(), TimeUnit.MILLISECONDS);
            Long valueOf2 = Long.valueOf(valueOf.longValue() - batchIntervalMillis.longValue());
            valueOf = valueOf2;
            if (valueOf2.longValue() <= 0) {
                return;
            }
        } while (currentInputWatermarkTime.isBefore(instant));
    }

    @VisibleForTesting
    void adaptBoundedReads(Pipeline pipeline) {
        pipeline.replaceAll(Collections.singletonList(PTransformOverride.of(PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class), new AdaptedBoundedAsUnbounded.Factory())));
    }
}
