package org.dcache.resilience.util;

import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.Message;
import diskCacheV111.vehicles.PnfsAddCacheLocationMessage;
import diskCacheV111.vehicles.PnfsClearCacheLocationMessage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import org.dcache.resilience.handlers.ResilienceMessageHandler;
import org.dcache.vehicles.CorruptFileMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/dcache/resilience/util/SimplePersistentBacklogHandler.class */
public final class SimplePersistentBacklogHandler implements BackloggedMessageHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimplePersistentBacklogHandler.class);
    ResilienceMessageHandler messageHandler;
    private final File backlogStore;
    private final Consumer consumer = new Consumer();
    private final Reloader reloader = new Reloader();
    BlockingQueue<Serializable> queue = new LinkedBlockingDeque(1000000);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/dcache/resilience/util/SimplePersistentBacklogHandler$BacklogThread.class */
    public abstract class BacklogThread implements Runnable {
        private Thread thread;

        BacklogThread() {
        }

        protected void start() {
            if (this.thread == null || !this.thread.isAlive()) {
                this.thread = new Thread(this, getClass().getSimpleName());
                this.thread.start();
            }
        }

        protected void stop() {
            if (this.thread != null && this.thread.isAlive()) {
                this.thread.interrupt();
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                }
            }
            this.thread = null;
        }
    }

    /* loaded from: input_file:org/dcache/resilience/util/SimplePersistentBacklogHandler$Consumer.class */
    class Consumer extends BacklogThread {
        private PrintWriter pw;

        Consumer() {
            super();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(SimplePersistentBacklogHandler.this.queue.take());
                    SimplePersistentBacklogHandler.this.queue.drainTo(arrayList, 1000);
                    arrayList.stream().forEach(this::saveToBacklog);
                } catch (InterruptedException e) {
                }
            }
            SimplePersistentBacklogHandler.LOGGER.info("Backlogged messages consumer exiting.");
        }

        @Override // org.dcache.resilience.util.SimplePersistentBacklogHandler.BacklogThread
        protected void start() {
            try {
                this.pw = new PrintWriter(new FileWriter(SimplePersistentBacklogHandler.this.backlogStore, true));
                super.start();
            } catch (IOException e) {
                SimplePersistentBacklogHandler.LOGGER.info("Failed to open writer to save backlogged messages: {}.", new ExceptionMessage(e));
            }
        }

        @Override // org.dcache.resilience.util.SimplePersistentBacklogHandler.BacklogThread
        protected void stop() {
            super.stop();
            this.pw.close();
        }

        private void saveToBacklog(Serializable serializable) {
            String pnfsId;
            String poolName;
            String simpleName;
            if (serializable instanceof CorruptFileMessage) {
                CorruptFileMessage corruptFileMessage = (CorruptFileMessage) serializable;
                pnfsId = corruptFileMessage.getPnfsId().toString();
                poolName = corruptFileMessage.getPool();
                simpleName = CorruptFileMessage.class.getSimpleName();
            } else if (serializable instanceof PnfsClearCacheLocationMessage) {
                PnfsClearCacheLocationMessage pnfsClearCacheLocationMessage = (PnfsClearCacheLocationMessage) serializable;
                pnfsId = pnfsClearCacheLocationMessage.getPnfsId().toString();
                poolName = pnfsClearCacheLocationMessage.getPoolName();
                simpleName = PnfsClearCacheLocationMessage.class.getSimpleName();
            } else {
                if (!(serializable instanceof PnfsAddCacheLocationMessage)) {
                    return;
                }
                PnfsAddCacheLocationMessage pnfsAddCacheLocationMessage = (PnfsAddCacheLocationMessage) serializable;
                pnfsId = pnfsAddCacheLocationMessage.getPnfsId().toString();
                poolName = pnfsAddCacheLocationMessage.getPoolName();
                simpleName = PnfsAddCacheLocationMessage.class.getSimpleName();
            }
            this.pw.println(simpleName + "," + pnfsId + "," + poolName);
            if (this.pw.checkError()) {
                SimplePersistentBacklogHandler.LOGGER.error("Problem saving ({} {} {}) to file; skipped.", new Object[]{simpleName, pnfsId, poolName});
            }
        }
    }

    /* loaded from: input_file:org/dcache/resilience/util/SimplePersistentBacklogHandler$Reloader.class */
    class Reloader extends BacklogThread {
        Reloader() {
            super();
        }

        @Override // java.lang.Runnable
        public void run() {
            String readLine;
            if (SimplePersistentBacklogHandler.this.backlogStore.exists()) {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new FileReader(SimplePersistentBacklogHandler.this.backlogStore));
                    Throwable th = null;
                    while (!Thread.interrupted() && (readLine = bufferedReader.readLine()) != null) {
                        try {
                            try {
                                SimplePersistentBacklogHandler.this.messageHandler.processBackloggedMessage(SimplePersistentBacklogHandler.fromString(readLine));
                            } catch (IllegalStateException e) {
                                SimplePersistentBacklogHandler.LOGGER.debug("Unable to reload message; {}", e.getMessage());
                            }
                        } finally {
                        }
                    }
                    SimplePersistentBacklogHandler.this.backlogStore.delete();
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                } catch (FileNotFoundException e2) {
                    SimplePersistentBacklogHandler.LOGGER.error("Unable to reload checkpoint file: {}", e2.getMessage());
                } catch (IOException e3) {
                    SimplePersistentBacklogHandler.LOGGER.error("Unrecoverable error during reload checkpoint file: {}", e3.getMessage());
                }
                SimplePersistentBacklogHandler.LOGGER.info("Done reloading backlogged messages.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Message fromString(String str) {
        String[] split = str.split("[,]");
        if (split.length != 3) {
            throw new IllegalStateException(str + " is malformed.");
        }
        if (split[0].equals(CorruptFileMessage.class.getSimpleName())) {
            return new CorruptFileMessage(split[2], new PnfsId(split[1]));
        }
        if (split[0].equals(PnfsClearCacheLocationMessage.class.getSimpleName())) {
            return new PnfsClearCacheLocationMessage(new PnfsId(split[1]), split[2]);
        }
        if (split[0].equals(PnfsAddCacheLocationMessage.class.getSimpleName())) {
            return new PnfsAddCacheLocationMessage(new PnfsId(split[1]), split[2]);
        }
        throw new IllegalStateException(str + ": invalid message type.");
    }

    public SimplePersistentBacklogHandler(String str) {
        this.backlogStore = new File(str);
    }

    @Override // org.dcache.resilience.util.BackloggedMessageHandler
    public void handleBacklog() {
        this.consumer.stop();
        this.reloader.start();
    }

    @Override // org.dcache.resilience.util.BackloggedMessageHandler
    public void initialize() {
        this.consumer.start();
    }

    @Override // org.dcache.resilience.util.BackloggedMessageHandler
    public void saveToBacklog(Serializable serializable) {
        try {
            this.queue.put(serializable);
        } catch (InterruptedException e) {
            LOGGER.info("Queue limit reached, dropping {}.", serializable);
        }
    }

    public void setMessageHandler(ResilienceMessageHandler resilienceMessageHandler) {
        this.messageHandler = resilienceMessageHandler;
    }

    @Override // org.dcache.resilience.util.BackloggedMessageHandler
    public void shutdown() {
        this.queue.clear();
        this.consumer.stop();
        this.reloader.stop();
    }
}
