package tachyon.master.lineage;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.thrift.TProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.Constants;
import tachyon.TachyonURI;
import tachyon.client.file.TachyonFile;
import tachyon.conf.TachyonConf;
import tachyon.exception.BlockInfoException;
import tachyon.exception.ExceptionMessage;
import tachyon.exception.FileAlreadyExistsException;
import tachyon.exception.FileDoesNotExistException;
import tachyon.exception.InvalidPathException;
import tachyon.exception.LineageDeletionException;
import tachyon.exception.LineageDoesNotExistException;
import tachyon.heartbeat.HeartbeatThread;
import tachyon.job.Job;
import tachyon.master.MasterBase;
import tachyon.master.MasterContext;
import tachyon.master.file.FileSystemMaster;
import tachyon.master.file.options.CreateOptions;
import tachyon.master.journal.Journal;
import tachyon.master.journal.JournalEntry;
import tachyon.master.journal.JournalOutputStream;
import tachyon.master.lineage.checkpoint.CheckpointPlan;
import tachyon.master.lineage.checkpoint.CheckpointSchedulingExcecutor;
import tachyon.master.lineage.journal.AsyncCompleteFileEntry;
import tachyon.master.lineage.journal.DeleteLineageEntry;
import tachyon.master.lineage.journal.LineageEntry;
import tachyon.master.lineage.journal.LineageIdGeneratorEntry;
import tachyon.master.lineage.journal.PersistFilesEntry;
import tachyon.master.lineage.journal.RequestFilePersistenceEntry;
import tachyon.master.lineage.meta.Lineage;
import tachyon.master.lineage.meta.LineageFile;
import tachyon.master.lineage.meta.LineageFileState;
import tachyon.master.lineage.meta.LineageIdGenerator;
import tachyon.master.lineage.meta.LineageStore;
import tachyon.master.lineage.meta.LineageStoreView;
import tachyon.master.lineage.recompute.RecomputeExecutor;
import tachyon.master.lineage.recompute.RecomputePlanner;
import tachyon.thrift.BlockLocation;
import tachyon.thrift.CheckpointFile;
import tachyon.thrift.CommandType;
import tachyon.thrift.FileBlockInfo;
import tachyon.thrift.LineageCommand;
import tachyon.thrift.LineageInfo;
import tachyon.thrift.LineageMasterService;
import tachyon.util.ThreadFactoryUtils;
import tachyon.util.io.PathUtils;

