/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.plan.physical;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.stram.Journal;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.OperatorStatus;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

public class PTOperator
implements Serializable {
    private static final long serialVersionUID = 201312112033L;
    public static final Journal.Recoverable SET_OPERATOR_STATE = new SetOperatorState();
    private volatile State state = State.INACTIVE;
    private final PhysicalPlan plan;
    PTContainer container;
    final LogicalPlan.OperatorMeta operatorMeta;
    final int id;
    private final String name;
    Map<LogicalPlan.InputPortMeta, Partitioner.PartitionKeys> partitionKeys;
    LogicalPlan.OperatorMeta unifiedOperatorMeta;
    List<PTInput> inputs;
    List<PTOutput> outputs;
    public final LinkedList<Checkpoint> checkpoints;
    Checkpoint recoveryCheckpoint;
    public int failureCount = 0;
    public int loadIndicator = 0;
    public List<? extends StatsListener> statsListeners;
    public final OperatorStatus stats;
    final Map<DAG.Locality, HostOperatorSet> groupings = Maps.newHashMapWithExpectedSize((int)3);
    public List<StreamingContainerUmbilicalProtocol.StramToNodeRequest> deployRequests = Collections.emptyList();
    public final HashMap<LogicalPlan.InputPortMeta, PTOperator> upstreamMerge = new HashMap();

    PTOperator(PhysicalPlan plan, int id, String name, LogicalPlan.OperatorMeta om) {
        this.checkpoints = new LinkedList();
        this.plan = plan;
        this.name = name;
        this.id = id;
        this.operatorMeta = om;
        this.stats = new OperatorStatus(this.id, om);
    }

    public LogicalPlan.OperatorMeta getOperatorMeta() {
        return this.operatorMeta;
    }

    public State getState() {
        return this.state;
    }

    public void setState(State state) {
        this.getPlan().getContext().writeJournal(new SetOperatorState(this.getId(), state));
        this.state = state;
    }

    public Checkpoint getRecentCheckpoint() {
        if (this.checkpoints.isEmpty()) {
            return Checkpoint.INITIAL_CHECKPOINT;
        }
        return this.checkpoints.getLast();
    }

    public Checkpoint getRecoveryCheckpoint() {
        return this.recoveryCheckpoint;
    }

    public void setRecoveryCheckpoint(Checkpoint recoveryCheckpoint) {
        this.recoveryCheckpoint = recoveryCheckpoint;
    }

    public String getLogicalId() {
        return this.operatorMeta.getName();
    }

    public int getId() {
        return this.id;
    }

    public String getName() {
        return this.name;
    }

    public PhysicalPlan getPlan() {
        return this.plan;
    }

    public List<PTInput> getInputs() {
        return this.inputs;
    }

    public List<PTOutput> getOutputs() {
        return this.outputs;
    }

    public PTContainer getContainer() {
        return this.container;
    }

    public Map<Operator.InputPort<?>, Partitioner.PartitionKeys> getPartitionKeys() {
        HashMap pkeys = null;
        if (this.partitionKeys != null) {
            pkeys = Maps.newHashMapWithExpectedSize((int)this.partitionKeys.size());
            for (Map.Entry<LogicalPlan.InputPortMeta, Partitioner.PartitionKeys> e : this.partitionKeys.entrySet()) {
                pkeys.put(e.getKey().getPortObject(), e.getValue());
            }
        }
        return pkeys;
    }

    public int getBufferServerMemory() {
        int bufferServerMemory = 0;
        for (int i = 0; i < this.outputs.size(); ++i) {
            if (this.outputs.get(i).isDownStreamInline()) continue;
            bufferServerMemory += ((Integer)this.outputs.get((int)i).logicalStream.getSource().getValue(Context.PortContext.BUFFER_MEMORY_MB)).intValue();
        }
        return bufferServerMemory;
    }

    public Set<PTOperator> getThreadLocalOperators() {
        HashSet<PTOperator> threadLocalOperators = null;
        for (int i = 0; i < this.outputs.size(); ++i) {
            if (this.outputs.get((int)i).logicalStream == null || this.outputs.get((int)i).logicalStream.getLocality() != DAG.Locality.THREAD_LOCAL) continue;
            if (threadLocalOperators == null) {
                threadLocalOperators = new HashSet<PTOperator>();
            }
            threadLocalOperators.addAll(this.outputs.get(i).threadLocalSinks());
        }
        return threadLocalOperators;
    }

    public void setPartitionKeys(Map<Operator.InputPort<?>, Partitioner.PartitionKeys> portKeys) {
        if (portKeys == null) {
            this.partitionKeys = Collections.emptyMap();
            return;
        }
        HashMap partitionKeys = Maps.newHashMapWithExpectedSize((int)portKeys.size());
        for (Map.Entry<Operator.InputPort<?>, Partitioner.PartitionKeys> portEntry : portKeys.entrySet()) {
            DAG.InputPortMeta pportMeta = this.operatorMeta.getMeta((Operator.InputPort)portEntry.getKey());
            if (pportMeta == null) {
                throw new AssertionError((Object)("Invalid port reference " + portEntry));
            }
            partitionKeys.put(pportMeta, portEntry.getValue());
        }
        this.partitionKeys = partitionKeys;
    }

    public boolean isUnifier() {
        return this.unifiedOperatorMeta != null;
    }

    public LogicalPlan.OperatorMeta getUnifiedOperatorMeta() {
        return this.unifiedOperatorMeta;
    }

    public Class<?> getUnifierClass() {
        if (this.unifiedOperatorMeta == null) {
            throw new IllegalStateException("OperatorMeta does not support this method, please call isUnifier before making this call");
        }
        return this.operatorMeta.getOperator().getClass();
    }

    public boolean isOperatorStateLess() {
        if (((Boolean)this.operatorMeta.getDAG().getValue(Context.OperatorContext.STATELESS)).booleanValue() || ((Boolean)this.operatorMeta.getValue(Context.OperatorContext.STATELESS)).booleanValue()) {
            return true;
        }
        return this.operatorMeta.getOperator().getClass().isAnnotationPresent(Stateless.class);
    }

    public Checkpoint addCheckpoint(long windowId, long startTime) {
        int widthMillis = (Integer)this.operatorMeta.getDAG().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
        long millis = WindowGenerator.getNextWindowMillis(windowId, startTime, widthMillis);
        long count = WindowGenerator.getWindowCount(millis, startTime, widthMillis);
        Checkpoint c = new Checkpoint(windowId, (int)(count % (long)((Integer)this.operatorMeta.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue()), (int)(count % (long)((Integer)this.operatorMeta.getValue(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT)).intValue()));
        this.checkpoints.add(c);
        return c;
    }

    HostOperatorSet getGrouping(DAG.Locality type) {
        HostOperatorSet grpObj = this.groupings.get(type);
        if (grpObj == null) {
            grpObj = new HostOperatorSet();
            grpObj.operatorSet = Sets.newHashSet();
            this.groupings.put(type, grpObj);
        }
        return grpObj;
    }

    public HostOperatorSet getNodeLocalOperators() {
        return this.getGrouping(DAG.Locality.NODE_LOCAL);
    }

    public String toString() {
        return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", this.id).append("name", (Object)this.name).toString();
    }

    public class HostOperatorSet
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        private String host;
        private Set<PTOperator> operatorSet;

        public String getHost() {
            return this.host;
        }

        public void setHost(String host) {
            this.host = host;
        }

        public Set<PTOperator> getOperatorSet() {
            return this.operatorSet;
        }

        public void setOperatorSet(Set<PTOperator> operatorSet) {
            this.operatorSet = operatorSet;
        }
    }

    private static class SetOperatorState
    implements Journal.Recoverable {
        private final int operatorId;
        private final State state;

        private SetOperatorState() {
            this.operatorId = -1;
            this.state = State.INACTIVE;
        }

        private SetOperatorState(int operatorId, State state) {
            this.operatorId = operatorId;
            this.state = state;
        }

        @Override
        public void read(Object object, Input in) throws KryoException {
            PhysicalPlan plan = (PhysicalPlan)object;
            int operatorId = in.readInt();
            int stateOrd = in.readInt();
            plan.getAllOperators().get(operatorId).state = State.values()[stateOrd];
        }

        @Override
        public void write(Output out) throws KryoException {
            out.writeInt(this.operatorId);
            out.writeInt(this.state.ordinal());
        }
    }

    public static class PTOutput
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        public final LogicalPlan.StreamMeta logicalStream;
        public final PTOperator source;
        public final String portName;
        public final List<PTInput> sinks;

        protected PTOutput(String portName, LogicalPlan.StreamMeta logicalStream, PTOperator source) {
            this.logicalStream = logicalStream;
            this.source = source;
            this.portName = portName;
            this.sinks = new ArrayList<PTInput>();
        }

        public boolean isDownStreamInline() {
            for (PTInput sink : this.sinks) {
                if (this.source.container == sink.target.container) continue;
                return false;
            }
            return true;
        }

        public Set<PTOperator> threadLocalSinks() {
            HashSet<PTOperator> threadLocalOperators = null;
            if (this.logicalStream != null && this.logicalStream.getLocality() == DAG.Locality.THREAD_LOCAL) {
                threadLocalOperators = new HashSet<PTOperator>();
                for (PTInput sink : this.sinks) {
                    threadLocalOperators.add(sink.target);
                }
            }
            return threadLocalOperators;
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("o", (Object)this.source).append("port", (Object)this.portName).append("stream", (Object)this.logicalStream.getName()).toString();
        }
    }

    public static class PTInput
    implements Serializable {
        private static final long serialVersionUID = 201312112033L;
        public final LogicalPlan.StreamMeta logicalStream;
        public final PTOperator target;
        public final Partitioner.PartitionKeys partitions;
        public final PTOutput source;
        public final String portName;
        public final boolean delay;

        protected PTInput(String portName, LogicalPlan.StreamMeta logicalStream, PTOperator target, Partitioner.PartitionKeys partitions, PTOutput source, boolean delay) {
            this.logicalStream = logicalStream;
            this.target = target;
            this.partitions = partitions;
            this.source = source;
            this.portName = portName;
            this.source.sinks.add(this);
            this.delay = delay;
        }

        public String toString() {
            return new ToStringBuilder((Object)this, ToStringStyle.SHORT_PREFIX_STYLE).append("o", (Object)this.target).append("port", (Object)this.portName).append("source", (Object)this.source).toString();
        }
    }

    public static enum State {
        PENDING_DEPLOY,
        ACTIVE,
        PENDING_UNDEPLOY,
        INACTIVE;

    }
}

