package com.datatorrent.stram.engine;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.debug.MuxSink;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.math.IntMath;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/Node.class */
public abstract class Node<OPERATOR extends Operator> implements Component<OperatorContext>, Runnable {
    public static final String INPUT = "input";
    public static final String OUTPUT = "output";
    protected int APPLICATION_WINDOW_COUNT;
    protected int DAG_CHECKPOINT_WINDOW_COUNT;
    protected int CHECKPOINT_WINDOW_COUNT;
    protected boolean DATA_TUPLE_AWARE;
    protected int id;
    protected boolean alive;
    protected final OPERATOR operator;
    public long currentWindowId;
    protected long endWindowEmitTime;
    protected long lastSampleCpuTime;
    protected ThreadMXBean tmb;
    protected HashMap<SweepableReservoir, Long> endWindowDequeueTimes;
    protected Checkpoint checkpoint;
    public int applicationWindowCount;
    public int checkpointWindowCount;
    public int nextCheckpointWindowCount;
    public int dagCheckpointOffsetCount;
    protected int controlTupleCount;
    public final OperatorContext context;
    public final BlockingQueue<StatsListener.OperatorResponse> commandResponse;
    private final List<Field> metricFields;
    private final Map<String, Method> metricMethods;
    protected Stats.CheckpointStats checkpointStats;
    public long firstWindowMillis;
    public long windowWidthMillis;
    protected Operator.ProcessingMode PROCESSING_MODE;
    protected volatile boolean shutdown;
    private static final Logger logger = LoggerFactory.getLogger(Node.class);
    protected volatile Sink<Object>[] sinks = Sink.NO_SINKS;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Queue<Pair<FutureTask<Stats.CheckpointStats>, Node<OPERATOR>.CheckpointWindowInfo>> taskQueue = new LinkedList();
    protected final HashMap<String, Sink<Object>> outputs = new HashMap<>();
    protected final Operators.PortMappingDescriptor descriptor = new Operators.PortMappingDescriptor();

    /* loaded from: input_file:com/datatorrent/stram/engine/Node$CheckpointHandler.class */
    private class CheckpointHandler implements Callable<Stats.CheckpointStats> {
        public AsyncFSStorageAgent agent;
        public int operatorId;
        public long windowId;
        public Stats.CheckpointStats stats;

