package com.datatorrent.stram.engine;

import com.datatorrent.api.Sink;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.netlet.util.UnsafeBlockingQueue;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoir.class */
public abstract class AbstractReservoir implements SweepableReservoir, BlockingQueue<Object> {
    private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class);
    static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir";
    private static final int SPSC_ARRAY_BLOCKING_QUEUE_CAPACITY_THRESHOLD = 65536;
    private Sink<Object> sink;
    private String id;
    protected int count;

    /* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoir$ArrayBlockingQueueReservoir.class */
    private static class ArrayBlockingQueueReservoir extends AbstractReservoir {
        private final ArrayBlockingQueue<Object> queue;

        private ArrayBlockingQueueReservoir(String str, int i) {
            super(str);
            this.queue = new ArrayBlockingQueue<>(i);
        }

        @Override // com.datatorrent.stram.engine.SweepableReservoir
        public Tuple sweep() {
            ArrayBlockingQueue<Object> arrayBlockingQueue = this.queue;
            Sink<Object> sink = getSink();
            while (true) {
                Object peek = arrayBlockingQueue.peek();
                if (peek == null) {
                    return null;
                }
                if (peek instanceof Tuple) {
                    return (Tuple) peek;
                }
                this.count++;
                sink.put(arrayBlockingQueue.poll());
            }
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue, java.util.Collection
        public boolean add(Object obj) {
            return this.queue.add(obj);
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue
        public boolean offer(Object obj) {
            return this.queue.offer(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public void put(Object obj) throws InterruptedException {
            this.queue.put(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public boolean offer(Object obj, long j, TimeUnit timeUnit) throws InterruptedException {
            return this.queue.offer(obj, j, timeUnit);
        }

        @Override // java.util.Queue
        public Object poll() {
            return this.queue.poll();
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object take() throws InterruptedException {
            return this.queue.take();
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object poll(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.queue.poll(j, timeUnit);
        }

        @Override // java.util.Queue
        public Object peek() {
            return this.queue.peek();
        }

        @Override // java.util.Collection
        public int size() {
            return this.queue.size();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public int size(boolean z) {
            return this.queue.size();
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir
        public int capacity() {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.BlockingQueue
        public int remainingCapacity() {
            return this.queue.remainingCapacity();
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean remove(Object obj) {
            return this.queue.remove(obj);
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean contains(Object obj) {
            return this.queue.contains(obj);
        }

        @Override // java.util.Collection
        public Object[] toArray() {
            return this.queue.toArray();
        }

        @Override // java.util.Collection
        public <T> T[] toArray(T[] tArr) {
            return (T[]) this.queue.toArray(tArr);
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir
        public String toString() {
            return this.queue.toString();
        }

        @Override // java.util.Collection
        public void clear() {
            this.queue.clear();
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(Collection<? super Object> collection) {
            return this.queue.drainTo(collection);
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(Collection<? super Object> collection, int i) {
            return this.queue.drainTo(collection, i);
        }

        @Override // java.util.Collection, java.lang.Iterable
        public Iterator<Object> iterator() {
            return this.queue.iterator();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public Object remove() {
            return this.queue.remove();
        }

        @Override // java.util.Queue
        public Object element() {
            return this.queue.element();
        }

        @Override // java.util.Collection
        public boolean addAll(Collection<?> collection) {
            return this.queue.addAll(collection);
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        @Override // java.util.Collection
        public boolean containsAll(Collection<?> collection) {
            return this.queue.containsAll(collection);
        }

        @Override // java.util.Collection
        public boolean removeAll(Collection<?> collection) {
            return this.queue.removeAll(collection);
        }

        @Override // java.util.Collection
        public boolean retainAll(Collection<?> collection) {
            return this.queue.retainAll(collection);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoir$CircularBufferReservoir.class */
    private static class CircularBufferReservoir extends AbstractReservoir implements UnsafeBlockingQueue<Object> {
        private final CircularBuffer<Object> circularBuffer;

        private CircularBufferReservoir(String str, int i) {
            super(str);
            this.circularBuffer = new CircularBuffer<>(i);
        }

        @Override // com.datatorrent.stram.engine.SweepableReservoir
        public Tuple sweep() {
            CircularBuffer<Object> circularBuffer = this.circularBuffer;
            Sink<Object> sink = getSink();
            int size = circularBuffer.size();
            for (int i = 0; i < size; i++) {
                if (circularBuffer.peekUnsafe() instanceof Tuple) {
                    this.count += i;
                    return (Tuple) peekUnsafe();
                }
                sink.put(pollUnsafe());
            }
            this.count += size;
            return null;
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue, java.util.Collection
        public boolean add(Object obj) {
            return this.circularBuffer.add(obj);
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public Object remove() {
            return this.circularBuffer.remove();
        }

        @Override // java.util.Queue
        public Object peek() {
            return this.circularBuffer.peek();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public int size(boolean z) {
            int size = this.circularBuffer.size();
            if (z) {
                Iterator frozenIterator = this.circularBuffer.getFrozenIterator();
                while (frozenIterator.hasNext()) {
                    if (frozenIterator.next() instanceof Tuple) {
                        size--;
                    }
                }
            }
            return size;
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir
        public int capacity() {
            return this.circularBuffer.capacity();
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(Collection<? super Object> collection) {
            return this.circularBuffer.drainTo(collection);
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue
        public boolean offer(Object obj) {
            return this.circularBuffer.offer(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public void put(Object obj) throws InterruptedException {
            this.circularBuffer.put(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public boolean offer(Object obj, long j, TimeUnit timeUnit) throws InterruptedException {
            return this.circularBuffer.offer(obj, j, timeUnit);
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object take() throws InterruptedException {
            return this.circularBuffer.take();
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object poll(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.circularBuffer.poll(j, timeUnit);
        }

        @Override // java.util.concurrent.BlockingQueue
        public int remainingCapacity() {
            return this.circularBuffer.remainingCapacity();
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean remove(Object obj) {
            return this.circularBuffer.remove(obj);
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean contains(Object obj) {
            return this.circularBuffer.contains(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(Collection<? super Object> collection, int i) {
            return this.circularBuffer.drainTo(collection, i);
        }

        @Override // java.util.Queue
        public Object poll() {
            return this.circularBuffer.poll();
        }

        public Object pollUnsafe() {
            return this.circularBuffer.pollUnsafe();
        }

        @Override // java.util.Queue
        public Object element() {
            return this.circularBuffer.element();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public boolean isEmpty() {
            return this.circularBuffer.isEmpty();
        }

        public Iterator<Object> getFrozenIterator() {
            return this.circularBuffer.getFrozenIterator();
        }

        public Iterable<Object> getFrozenIterable() {
            return this.circularBuffer.getFrozenIterable();
        }

        @Override // java.util.Collection, java.lang.Iterable
        public Iterator<Object> iterator() {
            return this.circularBuffer.iterator();
        }

        @Override // java.util.Collection
        public Object[] toArray() {
            return this.circularBuffer.toArray();
        }

        @Override // java.util.Collection
        public <T> T[] toArray(T[] tArr) {
            return (T[]) this.circularBuffer.toArray(tArr);
        }

        @Override // java.util.Collection
        public boolean containsAll(Collection<?> collection) {
            return this.circularBuffer.containsAll(collection);
        }

        @Override // java.util.Collection
        public boolean addAll(Collection<?> collection) {
            return this.circularBuffer.addAll(collection);
        }

        @Override // java.util.Collection
        public boolean removeAll(Collection<?> collection) {
            return this.circularBuffer.removeAll(collection);
        }

        @Override // java.util.Collection
        public boolean retainAll(Collection<?> collection) {
            return this.circularBuffer.retainAll(collection);
        }

        @Override // java.util.Collection
        public int size() {
            return this.circularBuffer.size();
        }

        @Override // java.util.Collection
        public void clear() {
            this.circularBuffer.clear();
        }

        public Object peekUnsafe() {
            return this.circularBuffer.peekUnsafe();
        }

        public CircularBuffer<Object> getWhitehole(String str) {
            return this.circularBuffer.getWhitehole(str);
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoir$SpscArrayBlockingQueueReservoir.class */
    private static class SpscArrayBlockingQueueReservoir extends SpscArrayQueueReservoir {
        private final ReentrantLock lock;
        private final Condition notFull;

        private SpscArrayBlockingQueueReservoir(String str, int i) {
            super(str, i);
            this.lock = new ReentrantLock();
            this.notFull = this.lock.newCondition();
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir.SpscArrayQueueReservoir, com.datatorrent.stram.engine.SweepableReservoir
        public Tuple sweep() {
            ReentrantLock reentrantLock = this.lock;
            SpscArrayQueue<Object> queue = getQueue();
            Sink<Object> sink = getSink();
            reentrantLock.lock();
            do {
                try {
                    Object peek = queue.peek();
                    if (peek == null) {
                        reentrantLock.unlock();
                        return null;
                    }
                    if (peek instanceof Tuple) {
                        Tuple tuple = (Tuple) peek;
                        reentrantLock.unlock();
                        return tuple;
                    }
                    this.count++;
                    sink.put(queue.poll());
                    this.notFull.signal();
                } finally {
                    reentrantLock.unlock();
                }
            } while (!reentrantLock.hasQueuedThreads());
            return null;
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir.SpscArrayQueueReservoir, java.util.concurrent.BlockingQueue
        public void put(Object obj) throws InterruptedException {
            SpscArrayQueue<Object> queue = getQueue();
            if (queue.offer(obj)) {
                return;
            }
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lockInterruptibly();
            while (!queue.offer(obj)) {
                try {
                    this.notFull.await();
                } finally {
                    reentrantLock.unlock();
                }
            }
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir.SpscArrayQueueReservoir, com.datatorrent.stram.engine.Reservoir
        public Object remove() {
            SpscArrayQueue<Object> queue = getQueue();
            ReentrantLock reentrantLock = this.lock;
            reentrantLock.lock();
            try {
                Object remove = queue.remove();
                if (remove != null) {
                    this.notFull.signal();
                }
                return remove;
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AbstractReservoir$SpscArrayQueueReservoir.class */
    private static class SpscArrayQueueReservoir extends AbstractReservoir {
        private final int maxSpinMillis = 10;
        private final SpscArrayQueue<Object> queue;

        private SpscArrayQueueReservoir(String str, int i) {
            super(str);
            this.maxSpinMillis = 10;
            this.queue = new SpscArrayQueue<>(i);
        }

        @Override // com.datatorrent.stram.engine.SweepableReservoir
        public Tuple sweep() {
            SpscArrayQueue<Object> spscArrayQueue = this.queue;
            Sink<Object> sink = getSink();
            while (true) {
                Object peek = spscArrayQueue.peek();
                if (peek == null) {
                    return null;
                }
                if (peek instanceof Tuple) {
                    return (Tuple) peek;
                }
                this.count++;
                sink.put(spscArrayQueue.poll());
            }
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue, java.util.Collection
        public boolean add(Object obj) {
            return this.queue.add(obj);
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public Object remove() {
            return this.queue.remove();
        }

        @Override // java.util.Queue
        public Object peek() {
            return this.queue.peek();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public int size(boolean z) {
            return this.queue.size();
        }

        @Override // com.datatorrent.stram.engine.AbstractReservoir
        public int capacity() {
            return this.queue.capacity();
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(final Collection<? super Object> collection) {
            return this.queue.drain(new MessagePassingQueue.Consumer<Object>() { // from class: com.datatorrent.stram.engine.AbstractReservoir.SpscArrayQueueReservoir.1
                public void accept(Object obj) {
                    collection.add(obj);
                }
            });
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Queue
        public boolean offer(Object obj) {
            return this.queue.offer(obj);
        }

        public void put(Object obj) throws InterruptedException {
            long j = 0;
            SpscArrayQueue<Object> spscArrayQueue = this.queue;
            while (!spscArrayQueue.offer(obj)) {
                Thread.sleep(j);
                j = Math.min(10L, j + 1);
            }
        }

        @Override // java.util.concurrent.BlockingQueue
        public boolean offer(Object obj, long j, TimeUnit timeUnit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object take() throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.BlockingQueue
        public Object poll(long j, TimeUnit timeUnit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.BlockingQueue
        public int remainingCapacity() {
            SpscArrayQueue<Object> spscArrayQueue = this.queue;
            return spscArrayQueue.capacity() - spscArrayQueue.size();
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean remove(Object obj) {
            return this.queue.remove(obj);
        }

        @Override // java.util.concurrent.BlockingQueue, java.util.Collection
        public boolean contains(Object obj) {
            return this.queue.contains(obj);
        }

        @Override // java.util.concurrent.BlockingQueue
        public int drainTo(final Collection<? super Object> collection, int i) {
            return this.queue.drain(new MessagePassingQueue.Consumer<Object>() { // from class: com.datatorrent.stram.engine.AbstractReservoir.SpscArrayQueueReservoir.2
                public void accept(Object obj) {
                    collection.add(obj);
                }
            }, i);
        }

        @Override // java.util.Queue
        public Object poll() {
            return this.queue.poll();
        }

        @Override // java.util.Queue
        public Object element() {
            return this.queue.element();
        }

        @Override // com.datatorrent.stram.engine.Reservoir
        public boolean isEmpty() {
            return this.queue.peek() == null;
        }

        @Override // java.util.Collection, java.lang.Iterable
        public Iterator<Object> iterator() {
            return this.queue.iterator();
        }

        @Override // java.util.Collection
        public Object[] toArray() {
            return this.queue.toArray();
        }

        @Override // java.util.Collection
        public <T> T[] toArray(T[] tArr) {
            return (T[]) this.queue.toArray(tArr);
        }

        @Override // java.util.Collection
        public boolean containsAll(Collection<?> collection) {
            return this.queue.containsAll(collection);
        }

        @Override // java.util.Collection
        public boolean addAll(Collection<?> collection) {
            return this.queue.addAll(collection);
        }

        @Override // java.util.Collection
        public boolean removeAll(Collection<?> collection) {
            return this.queue.removeAll(collection);
        }

        @Override // java.util.Collection
        public boolean retainAll(Collection<?> collection) {
            return this.queue.retainAll(collection);
        }

        @Override // java.util.Collection
        public int size() {
            return this.queue.size();
        }

        @Override // java.util.Collection
        public void clear() {
            this.queue.clear();
        }

        protected SpscArrayQueue<Object> getQueue() {
            return this.queue;
        }
    }

    public static AbstractReservoir newReservoir(String str, int i) {
        String property = System.getProperty(reservoirClassNameProperty);
        if (property == null) {
            return i >= SPSC_ARRAY_BLOCKING_QUEUE_CAPACITY_THRESHOLD ? new SpscArrayQueueReservoir(str, i) : new SpscArrayBlockingQueueReservoir(str, i);
        }
        if (property.equals(SpscArrayQueueReservoir.class.getName())) {
            return new SpscArrayQueueReservoir(str, i);
        }
        if (property.equals(SpscArrayBlockingQueueReservoir.class.getName())) {
            return new SpscArrayBlockingQueueReservoir(str, i);
        }
        if (property.equals(CircularBufferReservoir.class.getName())) {
            return new CircularBufferReservoir(str, i);
        }
        if (property.equals(ArrayBlockingQueueReservoir.class.getName())) {
            return new ArrayBlockingQueueReservoir(str, i);
        }
        try {
            return (AbstractReservoir) Class.forName(property).getConstructor(String.class, Integer.TYPE).newInstance(str, Integer.valueOf(i));
        } catch (ReflectiveOperationException e) {
            logger.debug("Fail to construct reservoir {}", property, e);
            throw new RuntimeException("Fail to construct reservoir " + property, e);
        }
    }

    protected AbstractReservoir(String str) {
        this.id = str;
    }

    @Override // com.datatorrent.stram.engine.SweepableReservoir
    public Sink<Object> setSink(Sink<Object> sink) {
        try {
            Sink<Object> sink2 = this.sink;
            this.sink = sink;
            return sink2;
        } catch (Throwable th) {
            this.sink = sink;
            throw th;
        }
    }

    @Override // com.datatorrent.stram.engine.SweepableReservoir
    public int getCount(boolean z) {
        try {
            int i = this.count;
            if (z) {
                this.count = 0;
            }
            return i;
        } catch (Throwable th) {
            if (z) {
                this.count = 0;
            }
            throw th;
        }
    }

    public abstract int capacity();

    public String getId() {
        return this.id;
    }

    public void setId(String str) {
        this.id = str;
    }

    protected Sink<Object> getSink() {
        return this.sink;
    }

    public String toString() {
        return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{sink=" + this.sink + ", id=" + this.id + ", count=" + this.count + '}';
    }
}
