package com.netflix.curator.framework.recipes.queue;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.api.ACLBackgroundPathAndBytesable;
import com.netflix.curator.framework.api.BackgroundCallback;
import com.netflix.curator.framework.api.BackgroundPathable;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.api.CuratorEventType;
import com.netflix.curator.framework.api.CuratorListener;
import com.netflix.curator.framework.api.PathAndBytesable;
import com.netflix.curator.framework.api.WatchPathable;
import com.netflix.curator.framework.listen.ListenerContainer;
import com.netflix.curator.utils.ZKPaths;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/curator/framework/recipes/queue/DistributedQueue.class */
public class DistributedQueue<T> implements QueueBase<T> {
    private final CuratorFramework client;
    private final QueueSerializer<T> serializer;
    private final String queuePath;
    private final Executor executor;
    private final ExecutorService service;
    private final QueueConsumer<T> consumer;
    private final int minItemsBeforeRefresh;
    private final boolean refreshOnWatch;
    private final boolean isProducerOnly;
    private final String lockPath;
    private static final String QUEUE_ITEM_NAME = "queue-";
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final AtomicReference<State> state = new AtomicReference<>(State.LATENT);
    private final AtomicBoolean refreshOnWatchSignaled = new AtomicBoolean(false);
    private final AtomicReference<ErrorMode> errorMode = new AtomicReference<>(ErrorMode.REQUEUE);
    private final ListenerContainer<QueuePutListener<T>> putListenerContainer = new ListenerContainer<>();
    private final AtomicInteger lastChildCount = new AtomicInteger(0);
    private final AtomicInteger putCount = new AtomicInteger(0);
    private final CuratorListener listener = new CuratorListener() { // from class: com.netflix.curator.framework.recipes.queue.DistributedQueue.1
        public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
            if (curatorEvent.getType() == CuratorEventType.WATCHED && curatorEvent.getWatchedEvent().getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                DistributedQueue.this.internalNotify();
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/curator/framework/recipes/queue/DistributedQueue$ProcessType.class */
    public enum ProcessType {
        NORMAL,
        REMOVE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/curator/framework/recipes/queue/DistributedQueue$State.class */
    public enum State {
        LATENT,
        STARTED,
        STOPPED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedQueue(CuratorFramework curatorFramework, QueueConsumer<T> queueConsumer, QueueSerializer<T> queueSerializer, String str, ThreadFactory threadFactory, Executor executor, int i, boolean z, String str2) {
        Preconditions.checkNotNull(curatorFramework, "client cannot be null");
        Preconditions.checkNotNull(queueSerializer, "serializer cannot be null");
        Preconditions.checkNotNull(str, "queuePath cannot be null");
        Preconditions.checkNotNull(threadFactory, "threadFactory cannot be null");
        Preconditions.checkNotNull(executor, "executor cannot be null");
        this.isProducerOnly = queueConsumer == null;
        this.lockPath = str2;
        this.consumer = queueConsumer;
        this.minItemsBeforeRefresh = i;
        this.refreshOnWatch = z;
        this.client = curatorFramework;
        this.serializer = queueSerializer;
        this.queuePath = str;
        this.executor = executor;
        this.service = Executors.newSingleThreadExecutor(threadFactory);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.netflix.curator.framework.recipes.queue.QueueBase
    public void start() throws Exception {
        if (!this.state.compareAndSet(State.LATENT, State.STARTED)) {
            throw new IllegalStateException();
        }
        try {
            this.client.create().creatingParentsIfNeeded().forPath(this.queuePath);
        } catch (KeeperException.NodeExistsException e) {
        }
        if (this.lockPath != null) {
            try {
                this.client.create().creatingParentsIfNeeded().forPath(this.lockPath);
            } catch (KeeperException.NodeExistsException e2) {
            }
        }
        this.client.getCuratorListenable().addListener(this.listener, this.executor);
        if (this.isProducerOnly) {
            return;
        }
        this.service.submit(new Callable<Object>() { // from class: com.netflix.curator.framework.recipes.queue.DistributedQueue.2
            @Override // java.util.concurrent.Callable
            public Object call() {
                DistributedQueue.this.runLoop();
                return null;
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (!this.state.compareAndSet(State.STARTED, State.STOPPED)) {
            throw new IllegalStateException();
        }
        this.putListenerContainer.clear();
        this.client.getCuratorListenable().removeListener(this.listener);
        this.service.shutdownNow();
    }

    @Override // com.netflix.curator.framework.recipes.queue.QueueBase
    public ListenerContainer<QueuePutListener<T>> getPutListenerContainer() {
        return this.putListenerContainer;
    }

    @Override // com.netflix.curator.framework.recipes.queue.QueueBase
    public void setErrorMode(ErrorMode errorMode) {
        Preconditions.checkNotNull(this.lockPath, "lockPath cannot be null");
        this.errorMode.set(errorMode);
    }

    @Override // com.netflix.curator.framework.recipes.queue.QueueBase
    public boolean flushPuts(long j, TimeUnit timeUnit) throws InterruptedException {
        long convert = TimeUnit.MILLISECONDS.convert(j, timeUnit);
        synchronized (this.putCount) {
            while (this.putCount.get() > 0) {
                if (convert <= 0) {
                    return false;
                }
                long currentTimeMillis = System.currentTimeMillis();
                this.putCount.wait(convert);
                convert -= System.currentTimeMillis() - currentTimeMillis;
            }
            return true;
        }
    }

    public void put(T t) throws Exception {
        checkState();
        internalPut(t, null, makeItemPath());
    }

    public void putMulti(MultiItem<T> multiItem) throws Exception {
        checkState();
        internalPut(null, multiItem, makeItemPath());
    }

    @Override // com.netflix.curator.framework.recipes.queue.QueueBase
    public int getLastMessageCount() {
        return this.lastChildCount.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void internalPut(final T t, final MultiItem<T> multiItem, String str) throws Exception {
        if (t != null) {
            final AtomicReference atomicReference = new AtomicReference(t);
            multiItem = new MultiItem<T>() { // from class: com.netflix.curator.framework.recipes.queue.DistributedQueue.3
                @Override // com.netflix.curator.framework.recipes.queue.MultiItem
                public T nextItem() throws Exception {
                    return (T) atomicReference.getAndSet(null);
                }
            };
        }
        this.putCount.incrementAndGet();
        internalCreateNode(str, ItemSerializer.serialize(multiItem, this.serializer), new BackgroundCallback() { // from class: com.netflix.curator.framework.recipes.queue.DistributedQueue.4
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                if (curatorEvent.getType() == CuratorEventType.CREATE) {
                    synchronized (DistributedQueue.this.putCount) {
                        DistributedQueue.this.putCount.decrementAndGet();
                        DistributedQueue.this.putCount.notifyAll();
                    }
                }
                DistributedQueue.this.putListenerContainer.forEach(new Function<QueuePutListener<T>, Void>() { // from class: com.netflix.curator.framework.recipes.queue.DistributedQueue.4.1
                    /* JADX WARN: Multi-variable type inference failed */
                    public Void apply(QueuePutListener<T> queuePutListener) {
                        if (t != null) {
                            queuePutListener.putCompleted(t);
                            return null;
                        }
                        queuePutListener.putMultiCompleted(multiItem);
                        return null;
                    }
                });
            }
        });
    }

    @VisibleForTesting
    void internalCreateNode(String str, byte[] bArr, BackgroundCallback backgroundCallback) throws Exception {
        ((PathAndBytesable) ((ACLBackgroundPathAndBytesable) this.client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).inBackground(backgroundCallback)).forPath(str, bArr);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkState() throws Exception {
        if (this.state.get() != State.STARTED) {
            throw new IllegalStateException();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String makeItemPath() {
        return ZKPaths.makePath(this.queuePath, QUEUE_ITEM_NAME);
    }

    protected void sortChildren(List<String> list) {
        Collections.sort(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getChildren() throws Exception {
        return (List) ((BackgroundPathable) this.client.getChildren().watched()).forPath(this.queuePath);
    }

    protected long getDelay(String str) {
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean tryRemove(String str) throws Exception {
        return this.lockPath != null ? processWithLockSafety(str, ProcessType.REMOVE) : processNormally(str, ProcessType.REMOVE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void internalNotify() {
        if (this.refreshOnWatch) {
            this.refreshOnWatchSignaled.set(true);
        }
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runLoop() {
        List<String> children;
        while (!Thread.currentThread().isInterrupted()) {
            try {
                synchronized (this) {
                    while (true) {
                        children = getChildren();
                        this.lastChildCount.set(children.size());
                        sortChildren(children);
                        long delay = children.size() > 0 ? getDelay(children.get(0)) : 0L;
                        if (delay <= 0) {
                            if (children.size() != 0) {
                                break;
                            } else {
                                wait();
                            }
                        } else {
                            wait(delay);
                        }
                    }
                    this.refreshOnWatchSignaled.set(false);
                }
                if (children.size() > 0) {
                    processChildren(children);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e2) {
                this.log.error("Exception caught in background handler", e2);
                return;
            }
        }
    }

    private void processChildren(List<String> list) throws Exception {
        boolean z = this.lockPath != null;
        int i = this.minItemsBeforeRefresh;
        for (String str : list) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            if (str.startsWith(QUEUE_ITEM_NAME)) {
                int i2 = i;
                i--;
                if (i2 <= 0 && this.refreshOnWatchSignaled.compareAndSet(true, false)) {
                    return;
                }
                if (getDelay(str) <= 0) {
                    if (z) {
                        processWithLockSafety(str, ProcessType.NORMAL);
                    } else {
                        processNormally(str, ProcessType.NORMAL);
                    }
                }
            } else {
                this.log.warn("Foreign node in queue path: " + str);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean processMessageBytes(String str, byte[] bArr) throws Exception {
        try {
            MultiItem deserialize = ItemSerializer.deserialize(bArr, this.serializer);
            boolean z = true;
            while (true) {
                Object nextItem = deserialize.nextItem();
                if (nextItem == null) {
                    break;
                }
                try {
                    this.consumer.consumeMessage(nextItem);
                } catch (Exception e) {
                    this.log.error("Exception processing queue item: " + str, e);
                    if (this.errorMode.get() == ErrorMode.REQUEUE) {
                        z = false;
                        break;
                    }
                }
            }
            return z;
        } catch (Exception e2) {
            this.log.error("Corrupted queue item: " + str, e2);
            return false;
        }
    }

    private boolean processNormally(String str, ProcessType processType) throws Exception {
        try {
            String makePath = ZKPaths.makePath(this.queuePath, str);
            Stat stat = new Stat();
            byte[] bArr = null;
            if (processType == ProcessType.NORMAL) {
                bArr = (byte[]) ((WatchPathable) this.client.getData().storingStatIn(stat)).forPath(makePath);
            }
            ((BackgroundPathable) this.client.delete().withVersion(stat.getVersion())).forPath(makePath);
            if (processType != ProcessType.NORMAL) {
                return true;
            }
            processMessageBytes(str, bArr);
            return true;
        } catch (KeeperException.NoNodeException | KeeperException.NodeExistsException | KeeperException.BadVersionException e) {
            return false;
        }
    }

    private boolean processWithLockSafety(String str, ProcessType processType) throws Exception {
        String makePath = ZKPaths.makePath(this.lockPath, str);
        boolean z = false;
        try {
            ((ACLBackgroundPathAndBytesable) this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(makePath);
            z = true;
            String makePath2 = ZKPaths.makePath(this.queuePath, str);
            boolean z2 = false;
            if (processType == ProcessType.NORMAL) {
                z2 = processMessageBytes(str, (byte[]) this.client.getData().forPath(makePath2));
            } else if (processType == ProcessType.REMOVE) {
                z2 = true;
            }
            if (z2) {
                this.client.delete().forPath(makePath2);
            }
            if (1 != 0) {
                this.client.delete().guaranteed().forPath(makePath);
            }
            return true;
        } catch (KeeperException.BadVersionException e) {
            if (!z) {
                return false;
            }
            this.client.delete().guaranteed().forPath(makePath);
            return false;
        } catch (KeeperException.NoNodeException e2) {
            if (!z) {
                return false;
            }
            this.client.delete().guaranteed().forPath(makePath);
            return false;
        } catch (KeeperException.NodeExistsException e3) {
            if (!z) {
                return false;
            }
            this.client.delete().guaranteed().forPath(makePath);
            return false;
        } catch (Throwable th) {
            if (z) {
                this.client.delete().guaranteed().forPath(makePath);
            }
            throw th;
        }
    }
}
