/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.plan.physical;

import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.math.IntMath;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamMapping
implements Serializable {
    private static final long serialVersionUID = 8572852828117485193L;
    private static final Logger LOG = LoggerFactory.getLogger(StreamMapping.class);
    private final LogicalPlan.StreamMeta streamMeta;
    private final PhysicalPlan plan;
    PTOperator finalUnifier;
    final Set<PTOperator> cascadingUnifiers = Sets.newHashSet();
    final Set<PTOperator> slidingUnifiers = Sets.newHashSet();
    private final List<PTOperator.PTOutput> upstream = Lists.newArrayList();

    public StreamMapping(LogicalPlan.StreamMeta streamMeta, PhysicalPlan plan) {
        this.streamMeta = streamMeta;
        this.plan = plan;
    }

    void addTo(Collection<PTOperator> opers) {
        if (this.finalUnifier != null) {
            opers.add(this.finalUnifier);
        }
        opers.addAll(this.cascadingUnifiers);
        opers.addAll(this.slidingUnifiers);
    }

    public void setSources(Collection<PTOperator> partitions) {
        this.upstream.clear();
        for (PTOperator uoper : partitions) {
            for (PTOperator.PTOutput source : uoper.outputs) {
                if (source.logicalStream != this.streamMeta) continue;
                this.upstream.add(source);
            }
        }
        this.redoMapping();
    }

    public static PTOperator createSlidingUnifier(LogicalPlan.StreamMeta streamMeta, PhysicalPlan plan, int operatorApplicationWindowCount, int slidingWindowCount) {
        int gcd = IntMath.gcd((int)operatorApplicationWindowCount, (int)slidingWindowCount);
        LogicalPlan.OperatorMeta um = streamMeta.getSource().getSlidingUnifier(operatorApplicationWindowCount / gcd, gcd, slidingWindowCount / gcd);
        PTOperator pu = plan.newOperator(um, um.getName());
        Operator unifier = um.getOperator();
        Operators.PortMappingDescriptor mergeDesc = new Operators.PortMappingDescriptor();
        Operators.describe(unifier, mergeDesc);
        if (mergeDesc.outputPorts.size() != 1) {
            throw new AssertionError((Object)("Unifier must have a single output port, instead found : " + mergeDesc.outputPorts));
        }
        pu.unifiedOperatorMeta = streamMeta.getSource().getOperatorMeta();
        pu.outputs.add(new PTOperator.PTOutput(mergeDesc.outputPorts.keySet().iterator().next(), streamMeta, pu));
        plan.newOpers.put(pu, unifier);
        return pu;
    }

    public static PTOperator createUnifier(LogicalPlan.StreamMeta streamMeta, PhysicalPlan plan) {
        LogicalPlan.OperatorMeta um = streamMeta.getSource().getUnifierMeta();
        PTOperator pu = plan.newOperator(um, um.getName());
        Operator unifier = um.getOperator();
        Operators.PortMappingDescriptor mergeDesc = new Operators.PortMappingDescriptor();
        Operators.describe(unifier, mergeDesc);
        if (mergeDesc.outputPorts.size() != 1) {
            throw new AssertionError((Object)("Unifier must have a single output port, instead found : " + mergeDesc.outputPorts));
        }
        pu.unifiedOperatorMeta = streamMeta.getSource().getOperatorMeta();
        pu.outputs.add(new PTOperator.PTOutput(mergeDesc.outputPorts.keySet().iterator().next(), streamMeta, pu));
        plan.newOpers.put(pu, unifier);
        return pu;
    }

    private void addSlidingUnifiers() {
        LogicalPlan.OperatorMeta sourceOM = this.streamMeta.getSource().getOperatorMeta();
        if (sourceOM.getAttributes().contains(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT)) {
            if ((Integer)sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT) < (Integer)sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)) {
                this.plan.undeployOpers.addAll(this.slidingUnifiers);
                this.slidingUnifiers.clear();
                ArrayList newUpstream = Lists.newArrayList();
                for (PTOperator.PTOutput source : this.upstream) {
                    PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(this.streamMeta, this.plan, (Integer)sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), (Integer)sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT));
                    StreamMapping.addInput(slidingUnifier, source, null);
                    this.slidingUnifiers.add(slidingUnifier);
                    newUpstream.add(slidingUnifier.outputs.get(0));
                }
                this.upstream.clear();
                this.upstream.addAll(newUpstream);
            } else {
                LOG.warn("Sliding Window Count {} should be less than APPLICATION WINDOW COUNT {}", sourceOM.getValue(Context.OperatorContext.SLIDE_BY_WINDOW_COUNT), sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
            }
        }
    }

    private List<PTOperator.PTOutput> setupCascadingUnifiers(List<PTOperator.PTOutput> upstream, List<PTOperator> pooledUnifiers, int limit, int level) {
        ArrayList nextLevel = Lists.newArrayList();
        PTOperator pu = null;
        for (int i = 0; i < upstream.size(); ++i) {
            if (i % limit == 0) {
                if (upstream.size() - i < limit) {
                    while (i < upstream.size()) {
                        nextLevel.add(upstream.get(i));
                        ++i;
                    }
                    continue;
                }
                pu = !pooledUnifiers.isEmpty() ? pooledUnifiers.remove(0) : StreamMapping.createUnifier(this.streamMeta, this.plan);
                assert (pu.outputs.size() == 1) : "unifier has single output";
                nextLevel.addAll(pu.outputs);
                this.cascadingUnifiers.add(pu);
            }
            PTOperator.PTOutput source = upstream.get(i);
            StreamMapping.addInput(pu, source, null);
        }
        if (nextLevel.size() > limit) {
            return this.setupCascadingUnifiers(nextLevel, pooledUnifiers, limit, level);
        }
        return nextLevel;
    }

    private void redoMapping() {
        HashSet downstreamOpers = Sets.newHashSet();
        for (LogicalPlan.InputPortMeta ipm : this.streamMeta.getSinks()) {
            if (((Boolean)ipm.getValue(Context.PortContext.PARTITION_PARALLEL)).booleanValue() || !this.plan.hasMapping(ipm.getOperatorWrapper())) continue;
            List<PTOperator> partitions = this.plan.getOperators(ipm.getOperatorWrapper());
            for (PTOperator doper : partitions) {
                downstreamOpers.add(new Pair((Object)doper, (Object)ipm));
            }
        }
        if (!downstreamOpers.isEmpty()) {
            for (PTOperator unifier : this.cascadingUnifiers) {
                this.detachUnifier(unifier);
            }
            if (this.finalUnifier != null) {
                this.detachUnifier(this.finalUnifier);
            }
            ArrayList currentUnifiers = Lists.newArrayList(this.cascadingUnifiers);
            this.cascadingUnifiers.clear();
            this.plan.undeployOpers.addAll(currentUnifiers);
            this.addSlidingUnifiers();
            int limit = (Integer)this.streamMeta.getSource().getValue(Context.PortContext.UNIFIER_LIMIT);
            boolean separateUnifiers = false;
            Integer lastId = null;
            for (LogicalPlan.InputPortMeta ipm : this.streamMeta.getSinks()) {
                StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
                Integer id = this.plan.getStreamCodecIdentifier(streamCodecInfo);
                if (lastId == null) {
                    lastId = id;
                    continue;
                }
                if (id.equals(lastId)) continue;
                separateUnifiers = true;
                break;
            }
            List unifierSources = this.upstream;
            HashMap cascadeUnifierSourcesMap = Maps.newHashMap();
            if (limit > 1 && this.upstream.size() > limit) {
                if (!separateUnifiers) {
                    unifierSources = this.setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
                } else {
                    for (LogicalPlan.InputPortMeta ipm : this.streamMeta.getSinks()) {
                        StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
                        if (cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) continue;
                        unifierSources = this.setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
                        cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
                    }
                }
            }
            for (PTOperator oper : currentUnifiers) {
                this.plan.removePTOperator(oper);
            }
            Boolean sourceSingleFinal = (Boolean)this.streamMeta.getSource().getAttributes().get(Context.PortContext.UNIFIER_SINGLE_FINAL);
            for (Pair doperEntry : downstreamOpers) {
                PTOperator unifier;
                boolean lastSingle;
                Map<LogicalPlan.InputPortMeta, Partitioner.PartitionKeys> partKeys = ((PTOperator)doperEntry.first).partitionKeys;
                Partitioner.PartitionKeys pks = partKeys != null ? partKeys.get(doperEntry.second) : null;
                Boolean sinkSingleFinal = (Boolean)((LogicalPlan.InputPortMeta)doperEntry.second).getAttributes().get(Context.PortContext.UNIFIER_SINGLE_FINAL);
                boolean bl = sinkSingleFinal != null ? sinkSingleFinal : (lastSingle = sourceSingleFinal != null ? sourceSingleFinal.booleanValue() : ((Boolean)Context.PortContext.UNIFIER_SINGLE_FINAL.defaultValue).booleanValue());
                if (this.upstream.size() > 1) {
                    StreamCodec<?> streamCodecInfo;
                    List cascadeSources;
                    if (!separateUnifiers && (pks == null || pks.mask == 0 || lastSingle)) {
                        if (this.finalUnifier == null) {
                            this.finalUnifier = StreamMapping.createUnifier(this.streamMeta, this.plan);
                        }
                        this.setInput((PTOperator)doperEntry.first, (LogicalPlan.InputPortMeta)doperEntry.second, this.finalUnifier, pks == null || pks.mask == 0 ? null : pks);
                        if (!this.finalUnifier.inputs.isEmpty()) continue;
                        for (PTOperator.PTOutput out : unifierSources) {
                            StreamMapping.addInput(this.finalUnifier, out, null);
                        }
                        continue;
                    }
                    LOG.debug("MxN unifier for {} {} {}", new Object[]{doperEntry.first, ((LogicalPlan.InputPortMeta)doperEntry.second).getPortName(), pks});
                    unifier = ((PTOperator)doperEntry.first).upstreamMerge.get(doperEntry.second);
                    if (unifier == null) {
                        unifier = StreamMapping.createUnifier(this.streamMeta, this.plan);
                        ((PTOperator)doperEntry.first).upstreamMerge.put((LogicalPlan.InputPortMeta)doperEntry.second, unifier);
                        this.setInput((PTOperator)doperEntry.first, (LogicalPlan.InputPortMeta)doperEntry.second, unifier, null);
                    }
                    for (PTOperator.PTInput in : unifier.inputs) {
                        in.source.sinks.remove(in);
                    }
                    unifier.inputs.clear();
                    List doperUnifierSources = unifierSources;
                    if (separateUnifiers && (cascadeSources = (List)cascadeUnifierSourcesMap.get(streamCodecInfo = StreamingContainerAgent.getStreamCodec((LogicalPlan.InputPortMeta)doperEntry.second))) != null) {
                        doperUnifierSources = cascadeSources;
                    }
                    for (PTOperator.PTOutput out : doperUnifierSources) {
                        StreamMapping.addInput(unifier, out, pks);
                    }
                    continue;
                }
                unifier = ((PTOperator)doperEntry.first).upstreamMerge.remove(doperEntry.second);
                if (unifier != null) {
                    this.plan.removePTOperator(unifier);
                }
                this.setInput((PTOperator)doperEntry.first, (LogicalPlan.InputPortMeta)doperEntry.second, this.upstream.get((int)0).source, pks);
            }
            if (this.finalUnifier != null && this.finalUnifier.inputs.isEmpty()) {
                this.plan.removePTOperator(this.finalUnifier);
                this.finalUnifier = null;
            }
        }
    }

    private void setInput(PTOperator oper, LogicalPlan.InputPortMeta ipm, PTOperator sourceOper, Partitioner.PartitionKeys pks) {
        for (PTOperator.PTInput in : oper.inputs) {
            if (in.source.source != sourceOper || in.logicalStream != this.streamMeta || !ipm.getPortName().equals(in.portName)) continue;
            return;
        }
        for (PTOperator.PTOutput upstreamOut : sourceOper.outputs) {
            if (upstreamOut.logicalStream != this.streamMeta) continue;
            PTOperator.PTInput input = new PTOperator.PTInput(ipm.getPortName(), this.streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
            oper.inputs.add(input);
        }
    }

    public static void addInput(PTOperator target, PTOperator.PTOutput upstreamOut, Partitioner.PartitionKeys pks) {
        LogicalPlan.StreamMeta lStreamMeta = upstreamOut.logicalStream;
        PTOperator.PTInput input = new PTOperator.PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut, false);
        target.inputs.add(input);
    }

    private void detachUnifier(PTOperator unifier) {
        for (PTOperator.PTOutput out : unifier.outputs) {
            for (PTOperator.PTInput input : out.sinks) {
                input.target.inputs.remove(input);
            }
            out.sinks.clear();
        }
        for (PTOperator.PTInput in : unifier.inputs) {
            in.source.sinks.remove(in);
        }
        unifier.inputs.clear();
    }
}

