/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.RecordField;
import com.datatorrent.common.codec.JsonStreamCodec;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.StatsRecorder;
import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.util.FSPartFileCollection;
import com.datatorrent.stram.webapp.ContainerInfo;
import com.datatorrent.stram.webapp.OperatorInfo;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSStatsRecorder
implements StatsRecorder {
    public static final String VERSION = "1.0";
    private static final Logger LOG = LoggerFactory.getLogger(FSStatsRecorder.class);
    private String basePath = ".";
    private FSPartFileCollection containersStorage;
    private final Map<String, FSPartFileCollection> logicalOperatorStorageMap = new ConcurrentHashMap<String, FSPartFileCollection>();
    private final Map<String, Integer> knownContainers = new HashMap<String, Integer>();
    private final Set<String> knownOperators = new HashSet<String>();
    private transient StreamCodec<Object> streamCodec;
    private final Map<Class<?>, List<Field>> metaFields = new HashMap();
    private final Map<Class<?>, List<Field>> statsFields = new HashMap();
    private final BlockingQueue<WriteOperation> queue = new LinkedBlockingQueue<WriteOperation>();
    private final StatsRecorderThread statsRecorderThread = new StatsRecorderThread();

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public void setup() {
        try {
            this.streamCodec = new JsonStreamCodec();
            this.containersStorage = new FSPartFileCollection();
            this.containersStorage.setBasePath(this.basePath + "/containers");
            this.containersStorage.setup();
            this.containersStorage.writeMetaData("1.0\n".getBytes());
            this.statsRecorderThread.start();
        }
        catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    public void teardown() {
        this.statsRecorderThread.interrupt();
        try {
            this.statsRecorderThread.join();
        }
        catch (InterruptedException ex) {
            LOG.warn("Stats recorder thread join interrupted");
        }
        if (this.containersStorage != null) {
            this.containersStorage.teardown();
        }
        for (FSPartFileCollection operatorStorage : this.logicalOperatorStorageMap.values()) {
            operatorStorage.teardown();
        }
    }

    @Override
    public void recordContainers(Map<String, StreamingContainerAgent> containerMap, long timestamp) throws IOException {
        for (Map.Entry<String, StreamingContainerAgent> entry : containerMap.entrySet()) {
            Slice f;
            ByteArrayOutputStream bos;
            Map<String, Object> fieldMap;
            int containerIndex;
            StreamingContainerAgent sca = entry.getValue();
            ContainerInfo containerInfo = sca.getContainerInfo();
            if (!containerInfo.state.equals("ACTIVE")) continue;
            if (!this.knownContainers.containsKey(entry.getKey())) {
                containerIndex = this.knownContainers.size();
                this.knownContainers.put(entry.getKey(), containerIndex);
                fieldMap = this.extractRecordFields(containerInfo, "meta");
                bos = new ByteArrayOutputStream();
                f = this.streamCodec.toByteArray(fieldMap);
                bos.write((String.valueOf(containerIndex) + ":").getBytes());
                bos.write(f.buffer, f.offset, f.length);
                bos.write("\n".getBytes());
                this.queue.add(new WriteOperation(this.containersStorage, bos.toByteArray(), true));
            } else {
                containerIndex = this.knownContainers.get(entry.getKey());
            }
            fieldMap = this.extractRecordFields(containerInfo, "stats");
            bos = new ByteArrayOutputStream();
            f = this.streamCodec.toByteArray(fieldMap);
            bos.write((String.valueOf(containerIndex) + ":").getBytes());
            bos.write((String.valueOf(timestamp) + ":").getBytes());
            bos.write(f.buffer, f.offset, f.length);
            bos.write("\n".getBytes());
            this.queue.add(new WriteOperation(this.containersStorage, bos.toByteArray(), false));
        }
    }

    @Override
    public void recordOperators(List<OperatorInfo> operatorList, long timestamp) throws IOException {
        for (OperatorInfo operatorInfo : operatorList) {
            Slice f;
            ByteArrayOutputStream bos;
            Map<String, Object> fieldMap;
            FSPartFileCollection operatorStorage;
            if (!this.logicalOperatorStorageMap.containsKey(operatorInfo.name)) {
                operatorStorage = new FSPartFileCollection();
                operatorStorage.setBasePath(this.basePath + "/operators/" + operatorInfo.name);
                operatorStorage.setup();
                operatorStorage.writeMetaData("1.0\n".getBytes());
                this.logicalOperatorStorageMap.put(operatorInfo.name, operatorStorage);
            } else {
                operatorStorage = this.logicalOperatorStorageMap.get(operatorInfo.name);
            }
            if (!this.knownOperators.contains(operatorInfo.id)) {
                this.knownOperators.add(operatorInfo.id);
                fieldMap = this.extractRecordFields(operatorInfo, "meta");
                bos = new ByteArrayOutputStream();
                f = this.streamCodec.toByteArray(fieldMap);
                bos.write(f.buffer, f.offset, f.length);
                bos.write("\n".getBytes());
                this.queue.add(new WriteOperation(operatorStorage, bos.toByteArray(), true));
            }
            fieldMap = this.extractRecordFields(operatorInfo, "stats");
            bos = new ByteArrayOutputStream();
            f = this.streamCodec.toByteArray(fieldMap);
            bos.write((operatorInfo.id + ":").getBytes());
            bos.write((String.valueOf(timestamp) + ":").getBytes());
            bos.write(f.buffer, f.offset, f.length);
            bos.write("\n".getBytes());
            this.queue.add(new WriteOperation(operatorStorage, bos.toByteArray(), false));
        }
    }

    public Map<String, Object> extractRecordFields(Object o, String type) {
        HashMap<String, Object> fieldMap = new HashMap<String, Object>();
        try {
            List<Object> fieldList;
            Map<Class<?>, List<Field>> cacheFields = null;
            if (type.equals("meta")) {
                cacheFields = this.metaFields;
            } else if (type.equals("stats")) {
                cacheFields = this.statsFields;
            }
            if (cacheFields == null || !cacheFields.containsKey(o.getClass())) {
                fieldList = new ArrayList();
                for (Class<?> c = o.getClass(); c != Object.class; c = c.getSuperclass()) {
                    Field[] fieldArray;
                    for (Field field : fieldArray = c.getDeclaredFields()) {
                        field.setAccessible(true);
                        RecordField rfa = field.getAnnotation(RecordField.class);
                        if (rfa == null || !rfa.type().equals(type)) continue;
                        fieldList.add(field);
                    }
                }
                if (cacheFields != null) {
                    cacheFields.put(o.getClass(), fieldList);
                }
            } else {
                fieldList = cacheFields.get(o.getClass());
            }
            for (Field field : fieldList) {
                fieldMap.put(field.getName(), field.get(o));
            }
        }
        catch (IllegalAccessException ex) {
            throw new RuntimeException(ex);
        }
        return fieldMap;
    }

    public void requestSync() {
        this.containersStorage.requestSync();
        for (Map.Entry<String, FSPartFileCollection> entry : this.logicalOperatorStorageMap.entrySet()) {
            entry.getValue().requestSync();
        }
    }

    static /* synthetic */ BlockingQueue access$100(FSStatsRecorder x0) {
        return x0.queue;
    }

    static /* synthetic */ FSPartFileCollection access$200(FSStatsRecorder x0) {
        return x0.containersStorage;
    }

    static /* synthetic */ Map access$300(FSStatsRecorder x0) {
        return x0.logicalOperatorStorageMap;
    }

    static /* synthetic */ Logger access$400() {
        return LOG;
    }

    private static class WriteOperation {
        FSPartFileCollection storage;
        byte[] bytes;
        boolean meta;

        WriteOperation(FSPartFileCollection storage, byte[] bytes, boolean meta) {
            this.storage = storage;
            this.bytes = bytes;
            this.meta = meta;
        }
    }

    private class StatsRecorderThread
    extends Thread {
        private StatsRecorderThread() {
        }

        /*
         * Unable to fully structure code
         */
        @Override
        public void run() {
            while (true) {
                try {
                    block4: while (true) {
                        wo = (WriteOperation)FSStatsRecorder.access$100(FSStatsRecorder.this).take();
                        if (wo.meta) {
                            wo.storage.writeMetaData(wo.bytes);
                        } else {
                            wo.storage.writeDataItem(wo.bytes, true);
                        }
                        Thread.yield();
                        if (!FSStatsRecorder.access$100(FSStatsRecorder.this).isEmpty()) continue;
                        FSStatsRecorder.access$200(FSStatsRecorder.this).flushData();
                        i$ = FSStatsRecorder.access$300(FSStatsRecorder.this).values().iterator();
                        while (true) {
                            if (i$.hasNext()) ** break;
                            continue block4;
                            operatorStorage = (FSPartFileCollection)i$.next();
                            operatorStorage.flushData();
                        }
                        break;
                    }
                }
                catch (InterruptedException ex) {
                    return;
                }
                catch (Exception ex) {
                    FSStatsRecorder.access$400().error("Caught Exception", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }
}

