package org.apache.beam.runners.apex.translation;

import com.datatorrent.api.Operator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.Maps;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslator.class */
public class ParDoTranslator<InputT, OutputT> implements TransformTranslator<ParDo.MultiOutput<InputT, OutputT>> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class);

    /* loaded from: input_file:org/apache/beam/runners/apex/translation/ParDoTranslator$SplittableProcessElementsTranslator.class */
    static class SplittableProcessElementsTranslator<InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> implements TransformTranslator<SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, TrackerT>> {
        @Override // org.apache.beam.runners.apex.translation.TransformTranslator
        public void translate(SplittableParDoViaKeyedWorkItems.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> processElements, TranslationContext translationContext) {
            Map<TupleTag<?>, PValue> outputs = translationContext.getOutputs();
            PCollection input = translationContext.getInput();
            List sideInputs = processElements.getSideInputs();
            ApexParDoOperator apexParDoOperator = new ApexParDoOperator(translationContext.getPipelineOptions(), processElements.newProcessFn(processElements.getFn()), processElements.getMainOutputTag(), processElements.getAdditionalOutputTags().getAll(), input.getWindowingStrategy(), sideInputs, WindowedValue.FullWindowedValueCoder.of(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), translationContext.getStateBackend());
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(outputs.size());
            for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
                Preconditions.checkArgument(entry.getValue() instanceof PCollection, "%s %s outputs non-PCollection %s of type %s", ParDo.MultiOutput.class.getSimpleName(), translationContext.getFullName(), entry.getValue(), entry.getValue().getClass().getSimpleName());
                PCollection<?> pCollection = (PCollection) entry.getValue();
                if (entry.getKey().equals(processElements.getMainOutputTag())) {
                    newHashMapWithExpectedSize.put(pCollection, apexParDoOperator.output);
                } else {
                    int i = 0;
                    Iterator it = processElements.getAdditionalOutputTags().getAll().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        if (((TupleTag) it.next()).equals(entry.getKey())) {
                            newHashMapWithExpectedSize.put(pCollection, apexParDoOperator.additionalOutputPorts[i]);
                            break;
                        }
                        i++;
                    }
                }
            }
            translationContext.addOperator((Operator) apexParDoOperator, (Map<PCollection<?>, Operator.OutputPort<?>>) newHashMapWithExpectedSize);
            translationContext.addStream(translationContext.getInput(), apexParDoOperator.input);
            if (sideInputs.isEmpty()) {
                return;
            }
            ParDoTranslator.addSideInputs(apexParDoOperator.sideInput1, sideInputs, translationContext);
        }
    }

    @Override // org.apache.beam.runners.apex.translation.TransformTranslator
    public void translate(ParDo.MultiOutput<InputT, OutputT> multiOutput, TranslationContext translationContext) {
        DoFn fn = multiOutput.getFn();
        DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
        if (signature.processElement().isSplittable()) {
            throw new UnsupportedOperationException(String.format("%s does not support splittable DoFn: %s", ApexRunner.class.getSimpleName(), fn));
        }
        if (signature.stateDeclarations().size() > 0) {
            throw new UnsupportedOperationException(String.format("Found %s annotations on %s, but %s cannot yet be used with state in the %s.", DoFn.StateId.class.getSimpleName(), fn.getClass().getName(), DoFn.class.getSimpleName(), ApexRunner.class.getSimpleName()));
        }
        if (signature.timerDeclarations().size() > 0) {
            throw new UnsupportedOperationException(String.format("Found %s annotations on %s, but %s cannot yet be used with timers in the %s.", DoFn.TimerId.class.getSimpleName(), fn.getClass().getName(), DoFn.class.getSimpleName(), ApexRunner.class.getSimpleName()));
        }
        Map<TupleTag<?>, PValue> outputs = translationContext.getOutputs();
        PCollection input = translationContext.getInput();
        List sideInputs = multiOutput.getSideInputs();
        ApexParDoOperator apexParDoOperator = new ApexParDoOperator(translationContext.getPipelineOptions(), fn, multiOutput.getMainOutputTag(), multiOutput.getAdditionalOutputTags().getAll(), input.getWindowingStrategy(), sideInputs, WindowedValue.FullWindowedValueCoder.of(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()), translationContext.getStateBackend());
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(outputs.size());
        for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
            Preconditions.checkArgument(entry.getValue() instanceof PCollection, "%s %s outputs non-PCollection %s of type %s", ParDo.MultiOutput.class.getSimpleName(), translationContext.getFullName(), entry.getValue(), entry.getValue().getClass().getSimpleName());
            PCollection<?> pCollection = (PCollection) entry.getValue();
            if (entry.getKey().equals(multiOutput.getMainOutputTag())) {
                newHashMapWithExpectedSize.put(pCollection, apexParDoOperator.output);
            } else {
                int i = 0;
                Iterator it = multiOutput.getAdditionalOutputTags().getAll().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (((TupleTag) it.next()).equals(entry.getKey())) {
                        newHashMapWithExpectedSize.put(pCollection, apexParDoOperator.additionalOutputPorts[i]);
                        break;
                    }
                    i++;
                }
            }
        }
        translationContext.addOperator((Operator) apexParDoOperator, (Map<PCollection<?>, Operator.OutputPort<?>>) newHashMapWithExpectedSize);
        translationContext.addStream(translationContext.getInput(), apexParDoOperator.input);
        if (sideInputs.isEmpty()) {
            return;
        }
        addSideInputs(apexParDoOperator.sideInput1, sideInputs, translationContext);
    }

    static void addSideInputs(Operator.InputPort<?> inputPort, List<PCollectionView<?>> list, TranslationContext translationContext) {
        Operator.InputPort[] inputPortArr = {inputPort};
        if (list.size() > inputPortArr.length) {
            translationContext.addStream(unionSideInputs(list, translationContext), inputPortArr[0]);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            translationContext.addStream(translationContext.getViewInput(list.get(i)), inputPortArr[i]);
        }
    }

    private static PCollection<?> unionSideInputs(List<PCollectionView<?>> list, TranslationContext translationContext) {
        Preconditions.checkArgument(list.size() > 1, "requires multiple side inputs");
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        PCollection viewInput = translationContext.getViewInput(list.get(0));
        for (int i = 0; i < list.size(); i++) {
            PCollectionView<?> pCollectionView = list.get(i);
            PCollection viewInput2 = translationContext.getViewInput(pCollectionView);
            if (!viewInput2.getWindowingStrategy().equals(viewInput.getWindowingStrategy())) {
                LOG.warn("Side inputs union with different windowing strategies {} {}", viewInput.getWindowingStrategy(), viewInput2.getWindowingStrategy());
            }
            if (!viewInput2.getCoder().equals(viewInput.getCoder())) {
                throw new UnsupportedOperationException("Multiple side inputs with different coders.");
            }
            arrayList.add(translationContext.getViewInput(pCollectionView));
            hashMap.put(viewInput2, Integer.valueOf(i));
        }
        PCollection<?> intermediateCollection = FlattenPCollectionTranslator.intermediateCollection(viewInput, viewInput.getCoder());
        FlattenPCollectionTranslator.flattenCollections(arrayList, hashMap, intermediateCollection, translationContext);
        return intermediateCollection;
    }
}
