package org.apache.beam.runners.apex.translation;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.apex.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/apex/translation/TranslationContext.class */
public class TranslationContext {
    private final ApexPipelineOptions pipelineOptions;
    private AppliedPTransform<?, ?, ?> currentTransform;
    private final Map<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>> streams = new HashMap();
    private final Map<String, Operator> operators = new HashMap();
    private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap();
    private Map<PInput, PInput> aliasCollections = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/apex/translation/TranslationContext$InputPortInfo.class */
    public static class InputPortInfo {
        Operator.InputPort port;
        AppliedPTransform transform;

        public InputPortInfo(Operator.InputPort inputPort, AppliedPTransform appliedPTransform) {
            this.port = inputPort;
            this.transform = appliedPTransform;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/apex/translation/TranslationContext$OutputPortInfo.class */
    public static class OutputPortInfo {
        Operator.OutputPort port;
        AppliedPTransform transform;

        public OutputPortInfo(Operator.OutputPort outputPort, AppliedPTransform appliedPTransform) {
            this.port = outputPort;
            this.transform = appliedPTransform;
        }
    }

    public void addView(PCollectionView<?> pCollectionView) {
        this.viewInputs.put(pCollectionView, getInput());
    }

    public <InputT extends PInput> InputT getViewInput(PCollectionView<?> pCollectionView) {
        InputT inputt = (InputT) this.viewInputs.get(pCollectionView);
        Preconditions.checkArgument(inputt != null, "unknown view " + pCollectionView.getName());
        return inputt;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TranslationContext(ApexPipelineOptions apexPipelineOptions) {
        this.pipelineOptions = apexPipelineOptions;
    }

    public void setCurrentTransform(AppliedPTransform<?, ?, ?> appliedPTransform) {
        this.currentTransform = appliedPTransform;
    }

    public ApexPipelineOptions getPipelineOptions() {
        return this.pipelineOptions;
    }

    public String getFullName() {
        return getCurrentTransform().getFullName();
    }

    public Map<TupleTag<?>, PValue> getInputs() {
        return getCurrentTransform().getInputs();
    }

    public <InputT extends PValue> InputT getInput() {
        return (InputT) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
    }

    public Map<TupleTag<?>, PValue> getOutputs() {
        return getCurrentTransform().getOutputs();
    }

    public <OutputT extends PValue> OutputT getOutput() {
        return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs().values());
    }

    private AppliedPTransform<?, ?, ?> getCurrentTransform() {
        Preconditions.checkArgument(this.currentTransform != null, "current transform not set");
        return this.currentTransform;
    }

    public void addOperator(Operator operator, Operator.OutputPort outputPort) {
        addOperator(operator, outputPort, (PCollection) getOutput());
    }

    public void addOperator(Operator operator, Map<PCollection<?>, Operator.OutputPort<?>> map) {
        boolean z = true;
        for (Map.Entry<PCollection<?>, Operator.OutputPort<?>> entry : map.entrySet()) {
            if (z) {
                addOperator(operator, entry.getValue(), entry.getKey());
                z = false;
            } else {
                this.streams.put(entry.getKey(), new ImmutablePair(new OutputPortInfo(entry.getValue(), getCurrentTransform()), new ArrayList()));
            }
        }
    }

    public void addOperator(Operator operator, Operator.OutputPort outputPort, PCollection pCollection) {
        String fullName = getCurrentTransform().getFullName();
        int i = 1;
        while (this.operators.containsKey(fullName)) {
            fullName = getCurrentTransform().getFullName() + i;
            i++;
        }
        this.operators.put(fullName, operator);
        this.streams.put(pCollection, new ImmutablePair(new OutputPortInfo(outputPort, getCurrentTransform()), new ArrayList()));
    }

    public void addStream(PInput pInput, Operator.InputPort inputPort) {
        while (this.aliasCollections.containsKey(pInput)) {
            pInput = this.aliasCollections.get(pInput);
        }
        Pair<OutputPortInfo, List<InputPortInfo>> pair = this.streams.get(pInput);
        Preconditions.checkArgument(pair != null, "no upstream operator defined for " + pInput);
        ((List) pair.getRight()).add(new InputPortInfo(inputPort, getCurrentTransform()));
    }

    public void addAlias(PValue pValue, PInput pInput) {
        this.aliasCollections.put(pValue, pInput);
    }

    public void populateDAG(DAG dag) {
        for (Map.Entry<String, Operator> entry : this.operators.entrySet()) {
            dag.addOperator(entry.getKey(), entry.getValue());
        }
        int i = 0;
        for (Map.Entry<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>> entry2 : this.streams.entrySet()) {
            List list = (List) entry2.getValue().getRight();
            Operator.InputPort[] inputPortArr = new Operator.InputPort[list.size()];
            for (int i2 = 0; i2 < inputPortArr.length; i2++) {
                inputPortArr[i2] = ((InputPortInfo) list.get(i2)).port;
            }
            if (inputPortArr.length > 0) {
                int i3 = i;
                i++;
                DAG.StreamMeta addStream = dag.addStream("stream" + i3, ((OutputPortInfo) entry2.getValue().getLeft()).port, inputPortArr);
                if (this.pipelineOptions.isParDoFusionEnabled()) {
                    optimizeStreams(addStream, entry2);
                }
                for (Operator.InputPort inputPort : inputPortArr) {
                    PCollection key = entry2.getKey();
                    WindowedValue.FullWindowedValueCoder coder = key.getCoder();
                    if (key.getWindowingStrategy() != null) {
                        coder = WindowedValue.FullWindowedValueCoder.of(key.getCoder(), key.getWindowingStrategy().getWindowFn().windowCoder());
                    }
                    dag.setInputPortAttribute(inputPort, Context.PortContext.STREAM_CODEC, new CoderAdapterStreamCodec(ApexStreamTuple.ApexStreamTupleCoder.of(coder)));
                }
            }
        }
    }

    private void optimizeStreams(DAG.StreamMeta streamMeta, Map.Entry<PCollection, Pair<OutputPortInfo, List<InputPortInfo>>> entry) {
        DAG.Locality locality = null;
        List list = (List) entry.getValue().getRight();
        PTransform transform = ((OutputPortInfo) entry.getValue().getLeft()).transform.getTransform();
        if ((transform instanceof ParDo.MultiOutput) || (transform instanceof Window.Assign)) {
            Iterator it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ParDo.MultiOutput transform2 = ((InputPortInfo) it.next()).transform.getTransform();
                if (!(transform2 instanceof ParDo.MultiOutput)) {
                    if (!(transform2 instanceof Window.Assign)) {
                        locality = null;
                        break;
                    }
                    locality = DAG.Locality.THREAD_LOCAL;
                } else {
                    if (transform2.getSideInputs().size() > 0) {
                        locality = DAG.Locality.CONTAINER_LOCAL;
                        break;
                    }
                    locality = DAG.Locality.THREAD_LOCAL;
                }
            }
        }
        streamMeta.setLocality(locality);
    }

    public ApexStateInternals.ApexStateBackend getStateBackend() {
        return new ApexStateInternals.ApexStateBackend();
    }
}