/* loaded from: input_file:tachyon/master/lineage/LineageMaster.class */
public final class LineageMaster extends MasterBase {
    private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);
    private final TachyonConf mTachyonConf;
    private final LineageStore mLineageStore;
    private final FileSystemMaster mFileSystemMaster;
    private final LineageIdGenerator mLineageIdGenerator;
    private Future<?> mCheckpointExecutionService;
    private Future<?> mRecomputeExecutionService;
    private final Map<Long, Set<LineageFile>> mWorkerToCheckpointFile;

    public static String getJournalDirectory(String str) {
        return PathUtils.concatPath(str, new Object[]{"LineageMaster"});
    }

    public LineageMaster(FileSystemMaster fileSystemMaster, Journal journal) {
        super(journal, Executors.newFixedThreadPool(2, ThreadFactoryUtils.build("lineage-master-%d", true)));
        this.mTachyonConf = MasterContext.getConf();
        this.mFileSystemMaster = (FileSystemMaster) Preconditions.checkNotNull(fileSystemMaster);
        this.mLineageIdGenerator = new LineageIdGenerator();
        this.mLineageStore = new LineageStore(this.mLineageIdGenerator);
        this.mWorkerToCheckpointFile = Maps.newHashMap();
    }

    @Override // tachyon.master.Master
    public TProcessor getProcessor() {
        return new LineageMasterService.Processor(new LineageMasterServiceHandler(this));
    }

    @Override // tachyon.master.Master
    public String getServiceName() {
        return "LineageMaster";
    }

    @Override // tachyon.master.Master
    public void processJournalEntry(JournalEntry journalEntry) throws IOException {
        if (journalEntry instanceof LineageEntry) {
            this.mLineageStore.addLineageFromJournal((LineageEntry) journalEntry);
            return;
        }
        if (journalEntry instanceof LineageIdGeneratorEntry) {
            this.mLineageIdGenerator.fromJournalEntry((LineageIdGeneratorEntry) journalEntry);
            return;
        }
        if (journalEntry instanceof AsyncCompleteFileEntry) {
            asyncCompleteFileFromEntry((AsyncCompleteFileEntry) journalEntry);
            return;
        }
        if (journalEntry instanceof PersistFilesEntry) {
            persistFilesFromEntry((PersistFilesEntry) journalEntry);
        } else if (journalEntry instanceof RequestFilePersistenceEntry) {
            requestFilePersistenceFromEntry((RequestFilePersistenceEntry) journalEntry);
        } else {
            if (!(journalEntry instanceof DeleteLineageEntry)) {
                throw new IOException(ExceptionMessage.UNEXPECETD_JOURNAL_ENTRY.getMessage(new Object[]{journalEntry}));
            }
            deleteLineageFromEntry((DeleteLineageEntry) journalEntry);
        }
    }

    @Override // tachyon.master.MasterBase, tachyon.master.Master
    public void start(boolean z) throws IOException {
        super.start(z);
        if (z) {
            this.mCheckpointExecutionService = getExecutorService().submit((Runnable) new HeartbeatThread("Master Checkpoint Scheduling", new CheckpointSchedulingExcecutor(this), this.mTachyonConf.getInt("tachyon.master.lineage.checkpoint.interval.ms")));
            this.mRecomputeExecutionService = getExecutorService().submit((Runnable) new HeartbeatThread("Master File Recomputation", new RecomputeExecutor(new RecomputePlanner(this.mLineageStore, this.mFileSystemMaster), this.mFileSystemMaster), this.mTachyonConf.getInt("tachyon.master.lineage.recompute.interval.ms")));
        }
    }

    @Override // tachyon.master.MasterBase, tachyon.master.Master
    public void stop() throws IOException {
        super.stop();
        if (this.mCheckpointExecutionService != null) {
            this.mCheckpointExecutionService.cancel(true);
        }
        if (this.mRecomputeExecutionService != null) {
            this.mRecomputeExecutionService.cancel(true);
        }
    }

    @Override // tachyon.master.journal.JournalCheckpointStreamable
    public synchronized void streamToJournalCheckpoint(JournalOutputStream journalOutputStream) throws IOException {
        this.mLineageStore.streamToJournalCheckpoint(journalOutputStream);
        journalOutputStream.writeEntry(this.mLineageIdGenerator.toJournalEntry());
    }

    public LineageStoreView getLineageStoreView() {
        return new LineageStoreView(this.mLineageStore);
    }

    public synchronized long createLineage(List<TachyonURI> list, List<TachyonURI> list2, Job job) throws InvalidPathException, FileAlreadyExistsException, BlockInfoException, IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (TachyonURI tachyonURI : list) {
            long fileId = this.mFileSystemMaster.getFileId(tachyonURI);
            if (fileId == -1) {
                throw new InvalidPathException(ExceptionMessage.LINEAGE_INPUT_FILE_NOT_EXIST.getMessage(new Object[]{tachyonURI}));
            }
            newArrayList.add(new TachyonFile(fileId));
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<TachyonURI> it = list2.iterator();
        while (it.hasNext()) {
            newArrayList2.add(new LineageFile(this.mFileSystemMaster.create(it.next(), new CreateOptions.Builder(MasterContext.getConf()).setRecursive(true).setBlockSizeBytes(1024L).build())));
        }
        LOG.info("Create lineage of input:" + newArrayList + ", output:" + newArrayList2 + ", job:" + job);
        long createLineage = this.mLineageStore.createLineage(newArrayList, newArrayList2, job);
        writeJournalEntry(this.mLineageIdGenerator.toJournalEntry());
        writeJournalEntry(this.mLineageStore.getLineage(createLineage).toJournalEntry());
        flushJournal();
        return createLineage;
    }

    public synchronized boolean deleteLineage(long j, boolean z) throws LineageDoesNotExistException, LineageDeletionException {
        deleteLineageInternal(j, z);
        writeJournalEntry(new DeleteLineageEntry(j, z));
        flushJournal();
        return true;
    }

    private boolean deleteLineageInternal(long j, boolean z) throws LineageDoesNotExistException, LineageDeletionException {
        Lineage lineage = this.mLineageStore.getLineage(j);
        if (lineage == null) {
            throw new LineageDoesNotExistException("the lineage " + j + " to delete does not exist");
        }
        if (!z && !this.mLineageStore.getChildren(lineage).isEmpty()) {
            throw new LineageDeletionException("the lineage " + j + " to delete has children lineages");
        }
        LOG.info("Delete lineage " + j);
        this.mLineageStore.deleteLineage(j);
        return true;
    }

    private void deleteLineageFromEntry(DeleteLineageEntry deleteLineageEntry) {
        try {
            deleteLineageInternal(deleteLineageEntry.getLineageId(), deleteLineageEntry.isCascade());
        } catch (LineageDoesNotExistException e) {
            LOG.error("Failed to delete lineage " + deleteLineageEntry.getLineageId(), e);
        } catch (LineageDeletionException e2) {
            LOG.error("Failed to delete lineage " + deleteLineageEntry.getLineageId(), e2);
        }
    }

    public synchronized long reinitializeFile(String str, long j, long j2) throws InvalidPathException, LineageDoesNotExistException, IOException {
        LineageFileState lineageFileState = this.mLineageStore.getLineageFileState(this.mFileSystemMaster.getFileId(new TachyonURI(str)));
        if (lineageFileState != LineageFileState.CREATED && lineageFileState != LineageFileState.LOST) {
            return -1L;
        }
        LOG.info("Recreate the file " + str + " with block size of " + j + " bytes");
        return this.mFileSystemMaster.reinitializeFile(new TachyonURI(str), j, j2);
    }

    public synchronized void asyncCompleteFile(long j) throws FileDoesNotExistException, BlockInfoException {
        LOG.info("Async complete file " + j);
        try {
            this.mFileSystemMaster.completeFile(j);
            this.mLineageStore.completeFile(j);
            writeJournalEntry(new AsyncCompleteFileEntry(j));
            flushJournal();
        } catch (InvalidPathException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void asyncCompleteFileFromEntry(AsyncCompleteFileEntry asyncCompleteFileEntry) {
        this.mLineageStore.completeFile(asyncCompleteFileEntry.getFileId());
    }

    public synchronized LineageCommand lineageWorkerHeartbeat(long j, List<Long> list) throws FileDoesNotExistException, InvalidPathException {
        if (!list.isEmpty()) {
            persistFiles(j, list);
        }
        List<CheckpointFile> pollToCheckpoint = pollToCheckpoint(j);
        if (!pollToCheckpoint.isEmpty()) {
            LOG.info("Sent files " + pollToCheckpoint + " to worker " + j + " to persist");
        }
        return new LineageCommand(CommandType.Persist, pollToCheckpoint);
    }

    public synchronized List<LineageInfo> getLineageInfoList() {
        ArrayList newArrayList = Lists.newArrayList();
        for (Lineage lineage : this.mLineageStore.getAllInTopologicalOrder()) {
            LineageInfo generateLineageInfo = lineage.generateLineageInfo();
            ArrayList newArrayList2 = Lists.newArrayList();
            Iterator<Lineage> it = this.mLineageStore.getParents(lineage).iterator();
            while (it.hasNext()) {
                newArrayList2.add(Long.valueOf(it.next().getId()));
            }
            generateLineageInfo.parents = newArrayList2;
            ArrayList newArrayList3 = Lists.newArrayList();
            Iterator<Lineage> it2 = this.mLineageStore.getChildren(lineage).iterator();
            while (it2.hasNext()) {
                newArrayList3.add(Long.valueOf(it2.next().getId()));
            }
            generateLineageInfo.children = newArrayList3;
            newArrayList.add(generateLineageInfo);
        }
        return newArrayList;
    }

    public synchronized void queueForCheckpoint(CheckpointPlan checkpointPlan) {
        Iterator<Long> it = checkpointPlan.getLineagesToCheckpoint().iterator();
        while (it.hasNext()) {
            for (LineageFile lineageFile : this.mLineageStore.getLineage(it.next().longValue()).getOutputFiles()) {
                long workerStoringFile = getWorkerStoringFile(lineageFile);
                if (workerStoringFile != -1) {
                    if (!this.mWorkerToCheckpointFile.containsKey(Long.valueOf(workerStoringFile))) {
                        this.mWorkerToCheckpointFile.put(Long.valueOf(workerStoringFile), Sets.newHashSet());
                    }
                    this.mWorkerToCheckpointFile.get(Long.valueOf(workerStoringFile)).add(lineageFile);
                }
            }
        }
    }

    public synchronized List<CheckpointFile> pollToCheckpoint(long j) throws FileDoesNotExistException, InvalidPathException {
        ArrayList newArrayList = Lists.newArrayList();
        if (!this.mWorkerToCheckpointFile.containsKey(Long.valueOf(j))) {
            return newArrayList;
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        for (LineageFile lineageFile : this.mWorkerToCheckpointFile.get(Long.valueOf(j))) {
            if (lineageFile.getState() == LineageFileState.COMPLETED) {
                long fileId = lineageFile.getFileId();
                newArrayList2.add(Long.valueOf(fileId));
                ArrayList newArrayList3 = Lists.newArrayList();
                Iterator<FileBlockInfo> it = this.mFileSystemMaster.getFileBlockInfoList(fileId).iterator();
                while (it.hasNext()) {
                    newArrayList3.add(Long.valueOf(it.next().blockInfo.blockId));
                }
                newArrayList.add(new CheckpointFile(fileId, newArrayList3));
            }
        }
        requestFilePersistence(newArrayList2);
        return newArrayList;
    }

    public synchronized void requestFilePersistence(List<Long> list) {
        if (!list.isEmpty()) {
            LOG.info("Request file persistency: " + list);
        }
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.mLineageStore.requestFilePersistence(it.next().longValue());
        }
        writeJournalEntry(new RequestFilePersistenceEntry(list));
        flushJournal();
    }

    private synchronized void requestFilePersistenceFromEntry(RequestFilePersistenceEntry requestFilePersistenceEntry) {
        Iterator<Long> it = requestFilePersistenceEntry.getFileIds().iterator();
        while (it.hasNext()) {
            this.mLineageStore.requestFilePersistence(it.next().longValue());
        }
    }

    private synchronized void persistFiles(long j, List<Long> list) {
        Preconditions.checkNotNull(list);
        if (!list.isEmpty()) {
            LOG.info("Files persisted on worker " + j + ":" + list);
        }
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.mLineageStore.commitFilePersistence(it.next());
        }
        writeJournalEntry(new PersistFilesEntry(list));
        flushJournal();
    }

    private synchronized void persistFilesFromEntry(PersistFilesEntry persistFilesEntry) {
        Iterator<Long> it = persistFilesEntry.getFileIds().iterator();
        while (it.hasNext()) {
            this.mLineageStore.commitFilePersistence(it.next());
        }
    }

    private long getWorkerStoringFile(LineageFile lineageFile) {
        ArrayList newArrayList = Lists.newArrayList();
        try {
            Iterator<FileBlockInfo> it = this.mFileSystemMaster.getFileBlockInfoList(lineageFile.getFileId()).iterator();
            while (it.hasNext()) {
                Iterator it2 = it.next().blockInfo.locations.iterator();
                while (it2.hasNext()) {
                    newArrayList.add(Long.valueOf(((BlockLocation) it2.next()).workerId));
                }
            }
            if (newArrayList.size() == 0) {
                LOG.info("the file " + lineageFile + " is not on any worker");
                return -1L;
            }
            Preconditions.checkState(newArrayList.size() < 2, "the file is stored at more than one worker: " + newArrayList);
            return ((Long) newArrayList.get(0)).longValue();
        } catch (FileDoesNotExistException e) {
            throw new RuntimeException((Throwable) e);
        } catch (InvalidPathException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }
}
