/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.r;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.r.REngineConnectable;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.rosuda.REngine.REngineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(partitionable=false)
public class RStandardDeviation
extends BaseOperator {
    private List<Number> values = new ArrayList<Number>();
    REngineConnectable connectable;
    private static Logger log = LoggerFactory.getLogger(RStandardDeviation.class);
    public final transient DefaultInputPort<Number> data = new DefaultInputPort<Number>(){

        public void process(Number tuple) {
            RStandardDeviation.this.values.add(tuple.doubleValue());
        }
    };
    @OutputPortFieldAnnotation(optional=true)
    public final transient DefaultOutputPort<Number> variance = new DefaultOutputPort();
    public final transient DefaultOutputPort<Number> standardDeviation = new DefaultOutputPort();

    public void setup(Context.OperatorContext context) {
        super.setup(context);
        try {
            this.connectable = new REngineConnectable();
            this.connectable.connect();
        }
        catch (IOException ioe) {
            log.error("Exception: ", (Throwable)ioe);
            DTThrowable.rethrow((Exception)ioe);
        }
    }

    public void teardown() {
        try {
            this.connectable.disconnect();
        }
        catch (IOException ioe) {
            log.error("Exception: ", (Throwable)ioe);
            DTThrowable.rethrow((Exception)ioe);
        }
    }

    public void endWindow() {
        if (this.values.size() == 0) {
            return;
        }
        double[] vector = new double[this.values.size()];
        for (int i = 0; i < this.values.size(); ++i) {
            vector[i] = this.values.get(i).doubleValue();
        }
        try {
            this.connectable.getRengine().assign("values", vector);
        }
        catch (REngineException e) {
            log.error("Exception: ", (Throwable)e);
            DTThrowable.rethrow((Exception)((Object)e));
        }
        double rStandardDeviation = 0.0;
        double rVariance = 0.0;
        try {
            rStandardDeviation = this.connectable.getRengine().parseAndEval("sd(values)").asDouble();
            rVariance = this.connectable.getRengine().parseAndEval("var(values)").asDouble();
            this.connectable.getRengine().parseAndEval("rm(list = setdiff(ls(), lsf.str()))");
        }
        catch (Exception e) {
            log.error("Exception: ", (Throwable)e);
            DTThrowable.rethrow((Exception)e);
        }
        this.variance.emit((Object)rVariance);
        this.standardDeviation.emit((Object)rStandardDeviation);
        this.values.clear();
    }
}

