package com.datatorrent.stram.appdata;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.common.metric.AutoMetricBuiltInTransport;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.PubSubWebSocketMetricTransport;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.MetricAggregatorMeta;
import com.datatorrent.stram.webapp.LogicalOperatorInfo;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/appdata/AppDataPushAgent.class */
public class AppDataPushAgent extends AbstractService {
    private static final String METRICS_SCHEMA = "metricsSchema";
    private static final String METRICS_SCHEMA_VERSION = "1.0";
    private static final String DATA = "data";
    private static final Logger LOG = LoggerFactory.getLogger(AppDataPushAgent.class);
    private final StreamingContainerManager dnmgr;
    private final StramAppContext appContext;
    private final AppDataPushThread appDataPushThread;
    private AutoMetric.Transport metricsTransport;
    private final Map<Class<?>, List<Field>> cacheFields;
    private final Map<Class<?>, Map<String, Method>> cacheGetMethods;
    private final Map<String, Long> operatorsSchemaLastSentTime;
    private final Map<String, JSONObject> operatorSchemas;

    /* loaded from: input_file:com/datatorrent/stram/appdata/AppDataPushAgent$AppDataPushThread.class */
    public class AppDataPushThread extends Thread {
        public AppDataPushThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    AppDataPushAgent.this.pushData();
                } catch (IOException e) {
                    AppDataPushAgent.LOG.warn("Error during pushing app data", e);
                }
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e2) {
                    AppDataPushAgent.LOG.warn("Received interrupt, exiting app data push thread!");
                    return;
                }
            }
        }
    }

    public AppDataPushAgent(StreamingContainerManager streamingContainerManager, StramAppContext stramAppContext) {
        super(AppDataPushAgent.class.getName());
        this.appDataPushThread = new AppDataPushThread();
        this.cacheFields = new HashMap();
        this.cacheGetMethods = new HashMap();
        this.operatorsSchemaLastSentTime = new HashMap();
        this.operatorSchemas = new HashMap();
        this.dnmgr = streamingContainerManager;
        this.appContext = stramAppContext;
    }

    protected void serviceStop() throws Exception {
        if (this.metricsTransport != null) {
            this.appDataPushThread.interrupt();
            try {
                this.appDataPushThread.join();
            } catch (InterruptedException e) {
                LOG.error("Error joining with {}", this.appDataPushThread.getName(), e);
            }
        }
        super.serviceStop();
    }

    protected void serviceStart() throws Exception {
        if (this.metricsTransport != null) {
            this.appDataPushThread.start();
        }
        super.serviceStart();
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        init();
        super.serviceInit(configuration);
    }

    public void init() {
        this.metricsTransport = (AutoMetric.Transport) this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.METRICS_TRANSPORT);
        if (this.metricsTransport instanceof AutoMetricBuiltInTransport) {
            AutoMetricBuiltInTransport autoMetricBuiltInTransport = this.metricsTransport;
            this.metricsTransport = new PubSubWebSocketMetricTransport(this.dnmgr.getWsClient(), autoMetricBuiltInTransport.getTopic(), autoMetricBuiltInTransport.getSchemaResendInterval());
        }
        LOG.info("Metrics Transport set up for {}", this.metricsTransport);
    }

    private JSONObject getPushData() {
        JSONObject jSONObject = new JSONObject();
        try {
            jSONObject.put("type", DATA);
            jSONObject.put("appId", this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.APPLICATION_ID));
            jSONObject.put("appName", this.dnmgr.getLogicalPlan().getValue(Context.DAGContext.APPLICATION_NAME));
            jSONObject.put("appUser", this.appContext.getUser());
            List<LogicalOperatorInfo> logicalOperatorInfoList = this.dnmgr.getLogicalOperatorInfoList();
            JSONObject jSONObject2 = new JSONObject();
            for (LogicalOperatorInfo logicalOperatorInfo : logicalOperatorInfoList) {
                JSONObject extractFields = extractFields(logicalOperatorInfo);
                JSONArray jSONArray = new JSONArray();
                Queue<Pair<Long, Map<String, Object>>> windowMetrics = this.dnmgr.getWindowMetrics(logicalOperatorInfo.name);
                if (windowMetrics != null) {
                    while (!windowMetrics.isEmpty()) {
                        Pair<Long, Map<String, Object>> remove = windowMetrics.remove();
                        long longValue = ((Long) remove.first).longValue();
                        Map<String, Object> map = (Map) remove.second;
                        long currentTimeMillis = System.currentTimeMillis();
                        if (!this.operatorsSchemaLastSentTime.containsKey(logicalOperatorInfo.name) || (this.metricsTransport.getSchemaResendInterval() > 0 && this.operatorsSchemaLastSentTime.get(logicalOperatorInfo.name).longValue() < currentTimeMillis - this.metricsTransport.getSchemaResendInterval())) {
                            try {
                                pushMetricsSchema(this.dnmgr.getLogicalPlan().m88getOperatorMeta(logicalOperatorInfo.name), map);
                                this.operatorsSchemaLastSentTime.put(logicalOperatorInfo.name, Long.valueOf(currentTimeMillis));
                            } catch (IOException e) {
                                LOG.error("Cannot push metrics schema", e);
                            }
                        }
                        JSONObject jSONObject3 = new JSONObject();
                        jSONObject3.put("_windowId", longValue);
                        long windowIdToMillis = this.dnmgr.windowIdToMillis(longValue);
                        LOG.debug("metric window {} time {}", Long.valueOf(longValue), Long.valueOf(windowIdToMillis));
                        jSONObject3.put("_time", windowIdToMillis);
                        for (Map.Entry<String, Object> entry : map.entrySet()) {
                            jSONObject3.put(entry.getKey(), entry.getValue());
                        }
                        jSONArray.put(jSONObject3);
                    }
                }
                extractFields.put(StreamingContainerManager.APP_META_KEY_METRICS, jSONArray);
                jSONObject2.put(logicalOperatorInfo.name, extractFields);
            }
            jSONObject.put("time", System.currentTimeMillis());
            jSONObject.put("logicalOperators", jSONObject2);
            jSONObject.put("stats", extractFields(this.appContext.getStats()));
            return jSONObject;
        } catch (JSONException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v78, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v87, types: [java.util.List] */
    private JSONObject extractFields(Object obj) {
        ArrayList<Field> arrayList;
        HashMap hashMap;
        if (this.cacheFields.containsKey(obj.getClass())) {
            arrayList = (List) this.cacheFields.get(obj.getClass());
        } else {
            arrayList = new ArrayList();
            Class<?> cls = obj.getClass();
            while (true) {
                Class<?> cls2 = cls;
                if (cls2 == Object.class) {
                    break;
                }
                for (Field field : cls2.getDeclaredFields()) {
                    field.setAccessible(true);
                    if (field.getAnnotation(AutoMetric.class) != null) {
                        field.setAccessible(true);
                        try {
                            arrayList.add(field);
                        } catch (Exception e) {
                            LOG.debug("Error extracting fields for app data: {}. Ignoring.", e.getMessage());
                        }
                    }
                }
                cls = cls2.getSuperclass();
            }
            this.cacheFields.put(obj.getClass(), arrayList);
        }
        JSONObject jSONObject = new JSONObject();
        for (Field field2 : arrayList) {
            try {
                jSONObject.put(field2.getName(), field2.get(obj));
            } catch (Exception e2) {
            }
        }
        if (this.cacheGetMethods.containsKey(obj.getClass())) {
            hashMap = (Map) this.cacheGetMethods.get(obj.getClass());
        } else {
            hashMap = new HashMap();
            try {
                for (PropertyDescriptor propertyDescriptor : Introspector.getBeanInfo(obj.getClass()).getPropertyDescriptors()) {
                    Method readMethod = propertyDescriptor.getReadMethod();
                    if (propertyDescriptor.getReadMethod() != null && readMethod.getAnnotation(AutoMetric.class) != null) {
                        hashMap.put(propertyDescriptor.getName(), readMethod);
                    }
                }
            } catch (IntrospectionException e3) {
            }
            this.cacheGetMethods.put(obj.getClass(), hashMap);
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            try {
                jSONObject.put((String) entry.getKey(), ((Method) entry.getValue()).invoke(obj, new Object[0]));
            } catch (Exception e4) {
            }
        }
        return jSONObject;
    }

    private JSONObject getMetricsSchemaData(LogicalPlan.OperatorMeta operatorMeta, Map<String, Object> map) {
        JSONObject jSONObject = new JSONObject();
        try {
            jSONObject.put("type", METRICS_SCHEMA);
            jSONObject.put("version", "1.0");
            jSONObject.put("appUser", this.appContext.getUser());
            jSONObject.put("appName", this.dnmgr.getApplicationAttributes().get(Context.DAGContext.APPLICATION_NAME));
            jSONObject.put("logicalOperatorName", operatorMeta.getName());
            MetricAggregatorMeta metricAggregatorMeta = operatorMeta.getMetricAggregatorMeta();
            JSONArray jSONArray = new JSONArray();
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                JSONObject jSONObject2 = new JSONObject();
                jSONObject2.put("name", key);
                Class wrapperToPrimitive = ClassUtils.wrapperToPrimitive(value.getClass());
                jSONObject2.put("type", wrapperToPrimitive == null ? value.getClass().getCanonicalName() : wrapperToPrimitive);
                String[] dimensionAggregatorsFor = metricAggregatorMeta.getDimensionAggregatorsFor(key);
                if (dimensionAggregatorsFor != null) {
                    jSONObject2.put("dimensionAggregators", Arrays.asList(dimensionAggregatorsFor));
                }
                jSONArray.put(jSONObject2);
            }
            jSONObject.put("values", jSONArray);
            String[] timeBuckets = metricAggregatorMeta.getTimeBuckets();
            if (timeBuckets != null) {
                jSONObject.put("timeBuckets", Arrays.asList(timeBuckets));
            }
            return jSONObject;
        } catch (JSONException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void pushMetricsSchema(LogicalPlan.OperatorMeta operatorMeta, Map<String, Object> map) throws IOException {
        JSONObject jSONObject = this.operatorSchemas.get(operatorMeta.getName());
        if (jSONObject == null) {
            jSONObject = getMetricsSchemaData(operatorMeta, map);
            this.operatorSchemas.put(operatorMeta.getName(), jSONObject);
        }
        this.metricsTransport.push(jSONObject.toString());
    }

    public void pushData() throws IOException {
        this.metricsTransport.push(getPushData().toString());
    }
}
