package org.apache.beam.runners.apex;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.apex.api.Launcher;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Throwables;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.apex.translation.ApexPipelineTranslator;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PrimitiveCreate;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner.class */
public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
    private final ApexPipelineOptions options;
    public static final String CLASSPATH_SCHEME = "classpath";
    protected boolean translateOnly = false;
    public static final AtomicReference<AssertionError> ASSERTION_ERROR = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$Concatenate.class */
    public static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
        private static final long serialVersionUID = 1;

        private Concatenate() {
        }

        /* renamed from: createAccumulator, reason: merged with bridge method [inline-methods] */
        public List<T> m3createAccumulator() {
            return new ArrayList();
        }

        public List<T> addInput(List<T> list, T t) {
            list.add(t);
            return list;
        }

        /* renamed from: mergeAccumulators, reason: merged with bridge method [inline-methods] */
        public List<T> m2mergeAccumulators(Iterable<List<T>> iterable) {
            List<T> m3createAccumulator = m3createAccumulator();
            Iterator<List<T>> it = iterable.iterator();
            while (it.hasNext()) {
                m3createAccumulator.addAll(it.next());
            }
            return m3createAccumulator;
        }

        public List<T> extractOutput(List<T> list) {
            return list;
        }

        public Coder<List<T>> getAccumulatorCoder(CoderRegistry coderRegistry, Coder<T> coder) {
            return ListCoder.of(coder);
        }

        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry coderRegistry, Coder<T> coder) {
            return ListCoder.of(coder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object addInput(Object obj, Object obj2) {
            return addInput((List<List<T>>) obj, (List<T>) obj2);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$CreateApexPCollectionView.class */
    public static class CreateApexPCollectionView<ElemT, ViewT> extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
        private static final long serialVersionUID = 1;
        private PCollectionView<ViewT> view;

        private CreateApexPCollectionView(PCollectionView<ViewT> pCollectionView) {
            this.view = pCollectionView;
        }

        public static <ElemT, ViewT> CreateApexPCollectionView<ElemT, ViewT> of(PCollectionView<ViewT> pCollectionView) {
            return new CreateApexPCollectionView<>(pCollectionView);
        }

        public PCollection<ElemT> expand(PCollection<ElemT> pCollection) {
            return PCollection.createPrimitiveOutputInternal(pCollection.getPipeline(), pCollection.getWindowingStrategy(), pCollection.isBounded()).setCoder(pCollection.getCoder());
        }

        public PCollectionView<ViewT> getView() {
            return this.view;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$SplittableParDoOverrideFactory.class */
    public static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {
        SplittableParDoOverrideFactory() {
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<InputT>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new SplittableParDo(appliedPTransform.getTransform()));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollectionTuple pCollectionTuple) {
            return ReplacementOutputs.tagged(map, pCollectionTuple);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollectionTuple) pOutput);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$StreamingViewAsIterable.class */
    private static class StreamingViewAsIterable<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private static final long serialVersionUID = 1;
        private final PCollectionView<Iterable<T>> view;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$StreamingViewAsIterable$Factory.class */
        public static class Factory<T> extends SingleInputOutputOverrideFactory<PCollection<T>, PCollection<T>, View.CreatePCollectionView<T, Iterable<T>>> {
            Factory() {
            }

            public PTransformOverrideFactory.PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform(AppliedPTransform<PCollection<T>, PCollection<T>, View.CreatePCollectionView<T, Iterable<T>>> appliedPTransform) {
                return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new StreamingViewAsIterable(appliedPTransform.getTransform().getView()));
            }
        }

        private StreamingViewAsIterable(PCollectionView<Iterable<T>> pCollectionView) {
            this.view = pCollectionView;
        }

        public PCollection<T> expand(PCollection<T> pCollection) {
            return pCollection.apply(Combine.globally(new Concatenate()).withoutDefaults()).apply(CreateApexPCollectionView.of(this.view));
        }

        protected String getKindString() {
            return "StreamingViewAsIterable";
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$StreamingWrapSingletonInList.class */
    private static class StreamingWrapSingletonInList<T> extends PTransform<PCollection<T>, PCollection<T>> {
        private static final long serialVersionUID = 1;
        View.CreatePCollectionView<T, T> transform;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$StreamingWrapSingletonInList$Factory.class */
        public static class Factory<T> extends SingleInputOutputOverrideFactory<PCollection<T>, PCollection<T>, View.CreatePCollectionView<T, T>> {
            Factory() {
            }

            public PTransformOverrideFactory.PTransformReplacement<PCollection<T>, PCollection<T>> getReplacementTransform(AppliedPTransform<PCollection<T>, PCollection<T>, View.CreatePCollectionView<T, T>> appliedPTransform) {
                return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new StreamingWrapSingletonInList(appliedPTransform.getTransform()));
            }
        }

        private StreamingWrapSingletonInList(View.CreatePCollectionView<T, T> createPCollectionView) {
            this.transform = createPCollectionView;
        }

        public PCollection<T> expand(PCollection<T> pCollection) {
            pCollection.apply(ParDo.of(new WrapAsList())).apply(CreateApexPCollectionView.of(this.transform.getView()));
            return pCollection;
        }

        protected String getKindString() {
            return "StreamingWrapSingletonInList";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/apex/ApexRunner$WrapAsList.class */
    public static class WrapAsList<T> extends DoFn<T, List<T>> {
        private WrapAsList() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, List<T>>.ProcessContext processContext) {
            processContext.output(Collections.singletonList(processContext.element()));
        }
    }

    public ApexRunner(ApexPipelineOptions apexPipelineOptions) {
        this.options = apexPipelineOptions;
    }

    public static ApexRunner fromOptions(PipelineOptions pipelineOptions) {
        return new ApexRunner((ApexPipelineOptions) PipelineOptionsValidator.validate(ApexPipelineOptions.class, pipelineOptions));
    }

    private List<PTransformOverride> getOverrides() {
        return ImmutableList.builder().add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(Create.Values.class), new PrimitiveCreate.Factory())).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), new StreamingViewAsIterable.Factory())).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), new StreamingWrapSingletonInList.Factory())).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory())).add((ImmutableList.Builder) PTransformOverride.of(PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class), new SplittableParDoViaKeyedWorkItems.OverrideFactory())).build();
    }

    /* renamed from: run, reason: merged with bridge method [inline-methods] */
    public ApexRunnerResult m1run(final Pipeline pipeline) {
        pipeline.replaceAll(getOverrides());
        final ApexPipelineTranslator apexPipelineTranslator = new ApexPipelineTranslator(this.options);
        final AtomicReference atomicReference = new AtomicReference();
        StreamingApplication streamingApplication = new StreamingApplication() { // from class: org.apache.beam.runners.apex.ApexRunner.1
            public void populateDAG(DAG dag, Configuration configuration) {
                atomicReference.set(dag);
                dag.setAttribute(Context.DAGContext.APPLICATION_NAME, ApexRunner.this.options.getApplicationName());
                apexPipelineTranslator.translate(pipeline, dag);
            }
        };
        Properties properties = new Properties();
        try {
            if (this.options.getConfigFile() != null) {
                URI uri = new URI(this.options.getConfigFile());
                if (CLASSPATH_SCHEME.equals(uri.getScheme())) {
                    InputStream resourceAsStream = getClass().getResourceAsStream(uri.getPath());
                    if (resourceAsStream != null) {
                        properties.load(resourceAsStream);
                        resourceAsStream.close();
                    }
                } else {
                    if (!uri.isAbsolute()) {
                        uri = new File(this.options.getConfigFile()).toURI();
                    }
                    InputStream openStream = uri.toURL().openStream();
                    Throwable th = null;
                    try {
                        try {
                            properties.load(openStream);
                            if (openStream != null) {
                                if (0 != 0) {
                                    try {
                                        openStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    openStream.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
            }
            if (!this.options.isEmbeddedExecution()) {
                try {
                    return new ApexRunnerResult((DAG) atomicReference.get(), new ApexYarnLauncher().launchApp(streamingApplication, properties));
                } catch (IOException e) {
                    throw new RuntimeException("Failed to launch the application on YARN.", e);
                }
            }
            EmbeddedAppLauncher launcher = Launcher.getLauncher(Launcher.LaunchMode.EMBEDDED);
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(EmbeddedAppLauncher.RUN_ASYNC, true);
            if (this.options.isEmbeddedExecutionDebugMode()) {
                defaultAttributeMap.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
            }
            Configuration configuration = new Configuration(false);
            ApexYarnLauncher.addProperties(configuration, properties);
            try {
                if (this.translateOnly) {
                    launcher.prepareDAG(streamingApplication, configuration);
                    return new ApexRunnerResult(launcher.getDAG(), null);
                }
                ASSERTION_ERROR.set(null);
                return new ApexRunnerResult((DAG) atomicReference.get(), launcher.launchApp(streamingApplication, configuration, defaultAttributeMap));
            } catch (Exception e2) {
                Throwables.throwIfUnchecked(e2);
                throw new RuntimeException(e2);
            }
        } catch (IOException | URISyntaxException e3) {
            throw new RuntimeException("Error loading properties", e3);
        }
    }
}
