/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.util;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSPartFileCollection {
    private transient FileSystem fs;
    private transient FSDataOutputStream partOutStr;
    private transient FSDataOutputStream indexOutStr;
    private transient FSDataOutputStream metaOs;
    private transient String localBasePath;
    public static final String INDEX_FILE = "index.txt";
    public static final String META_FILE = "meta.txt";
    protected int bytesPerPartFile = 0x100000;
    protected long millisPerPartFile = 3600000L;
    protected int fileParts = 0;
    protected int partFileItemCount = 0;
    protected int partFileBytes = 0;
    protected long currentPartFileTimeStamp = 0L;
    protected String basePath = ".";
    protected String hdfsFile;
    private boolean isLocalMode = false;
    private boolean syncRequested = false;
    private static final Logger logger = LoggerFactory.getLogger(FSPartFileCollection.class);

    public void setBytesPerPartFile(int bytes) {
        this.bytesPerPartFile = bytes;
    }

    public void setMillisPerPartFile(long millis) {
        this.millisPerPartFile = millis;
    }

    public void setLocalMode(boolean isLocalMode) {
        this.isLocalMode = isLocalMode;
    }

    public void setBasePath(String basePath) {
        this.basePath = basePath;
    }

    public String getBasePath() {
        return this.basePath;
    }

    public void setup() throws IOException {
        if (this.basePath.startsWith("file:")) {
            this.isLocalMode = true;
            this.localBasePath = this.basePath.substring(5);
            new File(this.localBasePath).mkdirs();
        }
        this.fs = FileSystem.newInstance((URI)new Path(this.basePath).toUri(), (Configuration)new Configuration());
        Path pa = new Path(this.basePath, META_FILE);
        this.metaOs = this.isLocalMode ? new FSDataOutputStream((OutputStream)new FileOutputStream(this.localBasePath + "/" + META_FILE), null) : this.fs.create(pa);
        pa = new Path(this.basePath, INDEX_FILE);
        this.indexOutStr = this.isLocalMode ? new FSDataOutputStream((OutputStream)new FileOutputStream(this.localBasePath + "/" + INDEX_FILE), null) : this.fs.create(pa);
    }

    public void teardown() {
        logger.info("Closing hdfs part collection.");
        try {
            if (this.metaOs != null) {
                this.metaOs.close();
            }
            if (this.partOutStr != null) {
                logger.debug("Closing part file");
                this.partOutStr.close();
                if (this.indexOutStr != null) {
                    this.writeIndex();
                }
            }
            if (this.indexOutStr != null) {
                this.writeIndexEnd();
                this.indexOutStr.close();
            }
            this.fs.close();
        }
        catch (IOException ex) {
            logger.error(ex.toString());
        }
    }

    private void openNewPartFile() throws IOException {
        this.hdfsFile = "part" + this.fileParts + ".txt";
        Path path = new Path(this.basePath, this.hdfsFile);
        logger.debug("Opening new part file: {}", (Object)this.hdfsFile);
        this.partOutStr = this.isLocalMode ? new FSDataOutputStream((OutputStream)new FileOutputStream(this.localBasePath + "/" + this.hdfsFile), null) : this.fs.create(path);
        ++this.fileParts;
        this.currentPartFileTimeStamp = System.currentTimeMillis();
        this.partFileItemCount = 0;
        this.partFileBytes = 0;
    }

    public void writeMetaData(byte[] bytes) throws IOException {
        this.metaOs.write(bytes);
        this.metaOs.hflush();
    }

    public void writeDataItem(byte[] bytes, boolean incrementItemCount) throws IOException {
        if (this.partOutStr == null) {
            this.openNewPartFile();
        }
        this.partOutStr.write(bytes);
        this.partFileBytes += bytes.length;
        if (incrementItemCount) {
            ++this.partFileItemCount;
        }
    }

    public void requestSync() {
        this.syncRequested = true;
    }

    public boolean isReadyTurnoverPartFile() {
        try {
            return (this.syncRequested || this.partOutStr.getPos() > (long)this.bytesPerPartFile || this.currentPartFileTimeStamp + this.millisPerPartFile < System.currentTimeMillis()) && this.partOutStr.getPos() > 0L;
        }
        catch (IOException ex) {
            return true;
        }
    }

    public boolean flushData() throws IOException {
        if (this.partOutStr != null) {
            this.partOutStr.hflush();
            if (this.isReadyTurnoverPartFile()) {
                this.turnover();
                return true;
            }
        }
        return false;
    }

    private void turnover() throws IOException {
        this.partOutStr.close();
        this.partOutStr = null;
        this.writeIndex();
        this.syncRequested = false;
    }

    private void writeIndex() {
        if (this.partFileBytes <= 0) {
            return;
        }
        try {
            String line = this.getLatestIndexLine();
            this.resetIndexExtraInfo();
            this.indexOutStr.write(line.getBytes());
            this.indexOutStr.hflush();
            this.indexOutStr.hsync();
        }
        catch (IOException ex) {
            logger.error(ex.toString());
        }
    }

    public String getLatestIndexLine() {
        String extraInfo = this.getIndexExtraInfo();
        String line = "F:" + this.hdfsFile + ":" + this.currentPartFileTimeStamp + "-" + System.currentTimeMillis() + ":" + this.partFileItemCount;
        if (extraInfo != null) {
            line = line + ":T:" + extraInfo;
        }
        line = line + "\n";
        return line;
    }

    private void writeIndexEnd() {
        try {
            this.indexOutStr.write("E\n".getBytes());
            this.indexOutStr.hflush();
            this.indexOutStr.hsync();
        }
        catch (IOException ex) {
            logger.error(ex.toString());
        }
    }

    protected String getIndexExtraInfo() {
        return null;
    }

    protected void resetIndexExtraInfo() {
    }
}

