/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.plan.logical;

import com.datatorrent.api.Attribute;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.common.metric.MetricsAggregator;
import com.datatorrent.common.metric.SingleMetricAggregator;
import com.datatorrent.common.metric.sum.DoubleSumAggregator;
import com.datatorrent.common.metric.sum.LongSumAggregator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.Slider;
import com.datatorrent.stram.plan.logical.LogicalOperatorStatus;
import com.datatorrent.stram.plan.logical.MetricAggregatorMeta;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Validation;
import javax.validation.ValidationException;
import javax.validation.Validator;
import javax.validation.ValidatorFactory;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogicalPlan
implements Serializable,
DAG {
    public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute((Object)false);
    private static final long serialVersionUID = -2099729915606048704L;
    private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
    public static final String SER_FILE_NAME = "dt-conf.ser";
    public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml";
    private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger();
    public static final String MODULE_NAMESPACE_SEPARATOR = "$";
    public static String SUBDIR_CHECKPOINTS = "checkpoints";
    public static String SUBDIR_STATS = "stats";
    public static String SUBDIR_EVENTS = "events";
    public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute((Object)false);
    public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute((Object)604800000L);
    public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute((Object)604800000L);
    public static Attribute<String> KEY_TAB_FILE = new Attribute((Object)null, (StringCodec)new StringCodec.String2String());
    public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute((Object)0.7);
    public static Attribute<String> LIBRARY_JARS = new Attribute((StringCodec)new StringCodec.String2String());
    public static Attribute<String> ARCHIVES = new Attribute((StringCodec)new StringCodec.String2String());
    public static Attribute<String> FILES = new Attribute((StringCodec)new StringCodec.String2String());
    public static Attribute<Integer> CONTAINERS_MAX_COUNT = new Attribute((Object)Integer.MAX_VALUE);
    public static Attribute<Integer> APPLICATION_ATTEMPT_ID = new Attribute((Object)1);
    private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>();
    private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>();
    public final Map<String, ModuleMeta> modules = new LinkedHashMap<String, ModuleMeta>();
    private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
    private final Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
    private transient Map<String, ArrayListMultimap<Operator.OutputPort<?>, Operator.InputPort<?>>> streamLinks = new HashMap();

    public Attribute.AttributeMap getAttributes() {
        return this.attributes;
    }

    public <T> T getValue(Attribute<T> key) {
        Object val = this.attributes.get(key);
        if (val == null) {
            return (T)key.defaultValue;
        }
        return (T)val;
    }

    public void setCounters(Object counters) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public void sendMetrics(Collection<String> metricNames) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public <T extends Operator> T addOperator(String name, Class<T> clazz) {
        Operator instance;
        try {
            instance = (Operator)clazz.newInstance();
        }
        catch (Exception ex) {
            throw new IllegalArgumentException(ex);
        }
        this.addOperator(name, instance);
        return (T)instance;
    }

    public <T extends Operator> T addOperator(String name, T operator) {
        if (this.operators.containsKey(name)) {
            if (this.operators.get(name).operator == operator) {
                return operator;
            }
            throw new IllegalArgumentException("duplicate operator id: " + this.operators.get(name));
        }
        if (this.modules.containsKey(name)) {
            throw new IllegalArgumentException("duplicate operator id: " + this.operators.get(name));
        }
        OperatorMeta decl = new OperatorMeta(name, (Operator)operator);
        this.rootOperators.add(decl);
        this.operators.put(name, decl);
        return operator;
    }

    public <T extends Module> T addModule(String name, T module) {
        if (this.modules.containsKey(name)) {
            if (this.modules.get(name).module == module) {
                return module;
            }
            throw new IllegalArgumentException("duplicate module is: " + this.modules.get(name));
        }
        if (this.operators.containsKey(name)) {
            throw new IllegalArgumentException("duplicate module is: " + this.modules.get(name));
        }
        ModuleMeta meta = new ModuleMeta(name, module);
        this.modules.put(name, meta);
        return module;
    }

    public <T extends Module> T addModule(String name, Class<T> clazz) {
        Module instance;
        try {
            instance = (Module)clazz.newInstance();
        }
        catch (Exception ex) {
            throw new IllegalArgumentException(ex);
        }
        this.addModule(name, instance);
        return (T)instance;
    }

    public void removeOperator(Operator operator) {
        OperatorMeta om = this.getMeta(operator);
        if (om == null) {
            return;
        }
        Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams();
        for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) {
            StreamMeta stream = e.getValue();
            if (e.getKey().getOperatorWrapper() == om) {
                stream.sinks.remove(e.getKey());
            }
            stream.resetStreamPersistanceOnSinkRemoval(e.getKey());
        }
        this.operators.remove(om.getName());
        this.rootOperators.remove(om);
    }

    public StreamMeta addStream(String id) {
        StreamMeta s = new StreamMeta(id);
        StreamMeta o = this.streams.put(id, s);
        if (o == null) {
            return s;
        }
        throw new IllegalArgumentException("duplicate stream id: " + o);
    }

    public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> ... sinks) {
        StreamMeta s = this.addStream(id);
        id = s.id;
        ArrayListMultimap streamMap = ArrayListMultimap.create();
        if (!(source instanceof Module.ProxyOutputPort)) {
            s.setSource((Operator.OutputPort)source);
        }
        for (Operator.InputPort<? super T> sink : sinks) {
            if (source instanceof Module.ProxyOutputPort || sink instanceof Module.ProxyInputPort) {
                streamMap.put(source, sink);
                this.streamLinks.put(id, streamMap);
                continue;
            }
            if (s.getSource() == null) {
                s.setSource((Operator.OutputPort)source);
            }
            s.addSink((Operator.InputPort)sink);
        }
        return s;
    }

    public void applyStreamLinks() {
        for (String id : this.streamLinks.keySet()) {
            StreamMeta s = this.getStream(id);
            for (Map.Entry pair : this.streamLinks.get(id).entries()) {
                if (s.getSource() == null) {
                    Operator.OutputPort outputPort = (Operator.OutputPort)pair.getKey();
                    while (outputPort instanceof Module.ProxyOutputPort) {
                        outputPort = ((Module.ProxyOutputPort)outputPort).get();
                    }
                    s.setSource(outputPort);
                }
                Operator.InputPort inputPort = (Operator.InputPort)pair.getValue();
                while (inputPort instanceof Module.ProxyInputPort) {
                    inputPort = ((Module.ProxyInputPort)inputPort).get();
                }
                s.addSink(inputPort);
            }
        }
    }

    private void addDAGToCurrentDAG(ModuleMeta moduleMeta) {
        String name;
        LogicalPlan subDag = moduleMeta.getDag();
        String subDAGName = moduleMeta.getName();
        for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
            name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
            Operator op = this.addOperator(name, operatorMeta.getOperator());
            OperatorMeta operatorMetaNew = this.getMeta(op);
            operatorMetaNew.copyAttributesFrom(operatorMeta);
            operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName : subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
        }
        for (StreamMeta streamMeta : subDag.getAllStreams()) {
            OutputPortMeta sourceMeta = streamMeta.getSource();
            LinkedList ports = new LinkedList();
            for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
                ports.add(inputPortMeta.getPortObject());
            }
            Operator.InputPort[] inputPorts = ports.toArray(new Operator.InputPort[0]);
            name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
            DAG.StreamMeta streamMetaNew = this.addStream(name, (Operator.OutputPort)sourceMeta.getPortObject(), inputPorts);
            streamMetaNew.setLocality(streamMeta.getLocality());
        }
    }

    public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1) {
        Operator.InputPort[] ports = new Operator.InputPort[]{sink1};
        return this.addStream(id, (Operator.OutputPort)source, ports);
    }

    public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2) {
        Operator.InputPort[] ports = new Operator.InputPort[]{sink1, sink2};
        return this.addStream(id, (Operator.OutputPort)source, ports);
    }

    public StreamMeta getStream(String id) {
        return this.streams.get(id);
    }

    public Attribute.AttributeMap getContextAttributes(Operator operator) {
        return this.getMeta(operator).attributes;
    }

    public <T> void setAttribute(Attribute<T> key, T value) {
        this.getAttributes().put(key, value);
    }

    public <T> void setAttribute(Operator operator, Attribute<T> key, T value) {
        this.getMeta(operator).attributes.put(key, value);
    }

    private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port) {
        for (OperatorMeta o : this.getAllOperators()) {
            OutputPortMeta opm = (OutputPortMeta)o.getPortMapping().outPortMap.get(port);
            if (opm == null) continue;
            return opm;
        }
        throw new IllegalArgumentException("Port is not associated to any operator in the DAG: " + port);
    }

    private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port) {
        for (OperatorMeta o : this.getAllOperators()) {
            InputPortMeta opm = (InputPortMeta)o.getPortMapping().inPortMap.get(port);
            if (opm == null) continue;
            return opm;
        }
        throw new IllegalArgumentException("Port is not associated to any operator in the DAG: " + port);
    }

    public <T> void setOutputPortAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value) {
        this.assertGetPortMeta(port).attributes.put(key, value);
    }

    public <T> void setUnifierAttribute(Operator.OutputPort<?> port, Attribute<T> key, T value) {
        this.assertGetPortMeta(port).getUnifierMeta().attributes.put(key, value);
    }

    public <T> void setInputPortAttribute(Operator.InputPort<?> port, Attribute<T> key, T value) {
        this.assertGetPortMeta(port).attributes.put(key, value);
    }

    public List<OperatorMeta> getRootOperators() {
        return Collections.unmodifiableList(this.rootOperators);
    }

    public Collection<OperatorMeta> getAllOperators() {
        return Collections.unmodifiableCollection(this.operators.values());
    }

    public Collection<ModuleMeta> getAllModules() {
        return Collections.unmodifiableCollection(this.modules.values());
    }

    public Collection<StreamMeta> getAllStreams() {
        return Collections.unmodifiableCollection(this.streams.values());
    }

    public OperatorMeta getOperatorMeta(String operatorName) {
        return this.operators.get(operatorName);
    }

    public ModuleMeta getModuleMeta(String moduleName) {
        return this.modules.get(moduleName);
    }

    public OperatorMeta getMeta(Operator operator) {
        for (OperatorMeta o : this.getAllOperators()) {
            if (o.operator != operator) continue;
            return o;
        }
        throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
    }

    public ModuleMeta getMeta(Module module) {
        for (ModuleMeta m : this.getAllModules()) {
            if (m.module != module) continue;
            return m;
        }
        throw new IllegalArgumentException("Module not associated with the DAG: " + module);
    }

    public int getMaxContainerCount() {
        return this.getValue(CONTAINERS_MAX_COUNT);
    }

    public boolean isDebug() {
        return (Boolean)this.getValue(DEBUG);
    }

    public int getMasterMemoryMB() {
        return (Integer)this.getValue(MASTER_MEMORY_MB);
    }

    public String getMasterJVMOptions() {
        return (String)this.getValue(CONTAINER_JVM_OPTIONS);
    }

    public String assertAppPath() {
        String path = (String)this.getAttributes().get(APPLICATION_PATH);
        if (path == null) {
            throw new AssertionError((Object)("Missing " + APPLICATION_PATH));
        }
        return path;
    }

    public Set<String> getClassNames() {
        HashSet<String> classNames = new HashSet<String>();
        for (OperatorMeta operatorMeta : this.operators.values()) {
            String className = operatorMeta.getOperator().getClass().getName();
            if (className == null) continue;
            classNames.add(className);
        }
        for (StreamMeta streamMeta : this.streams.values()) {
            for (InputPortMeta sink : streamMeta.getSinks()) {
                StreamCodec streamCodec = (StreamCodec)sink.getValue(Context.PortContext.STREAM_CODEC);
                if (streamCodec != null) {
                    classNames.add(streamCodec.getClass().getName());
                    continue;
                }
                StreamCodec codec = sink.getPortObject().getStreamCodec();
                if (codec == null) continue;
                classNames.add(codec.getClass().getName());
            }
        }
        return classNames;
    }

    public void resetNIndex() {
        for (OperatorMeta om : this.getAllOperators()) {
            om.lowlink = null;
            om.nindex = null;
        }
    }

    public void validate() throws ConstraintViolationException {
        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
        Validator validator = factory.getValidator();
        this.checkAttributeValueSerializable(this.getAttributes(), DAG.class.getName());
        for (OperatorMeta n : this.operators.values()) {
            n.oioRoot = null;
        }
        for (OperatorMeta n : this.operators.values()) {
            n.nindex = null;
            n.lowlink = null;
            Set constraintViolations = validator.validate((Object)n.getOperator(), new Class[0]);
            if (!constraintViolations.isEmpty()) {
                HashSet<ConstraintViolation> copySet = new HashSet<ConstraintViolation>(constraintViolations.size());
                for (ConstraintViolation cv : constraintViolations) {
                    copySet.add(cv);
                }
                throw new ConstraintViolationException("Operator " + n.getName() + " violates constraints " + copySet, copySet);
            }
            OperatorMeta.PortMapping portMapping = n.getPortMapping();
            this.checkAttributeValueSerializable(n.getAttributes(), n.getName());
            if (n.operatorAnnotation != null) {
                if (!n.operatorAnnotation.partitionable()) {
                    for (InputPortMeta pm : portMapping.inPortMap.values()) {
                        Boolean paralellPartition = (Boolean)pm.getValue(Context.PortContext.PARTITION_PARALLEL);
                        if (!paralellPartition.booleanValue()) continue;
                        throw new ValidationException("Operator " + n.getName() + " is not partitionable but PARTITION_PARALLEL attribute is set");
                    }
                    if (n.getValue(Context.OperatorContext.PARTITIONER) != null || n.attributes != null && !n.attributes.contains(Context.OperatorContext.PARTITIONER) && Partitioner.class.isAssignableFrom(n.getOperator().getClass())) {
                        throw new ValidationException("Operator " + n.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!");
                    }
                }
                if (!n.operatorAnnotation.checkpointableWithinAppWindow() && (Integer)n.getValue(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT) % (Integer)n.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) != 0) {
                    throw new ValidationException("Operator " + n.getName() + " cannot be check-pointed between an application window " + "but the checkpoint-window-count " + n.getValue(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT) + " is not a multiple application-window-count " + n.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT));
                }
            }
            for (InputPortMeta pm : portMapping.inPortMap.values()) {
                this.checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
                StreamMeta sm = (StreamMeta)n.inputStreams.get(pm);
                if (sm == null) {
                    if (pm.portAnnotation != null && pm.portAnnotation.optional() || pm.classDeclaringHiddenPort != null) continue;
                    throw new ValidationException("Input port connection required: " + n.name + "." + pm.getPortName());
                }
                if (pm.classDeclaringHiddenPort != null) {
                    throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(), pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
                }
                DAG.Locality locality = sm.getLocality();
                if (locality == DAG.Locality.THREAD_LOCAL && n.inputStreams.size() > 1) {
                    this.validateThreadLocal(n);
                }
                if (pm.portAnnotation == null || !pm.portAnnotation.schemaRequired() || pm.attributes.get(Context.PortContext.TUPLE_CLASS) != null) continue;
                throw new ValidationException("Attribute " + Context.PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
            }
            boolean allPortsOptional = true;
            for (OutputPortMeta pm : portMapping.outPortMap.values()) {
                this.checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
                if (!n.outputStreams.containsKey(pm)) {
                    if (pm.portAnnotation != null && !pm.portAnnotation.optional() && pm.classDeclaringHiddenPort == null) {
                        throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
                    }
                } else {
                    if (pm.classDeclaringHiddenPort != null) {
                        throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(), pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
                    }
                    if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired() && pm.attributes.get(Context.PortContext.TUPLE_CLASS) == null) {
                        throw new ValidationException("Attribute " + Context.PortContext.TUPLE_CLASS.getName() + " missing on port : " + n.name + "." + pm.getPortName());
                    }
                }
                allPortsOptional &= pm.portAnnotation != null && pm.portAnnotation.optional();
            }
            if (allPortsOptional || !n.outputStreams.isEmpty()) continue;
            throw new ValidationException("At least one output port must be connected: " + n.name);
        }
        ValidationContext validatonContext = new ValidationContext();
        for (OperatorMeta n : this.operators.values()) {
            if (n.nindex != null) continue;
            this.findStronglyConnected(n, validatonContext);
        }
        if (!validatonContext.invalidCycles.isEmpty()) {
            throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
        }
        ArrayList<List<String>> invalidDelays = new ArrayList<List<String>>();
        for (OperatorMeta n : this.rootOperators) {
            this.findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
        }
        if (!invalidDelays.isEmpty()) {
            throw new ValidationException("Invalid delays in graph: " + invalidDelays);
        }
        for (StreamMeta s : this.streams.values()) {
            if (s.source == null) {
                throw new ValidationException("Stream source not connected: " + s.getName());
            }
            if (!s.sinks.isEmpty()) continue;
            throw new ValidationException("Stream sink not connected: " + s.getName());
        }
        for (OperatorMeta om : this.rootOperators) {
            if (om.getOperator() instanceof InputOperator) continue;
            throw new ValidationException(String.format("Root operator: %s is not a Input operator", om.getName()));
        }
        HashSet visited = Sets.newHashSet();
        for (OperatorMeta om : this.rootOperators) {
            this.validateProcessingMode(om, visited);
        }
    }

    private void checkAttributeValueSerializable(Attribute.AttributeMap attributes, String context) {
        StringBuilder sb = new StringBuilder();
        String delim = "";
        for (Map.Entry entry : attributes.entrySet()) {
            if (entry.getValue() == null || entry.getValue() instanceof Serializable) continue;
            sb.append(delim).append(((Attribute)entry.getKey()).getSimpleName());
            delim = ", ";
        }
        if (sb.length() > 0) {
            throw new ValidationException("Attribute value(s) for " + sb.toString() + " in " + context + " are not serializable");
        }
    }

    private void validateThreadLocal(OperatorMeta om) {
        Integer oioRoot = null;
        if (om.oioRoot != null) {
            return;
        }
        if (om.getOperator() instanceof Operator.DelayOperator) {
            String msg = String.format("Locality %s invalid for delay operator %s", DAG.Locality.THREAD_LOCAL, om);
            throw new ValidationException(msg);
        }
        for (StreamMeta sm : om.inputStreams.values()) {
            String msg;
            if (sm.locality != DAG.Locality.THREAD_LOCAL) {
                msg = String.format("Locality %s invalid for operator %s with multiple input streams as at least one of the input streams is not %s", DAG.Locality.THREAD_LOCAL, om, DAG.Locality.THREAD_LOCAL);
                throw new ValidationException(msg);
            }
            if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
                msg = String.format("Locality %s invalid for delay operator %s", DAG.Locality.THREAD_LOCAL, sm.source.operatorMeta);
                throw new ValidationException(msg);
            }
            Integer oioStreamRoot = this.getOioRoot(sm.source.operatorMeta);
            if (om.oioRoot != null && oioStreamRoot != om.oioRoot) {
                String msg2 = String.format("Locality %s invalid for operator %s with multiple input streams as at least one of the input streams is not originating from common OIO owner node", DAG.Locality.THREAD_LOCAL, om, DAG.Locality.THREAD_LOCAL);
                throw new ValidationException(msg2);
            }
            if (oioRoot == null) {
                oioRoot = oioStreamRoot;
                continue;
            }
            if (oioRoot.intValue() == oioStreamRoot.intValue()) continue;
            String msg3 = String.format("Locality %s invalid for operator %s with multiple input streams as they origin from different owner OIO operators", sm.locality, om);
            throw new ValidationException(msg3);
        }
        om.oioRoot = oioRoot;
    }

    private Integer getOioRoot(OperatorMeta om) {
        if (om.oioRoot != null) {
            return om.oioRoot;
        }
        switch (om.inputStreams.size()) {
            case 1: {
                StreamMeta sm = (StreamMeta)om.inputStreams.values().iterator().next();
                if (sm.locality == DAG.Locality.THREAD_LOCAL) {
                    om.oioRoot = this.getOioRoot(sm.source.operatorMeta);
                    break;
                }
                om.oioRoot = om.hashCode();
                break;
            }
            case 0: {
                om.oioRoot = om.hashCode();
                break;
            }
            default: {
                this.validateThreadLocal(om);
            }
        }
        return om.oioRoot;
    }

    public void findStronglyConnected(OperatorMeta om, ValidationContext ctx) {
        om.nindex = ctx.nodeIndex;
        om.lowlink = ctx.nodeIndex;
        ++ctx.nodeIndex;
        ctx.stack.push(om);
        ctx.path.push(om);
        for (StreamMeta downStream : om.outputStreams.values()) {
            for (InputPortMeta sink : downStream.sinks) {
                OperatorMeta successor = sink.getOperatorWrapper();
                if (successor == null) continue;
                if (om == successor) {
                    ctx.invalidCycles.add(Collections.singleton(om));
                }
                if (successor.nindex == null) {
                    this.findStronglyConnected(successor, ctx);
                    om.lowlink = Math.min(om.lowlink, successor.lowlink);
                    continue;
                }
                if (!ctx.stack.contains(successor)) continue;
                om.lowlink = Math.min(om.lowlink, successor.nindex);
                boolean isDelayLoop = false;
                for (int i = ctx.path.size(); i > 0; --i) {
                    OperatorMeta om2 = (OperatorMeta)ctx.path.get(i - 1);
                    if (om2.getOperator() instanceof Operator.DelayOperator) {
                        isDelayLoop = true;
                    }
                    if (om2 == successor) break;
                }
                if (isDelayLoop) continue;
                ctx.invalidLoopAt = successor;
            }
        }
        if (om.lowlink.equals(om.nindex)) {
            LinkedHashSet<OperatorMeta> connectedSet = new LinkedHashSet<OperatorMeta>(ctx.stack.size());
            while (!ctx.stack.isEmpty()) {
                OperatorMeta n2 = ctx.stack.pop();
                connectedSet.add(n2);
                if (n2 != om) continue;
                break;
            }
            if (connectedSet.size() > 1) {
                ctx.stronglyConnected.add(connectedSet);
                if (connectedSet.contains(ctx.invalidLoopAt)) {
                    ctx.invalidCycles.add(connectedSet);
                }
            }
        }
        ctx.path.pop();
    }

    public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack) {
        stack.push(om);
        boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
        if (isDelayOperator && (Integer)om.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
            LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
            invalidDelays.add(Collections.singletonList(om.getName()));
        }
        for (StreamMeta downStream : om.outputStreams.values()) {
            for (InputPortMeta sink : downStream.sinks) {
                OperatorMeta successor = sink.getOperatorWrapper();
                if (isDelayOperator) {
                    sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, (Object)true);
                    if (successor == null || stack.contains(successor)) continue;
                    LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}", new Object[]{om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName()});
                    invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
                    continue;
                }
                this.findInvalidDelays(successor, invalidDelays, stack);
            }
        }
        stack.pop();
    }

    private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited) {
        for (StreamMeta is : om.getInputStreams().values()) {
            if (visited.contains(is.getSource().getOperatorMeta())) continue;
            return;
        }
        visited.add(om);
        Operator.ProcessingMode pm = (Operator.ProcessingMode)om.getValue(Context.OperatorContext.PROCESSING_MODE);
        for (StreamMeta os : om.outputStreams.values()) {
            for (InputPortMeta sink : os.sinks) {
                Operator.ProcessingMode sinkPm;
                OperatorMeta sinkOm = sink.getOperatorWrapper();
                Operator.ProcessingMode processingMode = sinkPm = sinkOm.attributes == null ? null : (Operator.ProcessingMode)sinkOm.attributes.get(Context.OperatorContext.PROCESSING_MODE);
                if (sinkPm == null) {
                    if (Operator.ProcessingMode.AT_MOST_ONCE.equals((Object)pm)) {
                        LOG.warn("Setting processing mode for operator {} to {}", (Object)sinkOm.getName(), (Object)pm);
                        sinkOm.getAttributes().put(Context.OperatorContext.PROCESSING_MODE, (Object)pm);
                    } else if (Operator.ProcessingMode.EXACTLY_ONCE.equals((Object)pm)) {
                        String msg = String.format("Processing mode for %s should be AT_MOST_ONCE for source %s/%s", sinkOm.getName(), om.getName(), pm);
                        throw new ValidationException(msg);
                    }
                } else if (Operator.ProcessingMode.AT_MOST_ONCE.equals((Object)pm) && sinkPm != pm || Operator.ProcessingMode.EXACTLY_ONCE.equals((Object)pm) && !Operator.ProcessingMode.AT_MOST_ONCE.equals((Object)sinkPm)) {
                    String msg = String.format("Processing mode %s/%s not valid for source %s/%s", sinkOm.getName(), sinkPm, om.getName(), pm);
                    throw new ValidationException(msg);
                }
                this.validateProcessingMode(sinkOm, visited);
            }
        }
    }

    public static void write(DAG dag, OutputStream os) throws IOException {
        ObjectOutputStream oos = new ObjectOutputStream(os);
        oos.writeObject(dag);
    }

    public static LogicalPlan read(InputStream is) throws IOException, ClassNotFoundException {
        return (LogicalPlan)new ObjectInputStream(is).readObject();
    }

    public static Type getPortType(Field f) {
        if (f.getGenericType() instanceof ParameterizedType) {
            ParameterizedType t = (ParameterizedType)f.getGenericType();
            Type typeArgument = t.getActualTypeArguments()[0];
            if (typeArgument instanceof Class) {
                return typeArgument;
            }
            if (typeArgument instanceof TypeVariable) {
                TypeVariable tv = (TypeVariable)typeArgument;
                LOG.debug("bounds: " + Arrays.asList(tv.getBounds()));
                return tv.getBounds()[0];
            }
            if (typeArgument instanceof GenericArrayType) {
                LOG.debug("type {} is of GenericArrayType", (Object)typeArgument);
                return typeArgument;
            }
            if (typeArgument instanceof WildcardType) {
                LOG.debug("type {} is of WildcardType", (Object)typeArgument);
                return typeArgument;
            }
            if (typeArgument instanceof ParameterizedType) {
                return typeArgument;
            }
            LOG.error("Type argument is of expected type {}", (Object)typeArgument);
            return null;
        }
        LOG.error("No type variable: {}, typeParameters: {}", f.getType(), Arrays.asList(f.getClass().getTypeParameters()));
        return null;
    }

    public String toString() {
        return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("operators", this.operators).append("streams", this.streams).append("properties", (Object)this.attributes).toString();
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof LogicalPlan)) {
            return false;
        }
        LogicalPlan that = (LogicalPlan)o;
        if (this.attributes != null ? !this.attributes.equals(that.attributes) : that.attributes != null) {
            return false;
        }
        return !(this.streams == null ? that.streams != null : !this.streams.equals(that.streams));
    }

    public int hashCode() {
        int result = this.streams != null ? this.streams.hashCode() : 0;
        result = 31 * result + (this.attributes != null ? this.attributes.hashCode() : 0);
        return result;
    }

    static {
        Attribute.AttributeMap.AttributeInitializer.initialize(LogicalPlan.class);
    }

    public static class ValidationContext {
        public int nodeIndex = 0;
        public Stack<OperatorMeta> stack = new Stack();
        public Stack<OperatorMeta> path = new Stack();
        public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<Set<OperatorMeta>>();
        public OperatorMeta invalidLoopAt;
        public List<Set<OperatorMeta>> invalidCycles = new ArrayList<Set<OperatorMeta>>();
    }

    public final class ModuleMeta
    implements DAG.ModuleMeta,
    Serializable {
        private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap();
        private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap();
        private final Attribute.AttributeMap attributes;
        @NotNull
        private String name;
        private transient Module module;
        private ModuleMeta parent;
        private LogicalPlan dag = null;
        private transient String fullName;
        private static final long serialVersionUID = 7562277769188329223L;

        private ModuleMeta(String name, Module module) {
            LOG.debug("Initializing {} as {}", (Object)name, (Object)module.getClass().getName());
            this.name = name;
            this.module = module;
            this.attributes = new Attribute.AttributeMap.DefaultAttributeMap();
            this.dag = new LogicalPlan();
        }

        public String getName() {
            return this.name;
        }

        public Module getModule() {
            return this.module;
        }

        public Attribute.AttributeMap getAttributes() {
            return this.attributes;
        }

        public <T> T getValue(Attribute<T> key) {
            return (T)this.attributes.get(key);
        }

        public void setCounters(Object counters) {
        }

        public void sendMetrics(Collection<String> metricNames) {
        }

        public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams() {
            return this.inputStreams;
        }

        public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams() {
            return this.outputStreams;
        }

        public LogicalPlan getDag() {
            return this.dag;
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            out.defaultWriteObject();
            FSStorageAgent.store((OutputStream)out, (Object)this.module);
        }

        private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
            input.defaultReadObject();
            this.module = (Module)FSStorageAgent.retrieve((InputStream)input);
        }

        public void flattenModule(LogicalPlan parentDAG, Configuration conf) {
            this.module.populateDAG((DAG)this.dag, conf);
            for (ModuleMeta subModuleMeta : this.dag.getAllModules()) {
                subModuleMeta.setParent(this);
                subModuleMeta.flattenModule(this.dag, conf);
            }
            this.dag.applyStreamLinks();
            parentDAG.addDAGToCurrentDAG(this);
        }

        public String getFullName() {
            if (this.fullName != null) {
                return this.fullName;
            }
            this.fullName = this.parent == null ? this.name : this.parent.getFullName() + LogicalPlan.MODULE_NAMESPACE_SEPARATOR + this.name;
            return this.fullName;
        }

        private void setParent(ModuleMeta meta) {
            this.parent = meta;
        }
    }

    public final class OperatorMeta
    implements DAG.OperatorMeta,
    Serializable {
        private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap();
        private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap();
        private final Attribute.AttributeMap attributes;
        private final int id;
        @NotNull
        private final String name;
        private final OperatorAnnotation operatorAnnotation;
        private final LogicalOperatorStatus status;
        private transient Integer nindex;
        private transient Integer lowlink;
        private transient Operator operator;
        private MetricAggregatorMeta metricAggregatorMeta;
        private String moduleName;
        private transient Integer oioRoot = null;
        private transient PortMapping portMapping = null;
        private static final long serialVersionUID = 201401091635L;

        private OperatorMeta(String name, Operator operator) {
            this(name, operator, (Attribute.AttributeMap)new Attribute.AttributeMap.DefaultAttributeMap());
        }

        private OperatorMeta(String name, Operator operator, Attribute.AttributeMap attributeMap) {
            LOG.debug("Initializing {} as {}", (Object)name, (Object)operator.getClass().getName());
            this.operatorAnnotation = operator.getClass().getAnnotation(OperatorAnnotation.class);
            this.name = name;
            this.operator = operator;
            this.id = logicalOperatorSequencer.decrementAndGet();
            this.status = new LogicalOperatorStatus(name);
            this.attributes = attributeMap;
        }

        public String getName() {
            return this.name;
        }

        public Attribute.AttributeMap getAttributes() {
            return this.attributes;
        }

        public <T> T getValue(Attribute<T> key) {
            Object attr = this.attributes.get(key);
            if (attr == null) {
                attr = LogicalPlan.this.getValue(key);
            }
            if (attr == null) {
                return (T)key.defaultValue;
            }
            return (T)attr;
        }

        public LogicalOperatorStatus getStatus() {
            return this.status;
        }

        private void writeObject(ObjectOutputStream out) throws IOException {
            out.defaultWriteObject();
            FSStorageAgent.store((OutputStream)out, (Object)this.operator);
        }

        private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
            input.defaultReadObject();
            this.operator = (Operator)FSStorageAgent.retrieve((InputStream)input);
        }

        public void setCounters(Object counters) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void sendMetrics(Collection<String> metricNames) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public MetricAggregatorMeta getMetricAggregatorMeta() {
            return this.metricAggregatorMeta;
        }

        public String getModuleName() {
            return this.moduleName;
        }

        public void setModuleName(String moduleName) {
            this.moduleName = moduleName;
        }

        protected void populateAggregatorMeta() {
            AutoMetric.Aggregator aggregator = (AutoMetric.Aggregator)this.getValue(Context.OperatorContext.METRICS_AGGREGATOR);
            if (aggregator == null && this.operator instanceof AutoMetric.Aggregator) {
                aggregator = new MetricAggregatorMeta.MetricsAggregatorProxy(this);
            }
            if (aggregator == null) {
                MetricsAggregator defAggregator = null;
                HashSet metricNames = Sets.newHashSet();
                for (Field field : ReflectionUtils.getDeclaredFieldsIncludingInherited(this.operator.getClass())) {
                    if (!field.isAnnotationPresent(AutoMetric.class)) continue;
                    metricNames.add(field.getName());
                    if (field.getType() == Integer.TYPE || field.getType() == Integer.class || field.getType() == Long.TYPE || field.getType() == Long.class) {
                        if (defAggregator == null) {
                            defAggregator = new MetricsAggregator();
                        }
                        defAggregator.addAggregators(field.getName(), new SingleMetricAggregator[]{new LongSumAggregator()});
                        continue;
                    }
                    if (field.getType() != Float.TYPE && field.getType() != Float.class && field.getType() != Double.TYPE && field.getType() != Double.class) continue;
                    if (defAggregator == null) {
                        defAggregator = new MetricsAggregator();
                    }
                    defAggregator.addAggregators(field.getName(), new SingleMetricAggregator[]{new DoubleSumAggregator()});
                }
                try {
                    for (PropertyDescriptor pd : Introspector.getBeanInfo(this.operator.getClass()).getPropertyDescriptors()) {
                        String propName;
                        AutoMetric rfa;
                        Method readMethod = pd.getReadMethod();
                        if (readMethod == null || (rfa = readMethod.getAnnotation(AutoMetric.class)) == null || metricNames.contains(propName = pd.getName())) continue;
                        if (readMethod.getReturnType() == Integer.TYPE || readMethod.getReturnType() == Integer.class || readMethod.getReturnType() == Long.TYPE || readMethod.getReturnType() == Long.class) {
                            if (defAggregator == null) {
                                defAggregator = new MetricsAggregator();
                            }
                            defAggregator.addAggregators(propName, new SingleMetricAggregator[]{new LongSumAggregator()});
                            continue;
                        }
                        if (readMethod.getReturnType() != Float.TYPE && readMethod.getReturnType() != Float.class && readMethod.getReturnType() != Double.TYPE && readMethod.getReturnType() != Double.class) continue;
                        if (defAggregator == null) {
                            defAggregator = new MetricsAggregator();
                        }
                        defAggregator.addAggregators(propName, new SingleMetricAggregator[]{new DoubleSumAggregator()});
                    }
                }
                catch (IntrospectionException e) {
                    throw new RuntimeException("finding methods", e);
                }
                if (defAggregator != null) {
                    aggregator = defAggregator;
                }
            }
            this.metricAggregatorMeta = new MetricAggregatorMeta(aggregator, (AutoMetric.DimensionsScheme)this.getValue(Context.OperatorContext.METRICS_DIMENSIONS_SCHEME));
        }

        private void copyAttributes(Attribute.AttributeMap dest, Attribute.AttributeMap source) {
            for (Map.Entry a : source.entrySet()) {
                dest.put((Attribute)a.getKey(), a.getValue());
            }
        }

        private void copyAttributesFrom(OperatorMeta operatorMeta) {
            if (this.operator != operatorMeta.getOperator()) {
                throw new IllegalArgumentException("Operator meta is not for the same operator ");
            }
            this.copyAttributes(this.attributes, operatorMeta.getAttributes());
            for (Map.Entry entry : operatorMeta.getPortMapping().inPortMap.entrySet()) {
                this.copyAttributes(((InputPortMeta)this.getPortMapping().inPortMap.get(entry.getKey())).attributes, ((InputPortMeta)entry.getValue()).attributes);
            }
            for (Map.Entry entry : operatorMeta.getPortMapping().outPortMap.entrySet()) {
                this.copyAttributes((Attribute.AttributeMap)((OutputPortMeta)this.getPortMapping().outPortMap.get(entry.getKey())).attributes, (Attribute.AttributeMap)((OutputPortMeta)entry.getValue()).attributes);
            }
        }

        private PortMapping getPortMapping() {
            if (this.portMapping == null) {
                this.portMapping = new PortMapping();
                Operators.describe(this.getOperator(), this.portMapping);
            }
            return this.portMapping;
        }

        public OutputPortMeta getMeta(Operator.OutputPort<?> port) {
            return (OutputPortMeta)this.getPortMapping().outPortMap.get(port);
        }

        public InputPortMeta getMeta(Operator.InputPort<?> port) {
            return (InputPortMeta)this.getPortMapping().inPortMap.get(port);
        }

        public Map<InputPortMeta, StreamMeta> getInputStreams() {
            return this.inputStreams;
        }

        public Map<OutputPortMeta, StreamMeta> getOutputStreams() {
            return this.outputStreams;
        }

        public Operator getOperator() {
            return this.operator;
        }

        public LogicalPlan getDAG() {
            return LogicalPlan.this;
        }

        public String toString() {
            return "OperatorMeta{name=" + this.name + ", operator=" + this.operator + ", attributes=" + this.attributes + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof OperatorMeta)) {
                return false;
            }
            OperatorMeta that = (OperatorMeta)o;
            if (this.attributes != null ? !this.attributes.equals(that.attributes) : that.attributes != null) {
                return false;
            }
            if (!this.name.equals(that.name)) {
                return false;
            }
            if (this.operatorAnnotation != null ? !this.operatorAnnotation.equals(that.operatorAnnotation) : that.operatorAnnotation != null) {
                return false;
            }
            return !(this.operator == null ? that.operator != null : !this.operator.equals(that.operator));
        }

        public int hashCode() {
            return this.name.hashCode();
        }

        private class PortMapping
        implements Operators.OperatorDescriptor {
            private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap();
            private final Map<Operator.OutputPort<?>, OutputPortMeta> outPortMap = new HashMap();
            private final Map<String, Object> portNameMap = new HashMap<String, Object>();

            private PortMapping() {
            }

            @Override
            public void addInputPort(Operator.InputPort<?> portObject, Field field, InputPortFieldAnnotation portAnnotation, AppData.QueryPort adqAnnotation) {
                if (!OperatorMeta.this.inputStreams.isEmpty()) {
                    for (Map.Entry e : OperatorMeta.this.inputStreams.entrySet()) {
                        InputPortMeta pm = (InputPortMeta)e.getKey();
                        if (pm.operatorMeta != OperatorMeta.this || !pm.fieldName.equals(field.getName())) continue;
                        this.inPortMap.put(portObject, pm);
                        this.markInputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
                        return;
                    }
                }
                InputPortMeta metaPort = new InputPortMeta();
                metaPort.operatorMeta = OperatorMeta.this;
                metaPort.fieldName = field.getName();
                metaPort.portAnnotation = portAnnotation;
                metaPort.adqAnnotation = adqAnnotation;
                this.inPortMap.put(portObject, metaPort);
                this.markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
            }

            @Override
            public void addOutputPort(Operator.OutputPort<?> portObject, Field field, OutputPortFieldAnnotation portAnnotation, AppData.ResultPort adrAnnotation) {
                if (!OperatorMeta.this.outputStreams.isEmpty()) {
                    for (Map.Entry e : OperatorMeta.this.outputStreams.entrySet()) {
                        OutputPortMeta pm = (OutputPortMeta)e.getKey();
                        if (pm.operatorMeta != OperatorMeta.this || !pm.fieldName.equals(field.getName())) continue;
                        this.outPortMap.put(portObject, pm);
                        this.markOutputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
                        return;
                    }
                }
                OutputPortMeta metaPort = new OutputPortMeta();
                metaPort.operatorMeta = OperatorMeta.this;
                metaPort.fieldName = field.getName();
                metaPort.portAnnotation = portAnnotation;
                metaPort.adrAnnotation = adrAnnotation;
                this.outPortMap.put(portObject, metaPort);
                this.markOutputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
            }

            private void markOutputPortIfHidden(String portName, OutputPortMeta portMeta, Class<?> declaringClass) {
                if (!this.portNameMap.containsKey(portName)) {
                    this.portNameMap.put(portName, portMeta);
                } else {
                    portMeta.classDeclaringHiddenPort = declaringClass;
                }
            }

            private void markInputPortIfHidden(String portName, InputPortMeta portMeta, Class<?> declaringClass) {
                if (!this.portNameMap.containsKey(portName)) {
                    this.portNameMap.put(portName, portMeta);
                } else {
                    portMeta.classDeclaringHiddenPort = declaringClass;
                }
            }
        }
    }

    public final class StreamMeta
    implements DAG.StreamMeta,
    Serializable {
        private static final long serialVersionUID = 1L;
        private DAG.Locality locality;
        private final List<InputPortMeta> sinks = new ArrayList<InputPortMeta>();
        private OutputPortMeta source;
        private final String id;
        private OperatorMeta persistOperatorForStream;
        private InputPortMeta persistOperatorInputPort;
        private Set<InputPortMeta> enableSinksForPersisting;
        private String persistOperatorName;
        public Map<InputPortMeta, OperatorMeta> sinkSpecificPersistOperatorMap;
        public Map<InputPortMeta, InputPortMeta> sinkSpecificPersistInputPortMap;

        private StreamMeta(String id) {
            this.id = id;
            this.enableSinksForPersisting = new HashSet<InputPortMeta>();
            this.sinkSpecificPersistOperatorMap = new HashMap<InputPortMeta, OperatorMeta>();
            this.sinkSpecificPersistInputPortMap = new HashMap<InputPortMeta, InputPortMeta>();
        }

        public String getName() {
            return this.id;
        }

        public DAG.Locality getLocality() {
            return this.locality;
        }

        public StreamMeta setLocality(DAG.Locality locality) {
            this.locality = locality;
            return this;
        }

        public OutputPortMeta getSource() {
            return this.source;
        }

        public StreamMeta setSource(Operator.OutputPort<?> port) {
            OutputPortMeta portMeta = LogicalPlan.this.assertGetPortMeta(port);
            OperatorMeta om = portMeta.getOperatorMeta();
            if (om.outputStreams.containsKey(portMeta)) {
                String msg = String.format("Operator %s already connected to %s", om.name, ((StreamMeta)((OperatorMeta)om).outputStreams.get((Object)portMeta)).id);
                throw new IllegalArgumentException(msg);
            }
            this.source = portMeta;
            om.outputStreams.put(portMeta, this);
            return this;
        }

        public List<InputPortMeta> getSinks() {
            return this.sinks;
        }

        public StreamMeta addSink(Operator.InputPort<?> port) {
            InputPortMeta portMeta = LogicalPlan.this.assertGetPortMeta(port);
            OperatorMeta om = portMeta.getOperatorWrapper();
            String portName = portMeta.getPortName();
            if (om.inputStreams.containsKey(portMeta)) {
                throw new IllegalArgumentException(String.format("Port %s already connected to stream %s", portName, om.inputStreams.get(portMeta)));
            }
            this.sinks.add(portMeta);
            om.inputStreams.put(portMeta, this);
            LogicalPlan.this.rootOperators.remove(portMeta.operatorMeta);
            return this;
        }

        public void remove() {
            for (InputPortMeta inputPortMeta : this.sinks) {
                inputPortMeta.getOperatorWrapper().inputStreams.remove(inputPortMeta);
                if (!inputPortMeta.getOperatorWrapper().inputStreams.isEmpty()) continue;
                LogicalPlan.this.rootOperators.add(inputPortMeta.getOperatorWrapper());
            }
            if (this.getPersistOperator() != null) {
                LogicalPlan.this.removeOperator(this.getPersistOperator().getOperator());
            }
            if (!this.sinkSpecificPersistOperatorMap.isEmpty()) {
                for (Map.Entry entry : this.sinkSpecificPersistOperatorMap.entrySet()) {
                    LogicalPlan.this.removeOperator(((OperatorMeta)entry.getValue()).getOperator());
                }
                this.sinkSpecificPersistOperatorMap.clear();
                this.sinkSpecificPersistInputPortMap.clear();
            }
            this.sinks.clear();
            if (this.source != null) {
                this.source.getOperatorMeta().outputStreams.remove(this.source);
            }
            this.source = null;
            LogicalPlan.this.streams.remove(this.id);
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", (Object)this.id).toString();
        }

        public int hashCode() {
            int hash = 7;
            hash = 31 * hash + (this.locality != null ? this.locality.hashCode() : 0);
            hash = 31 * hash + (this.source != null ? this.source.hashCode() : 0);
            hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0);
            return hash;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            StreamMeta other = (StreamMeta)obj;
            if (this.locality != other.locality) {
                return false;
            }
            if (!(this.sinks == other.sinks || this.sinks != null && this.sinks.equals(other.sinks))) {
                return false;
            }
            if (!(this.source == other.source || this.source != null && this.source.equals(other.source))) {
                return false;
            }
            return !(this.id != null ? !this.id.equals(other.id) : other.id != null);
        }

        public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> port) {
            this.persistOperatorName = name;
            this.enablePersistingForSinksAddedSoFar(persistOperator);
            OperatorMeta persistOpMeta = this.createPersistOperatorMeta(persistOperator);
            if (!persistOpMeta.getPortMapping().inPortMap.containsKey(port)) {
                String msg = String.format("Port argument %s does not belong to persist operator passed %s", port, persistOperator);
                throw new IllegalArgumentException(msg);
            }
            this.setPersistOperatorInputPort((InputPortMeta)persistOpMeta.getPortMapping().inPortMap.get(port));
            return this;
        }

        public StreamMeta persistUsing(String name, Operator persistOperator) {
            this.persistOperatorName = name;
            this.enablePersistingForSinksAddedSoFar(persistOperator);
            OperatorMeta persistOpMeta = this.createPersistOperatorMeta(persistOperator);
            InputPortMeta port = (InputPortMeta)persistOpMeta.getPortMapping().inPortMap.values().iterator().next();
            this.setPersistOperatorInputPort(port);
            return this;
        }

        private void enablePersistingForSinksAddedSoFar(Operator persistOperator) {
            for (InputPortMeta portMeta : this.getSinks()) {
                this.enableSinksForPersisting.add(portMeta);
            }
        }

        private OperatorMeta createPersistOperatorMeta(Operator persistOperator) {
            LogicalPlan.this.addOperator(this.persistOperatorName, persistOperator);
            OperatorMeta persistOpMeta = LogicalPlan.this.getOperatorMeta(this.persistOperatorName);
            this.setPersistOperator(persistOpMeta);
            if (persistOpMeta.getPortMapping().inPortMap.isEmpty()) {
                String msg = String.format("Persist operator passed %s has no input ports to connect", persistOperator);
                throw new IllegalArgumentException(msg);
            }
            Map inputPortMap = persistOpMeta.getPortMapping().inPortMap;
            int nonOptionalInputPortCount = 0;
            for (InputPortMeta inputPort : inputPortMap.values()) {
                if (inputPort.portAnnotation != null && inputPort.portAnnotation.optional()) continue;
                ++nonOptionalInputPortCount;
            }
            if (nonOptionalInputPortCount > 1) {
                String msg = String.format("Persist operator %s has more than 1 non optional input port", persistOperator);
                throw new IllegalArgumentException(msg);
            }
            Map outputPortMap = persistOpMeta.getPortMapping().outPortMap;
            for (OutputPortMeta outPort : outputPortMap.values()) {
                if (outPort.portAnnotation == null || outPort.portAnnotation.optional()) continue;
                String msg = String.format("Persist operator %s has non optional output port %s", persistOperator, outPort.fieldName);
                throw new IllegalArgumentException(msg);
            }
            return persistOpMeta;
        }

        public OperatorMeta getPersistOperator() {
            return this.persistOperatorForStream;
        }

        private void setPersistOperator(OperatorMeta persistOperator) {
            this.persistOperatorForStream = persistOperator;
        }

        public InputPortMeta getPersistOperatorInputPort() {
            return this.persistOperatorInputPort;
        }

        private void setPersistOperatorInputPort(InputPortMeta inport) {
            this.addSink((Operator.InputPort)inport.getPortObject());
            this.persistOperatorInputPort = inport;
        }

        public Set<InputPortMeta> getSinksToPersist() {
            return this.enableSinksForPersisting;
        }

        private String getPersistOperatorName(Operator operator) {
            return this.id + "_persister";
        }

        private String getPersistOperatorName(Operator.InputPort<?> sinkToPersist) {
            InputPortMeta portMeta = LogicalPlan.this.assertGetPortMeta(sinkToPersist);
            OperatorMeta operatorMeta = portMeta.getOperatorWrapper();
            return this.id + "_" + operatorMeta.getName() + "_persister";
        }

        public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> port, Operator.InputPort<?> sinkToPersist) {
            String persistOperatorName = name;
            LogicalPlan.this.addOperator(persistOperatorName, persistOperator);
            this.addSink((Operator.InputPort)port);
            InputPortMeta sinkPortMeta = LogicalPlan.this.assertGetPortMeta(sinkToPersist);
            this.addStreamCodec(sinkPortMeta, port);
            this.updateSinkSpecificPersistOperatorMap(sinkPortMeta, persistOperatorName, port);
            return this;
        }

        private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, Operator.InputPort<?> port) {
            StreamCodec inputStreamCodec;
            StreamCodec streamCodec = inputStreamCodec = sinkToPersistPortMeta.getValue(Context.PortContext.STREAM_CODEC) != null ? (StreamCodec)sinkToPersistPortMeta.getValue(Context.PortContext.STREAM_CODEC) : sinkToPersistPortMeta.getPortObject().getStreamCodec();
            if (inputStreamCodec != null) {
                HashMap<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
                codecs.put(sinkToPersistPortMeta, inputStreamCodec);
                InputPortMeta persistOperatorPortMeta = LogicalPlan.this.assertGetPortMeta(port);
                StreamCodec specifiedCodecForPersistOperator = persistOperatorPortMeta.getValue(Context.PortContext.STREAM_CODEC) != null ? (StreamCodec)persistOperatorPortMeta.getValue(Context.PortContext.STREAM_CODEC) : port.getStreamCodec();
                StreamCodecWrapperForPersistance codec = new StreamCodecWrapperForPersistance(codecs, (StreamCodec<Object>)specifiedCodecForPersistOperator);
                LogicalPlan.this.setInputPortAttribute(port, Context.PortContext.STREAM_CODEC, codec);
            }
        }

        private void updateSinkSpecificPersistOperatorMap(InputPortMeta sinkToPersistPortMeta, String persistOperatorName, Operator.InputPort<?> persistOperatorInPort) {
            OperatorMeta persistOpMeta = (OperatorMeta)LogicalPlan.this.operators.get(persistOperatorName);
            this.sinkSpecificPersistOperatorMap.put(sinkToPersistPortMeta, persistOpMeta);
            this.sinkSpecificPersistInputPortMap.put(sinkToPersistPortMeta, (InputPortMeta)persistOpMeta.getMeta((Operator.InputPort)persistOperatorInPort));
        }

        public void resetStreamPersistanceOnSinkRemoval(InputPortMeta sinkBeingRemoved) {
            if (this.enableSinksForPersisting.contains(sinkBeingRemoved)) {
                this.enableSinksForPersisting.remove(sinkBeingRemoved);
                if (this.enableSinksForPersisting.isEmpty()) {
                    LogicalPlan.this.removeOperator(this.getPersistOperator().getOperator());
                    this.setPersistOperator(null);
                }
            }
            if (this.sinkSpecificPersistInputPortMap.containsKey(sinkBeingRemoved)) {
                this.sinkSpecificPersistInputPortMap.remove(sinkBeingRemoved);
            }
            if (this.sinkSpecificPersistOperatorMap.containsKey(sinkBeingRemoved)) {
                OperatorMeta persistOpMeta = this.sinkSpecificPersistOperatorMap.get(sinkBeingRemoved);
                this.sinkSpecificPersistOperatorMap.remove(sinkBeingRemoved);
                LogicalPlan.this.removeOperator(persistOpMeta.getOperator());
            }
        }
    }

    public final class OutputPortMeta
    implements DAG.OutputPortMeta,
    Serializable {
        private static final long serialVersionUID = 201412091633L;
        private OperatorMeta operatorMeta;
        private OperatorMeta unifierMeta;
        private OperatorMeta sliderMeta;
        private String fieldName;
        private OutputPortFieldAnnotation portAnnotation;
        private AppData.ResultPort adrAnnotation;
        private final Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
        private Class<?> classDeclaringHiddenPort;

        public OperatorMeta getOperatorMeta() {
            return this.operatorMeta;
        }

        public OperatorMeta getUnifierMeta() {
            if (this.unifierMeta == null) {
                this.unifierMeta = new OperatorMeta(this.operatorMeta.getName() + '.' + this.fieldName + "#unifier", (Operator)this.getUnifier());
            }
            return this.unifierMeta;
        }

        public OperatorMeta getSlidingUnifier(int numberOfBuckets, int slidingApplicationWindowCount, int numberOfSlidingWindows) {
            if (this.sliderMeta == null) {
                Slider slider = new Slider(this.getUnifier(), numberOfBuckets, numberOfSlidingWindows);
                try {
                    this.sliderMeta = new OperatorMeta(this.operatorMeta.getName() + '.' + this.fieldName + "#slider", (Operator)slider, this.getUnifierMeta().attributes.clone());
                }
                catch (CloneNotSupportedException ex) {
                    throw new RuntimeException(ex);
                }
                this.sliderMeta.getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, (Object)slidingApplicationWindowCount);
            }
            return this.sliderMeta;
        }

        public String getPortName() {
            return this.fieldName;
        }

        public Operator.OutputPort<?> getPortObject() {
            for (Map.Entry e : this.operatorMeta.getPortMapping().outPortMap.entrySet()) {
                if (e.getValue() != this) continue;
                return (Operator.OutputPort)e.getKey();
            }
            throw new AssertionError((Object)("Cannot find the port object for " + this));
        }

        public Operator.Unifier<?> getUnifier() {
            for (Map.Entry e : this.operatorMeta.getPortMapping().outPortMap.entrySet()) {
                if (e.getValue() != this) continue;
                Operator.Unifier unifier = ((Operator.OutputPort)e.getKey()).getUnifier();
                if (unifier == null) break;
                LOG.debug("User supplied unifier is {}", (Object)unifier);
                return unifier;
            }
            LOG.debug("Using default unifier for {}", (Object)this);
            return new DefaultUnifier();
        }

        public Attribute.AttributeMap getAttributes() {
            return this.attributes;
        }

        public <T> T getValue(Attribute<T> key) {
            Object attr = this.attributes.get(key);
            if (attr == null) {
                return (T)key.defaultValue;
            }
            return (T)attr;
        }

        public boolean isAppDataResultPort() {
            return this.adrAnnotation != null;
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("operator", (Object)this.operatorMeta).append("portAnnotation", (Object)this.portAnnotation).append("adrAnnotation", (Object)this.adrAnnotation).append("field", (Object)this.fieldName).toString();
        }

        public void setCounters(Object counters) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void sendMetrics(Collection<String> metricNames) {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }

    public final class InputPortMeta
    implements DAG.InputPortMeta,
    Serializable {
        private static final long serialVersionUID = 1L;
        private OperatorMeta operatorMeta;
        private String fieldName;
        private InputPortFieldAnnotation portAnnotation;
        private AppData.QueryPort adqAnnotation;
        private final Attribute.AttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
        private Class<?> classDeclaringHiddenPort;

        public OperatorMeta getOperatorWrapper() {
            return this.operatorMeta;
        }

        public String getPortName() {
            return this.fieldName;
        }

        public Operator.InputPort<?> getPortObject() {
            for (Map.Entry e : this.operatorMeta.getPortMapping().inPortMap.entrySet()) {
                if (e.getValue() != this) continue;
                return (Operator.InputPort)e.getKey();
            }
            throw new AssertionError((Object)("Cannot find the port object for " + this));
        }

        public boolean isAppDataQueryPort() {
            return this.adqAnnotation != null;
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("operator", (Object)this.operatorMeta).append("portAnnotation", (Object)this.portAnnotation).append("adqAnnotation", (Object)this.adqAnnotation).append("field", (Object)this.fieldName).toString();
        }

        public Attribute.AttributeMap getAttributes() {
            return this.attributes;
        }

        public <T> T getValue(Attribute<T> key) {
            Object attr = this.attributes.get(key);
            if (attr == null) {
                return (T)key.defaultValue;
            }
            return (T)attr;
        }

        public void setCounters(Object counters) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void sendMetrics(Collection<String> metricNames) {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }
}

