/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.timelineservice.storage;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineHealth;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemTimelineWriterImpl
extends AbstractService
implements TimelineWriter {
    public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = "yarn.timeline-service.fs-writer.root-dir";
    public static final String TIMELINE_FS_WRITER_NUM_RETRIES = "yarn.timeline-service.fs-writer.num-retries";
    public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
    public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = "yarn.timeline-service.fs-writer.retry-interval-ms";
    public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L;
    public static final String ENTITIES_DIR = "entities";
    public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
    private FileSystem fs;
    private Path rootPath;
    private int fsNumRetries;
    private long fsRetryInterval;
    private Path entitiesPath;
    private Configuration config;
    private static final String STORAGE_DIR_ROOT = "timeline_service_data";
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemTimelineWriter.class);

    FileSystemTimelineWriterImpl() {
        super(FileSystemTimelineWriterImpl.class.getName());
    }

    @Override
    public TimelineWriteResponse write(TimelineCollectorContext context, TimelineEntities entities, UserGroupInformation callerUgi) throws IOException {
        TimelineWriteResponse response = new TimelineWriteResponse();
        String clusterId = context.getClusterId();
        String userId = context.getUserId();
        String flowName = context.getFlowName();
        String flowVersion = context.getFlowVersion();
        long flowRunId = context.getFlowRunId();
        String appId = context.getAppId();
        for (TimelineEntity entity : entities.getEntities()) {
            this.writeInternal(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, response);
        }
        return response;
    }

    @Override
    public TimelineWriteResponse write(TimelineCollectorContext context, TimelineDomain domain) throws IOException {
        return null;
    }

    private synchronized void writeInternal(String clusterId, String userId, String flowName, String flowVersion, long flowRun, String appId, TimelineEntity entity, TimelineWriteResponse response) throws IOException {
        String entityTypePathStr = clusterId + File.separator + userId + File.separator + FileSystemTimelineWriterImpl.escape(flowName) + File.separator + FileSystemTimelineWriterImpl.escape(flowVersion) + File.separator + flowRun + File.separator + appId + File.separator + entity.getType();
        Path entityTypePath = new Path(this.entitiesPath, entityTypePathStr);
        try {
            this.mkdirs(entityTypePath);
            Path filePath = new Path(entityTypePath, entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
            this.createFileWithRetries(filePath);
            byte[] record = (TimelineUtils.dumpTimelineRecordtoJSON((Object)entity) + "\n").getBytes("UTF-8");
            this.writeFileWithRetries(filePath, record);
        }
        catch (Exception ioe) {
            LOG.warn("Interrupted operation:" + ioe.getMessage());
            TimelineWriteResponse.TimelineWriteError error = this.createTimelineWriteError(entity);
            response.addError(error);
        }
    }

    private TimelineWriteResponse.TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
        TimelineWriteResponse.TimelineWriteError error = new TimelineWriteResponse.TimelineWriteError();
        error.setEntityId(entity.getId());
        error.setEntityType(entity.getType());
        return error;
    }

    @Override
    public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException {
        return null;
    }

    @VisibleForTesting
    String getOutputRoot() {
        return this.rootPath.toString();
    }

    public void serviceInit(Configuration conf) throws Exception {
        String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
        this.rootPath = new Path(outputRoot);
        this.entitiesPath = new Path(this.rootPath, ENTITIES_DIR);
        this.fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES, 0);
        this.fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS, 1000L);
        this.config = conf;
        this.fs = this.rootPath.getFileSystem(this.config);
    }

    public void serviceStart() throws Exception {
        this.mkdirsWithRetries(this.rootPath);
        this.mkdirsWithRetries(this.entitiesPath);
    }

    @Override
    public void flush() throws IOException {
    }

    @Override
    public TimelineHealth getHealthStatus() {
        try {
            this.fs.exists(this.rootPath);
        }
        catch (IOException e) {
            return new TimelineHealth(TimelineHealth.TimelineHealthStatus.CONNECTION_FAILURE, e.getMessage());
        }
        return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, "");
    }

    private void mkdirs(Path ... paths) throws IOException, InterruptedException {
        for (Path path : paths) {
            if (this.existsWithRetries(path)) continue;
            this.mkdirsWithRetries(path);
        }
    }

    private void mkdirsWithRetries(final Path dirPath) throws IOException, InterruptedException {
        new FSAction<Void>(){

            @Override
            public Void run() throws IOException {
                FileSystemTimelineWriterImpl.this.fs.mkdirs(dirPath);
                return null;
            }
        }.runWithRetries();
    }

    private void writeFileWithRetries(final Path outputPath, final byte[] data) throws Exception {
        new FSAction<Void>(){

            @Override
            public Void run() throws IOException {
                FileSystemTimelineWriterImpl.this.writeFile(outputPath, data);
                return null;
            }
        }.runWithRetries();
    }

    private boolean createFileWithRetries(final Path newFile) throws IOException, InterruptedException {
        return (Boolean)new FSAction<Boolean>(){

            @Override
            public Boolean run() throws IOException {
                return FileSystemTimelineWriterImpl.this.createFile(newFile);
            }
        }.runWithRetries();
    }

    private boolean existsWithRetries(final Path path) throws IOException, InterruptedException {
        return (Boolean)new FSAction<Boolean>(){

            @Override
            public Boolean run() throws IOException {
                return FileSystemTimelineWriterImpl.this.fs.exists(path);
            }
        }.runWithRetries();
    }

    private boolean createFile(Path newFile) throws IOException {
        return this.fs.createNewFile(newFile);
    }

    protected void writeFile(Path outputPath, byte[] data) throws IOException {
        Path tempPath = new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
        FSDataOutputStream fsOut = null;
        try {
            fsOut = this.fs.create(tempPath, true);
            FSDataInputStream fsIn = this.fs.open(outputPath);
            IOUtils.copyBytes((InputStream)fsIn, (OutputStream)fsOut, (Configuration)this.config, (boolean)false);
            fsIn.close();
            this.fs.delete(outputPath, false);
            fsOut.write(data);
            fsOut.close();
            this.fs.rename(tempPath, outputPath);
        }
        catch (IOException ie) {
            LOG.error("Got an exception while writing file", (Throwable)ie);
        }
    }

    private static String escape(String str) {
        return str.replace(File.separatorChar, '_');
    }

    private abstract class FSAction<T> {
        private FSAction() {
        }

        abstract T run() throws IOException;

        T runWithRetries() throws IOException, InterruptedException {
            int retry = 0;
            while (true) {
                try {
                    return this.run();
                }
                catch (IOException e) {
                    LOG.info("Exception while executing a FS operation.", (Throwable)e);
                    if (++retry > FileSystemTimelineWriterImpl.this.fsNumRetries) {
                        LOG.info("Maxed out FS retries. Giving up!");
                        throw e;
                    }
                    LOG.info("Will retry operation on FS. Retry no. " + retry + " after sleeping for " + FileSystemTimelineWriterImpl.this.fsRetryInterval + " seconds");
                    Thread.sleep(FileSystemTimelineWriterImpl.this.fsRetryInterval);
                    continue;
                }
                break;
            }
        }
    }
}

