/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;
import org.joda.time.Instant;

class ParDoMultiOverrideFactory<InputT, OutputT>
implements PTransformOverrideFactory<PCollection<? extends InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> {
    ParDoMultiOverrideFactory() {
    }

    public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(ParDo.BoundMulti<InputT, OutputT> transform) {
        DoFn fn = transform.getFn();
        DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
        if (signature.processElement().isSplittable()) {
            return new SplittableParDo(transform);
        }
        if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) {
            ParDo.BoundMulti<InputT, OutputT> keyedTransform = transform;
            return new GbkThenStatefulParDo(keyedTransform);
        }
        return transform;
    }

    static class ToKeyedWorkItem<K, V>
    extends DoFn<KV<K, Iterable<WindowedValue<KV<K, V>>>>, KeyedWorkItem<K, KV<K, V>>> {
        ToKeyedWorkItem() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
            Object key = ((KV)c.element()).getKey();
            c.output((Object)KeyedWorkItems.elementsWorkItem((Object)key, (Iterable)((Iterable)((KV)c.element()).getValue())));
        }
    }

    static class ReifyWindowedValueFn<K, V>
    extends DoFn<KV<K, V>, KV<K, WindowedValue<KV<K, V>>>> {
        ReifyWindowedValueFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
            c.output((Object)KV.of((Object)((KV)c.element()).getKey(), (Object)WindowedValue.of((Object)c.element(), (Instant)c.timestamp(), (BoundedWindow)window, (PaneInfo)c.pane())));
        }
    }

    static class StatefulParDo<K, InputT, OutputT>
    extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
        private final transient ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;
        private final transient PCollection<KV<K, InputT>> originalInput;

        public StatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo, PCollection<KV<K, InputT>> originalInput) {
            this.underlyingParDo = underlyingParDo;
            this.originalInput = originalInput;
        }

        public ParDo.BoundMulti<KV<K, InputT>, OutputT> getUnderlyingParDo() {
            return this.underlyingParDo;
        }

        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, TypedPValue<T> output) throws CannotProvideCoderException {
            return this.underlyingParDo.getDefaultOutputCoder(this.originalInput, output);
        }

        public PCollectionTuple expand(PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input) {
            PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal((Pipeline)input.getPipeline(), (TupleTagList)TupleTagList.of((TupleTag)this.underlyingParDo.getMainOutputTag()).and(this.underlyingParDo.getSideOutputTags().getAll()), (WindowingStrategy)input.getWindowingStrategy(), (PCollection.IsBounded)input.isBounded());
            return outputs;
        }
    }

    static class GbkThenStatefulParDo<K, InputT, OutputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
        private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo;

        public GbkThenStatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo) {
            this.underlyingParDo = underlyingParDo;
        }

        public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
            WindowingStrategy inputWindowingStrategy = input.getWindowingStrategy();
            Preconditions.checkState(input.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", (Object)ParDo.class.getSimpleName(), (Object)KvCoder.class.getSimpleName(), (Object)input.getCoder());
            KvCoder kvCoder = (KvCoder)input.getCoder();
            Coder keyCoder = kvCoder.getKeyCoder();
            Coder windowCoder = inputWindowingStrategy.getWindowFn().windowCoder();
            PCollection adjustedInput = ((PCollection)((PCollection)((PCollection)((PCollection)input.apply("Reify timestamps", (PTransform)ParDo.of(new ReifyWindowedValueFn()))).setCoder((Coder)KvCoder.of((Coder)keyCoder, (Coder)WindowedValue.getFullCoder((Coder)kvCoder, (Coder)windowCoder))).apply((PTransform)Window.triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)1))).discardingFiredPanes().withAllowedLateness(inputWindowingStrategy.getAllowedLateness()).withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))).apply("Group by key", (PTransform)GroupByKey.create())).apply("To KeyedWorkItem", (PTransform)ParDo.of(new ToKeyedWorkItem()))).setCoder((Coder)KeyedWorkItemCoder.of((Coder)keyCoder, (Coder)kvCoder, (Coder)windowCoder)).setWindowingStrategyInternal(inputWindowingStrategy);
            PCollectionTuple outputs = (PCollectionTuple)adjustedInput.apply("Stateful ParDo", new StatefulParDo<K, InputT, OutputT>(this.underlyingParDo, input));
            return outputs;
        }
    }
}

