/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.PTransformOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;

class ParDoMultiOverrideFactory<InputT, OutputT>
implements PTransformOverrideFactory<PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
    ParDoMultiOverrideFactory() {
    }

    @Override
    public PTransform<PCollection<? extends InputT>, PCollectionTuple> override(ParDo.BoundMulti<InputT, OutputT> transform) {
        DoFn fn = transform.getNewFn();
        DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
        if (signature.processElement().isSplittable()) {
            return new SplittableParDo(transform);
        }
        if (signature.timerDeclarations().size() > 0) {
            throw new UnsupportedOperationException(String.format("Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", DoFn.TimerId.class.getSimpleName(), fn.getClass().getName(), DoFn.class.getSimpleName(), DirectRunner.class.getSimpleName()));
        }
        if (signature.stateDeclarations().size() > 0) {
            ParDo.BoundMulti<InputT, OutputT> keyedTransform = transform;
            return new GbkThenStatefulParDo(keyedTransform);
        }
        return transform;
    }

    static class StatefulParDo<K, InputT, OutputT>
    extends PTransform<PCollection<? extends KV<K, Iterable<InputT>>>, PCollectionTuple> {
        private final transient ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;
        private final transient PCollection<KV<K, InputT>> originalInput;

        public StatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo, PCollection<KV<K, InputT>> originalInput) {
            this.underlyingParDo = underlyingParDo;
            this.originalInput = originalInput;
        }

        public ParDo.BoundMulti<KV<K, InputT>, OutputT> getUnderlyingParDo() {
            return this.underlyingParDo;
        }

        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends KV<K, Iterable<InputT>>> input, TypedPValue<T> output) throws CannotProvideCoderException {
            return this.underlyingParDo.getDefaultOutputCoder(this.originalInput, output);
        }

        public PCollectionTuple expand(PCollection<? extends KV<K, Iterable<InputT>>> input) {
            PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal((Pipeline)input.getPipeline(), (TupleTagList)TupleTagList.of((TupleTag)this.underlyingParDo.getMainOutputTag()).and(this.underlyingParDo.getSideOutputTags().getAll()), (WindowingStrategy)input.getWindowingStrategy(), (PCollection.IsBounded)input.isBounded());
            return outputs;
        }
    }

    static class GbkThenStatefulParDo<K, InputT, OutputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
        private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;

        public GbkThenStatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo) {
            this.underlyingParDo = underlyingParDo;
        }

        public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
            PCollectionTuple outputs = (PCollectionTuple)((PCollection)input.apply("Group by key", (PTransform)GroupByKey.create())).apply("Stateful ParDo", new StatefulParDo<K, InputT, OutputT>(this.underlyingParDo, input));
            return outputs;
        }
    }
}

