package com.datatorrent.netlet;

import com.datatorrent.netlet.NetletThrowable;
import com.datatorrent.netlet.util.Slice;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/netlet/WriteOnlyClient.class */
public class WriteOnlyClient extends AbstractClientListener {
    private static final Logger logger = LoggerFactory.getLogger(WriteOnlyClient.class);
    protected final ByteBuffer writeBuffer;
    protected Queue<Slice> sendQueue;
    protected final SpscArrayQueue<Slice> freeQueue;
    protected final Queue<NetletThrowable> throwables;
    protected boolean isWriteEnabled;
    protected Lock lock;

    /* loaded from: input_file:com/datatorrent/netlet/WriteOnlyClient$ForwardingQueue.class */
    private static class ForwardingQueue extends AbstractQueue<Slice> {
        private final Queue<Slice> queue;

        private ForwardingQueue(Queue<Slice> queue) {
            this.queue = queue;
        }

        @Override // java.util.AbstractCollection, java.util.Collection, java.lang.Iterable
        public Iterator<Slice> iterator() {
            return this.queue.iterator();
        }

        @Override // java.util.AbstractCollection, java.util.Collection
        public int size() {
            return this.queue.size();
        }

        @Override // java.util.Queue
        public boolean offer(Slice slice) {
            return this.queue.offer(slice);
        }

        @Override // java.util.Queue
        public Slice poll() {
            return this.queue.poll();
        }

        @Override // java.util.Queue
        public Slice peek() {
            return this.queue.peek();
        }
    }

    public WriteOnlyClient() {
        this(65536, 1024);
    }

    public WriteOnlyClient(int i, int i2) {
        this(ByteBuffer.allocateDirect(i), i2);
    }

    public WriteOnlyClient(ByteBuffer byteBuffer, int i) {
        this.throwables = new ConcurrentLinkedQueue();
        this.isWriteEnabled = true;
        this.lock = new ReentrantLock();
        this.writeBuffer = byteBuffer;
        this.sendQueue = new SpscArrayQueue(i);
        this.freeQueue = new SpscArrayQueue<>(i);
    }

    @Override // com.datatorrent.netlet.AbstractClientListener, com.datatorrent.netlet.Listener.ClientListener
    public void connected() {
        super.connected();
        shutdownIO(true);
        suspendReadIfResumed();
        try {
            this.lock.lock();
            this.isWriteEnabled = !isWriteSuspended();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.datatorrent.netlet.AbstractClientListener, com.datatorrent.netlet.Listener
    public void unregistered(SelectionKey selectionKey) {
        try {
            this.lock.lock();
            if (this.sendQueue.peek() != null || this.isWriteEnabled) {
                this.sendQueue = new ForwardingQueue(this.sendQueue) { // from class: com.datatorrent.netlet.WriteOnlyClient.1
                    @Override // com.datatorrent.netlet.WriteOnlyClient.ForwardingQueue, java.util.Queue
                    public boolean offer(Slice slice) {
                        throw new RuntimeException("Client " + this + " does not accept new data.");
                    }
                };
            } else {
                super.unregistered(selectionKey);
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.datatorrent.netlet.Listener.ClientListener
    public final void read() throws IOException {
        if (suspendReadIfResumed()) {
            logger.warn("{} OP_READ should be disabled", this);
        } else {
            logger.error("{} read is not expected", this);
        }
    }

    @Override // com.datatorrent.netlet.AbstractClientListener
    public boolean resumeReadIfSuspended() {
        throw new UnsupportedOperationException();
    }

    @Override // com.datatorrent.netlet.Listener.ClientListener
    public void write() throws IOException {
        int remaining = this.writeBuffer.remaining();
        while (true) {
            Slice peek = this.sendQueue.peek();
            if (peek == null) {
                channelWrite();
                return;
            }
            while (remaining < peek.length) {
                if (remaining > 0) {
                    this.writeBuffer.put(peek.buffer, peek.offset, remaining);
                    peek.offset += remaining;
                    peek.length -= remaining;
                }
                if (channelWrite() == 0) {
                    return;
                } else {
                    remaining = this.writeBuffer.remaining();
                }
            }
            this.writeBuffer.put(peek.buffer, peek.offset, peek.length);
            peek.buffer = null;
            remaining -= peek.length;
            this.freeQueue.offer(this.sendQueue.poll());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int channelWrite() throws IOException {
        if (this.writeBuffer.flip().remaining() > 0) {
            int write = ((SocketChannel) this.key.channel()).write(this.writeBuffer);
            this.writeBuffer.compact();
            return write;
        }
        this.writeBuffer.clear();
        try {
            this.lock.lock();
            if (this.sendQueue.peek() == null) {
                suspendWriteIfResumed();
                this.isWriteEnabled = false;
            }
            return 0;
        } finally {
            this.lock.unlock();
        }
    }

    public boolean send(byte[] bArr) {
        return send(bArr, 0, bArr.length);
    }

    public boolean send(byte[] bArr, int i, int i2) {
        Slice slice = (Slice) this.freeQueue.poll();
        if (slice == null) {
            slice = new Slice(bArr, i, i2);
        } else {
            if (slice.buffer != null) {
                throw new RuntimeException("Unexpected slice " + slice.toString());
            }
            slice.buffer = bArr;
            slice.offset = i;
            slice.length = i2;
        }
        return send(slice);
    }

    public boolean send(Slice slice) {
        NetletThrowable poll = this.throwables.poll();
        if (poll != null) {
            NetletThrowable.Util.throwRuntime(poll);
        }
        if (!this.sendQueue.offer(slice)) {
            return false;
        }
        try {
            this.lock.lock();
            if (!this.isWriteEnabled) {
                resumeWriteIfSuspended();
                this.isWriteEnabled = true;
            }
            return true;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // com.datatorrent.netlet.AbstractClientListener, com.datatorrent.netlet.Listener
    public void handleException(Exception exc, EventLoop eventLoop) {
        if (!this.key.isValid()) {
            super.handleException(exc, eventLoop);
            this.throwables.offer(NetletThrowable.Util.rewrap(exc, eventLoop));
            return;
        }
        if (this.key.attachment() != this) {
            logger.error("Ignoring exception received after discarding the connection.", exc);
            return;
        }
        if (!(exc instanceof IOException)) {
            super.handleException(exc, eventLoop);
            this.throwables.offer(NetletThrowable.Util.rewrap(exc, eventLoop));
        } else {
            logger.error("Disconnecting {} because of an exception.", this, exc);
            if (isConnected()) {
                eventLoop.disconnect(this);
            }
        }
    }
}
