/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.client;

import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.client.FSPartFileAgent;
import com.datatorrent.stram.client.StramAgent;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.ser.std.ToStringSerializer;
import org.codehaus.jettison.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EventsAgent
extends FSPartFileAgent {
    private static final Logger LOG = LoggerFactory.getLogger(EventsAgent.class);

    public EventsAgent(StramAgent stramAgent) {
        super(stramAgent);
    }

    private String getEventsDirectory(String appId) {
        String appPath = this.stramAgent.getAppPath(appId);
        if (appPath == null) {
            return null;
        }
        return appPath + "/" + "events";
    }

    @Override
    protected EventsIndexLine parseIndexLine(String line) throws JSONException {
        EventsIndexLine info = new EventsIndexLine();
        if (line.startsWith("E")) {
            info.isEndLine = true;
            return info;
        }
        line = line.trim();
        int cursor = 2;
        int cursor2 = line.indexOf(58, cursor);
        info.partFile = line.substring(cursor, cursor2);
        cursor = cursor2 + 1;
        cursor2 = line.indexOf(58, cursor);
        String timeRange = line.substring(cursor, cursor2);
        String[] tmp = timeRange.split("-");
        info.startTime = Long.valueOf(tmp[0]);
        info.endTime = Long.valueOf(tmp[1]);
        cursor = cursor2 + 1;
        info.numEvents = Long.valueOf(line.substring(cursor));
        return info;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<EventInfo> getLatestEvents(String appId, int limit) {
        LinkedList<EventInfo> result = new LinkedList<EventInfo>();
        String dir = this.getEventsDirectory(appId);
        if (dir == null) {
            return null;
        }
        long totalNumEvents = 0L;
        FSPartFileAgent.IndexFileBufferedReader ifbr = null;
        LinkedList<Pair> partFiles = new LinkedList<Pair>();
        try {
            EventsIndexLine indexLine;
            ifbr = new FSPartFileAgent.IndexFileBufferedReader(this, new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, "index.txt"))), dir);
            while ((indexLine = (EventsIndexLine)ifbr.readIndexLine()) != null) {
                if (indexLine.isEndLine) continue;
                partFiles.add(new Pair((Object)indexLine.partFile, (Object)indexLine.numEvents));
                totalNumEvents += indexLine.numEvents;
            }
            IOUtils.closeQuietly((Reader)ifbr);
        }
        catch (Exception ex) {
            LOG.warn("Got exception when reading events", (Throwable)ex);
            LinkedList<EventInfo> linkedList = result;
            return linkedList;
        }
        finally {
            IOUtils.closeQuietly(ifbr);
        }
        long offset = 0L;
        while (totalNumEvents > (long)limit && !partFiles.isEmpty()) {
            Pair head = (Pair)partFiles.getFirst();
            if (totalNumEvents - (Long)head.second < (long)limit) {
                offset = Math.max(0L, totalNumEvents - (long)limit);
                break;
            }
            totalNumEvents -= ((Long)head.second).longValue();
            partFiles.removeFirst();
        }
        String lastProcessPartFile = null;
        for (Pair partFile : partFiles) {
            BufferedReader partBr = null;
            try {
                partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, (String)partFile.first))));
                this.processPartFile(partBr, null, null, offset, limit, result);
                offset = 0L;
                lastProcessPartFile = (String)partFile.first;
                IOUtils.closeQuietly((Reader)partBr);
            }
            catch (Exception ex) {
                LOG.warn("Got exception when reading events", (Throwable)ex);
            }
            finally {
                IOUtils.closeQuietly(partBr);
            }
        }
        BufferedReader partBr = null;
        try {
            String extraPartFile = EventsAgent.getNextPartFile(lastProcessPartFile);
            if (extraPartFile != null && limit > 0) {
                partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, extraPartFile))));
                this.processPartFile(partBr, null, null, 0L, Integer.MAX_VALUE, result);
            }
        }
        catch (Exception exception) {
        }
        finally {
            IOUtils.closeQuietly(partBr);
        }
        while (result.size() > limit) {
            result.removeFirst();
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<EventInfo> getEvents(String appId, Long fromTime, Long toTime, long offset, int limit) {
        ArrayList<EventInfo> result = new ArrayList<EventInfo>();
        String dir = this.getEventsDirectory(appId);
        if (dir == null) {
            return null;
        }
        FSPartFileAgent.IndexFileBufferedReader ifbr = null;
        try {
            BufferedReader partBr;
            EventsIndexLine indexLine;
            ifbr = new FSPartFileAgent.IndexFileBufferedReader(this, new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, "index.txt"))), dir);
            String lastProcessPartFile = null;
            while ((indexLine = (EventsIndexLine)ifbr.readIndexLine()) != null) {
                if (indexLine.isEndLine) continue;
                lastProcessPartFile = indexLine.partFile;
                if (fromTime != null && fromTime > indexLine.endTime) continue;
                if (toTime != null && toTime < indexLine.startTime) {
                    ArrayList<EventInfo> arrayList = result;
                    IOUtils.closeQuietly((Reader)ifbr);
                    return arrayList;
                }
                partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, indexLine.partFile))));
                try {
                    offset = this.processPartFile(partBr, fromTime, toTime, offset, limit, result);
                    limit -= result.size();
                }
                finally {
                    partBr.close();
                }
            }
            partBr = null;
            try {
                String extraPartFile = EventsAgent.getNextPartFile(lastProcessPartFile);
                if (extraPartFile != null && limit > 0) {
                    partBr = new BufferedReader(new InputStreamReader((InputStream)this.stramAgent.getFileSystem().open(new Path(dir, extraPartFile))));
                    this.processPartFile(partBr, fromTime, toTime, offset, limit, result);
                }
            }
            catch (Exception exception) {
            }
            finally {
                IOUtils.closeQuietly(partBr);
            }
            IOUtils.closeQuietly((Reader)ifbr);
        }
        catch (Exception ex) {
            LOG.warn("Got exception when reading events", (Throwable)ex);
        }
        finally {
            IOUtils.closeQuietly(ifbr);
        }
        return result;
    }

    private long processPartFile(BufferedReader partBr, Long fromTime, Long toTime, long offset, int limit, List<EventInfo> result) throws IOException {
        String partLine;
        while ((partLine = partBr.readLine()) != null) {
            EventInfo ev = new EventInfo();
            int cursor = 0;
            int cursor2 = partLine.indexOf(58, cursor);
            ev.timestamp = Long.valueOf(partLine.substring(cursor, cursor2));
            cursor = cursor2 + 1;
            cursor2 = partLine.indexOf(58, cursor);
            ev.type = partLine.substring(cursor, cursor2);
            cursor = cursor2 + 1;
            if (fromTime != null && ev.timestamp < fromTime || toTime != null && ev.timestamp > toTime) continue;
            if (offset > 0L) {
                --offset;
                continue;
            }
            if (limit-- <= 0) continue;
            ev.data = (Map)new ObjectMapper().readValue(partLine.substring(cursor), HashMap.class);
            ev.id = Long.valueOf(ev.data.get("id"));
            ev.data.remove("id");
            result.add(ev);
        }
        return offset;
    }

    public static class EventInfo {
        @JsonSerialize(using=ToStringSerializer.class)
        public long id;
        @JsonSerialize(using=ToStringSerializer.class)
        public long timestamp;
        public String type;
        public Map<String, String> data;
    }

    private static class EventsIndexLine
    extends FSPartFileAgent.IndexLine {
        @JsonSerialize(using=ToStringSerializer.class)
        public long startTime;
        @JsonSerialize(using=ToStringSerializer.class)
        public long endTime;
        @JsonSerialize(using=ToStringSerializer.class)
        public long numEvents;

        private EventsIndexLine() {
        }
    }
}

