package com.datatorrent.contrib.r;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.contrib.hbase.HBaseScanOperator;
import com.datatorrent.lib.script.ScriptOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import javax.validation.constraints.NotNull;
import org.rosuda.REngine.REXPDouble;
import org.rosuda.REngine.REXPInteger;
import org.rosuda.REngine.REXPLogical;
import org.rosuda.REngine.REXPString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/r/RScript.class */
public class RScript extends ScriptOperator {
    private static final long serialVersionUID = 201401161205L;

    @NotNull
    private Map<String, REXP_TYPE> argTypeMap;

    @NotNull
    private String functionName;

    @NotNull
    protected String scriptFilePath;
    REngineConnectable connectable;
    private static Logger log = LoggerFactory.getLogger(RScript.class);
    private String returnVariable = "retVal";
    public final transient DefaultOutputPort<Integer> intOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Double> doubleOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<String> strOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Boolean> boolOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Integer[]> intArrayOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Double[]> doubleArrayOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<String[]> strArrayOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<Boolean[]> boolArrayOutput = new DefaultOutputPort<>();

    /* loaded from: input_file:com/datatorrent/contrib/r/RScript$REXP_TYPE.class */
    public enum REXP_TYPE {
        REXP_INT(1),
        REXP_DOUBLE(2),
        REXP_STR(3),
        REXP_BOOL(6),
        REXP_ARRAY_INT(32),
        REXP_ARRAY_DOUBLE(33),
        REXP_ARRAY_STR(34),
        REXP_ARRAY_BOOL(36);

        private int value;

        REXP_TYPE(int i) {
            this.value = i;
        }
    }

    public RScript() {
    }

    public RScript(String str, String str2, String str3) {
        setScriptFilePath(str);
        super.setScript(readFileAsString());
        setFunctionName(str2);
        setReturnVariable(str3);
    }

    public Map<String, REXP_TYPE> getArgTypeMap() {
        return this.argTypeMap;
    }

    public void setArgTypeMap(Map<String, REXP_TYPE> map) {
        this.argTypeMap = map;
    }

    public Map<String, Object> getBindings() {
        return null;
    }

    public String getReturnVariable() {
        return this.returnVariable;
    }

    public void setReturnVariable(String str) {
        this.returnVariable = str;
    }

    public String getFunctionName() {
        return this.functionName;
    }

    public void setFunctionName(String str) {
        this.functionName = str;
    }

    public String getScriptFilePath() {
        return this.scriptFilePath;
    }

    public void setScriptFilePath(String str) {
        this.scriptFilePath = str;
    }

    public void process(Map<String, Object> map) {
        processTuple(map);
    }

    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        try {
            this.connectable = new REngineConnectable();
            this.connectable.connect();
            this.connectable.getRengine().parseAndEval(((ScriptOperator) this).script);
        } catch (Exception e) {
            log.error("Exception: ", e);
            DTThrowable.rethrow(e);
        }
    }

    public void teardown() {
        try {
            this.connectable.disconnect();
        } catch (IOException e) {
            log.error("Exception: ", e);
            DTThrowable.rethrow(e);
        }
    }

    private String readFileAsString() {
        StringBuffer stringBuffer = new StringBuffer(HBaseScanOperator.DEF_QUEUE_SIZE);
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(getClass().getClassLoader().getResourceAsStream(this.scriptFilePath)));
            char[] cArr = new char[1024];
            while (true) {
                int read = bufferedReader.read(cArr);
                if (read == -1) {
                    break;
                }
                stringBuffer.append(String.valueOf(cArr, 0, read));
            }
            bufferedReader.close();
        } catch (IOException e) {
            log.error("Exception: ", e);
            DTThrowable.rethrow(e);
        }
        return stringBuffer.toString();
    }

    public void processTuple(Map<String, Object> map) {
        try {
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                switch (this.argTypeMap.get(key)) {
                    case REXP_INT:
                        this.connectable.getRengine().assign(key, new REXPInteger(new int[]{((Integer) value).intValue()}));
                        break;
                    case REXP_DOUBLE:
                        this.connectable.getRengine().assign(key, new REXPDouble(new double[]{((Double) value).doubleValue()}));
                        break;
                    case REXP_STR:
                        this.connectable.getRengine().assign(key, new REXPString(new String[]{(String) value}));
                        break;
                    case REXP_BOOL:
                        this.connectable.getRengine().assign(key, new REXPLogical(new Boolean[]{(Boolean) value}[0].booleanValue()));
                        break;
                    case REXP_ARRAY_INT:
                        this.connectable.getRengine().assign(key, new REXPInteger((int[]) value));
                        break;
                    case REXP_ARRAY_DOUBLE:
                        this.connectable.getRengine().assign(key, new REXPDouble((double[]) value));
                        break;
                    case REXP_ARRAY_STR:
                        this.connectable.getRengine().assign(key, new REXPString((String[]) value));
                        break;
                    case REXP_ARRAY_BOOL:
                        this.connectable.getRengine().assign(key, new REXPLogical((boolean[]) value));
                        break;
                    default:
                        throw new IllegalArgumentException("Unsupported data type ... ");
                }
            }
            this.connectable.getRengine().parseAndEval(getReturnVariable() + "<-" + getFunctionName() + "()");
            REXPLogical parseAndEval = this.connectable.getRengine().parseAndEval(getReturnVariable());
            this.connectable.getRengine().parseAndEval("rm(list = setdiff(ls(), lsf.str()))");
            if (parseAndEval.isInteger()) {
                int length = parseAndEval.length();
                if (length > 1) {
                    Integer[] numArr = new Integer[length];
                    for (int i = 0; i < length; i++) {
                        numArr[i] = Integer.valueOf(parseAndEval.asIntegers()[i]);
                    }
                    this.intArrayOutput.emit(numArr);
                } else {
                    this.intOutput.emit(Integer.valueOf(parseAndEval.asInteger()));
                }
            } else if (parseAndEval.isNumeric()) {
                int length2 = parseAndEval.length();
                if (length2 > 1) {
                    Double[] dArr = new Double[length2];
                    for (int i2 = 0; i2 < length2; i2++) {
                        dArr[i2] = Double.valueOf(parseAndEval.asDoubles()[i2]);
                    }
                    this.doubleArrayOutput.emit(dArr);
                } else {
                    this.doubleOutput.emit(Double.valueOf(parseAndEval.asDouble()));
                }
            } else if (!parseAndEval.isString()) {
                if (!parseAndEval.isLogical()) {
                    throw new IllegalArgumentException("Unsupported data type returned ... ");
                }
                int length3 = parseAndEval.length();
                boolean[] zArr = new boolean[length3];
                if (length3 > 1) {
                    Boolean[] boolArr = new Boolean[length3];
                    for (int i3 = 0; i3 < length3; i3++) {
                        boolArr[i3] = Boolean.valueOf(parseAndEval.isTRUE()[i3]);
                    }
                    this.boolArrayOutput.emit(boolArr);
                } else {
                    this.boolOutput.emit(Boolean.valueOf(parseAndEval.isTRUE()[0]));
                }
            } else if (parseAndEval.length() > 1) {
                this.strArrayOutput.emit(parseAndEval.asStrings());
            } else {
                this.strOutput.emit(parseAndEval.asString());
            }
        } catch (Exception e) {
            log.error("Exception: ", e);
            DTThrowable.rethrow(e);
        }
    }
}
