package tachyon.worker.block;

import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.client.WorkerBlockMasterClient;
import tachyon.client.WorkerFileSystemMasterClient;
import tachyon.conf.TachyonConf;
import tachyon.metrics.MetricsSystem;
import tachyon.security.authentication.AuthenticationUtils;
import tachyon.thrift.NetAddress;
import tachyon.thrift.WorkerService;
import tachyon.util.CommonUtils;
import tachyon.util.LineageUtils;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.network.NetworkAddressUtils;
import tachyon.web.UIWebServer;
import tachyon.web.WorkerUIWebServer;
import tachyon.worker.DataServer;
import tachyon.worker.WorkerContext;
import tachyon.worker.WorkerIdRegistry;
import tachyon.worker.WorkerSource;
import tachyon.worker.lineage.LineageWorker;

/* loaded from: input_file:tachyon/worker/block/BlockWorker.class */
public final class BlockWorker {
    private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
    private final BlockMasterSync mBlockMasterSync;
    private final PinListSync mPinListSync;
    private final SessionCleaner mSessionCleanerThread;
    private final BlockServiceHandler mServiceHandler;
    private final BlockDataManager mBlockDataManager;
    private final DataServer mDataServer;
    private final ExecutorService mSyncExecutorService;
    private final NetAddress mWorkerNetAddress;
    private final TServerSocket mThriftServerSocket;
    private final int mPort;
    private final TThreadPoolServer mThriftServer;
    private final UIWebServer mWebServer;
    private MetricsSystem mWorkerMetricsSystem;
    private LineageWorker mLineageWorker;
    private SpaceReserver mSpaceReserver;
    private final TachyonConf mTachyonConf = WorkerContext.getConf();
    private final long mStartTimeMs = System.currentTimeMillis();
    private final WorkerBlockMasterClient mBlockMasterClient = new WorkerBlockMasterClient(NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC, this.mTachyonConf), this.mTachyonConf);
    private final WorkerFileSystemMasterClient mFileSystemMasterClient = new WorkerFileSystemMasterClient(NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.MASTER_RPC, this.mTachyonConf), this.mTachyonConf);

    public BlockServiceHandler getWorkerServiceHandler() {
        return this.mServiceHandler;
    }

    public String getRPCBindHost() {
        return NetworkAddressUtils.getThriftSocket(this.mThriftServerSocket).getLocalSocketAddress().toString();
    }

    public int getRPCLocalPort() {
        return this.mPort;
    }

    public String getDataBindHost() {
        return this.mDataServer.getBindHost();
    }

    public int getDataLocalPort() {
        return this.mDataServer.getPort();
    }

    public String getWebBindHost() {
        return this.mWebServer.getBindHost();
    }

    public int getWebLocalPort() {
        return this.mWebServer.getLocalPort();
    }

    public BlockWorker() throws IOException {
        this.mSpaceReserver = null;
        WorkerSource workerSource = new WorkerSource();
        this.mBlockDataManager = new BlockDataManager(workerSource, this.mBlockMasterClient, this.mFileSystemMasterClient, new TieredBlockStore());
        this.mWorkerMetricsSystem = new MetricsSystem("worker", this.mTachyonConf);
        workerSource.registerGauges(this.mBlockDataManager);
        this.mWorkerMetricsSystem.registerSource(workerSource);
        this.mDataServer = DataServer.Factory.createDataServer(NetworkAddressUtils.getBindAddress(NetworkAddressUtils.ServiceType.WORKER_DATA, this.mTachyonConf), this.mBlockDataManager, this.mTachyonConf);
        this.mTachyonConf.set("tachyon.worker.data.port", Integer.toString(this.mDataServer.getPort()));
        this.mServiceHandler = new BlockServiceHandler(this.mBlockDataManager);
        this.mThriftServerSocket = createThriftServerSocket();
        this.mPort = NetworkAddressUtils.getThriftPort(this.mThriftServerSocket);
        this.mTachyonConf.set("tachyon.worker.port", Integer.toString(this.mPort));
        this.mThriftServer = createThriftServer();
        this.mWorkerNetAddress = new NetAddress(NetworkAddressUtils.getConnectHost(NetworkAddressUtils.ServiceType.WORKER_RPC, this.mTachyonConf), this.mPort, this.mDataServer.getPort());
        WorkerIdRegistry.registerWithBlockMaster(this.mBlockMasterClient, this.mWorkerNetAddress);
        this.mWebServer = new WorkerUIWebServer(NetworkAddressUtils.ServiceType.WORKER_WEB, NetworkAddressUtils.getBindAddress(NetworkAddressUtils.ServiceType.WORKER_WEB, this.mTachyonConf), this.mBlockDataManager, NetworkAddressUtils.getConnectAddress(NetworkAddressUtils.ServiceType.WORKER_RPC, this.mTachyonConf), this.mStartTimeMs, this.mTachyonConf);
        this.mSyncExecutorService = Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("worker-heartbeat-%d", true));
        this.mBlockMasterSync = new BlockMasterSync(this.mBlockDataManager, this.mWorkerNetAddress, this.mBlockMasterClient);
        this.mPinListSync = new PinListSync(this.mBlockDataManager, this.mFileSystemMasterClient);
        this.mSessionCleanerThread = new SessionCleaner(this.mBlockDataManager);
        if (this.mTachyonConf.getBoolean("tachyon.worker.tieredstore.reserver.enabled")) {
            this.mSpaceReserver = new SpaceReserver(this.mBlockDataManager);
        }
        LOG.info("Started lineage worker at worker with ID " + WorkerIdRegistry.getWorkerId());
        if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) {
            this.mLineageWorker = new LineageWorker(this.mBlockDataManager);
        }
    }

    public NetAddress getWorkerNetAddress() {
        return this.mWorkerNetAddress;
    }

    public void process() {
        this.mWorkerMetricsSystem.start();
        this.mWebServer.addHandler(this.mWorkerMetricsSystem.getServletHandler());
        this.mSyncExecutorService.submit(this.mBlockMasterSync);
        this.mSyncExecutorService.submit(this.mPinListSync);
        this.mSyncExecutorService.submit(this.mSessionCleanerThread);
        if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) {
            this.mLineageWorker.start();
        }
        if (this.mSpaceReserver != null) {
            this.mSyncExecutorService.submit(this.mSpaceReserver);
        }
        this.mWebServer.startWebServer();
        this.mThriftServer.serve();
    }

    public void stop() throws IOException {
        this.mDataServer.close();
        this.mThriftServer.stop();
        this.mThriftServerSocket.close();
        if (LineageUtils.isLineageEnabled(WorkerContext.getConf())) {
            this.mLineageWorker.stop();
        }
        this.mBlockMasterSync.stop();
        this.mPinListSync.stop();
        this.mSessionCleanerThread.stop();
        this.mBlockMasterClient.close();
        if (this.mSpaceReserver != null) {
            this.mSpaceReserver.stop();
        }
        this.mFileSystemMasterClient.close();
        this.mSyncExecutorService.shutdown();
        this.mWorkerMetricsSystem.stop();
        try {
            this.mWebServer.shutdownWebServer();
        } catch (Exception e) {
            LOG.error("Failed to stop web server", e);
        }
        this.mBlockDataManager.stop();
        while (true) {
            if (this.mDataServer.isClosed() && !this.mThriftServer.isServing()) {
                return;
            }
            this.mDataServer.close();
            this.mThriftServer.stop();
            this.mThriftServerSocket.close();
            CommonUtils.sleepMs(100L);
        }
    }

    private TThreadPoolServer createThriftServer() {
        int i = this.mTachyonConf.getInt("tachyon.worker.block.threads.min");
        int i2 = this.mTachyonConf.getInt("tachyon.worker.block.threads.max");
        WorkerService.Processor processor = new WorkerService.Processor(this.mServiceHandler);
        try {
            return new TThreadPoolServer(new TThreadPoolServer.Args(this.mThriftServerSocket).minWorkerThreads(i).maxWorkerThreads(i2).processor(processor).transportFactory(AuthenticationUtils.getServerTransportFactory(this.mTachyonConf)).protocolFactory(new TBinaryProtocol.Factory(true, true)));
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private TServerSocket createThriftServerSocket() {
        try {
            return new TServerSocket(NetworkAddressUtils.getBindAddress(NetworkAddressUtils.ServiceType.WORKER_RPC, this.mTachyonConf));
        } catch (TTransportException e) {
            LOG.error(e.getMessage(), e);
            throw Throwables.propagate(e);
        }
    }
}
