package org.apache.beam.runners.spark;

import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

/* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner.class */
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
    private SparkRunner delegate;
    private boolean isForceStreaming;
    private int expectedNumberOfAssertions = 0;

    /* loaded from: input_file:org/apache/beam/runners/spark/TestSparkRunner$AdaptedBoundedAsUnbounded.class */
    private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
        private final BoundedReadFromUnboundedSource<T> source;

        AdaptedBoundedAsUnbounded(BoundedReadFromUnboundedSource<T> boundedReadFromUnboundedSource) {
            this.source = boundedReadFromUnboundedSource;
        }

        public PCollection<T> expand(PBegin pBegin) {
            return pBegin.apply(new UnboundedReadFromBoundedSource(this.source.getAdaptedSource())).apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()));
        }
    }

    private TestSparkRunner(SparkPipelineOptions sparkPipelineOptions) {
        this.delegate = SparkRunner.fromOptions(sparkPipelineOptions);
        this.isForceStreaming = sparkPipelineOptions.isForceStreaming();
    }

    public static TestSparkRunner fromOptions(PipelineOptions pipelineOptions) {
        return new TestSparkRunner((SparkPipelineOptions) PipelineOptionsValidator.validate(SparkPipelineOptions.class, pipelineOptions));
    }

    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> pTransform, InputT inputt) {
        if (this.isForceStreaming && (pTransform instanceof BoundedReadFromUnboundedSource)) {
            return (OutputT) this.delegate.apply(new AdaptedBoundedAsUnbounded((BoundedReadFromUnboundedSource) pTransform), inputt);
        }
        if ((pTransform instanceof PAssert.OneSideInputAssert) || (pTransform instanceof PAssert.GroupThenAssert) || (pTransform instanceof PAssert.GroupThenAssertForSingleton)) {
            this.expectedNumberOfAssertions++;
        }
        return (OutputT) this.delegate.apply(pTransform, inputt);
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public SparkPipelineResult m7run(Pipeline pipeline) {
        TestPipelineOptions as = pipeline.getOptions().as(TestPipelineOptions.class);
        SparkPipelineResult m3run = this.delegate.m3run(pipeline);
        m3run.waitUntilFinish();
        PipelineResult.State state = m3run.getState();
        MatcherAssert.assertThat(String.format("Test pipeline result state was %s instead of %s", state, PipelineResult.State.DONE), state, Matchers.is(PipelineResult.State.DONE));
        MatcherAssert.assertThat(m3run, as.getOnCreateMatcher());
        MatcherAssert.assertThat(m3run, as.getOnSuccessMatcher());
        if (this.isForceStreaming) {
            int intValue = ((Integer) m3run.getAggregatorValue("PAssertSuccess", Integer.class)).intValue();
            MatcherAssert.assertThat(String.format("Expected %d successful assertions, but found %d.", Integer.valueOf(this.expectedNumberOfAssertions), Integer.valueOf(intValue)), Integer.valueOf(intValue), Matchers.is(Integer.valueOf(this.expectedNumberOfAssertions)));
            MatcherAssert.assertThat("Failure aggregator should be zero.", Integer.valueOf(((Integer) m3run.getAggregatorValue("PAssertFailure", Integer.class)).intValue()), Matchers.is(0));
        }
        return m3run;
    }
}