        private CheckpointHandler() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Stats.CheckpointStats call() throws Exception {
            this.agent.copyToHDFS(Node.this.id, this.windowId);
            this.stats.checkpointTime = System.currentTimeMillis() - this.stats.checkpointStartTime;
            return this.stats;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/Node$CheckpointWindowInfo.class */
    private class CheckpointWindowInfo {
        public int applicationWindowCount;
        public int checkpointWindowCount;
        public long windowId;

        private CheckpointWindowInfo() {
        }
    }

    public Node(OPERATOR operator, OperatorContext operatorContext) {
        this.operator = operator;
        this.context = operatorContext;
        Operators.describe(operator, this.descriptor);
        this.endWindowDequeueTimes = new HashMap<>();
        this.tmb = ManagementFactory.getThreadMXBean();
        this.commandResponse = new LinkedBlockingQueue();
        this.metricFields = Lists.newArrayList();
        for (Field field : ReflectionUtils.getDeclaredFieldsIncludingInherited(operator.getClass())) {
            if (field.isAnnotationPresent(AutoMetric.class)) {
                this.metricFields.add(field);
                field.setAccessible(true);
            }
        }
        this.metricMethods = Maps.newHashMap();
        try {
            for (PropertyDescriptor propertyDescriptor : Introspector.getBeanInfo(operator.getClass()).getPropertyDescriptors()) {
                Method readMethod = propertyDescriptor.getReadMethod();
                if (readMethod != null && readMethod.getAnnotation(AutoMetric.class) != null) {
                    this.metricMethods.put(propertyDescriptor.getName(), readMethod);
                }
            }
        } catch (IntrospectionException e) {
            throw new RuntimeException("introspecting {}", e);
        }
    }

    public Operator getOperator() {
        return this.operator;
    }

    public void setup(OperatorContext operatorContext) {
        this.shutdown = false;
        logger.debug("Operator Context = {}", operatorContext);
        this.operator.setup(operatorContext);
    }

    public void teardown() {
        Iterator<Operators.PortContextPair<Operator.InputPort<?>>> it = this.descriptor.inputPorts.values().iterator();
        while (it.hasNext()) {
            it.next().component.teardown();
        }
        Iterator<Operators.PortContextPair<Operator.OutputPort<?>>> it2 = this.descriptor.outputPorts.values().iterator();
        while (it2.hasNext()) {
            it2.next().component.teardown();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            boolean z = false;
            try {
                z = this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                logger.debug("Wait for graceful executor service {} shutdown interrupted for node {}", new Object[]{this.executorService, this, e});
            }
            if (!z) {
                logger.warn("Shutting down executor service {} for node {}", this.executorService, this);
                this.executorService.shutdownNow();
            }
        }
        this.operator.teardown();
    }

    public Operators.PortMappingDescriptor getPortMappingDescriptor() {
        return this.descriptor;
    }

    public void connectOutputPort(String str, Sink<Object> sink) {
        Operators.PortContextPair<Operator.OutputPort<?>> portContextPair = this.descriptor.outputPorts.get(str);
        if (portContextPair != null) {
            if (sink == null) {
                portContextPair.component.setSink((Sink) null);
                this.outputs.remove(str);
            } else {
                portContextPair.component.setSink(sink);
                this.outputs.put(str, sink);
            }
        }
    }

    public abstract void connectInputPort(String str, SweepableReservoir sweepableReservoir);

    public void addSinks(Map<String, Sink<Object>> map) {
        boolean z = false;
        for (Map.Entry<String, Sink<Object>> entry : map.entrySet()) {
            Operators.PortContextPair<Operator.OutputPort<?>> portContextPair = this.descriptor.outputPorts.get(entry.getKey());
            if (portContextPair != null) {
                z = true;
                Sink<Object> sink = this.outputs.get(entry.getKey());
                if (sink == null) {
                    portContextPair.component.setSink(entry.getValue());
                    this.outputs.put(entry.getKey(), entry.getValue());
                    z = true;
                } else if (sink instanceof MuxSink) {
                    ((MuxSink) sink).add(entry.getValue());
                } else {
                    MuxSink muxSink = new MuxSink(sink, entry.getValue());
                    portContextPair.component.setSink(muxSink);
                    this.outputs.put(entry.getKey(), muxSink);
                    z = true;
                }
            }
        }
        if (z) {
            activateSinks();
        }
    }

    public void removeSinks(Map<String, Sink<Object>> map) {
        boolean z = false;
        for (Map.Entry<String, Sink<Object>> entry : map.entrySet()) {
            Operators.PortContextPair<Operator.OutputPort<?>> portContextPair = this.descriptor.outputPorts.get(entry.getKey());
            if (portContextPair != null) {
                Sink<Object> sink = this.outputs.get(entry.getKey());
                if (sink == entry.getValue()) {
                    portContextPair.component.setSink((Sink) null);
                    this.outputs.remove(entry.getKey());
                    z = true;
                } else if (sink instanceof MuxSink) {
                    MuxSink muxSink = (MuxSink) sink;
                    muxSink.remove(entry.getValue());
                    Sink<Object>[] sinks = muxSink.getSinks();
                    if (sinks.length == 0) {
                        portContextPair.component.setSink((Sink) null);
                        this.outputs.remove(entry.getKey());
                        z = true;
                    } else if (sinks.length == 1) {
                        portContextPair.component.setSink(sinks[0]);
                        this.outputs.put(entry.getKey(), sinks[0]);
                        z = true;
                    }
                }
            }
        }
        if (z) {
            activateSinks();
        }
    }

    public void shutdown() {
        this.shutdown = true;
        synchronized (this) {
            this.alive = false;
        }
        if (this.context == null) {
            logger.warn("Shutdown requested when context is not available!");
        } else {
            this.context.request(new StatsListener.OperatorRequest() { // from class: com.datatorrent.stram.engine.Node.1
                public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                    Node.this.alive = false;
                    return null;
                }
            });
        }
    }

