package org.apache.beam.runners.direct;

import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.runners.core.SplittableParDo;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.java.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory.class */
public class ParDoMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory<PCollection<? extends InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$GbkThenStatefulParDo.class */
    public static class GbkThenStatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
        private final ParDo.MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;

        public GbkThenStatefulParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> multiOutput) {
            this.underlyingParDo = multiOutput;
        }

        public PCollectionTuple expand(PCollection<KV<K, InputT>> pCollection) {
            WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
            Preconditions.checkState(pCollection.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", ParDo.class.getSimpleName(), KvCoder.class.getSimpleName(), pCollection.getCoder());
            KvCoder coder = pCollection.getCoder();
            Coder keyCoder = coder.getKeyCoder();
            Coder windowCoder = windowingStrategy.getWindowFn().windowCoder();
            return pCollection.apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn())).setCoder(KvCoder.of(keyCoder, WindowedValue.getFullCoder(coder, windowCoder))).apply(Window.configure().triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes().withAllowedLateness(windowingStrategy.getAllowedLateness()).withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())).apply("Group by key", GroupByKey.create()).apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem())).setCoder(KeyedWorkItemCoder.of(keyCoder, coder, windowCoder)).setWindowingStrategyInternal(windowingStrategy).apply("Stateful ParDo", new StatefulParDo(this.underlyingParDo, pCollection));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$ReifyWindowedValueFn.class */
    public static class ReifyWindowedValueFn<K, V> extends DoFn<KV<K, V>, KV<K, WindowedValue<KV<K, V>>>> {
        ReifyWindowedValueFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, KV<K, WindowedValue<KV<K, V>>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KV.of(((KV) processContext.element()).getKey(), WindowedValue.of(processContext.element(), processContext.timestamp(), boundedWindow, processContext.pane())));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$StatefulParDo.class */
    public static class StatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
        private final transient ParDo.MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
        private final transient PCollection<KV<K, InputT>> originalInput;

        public StatefulParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> multiOutput, PCollection<KV<K, InputT>> pCollection) {
            this.underlyingParDo = multiOutput;
            this.originalInput = pCollection;
        }

        public ParDo.MultiOutput<KV<K, InputT>, OutputT> getUnderlyingParDo() {
            return this.underlyingParDo;
        }

        public <T> Coder<T> getDefaultOutputCoder(PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> pCollection, TypedPValue<T> typedPValue) throws CannotProvideCoderException {
            return this.underlyingParDo.getDefaultOutputCoder(this.originalInput, typedPValue);
        }

        public PCollectionTuple expand(PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> pCollection) {
            return PCollectionTuple.ofPrimitiveOutputsInternal(pCollection.getPipeline(), TupleTagList.of(this.underlyingParDo.getMainOutputTag()).and(this.underlyingParDo.getSideOutputTags().getAll()), pCollection.getWindowingStrategy(), pCollection.isBounded());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ParDoMultiOverrideFactory$ToKeyedWorkItem.class */
    public static class ToKeyedWorkItem<K, V> extends DoFn<KV<K, Iterable<WindowedValue<KV<K, V>>>>, KeyedWorkItem<K, KV<K, V>>> {
        ToKeyedWorkItem() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<WindowedValue<KV<K, V>>>>, KeyedWorkItem<K, KV<K, V>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KeyedWorkItems.elementsWorkItem(((KV) processContext.element()).getKey(), (Iterable) ((KV) processContext.element()).getValue()));
        }
    }

    public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(ParDo.MultiOutput<InputT, OutputT> multiOutput) {
        DoFnSignature signature = DoFnSignatures.getSignature(multiOutput.getFn().getClass());
        return signature.processElement().isSplittable() ? new SplittableParDo(multiOutput) : (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) ? new GbkThenStatefulParDo(multiOutput) : multiOutput;
    }

    public PCollection<? extends InputT> getInput(List<TaggedPValue> list, Pipeline pipeline) {
        return ((TaggedPValue) Iterables.getOnlyElement(list)).getValue();
    }

    public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(List<TaggedPValue> list, PCollectionTuple pCollectionTuple) {
        return ReplacementOutputs.tagged(list, pCollectionTuple);
    }

    public /* bridge */ /* synthetic */ Map mapOutputs(List list, POutput pOutput) {
        return mapOutputs((List<TaggedPValue>) list, (PCollectionTuple) pOutput);
    }

    /* renamed from: getInput, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ PInput m17getInput(List list, Pipeline pipeline) {
        return getInput((List<TaggedPValue>) list, pipeline);
    }
}
