/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.appdata;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.PubSubWebSocketMetricTransport;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.MetricAggregatorMeta;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import com.google.common.collect.Maps;
import java.beans.BeanInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppDataPushAgent
extends AbstractService {
    private static final String METRICS_SCHEMA = "metricsSchema";
    private static final String METRICS_SCHEMA_VERSION = "1.0";
    private static final String DATA = "data";
    private static final Logger LOG = LoggerFactory.getLogger(AppDataPushAgent.class);
    private static final String APP_DATA_PUSH_TRANSPORT_BUILTIN_VALUE = "builtin";
    private final StreamingContainerManager dnmgr;
    private final StramAppContext appContext;
    private final AppDataPushThread appDataPushThread = new AppDataPushThread();
    private AutoMetric.Transport metricsTransport;
    private final Map<Class<?>, List<Field>> cacheFields = new HashMap();
    private final Map<Class<?>, Map<String, Method>> cacheGetMethods = new HashMap();
    private final Map<String, Long> operatorsSchemaLastSentTime = Maps.newHashMap();
    private final Map<String, JSONObject> operatorSchemas = Maps.newHashMap();

    public AppDataPushAgent(StreamingContainerManager dnmgr, StramAppContext appContext) {
        super(AppDataPushAgent.class.getName());
        this.dnmgr = dnmgr;
        this.appContext = appContext;
    }

    protected void serviceStop() throws Exception {
        if (this.metricsTransport != null) {
            this.appDataPushThread.interrupt();
            try {
                this.appDataPushThread.join();
            }
            catch (InterruptedException ex) {
                LOG.error("Error joining with {}", (Object)this.appDataPushThread.getName(), (Object)ex);
            }
        }
        super.serviceStop();
    }

    protected void serviceStart() throws Exception {
        if (this.metricsTransport != null) {
            this.appDataPushThread.start();
        }
        super.serviceStart();
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.init();
        super.serviceInit(conf);
    }

    public void init() {
        this.metricsTransport = (AutoMetric.Transport)this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.METRICS_TRANSPORT);
        if (this.metricsTransport instanceof AutoMetricBuiltInTransport) {
            AutoMetricBuiltInTransport transport = (AutoMetricBuiltInTransport)this.metricsTransport;
            this.metricsTransport = new PubSubWebSocketMetricTransport(this.dnmgr.getWsClient(), transport.getTopic(), transport.getSchemaResendInterval());
        }
        LOG.info("Metrics Transport set up for {}", (Object)this.metricsTransport);
    }

    private JSONObject getPushData() {
        JSONObject json = new JSONObject();
        try {
            json.put("type", (Object)DATA);
            json.put("appId", this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.APPLICATION_ID));
            json.put("appName", this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.APPLICATION_NAME));
            json.put("appUser", (Object)this.appContext.getUser());
            List<LogicalOperatorInfo> logicalOperatorInfoList = this.dnmgr.getLogicalOperatorInfoList();
            JSONObject logicalOperators = new JSONObject();
            for (LogicalOperatorInfo logicalOperator : logicalOperatorInfoList) {
                JSONObject logicalOperatorJson = this.extractFields(logicalOperator);
                JSONArray metricsList = new JSONArray();
                Queue<Pair<Long, Map<String, Object>>> windowMetrics = this.dnmgr.getWindowMetrics(logicalOperator.name);
                if (windowMetrics != null) {
                    while (!windowMetrics.isEmpty()) {
                        Pair<Long, Map<String, Object>> metrics = windowMetrics.remove();
                        long windowId = (Long)metrics.first;
                        Map aggregates = (Map)metrics.second;
                        long now = System.currentTimeMillis();
                        if (!this.operatorsSchemaLastSentTime.containsKey(logicalOperator.name) || this.metricsTransport.getSchemaResendInterval() > 0L && this.operatorsSchemaLastSentTime.get(logicalOperator.name) < now - this.metricsTransport.getSchemaResendInterval()) {
                            try {
                                this.pushMetricsSchema(this.dnmgr.getLogicalPlan().getOperatorMeta(logicalOperator.name), aggregates);
                                this.operatorsSchemaLastSentTime.put(logicalOperator.name, now);
                            }
                            catch (IOException ex) {
                                LOG.error("Cannot push metrics schema", (Throwable)ex);
                            }
                        }
                        JSONObject metricsItem = new JSONObject();
                        metricsItem.put("_windowId", windowId);
                        long windowToMillis = this.dnmgr.windowIdToMillis(windowId);
                        LOG.debug("metric window {} time {}", (Object)windowId, (Object)windowToMillis);
                        metricsItem.put("_time", windowToMillis);
                        for (Map.Entry entry : aggregates.entrySet()) {
                            String metricName = (String)entry.getKey();
                            Object aggregateValue = entry.getValue();
                            metricsItem.put(metricName, aggregateValue);
                        }
                        metricsList.put((Object)metricsItem);
                    }
                }
                logicalOperatorJson.put("metrics", (Object)metricsList);
                logicalOperators.put(logicalOperator.name, (Object)logicalOperatorJson);
            }
            json.put("time", System.currentTimeMillis());
            json.put("logicalOperators", (Object)logicalOperators);
            json.put("stats", (Object)this.extractFields(this.appContext.getStats()));
        }
        catch (JSONException ex) {
            throw new RuntimeException(ex);
        }
        return json;
    }

    private JSONObject extractFields(Object o) {
        Map<Object, Object> methods;
        List<Object> fields;
        if (this.cacheFields.containsKey(o.getClass())) {
            fields = this.cacheFields.get(o.getClass());
        } else {
            fields = new ArrayList();
            for (Class<?> c = o.getClass(); c != Object.class; c = c.getSuperclass()) {
                Field[] declaredFields;
                Field[] fieldArray = declaredFields = c.getDeclaredFields();
                int len$ = fieldArray.length;
                for (int i$ = 0; i$ < len$; ++i$) {
                    Field field = fieldArray[i$];
                    field.setAccessible(true);
                    AutoMetric rfa = field.getAnnotation(AutoMetric.class);
                    if (rfa == null) continue;
                    field.setAccessible(true);
                    try {
                        fields.add(field);
                        continue;
                    }
                    catch (Exception ex) {
                        LOG.debug("Error extracting fields for app data: {}. Ignoring.", (Object)ex.getMessage());
                    }
                }
            }
            this.cacheFields.put(o.getClass(), fields);
        }
        JSONObject result = new JSONObject();
        for (Field field : fields) {
            try {
                result.put(field.getName(), field.get(o));
            }
            catch (Exception len$) {}
        }
        if (this.cacheGetMethods.containsKey(o.getClass())) {
            methods = this.cacheGetMethods.get(o.getClass());
        } else {
            methods = new HashMap();
            try {
                BeanInfo info = Introspector.getBeanInfo(o.getClass());
                for (PropertyDescriptor pd : info.getPropertyDescriptors()) {
                    AutoMetric rfa;
                    Method method = pd.getReadMethod();
                    if (pd.getReadMethod() == null || (rfa = method.getAnnotation(AutoMetric.class)) == null) continue;
                    methods.put(pd.getName(), method);
                }
            }
            catch (IntrospectionException info) {
                // empty catch block
            }
            this.cacheGetMethods.put(o.getClass(), methods);
        }
        for (Map.Entry entry : methods.entrySet()) {
            try {
                result.put((String)entry.getKey(), ((Method)entry.getValue()).invoke(o, new Object[0]));
            }
            catch (Exception exception) {}
        }
        return result;
    }

    private JSONObject getMetricsSchemaData(LogicalPlan.OperatorMeta operatorMeta, Map<String, Object> aggregates) {
        JSONObject result = new JSONObject();
        try {
            result.put("type", (Object)METRICS_SCHEMA);
            result.put("version", (Object)METRICS_SCHEMA_VERSION);
            result.put("appUser", (Object)this.appContext.getUser());
            result.put("appName", this.dnmgr.getApplicationAttributes().get(Context.DAGContext.APPLICATION_NAME));
            result.put("logicalOperatorName", (Object)operatorMeta.getName());
            MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta();
            JSONArray valueSchemas = new JSONArray();
            for (Map.Entry<String, Object> entry : aggregates.entrySet()) {
                String metricName = entry.getKey();
                Object metricValue = entry.getValue();
                JSONObject valueSchema = new JSONObject();
                valueSchema.put("name", (Object)metricName);
                Class type = ClassUtils.wrapperToPrimitive(metricValue.getClass());
                valueSchema.put("type", type == null ? metricValue.getClass().getCanonicalName() : type);
                String[] dimensionAggregators = metricAggregatorMeta.getDimensionAggregatorsFor(metricName);
                if (dimensionAggregators != null) {
                    valueSchema.put("dimensionAggregators", Arrays.asList(dimensionAggregators));
                }
                valueSchemas.put((Object)valueSchema);
            }
            result.put("values", (Object)valueSchemas);
            String[] timeBuckets = metricAggregatorMeta.getTimeBuckets();
            if (timeBuckets != null) {
                result.put("timeBuckets", Arrays.asList(timeBuckets));
            }
        }
        catch (JSONException ex) {
            throw new RuntimeException(ex);
        }
        return result;
    }

    public void pushMetricsSchema(LogicalPlan.OperatorMeta operatorMeta, Map<String, Object> aggregates) throws IOException {
        JSONObject schema = this.operatorSchemas.get(operatorMeta.getName());
        if (schema == null) {
            schema = this.getMetricsSchemaData(operatorMeta, aggregates);
            this.operatorSchemas.put(operatorMeta.getName(), schema);
        }
        this.metricsTransport.push(schema.toString());
    }

    public void pushData() throws IOException {
        this.metricsTransport.push(this.getPushData().toString());
    }

    public class AppDataPushThread
    extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    AppDataPushAgent.this.pushData();
                }
                catch (IOException ex) {
                    LOG.warn("Error during pushing app data", (Throwable)ex);
                }
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ex) {
                    LOG.warn("Received interrupt, exiting app data push thread!");
                    return;
                }
            }
        }
    }
}