    public String toString() {
        return String.valueOf(getId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitEndStream() {
        EndStreamTuple endStreamTuple = new EndStreamTuple(this.currentWindowId);
        Iterator<Sink<Object>> it = this.outputs.values().iterator();
        while (it.hasNext()) {
            it.next().put(endStreamTuple);
        }
        this.controlTupleCount++;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitEndWindow() {
        EndWindowTuple endWindowTuple = new EndWindowTuple(this.operator instanceof Operator.DelayOperator ? WindowGenerator.getAheadWindowId(this.currentWindowId, this.firstWindowMillis, this.windowWidthMillis, 1) : this.currentWindowId);
        int length = this.sinks.length;
        while (true) {
            int i = length;
            length--;
            if (i <= 0) {
                this.controlTupleCount++;
                return;
            }
            this.sinks[length].put(endWindowTuple);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleRequests(long j) {
        try {
            BlockingQueue<StatsListener.OperatorRequest> requests = this.context.getRequests();
            int size = requests.size();
            int i = size;
            if (size > 0) {
                while (true) {
                    int i2 = i;
                    i--;
                    if (i2 <= 0) {
                        break;
                    }
                    StatsListener.OperatorResponse execute = requests.remove().execute(this.operator, this.context.getId(), j);
                    if (execute != null) {
                        this.commandResponse.add(execute);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (Error e2) {
            throw e2;
        } catch (RuntimeException e3) {
            throw e3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, Object> collectMetrics() {
        if (this.context.areMetricsListed() && (this.context.metricsToSend == null || this.context.metricsToSend.isEmpty())) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap();
        try {
            for (Field field : this.metricFields) {
                if (this.context.metricsToSend == null || this.context.metricsToSend.contains(field.getName())) {
                    newHashMap.put(field.getName(), field.get(this.operator));
                }
            }
            for (Map.Entry<String, Method> entry : this.metricMethods.entrySet()) {
                if (this.context.metricsToSend == null || this.context.metricsToSend.contains(entry.getKey())) {
                    newHashMap.put(entry.getKey(), entry.getValue().invoke(this.operator, new Object[0]));
                }
            }
            this.context.clearMetrics();
            return newHashMap;
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportStats(Stats.OperatorStats operatorStats, long j) {
        operatorStats.outputPorts = new ArrayList();
        for (Map.Entry<String, Sink<Object>> entry : this.outputs.entrySet()) {
            Stats.OperatorStats.PortStats portStats = new Stats.OperatorStats.PortStats(entry.getKey());
            portStats.tupleCount = entry.getValue().getCount(true) - this.controlTupleCount;
            portStats.endWindowTimestamp = this.endWindowEmitTime;
            operatorStats.outputPorts.add(portStats);
        }
        this.controlTupleCount = 0;
        long currentThreadCpuTime = this.tmb.getCurrentThreadCpuTime();
        operatorStats.cpuTimeUsed = currentThreadCpuTime - this.lastSampleCpuTime;
        this.lastSampleCpuTime = currentThreadCpuTime;
        if (this.checkpoint != null) {
            operatorStats.checkpoint = this.checkpoint;
            operatorStats.checkpointStats = this.checkpointStats;
            this.checkpointStats = null;
            this.checkpoint = null;
        } else {
            Pair<FutureTask<Stats.CheckpointStats>, Node<OPERATOR>.CheckpointWindowInfo> peek = this.taskQueue.peek();
            if (peek != null && ((FutureTask) peek.getFirst()).isDone()) {
                this.taskQueue.poll();
                try {
                    CheckpointWindowInfo checkpointWindowInfo = (CheckpointWindowInfo) peek.getSecond();
                    operatorStats.checkpointStats = (Stats.CheckpointStats) ((FutureTask) peek.getFirst()).get();
                    operatorStats.checkpoint = new Checkpoint(checkpointWindowInfo.windowId, checkpointWindowInfo.applicationWindowCount, checkpointWindowInfo.checkpointWindowCount);
                    if (this.operator instanceof Operator.CheckpointListener) {
                        this.operator.checkpointed(checkpointWindowInfo.windowId);
                    }
                } catch (Exception e) {
                    throw Throwables.propagate(e);
                }
            }
        }
        this.context.report(operatorStats, j);
    }

    protected void activateSinks() {
        int size = this.outputs.size();
        if (size == 0) {
            this.sinks = Sink.NO_SINKS;
            return;
        }
        Sink<Object>[] sinkArr = (Sink[]) Array.newInstance((Class<?>) Sink.class, size);
        Iterator<Sink<Object>> it = this.outputs.values().iterator();
        while (it.hasNext()) {
            size--;
            sinkArr[size] = it.next();
        }
        this.sinks = sinkArr;
    }

    protected void deactivateSinks() {
        this.sinks = Sink.NO_SINKS;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkpoint(long j) {
        if (!this.context.stateless) {
            if (this.operator instanceof Operator.CheckpointNotificationListener) {
                this.operator.beforeCheckpoint(j);
            }
            AsyncFSStorageAgent asyncFSStorageAgent = (StorageAgent) this.context.getValue(OperatorContext.STORAGE_AGENT);
            if (asyncFSStorageAgent != null) {
                try {
                    this.checkpointStats = new Stats.CheckpointStats();
                    this.checkpointStats.checkpointStartTime = System.currentTimeMillis();
                    asyncFSStorageAgent.save(this.operator, this.id, j);
                    if (asyncFSStorageAgent instanceof AsyncFSStorageAgent) {
                        AsyncFSStorageAgent asyncFSStorageAgent2 = asyncFSStorageAgent;
                        if (!asyncFSStorageAgent2.isSyncCheckpoint()) {
                            if (this.PROCESSING_MODE != Operator.ProcessingMode.EXACTLY_ONCE) {
                                CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo();
                                checkpointWindowInfo.windowId = j;
                                checkpointWindowInfo.applicationWindowCount = this.applicationWindowCount;
                                checkpointWindowInfo.checkpointWindowCount = this.checkpointWindowCount;
                                CheckpointHandler checkpointHandler = new CheckpointHandler();
                                checkpointHandler.agent = asyncFSStorageAgent2;
                                checkpointHandler.operatorId = this.id;
                                checkpointHandler.windowId = j;
                                checkpointHandler.stats = this.checkpointStats;
                                FutureTask futureTask = new FutureTask(checkpointHandler);
                                this.taskQueue.add(new Pair<>(futureTask, checkpointWindowInfo));
                                this.executorService.submit(futureTask);
                                this.checkpoint = null;
                                this.checkpointStats = null;
                                return;
                            }
                            asyncFSStorageAgent2.copyToHDFS(this.id, j);
                        }
                    }
                    this.checkpointStats.checkpointTime = System.currentTimeMillis() - this.checkpointStats.checkpointStartTime;
                } catch (IOException e) {
                    try {
                        logger.warn("Rolling back checkpoint {} for Operator {} due to the exception {}", new Object[]{Codec.getStringWindowId(j), this.operator, e});
                        asyncFSStorageAgent.delete(this.id, j);
                    } catch (IOException e2) {
                        logger.warn("Error while rolling back checkpoint", e2);
                    }
                    throw new RuntimeException(e);
                }
            }
        }
        calculateNextCheckpointWindow();
        this.dagCheckpointOffsetCount = 0;
        this.checkpoint = new Checkpoint(j, this.applicationWindowCount, this.checkpointWindowCount);
        if (this.operator instanceof Operator.CheckpointListener) {
            this.operator.checkpointed(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void calculateNextCheckpointWindow() {
        if (this.PROCESSING_MODE != Operator.ProcessingMode.EXACTLY_ONCE) {
            this.nextCheckpointWindowCount = ((((this.DAG_CHECKPOINT_WINDOW_COUNT - this.dagCheckpointOffsetCount) + this.CHECKPOINT_WINDOW_COUNT) - 1) / this.CHECKPOINT_WINDOW_COUNT) * this.CHECKPOINT_WINDOW_COUNT;
        } else {
            this.nextCheckpointWindowCount = 1;
        }
    }

    public static Node<?> retrieveNode(Object obj, OperatorContext operatorContext, OperatorDeployInfo.OperatorType operatorType) {
        logger.debug("type={}, operator class={}", operatorType, obj.getClass());
        return ((obj instanceof InputOperator) && operatorType == OperatorDeployInfo.OperatorType.INPUT) ? new InputNode((InputOperator) obj, operatorContext) : ((obj instanceof Operator.Unifier) && operatorType == OperatorDeployInfo.OperatorType.UNIFIER) ? new UnifierNode((Operator.Unifier) obj, operatorContext) : operatorType == OperatorDeployInfo.OperatorType.OIO ? new OiONode((Operator) obj, operatorContext) : new GenericNode((Operator) obj, operatorContext);
    }

    public int getId() {
        return this.id;
    }

    public void setId(int i) {
        if (this.id != 0) {
            throw new RuntimeException("Id cannot be changed from " + this.id + " to " + i);
        }
        this.id = i;
    }

    public void activate() {
        this.alive = true;
        this.APPLICATION_WINDOW_COUNT = ((Integer) this.context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)).intValue();
        if (this.context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT) != null) {
            this.APPLICATION_WINDOW_COUNT = IntMath.gcd(this.APPLICATION_WINDOW_COUNT, ((Integer) this.context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT)).intValue());
        }
        this.DAG_CHECKPOINT_WINDOW_COUNT = ((Integer) this.context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT)).intValue();
        this.CHECKPOINT_WINDOW_COUNT = ((Integer) this.context.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT)).intValue();
        Collection collection = (Collection) this.context.getValue(OperatorContext.STATS_LISTENERS);
        if (this.CHECKPOINT_WINDOW_COUNT % this.APPLICATION_WINDOW_COUNT != 0) {
            logger.warn("{} is not exact multiple of {} for operator {}. This may cause side effects such as processing to begin without beginWindow preceding it in the first window after activation.", new Object[]{OperatorContext.CHECKPOINT_WINDOW_COUNT, OperatorContext.APPLICATION_WINDOW_COUNT, this.operator});
        }
        this.PROCESSING_MODE = (Operator.ProcessingMode) this.context.getValue(OperatorContext.PROCESSING_MODE);
        if (this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE && this.CHECKPOINT_WINDOW_COUNT != 1) {
            logger.warn("Ignoring {} attribute in favor of {} processing mode", OperatorContext.CHECKPOINT_WINDOW_COUNT.getSimpleName(), Operator.ProcessingMode.EXACTLY_ONCE.name());
            this.CHECKPOINT_WINDOW_COUNT = 1;
        }
        activateSinks();
        if (this.operator instanceof Operator.ActivationListener) {
            this.operator.activate(this.context);
        }
        if (collection != null) {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                this.DATA_TUPLE_AWARE = ((StatsListener) it.next()).getClass().isAnnotationPresent(StatsListener.DataQueueSize.class);
                if (this.DATA_TUPLE_AWARE) {
                    break;
                }
            }
        }
        if (!this.DATA_TUPLE_AWARE && (this.operator instanceof StatsListener)) {
            this.DATA_TUPLE_AWARE = this.operator.getClass().isAnnotationPresent(StatsListener.DataQueueSize.class);
        }
        handleRequests(this.currentWindowId);
    }

    public void deactivate() {
        if (this.operator instanceof Operator.ActivationListener) {
            this.operator.deactivate();
        }
        if (!this.shutdown && !this.alive) {
            emitEndStream();
        }
        deactivateSinks();
    }
}
