/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploader;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SharedCacheUploadService
extends AbstractService
implements EventHandler<SharedCacheUploadEvent> {
    private static final Log LOG = LogFactory.getLog(SharedCacheUploadService.class);
    private boolean enabled;
    private FileSystem fs;
    private FileSystem localFs;
    private ExecutorService uploaderPool;
    private SCMUploaderProtocol scmClient;

    public SharedCacheUploadService() {
        super(SharedCacheUploadService.class.getName());
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.enabled = conf.getBoolean("yarn.sharedcache.enabled", false);
        if (this.enabled) {
            int threadCount = conf.getInt("yarn.sharedcache.nm.uploader.thread-count", 20);
            this.uploaderPool = HadoopExecutors.newFixedThreadPool((int)threadCount, (ThreadFactory)new ThreadFactoryBuilder().setNameFormat("Shared cache uploader #%d").build());
            this.scmClient = this.createSCMClient(conf);
            try {
                this.fs = FileSystem.get((Configuration)conf);
                this.localFs = FileSystem.getLocal((Configuration)conf);
            }
            catch (IOException e) {
                LOG.error((Object)"Unexpected exception in getting the filesystem", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        super.serviceInit(conf);
    }

    private SCMUploaderProtocol createSCMClient(Configuration conf) {
        YarnRPC rpc = YarnRPC.create((Configuration)conf);
        InetSocketAddress scmAddress = conf.getSocketAddr("yarn.sharedcache.uploader.server.address", "0.0.0.0:8046", 8046);
        return (SCMUploaderProtocol)rpc.getProxy(SCMUploaderProtocol.class, scmAddress, conf);
    }

    protected void serviceStop() throws Exception {
        if (this.enabled) {
            this.uploaderPool.shutdown();
            RPC.stopProxy((Object)this.scmClient);
        }
        super.serviceStop();
    }

    public void handle(SharedCacheUploadEvent event) {
        if (this.enabled) {
            Map<LocalResourceRequest, Path> resources = event.getResources();
            for (Map.Entry<LocalResourceRequest, Path> e : resources.entrySet()) {
                SharedCacheUploader uploader = new SharedCacheUploader(e.getKey(), e.getValue(), event.getUser(), this.getConfig(), this.scmClient, this.fs, this.localFs);
                this.uploaderPool.submit(uploader);
            }
        }
    }

    public boolean isEnabled() {
        return this.enabled;
    }
}

