package org.apache.atlas.notification.spool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.atlas.AtlasException;
import org.apache.atlas.hook.FailedMessagesLogger;
import org.apache.atlas.model.notification.MessageSource;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/notification/spool/AtlasFileSpool.class */
public class AtlasFileSpool implements NotificationInterface {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasFileSpool.class);
    private final AbstractNotification notificationHandler;
    private final SpoolConfiguration config;
    private final IndexManagement indexManagement;
    private final Spooler spooler;
    private final Publisher publisher;
    private Thread publisherThread;
    private Boolean initDone = null;
    private String currentUser;

    public AtlasFileSpool(Configuration configuration, AbstractNotification abstractNotification) {
        this.notificationHandler = abstractNotification;
        this.config = new SpoolConfiguration(configuration, abstractNotification.getClass().getSimpleName());
        this.indexManagement = new IndexManagement(this.config);
        this.spooler = new Spooler(this.config, this.indexManagement);
        this.publisher = new Publisher(this.config, this.indexManagement, abstractNotification);
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void init(String str, Object obj) {
        LOG.debug("==> AtlasFileSpool.init(source={})", str);
        if (isInitDone()) {
            LOG.debug("AtlasFileSpool.init(): initialization already done. initDone={}", this.initDone);
        } else {
            try {
                this.config.setSource(str, this.currentUser);
                LOG.info("{}: Initialization: Starting...", this.config.getSourceName());
                this.indexManagement.init();
                if (obj instanceof FailedMessagesLogger) {
                    this.spooler.setFailedMessagesLogger((FailedMessagesLogger) obj);
                }
                startPublisher();
                this.initDone = true;
            } catch (AtlasException e) {
                LOG.error("AtlasFileSpool(source={}): initialization failed", this.config.getSourceName(), e);
                this.initDone = false;
            } catch (Throwable th) {
                LOG.error("AtlasFileSpool(source={}): initialization failed, unknown error", this.config.getSourceName(), th);
            }
        }
        LOG.debug("<== AtlasFileSpool.init(source={})", str);
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void setCurrentUser(String str) {
        this.notificationHandler.setCurrentUser(str);
        this.currentUser = str;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i) {
        LOG.warn("AtlasFileSpool.createConsumers(): not implemented");
        return null;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> void send(NotificationInterface.NotificationType notificationType, T... tArr) throws NotificationException {
        send(notificationType, Arrays.asList(tArr));
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> void send(NotificationInterface.NotificationType notificationType, List<T> list) throws NotificationException {
        send(notificationType, list, new MessageSource("file_spool"));
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public boolean isReady(NotificationInterface.NotificationType notificationType) {
        return true;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public <T> void send(NotificationInterface.NotificationType notificationType, List<T> list, MessageSource messageSource) throws NotificationException {
        List<String> serializedMessages = getSerializedMessages(list, messageSource);
        if (hasInitSucceeded() && (this.indexManagement.isPending() || this.publisher.isDestinationDown())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("AtlasFileSpool.send(): sending to spooler");
            }
            this.spooler.sendInternal(notificationType, serializedMessages);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("AtlasFileSpool.send(): sending to notificationHandler");
        }
        try {
            this.notificationHandler.sendInternal(notificationType, serializedMessages);
        } catch (Exception e) {
            if (!isInitDone()) {
                LOG.warn("AtlasFileSpool.send(): failed in sending to notificationHandler. Not sending to spool, as it is not initialized.", e);
                throw e;
            }
            LOG.info("AtlasFileSpool.send(): failed in sending to notificationHandler. Sending to spool", e);
            this.publisher.setDestinationDown();
            this.spooler.sendInternal(notificationType, serializedMessages);
        }
    }

    private <T> List<String> getSerializedMessages(List<T> list, MessageSource messageSource) {
        ArrayList arrayList = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            AbstractNotification abstractNotification = this.notificationHandler;
            AbstractNotification.createNotificationMessages(list.get(i), arrayList, messageSource);
        }
        return arrayList;
    }

    @Override // org.apache.atlas.notification.NotificationInterface
    public void close() {
        try {
            this.spooler.setDrain();
            this.publisher.setDrain();
            this.indexManagement.stop();
            this.publisherThread.join();
        } catch (InterruptedException e) {
            LOG.error("Interrupted! source={}", this.config.getSourceName(), e);
        }
    }

    private void startPublisher() {
        this.publisherThread = new Thread(this.publisher);
        this.publisherThread.setDaemon(true);
        this.publisherThread.setContextClassLoader(getClass().getClassLoader());
        this.publisherThread.start();
        LOG.info("{}: publisher started!", this.config.getSourceName());
    }

    private boolean isInitDone() {
        return this.initDone != null;
    }

    private boolean hasInitSucceeded() {
        return this.initDone != null && this.initDone.booleanValue();
    }
}
