package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.PendingCheckpointTest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.AvailabilityUtil;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.class */
public class RemoteInputChannelTest {
    private static final long CHECKPOINT_ID = 1;
    private static final CheckpointOptions UNALIGNED = CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault());
    private static final CheckpointOptions ALIGNED_WITH_TIMEOUT = CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 10);

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TestBufferPool.class */
    private static final class TestBufferPool extends NoOpBufferPool {
        private TestBufferPool() {
        }

        @Override // org.apache.flink.runtime.io.network.buffer.NoOpBufferPool
        public Buffer requestBuffer() {
            return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TestPartitionProducerStateProvider.class */
    private static final class TestPartitionProducerStateProvider implements PartitionProducerStateProvider {
        private boolean isInvoked;
        private final ResultPartitionID partitionId;

        TestPartitionProducerStateProvider(ResultPartitionID resultPartitionID) {
            this.partitionId = (ResultPartitionID) Preconditions.checkNotNull(resultPartitionID);
        }

        public void requestPartitionProducerState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID, Consumer<? super PartitionProducerStateProvider.ResponseHandle> consumer) {
            Assert.assertEquals(this.partitionId, resultPartitionID);
            this.isInvoked = true;
        }

        boolean isInvoked() {
            return this.isInvoked;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TestVerifyConnectionManager.class */
    private static final class TestVerifyConnectionManager extends TestingConnectionManager {
        private final PartitionRequestClient client;

        TestVerifyConnectionManager(TestingPartitionRequestClient testingPartitionRequestClient) {
            this.client = (PartitionRequestClient) Preconditions.checkNotNull(testingPartitionRequestClient);
        }

        @Override // org.apache.flink.runtime.io.network.TestingConnectionManager
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) {
            return this.client;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TestVerifyPartitionRequestClient.class */
    private static final class TestVerifyPartitionRequestClient extends TestingPartitionRequestClient {
        private ResultPartitionID partitionId;
        private int subpartitionIndex;
        private int delayMs;

        private TestVerifyPartitionRequestClient() {
        }

        @Override // org.apache.flink.runtime.io.network.TestingPartitionRequestClient
        public void requestSubpartition(ResultPartitionID resultPartitionID, int i, RemoteInputChannel remoteInputChannel, int i2) {
            this.partitionId = resultPartitionID;
            this.subpartitionIndex = i;
            this.delayMs = i2;
        }

        void verifyResult(ResultPartitionID resultPartitionID, int i, int i2) {
            Assert.assertEquals(resultPartitionID, this.partitionId);
            Assert.assertEquals(i, this.subpartitionIndex);
            Assert.assertEquals(i2, this.delayMs);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TestingExceptionConnectionManager.class */
    private static final class TestingExceptionConnectionManager extends TestingConnectionManager {
        private TestingExceptionConnectionManager() {
        }

        @Override // org.apache.flink.runtime.io.network.TestingConnectionManager
        public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) throws IOException {
            throw new IOException("");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest$TriFunction.class */
    public interface TriFunction<T, U, V, R> {
        R apply(T t, U u, V v) throws Exception;
    }

    @Test
    public void testGateNotifiedOnBarrierConversion() throws IOException, InterruptedException {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1, 4096);
        try {
            SingleInputGate build = new SingleInputGateBuilder().setBufferPoolFactory(networkBufferPool.createBufferPool(1, 1)).build();
            build.setup();
            RemoteInputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().setConnectionManager(new TestVerifyConnectionManager(new TestVerifyPartitionRequestClient())).buildRemoteChannel(build);
            buildRemoteChannel.requestSubpartition(0);
            buildRemoteChannel.onBuffer(EventSerializer.toBuffer(new CheckpointBarrier(CHECKPOINT_ID, 123L, CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 2147483647L)), false), 0, 0);
            build.pollNext();
            buildRemoteChannel.convertToPriorityEvent(0);
            Assert.assertTrue(build.getPriorityEventAvailableFuture().isDone());
            networkBufferPool.destroy();
        } catch (Throwable th) {
            networkBufferPool.destroy();
            throw th;
        }
    }

    @Test
    public void testExceptionOnReordering() throws Exception {
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1));
        Buffer createBuffer = TestBufferFactory.createBuffer(32768);
        createRemoteInputChannel.onBuffer(createBuffer.retainBuffer(), 0, -1);
        createRemoteInputChannel.onBuffer(createBuffer, 29, -1);
        try {
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception after enqueuing an out-of-order buffer.");
        } catch (Exception e) {
            Assert.assertFalse(createBuffer.isRecycled());
            createRemoteInputChannel.releaseAllResources();
            Assert.assertTrue(createBuffer.isRecycled());
        }
    }

    @Test
    public void testExceptionOnPersisting() throws Exception {
        RemoteInputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().setStateWriter(new ChannelStateWriter.NoOpChannelStateWriter() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.1
            public void addInputData(long j, InputChannelInfo inputChannelInfo, int i, CloseableIterator<Buffer> closeableIterator) {
                try {
                    closeableIterator.close();
                    throw new ExpectedTestException();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }).buildRemoteChannel(InputChannelTestUtils.createSingleInputGate(1));
        buildRemoteChannel.checkpointStarted(new CheckpointBarrier(42L, System.currentTimeMillis(), CheckpointOptions.unaligned(CheckpointStorageLocationReference.getDefault())));
        Buffer createBuffer = TestBufferFactory.createBuffer(32768);
        Assert.assertFalse(createBuffer.isRecycled());
        try {
            buildRemoteChannel.onBuffer(createBuffer, 0, -1);
            Assert.fail("This should have failed");
        } catch (ExpectedTestException e) {
        }
        Assert.assertFalse(createBuffer.isRecycled());
        buildRemoteChannel.releaseAllResources();
        Assert.assertTrue(createBuffer.isRecycled());
    }

    @Test
    public void testConcurrentOnBufferAndRelease() throws Exception {
        testConcurrentReleaseAndSomething(8192, (remoteInputChannel, buffer, num) -> {
            remoteInputChannel.onBuffer(buffer, num.intValue(), -1);
            return true;
        });
    }

    @Test
    public void testConcurrentNotifyBufferAvailableAndRelease() throws Exception {
        testConcurrentReleaseAndSomething(1024, (remoteInputChannel, buffer, num) -> {
            return Boolean.valueOf(remoteInputChannel.getBufferManager().notifyBufferAvailable(buffer));
        });
    }

    private void testConcurrentReleaseAndSomething(int i, TriFunction<RemoteInputChannel, Buffer, Integer, Boolean> triFunction) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        Buffer createBuffer = TestBufferFactory.createBuffer(32768);
        try {
            SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
            for (int i2 = 0; i2 < i; i2++) {
                RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
                Callable callable = () -> {
                    do {
                        for (int i3 = 0; i3 < 128; i3++) {
                            if (!((Boolean) triFunction.apply(createRemoteInputChannel, createBuffer.retainBuffer(), Integer.valueOf(i3))).booleanValue()) {
                                createBuffer.recycleBuffer();
                            }
                        }
                    } while (!createRemoteInputChannel.isReleased());
                    return null;
                };
                Callable callable2 = () -> {
                    createRemoteInputChannel.releaseAllResources();
                    return null;
                };
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(2);
                newArrayListWithCapacity.add(newFixedThreadPool.submit(callable));
                newArrayListWithCapacity.add(newFixedThreadPool.submit(callable2));
                Iterator it = newArrayListWithCapacity.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                Assert.assertEquals("Resource leak during concurrent release and notifyBufferAvailable.", 0L, createRemoteInputChannel.getNumberOfQueuedBuffers());
            }
        } finally {
            newFixedThreadPool.shutdown();
            Assert.assertFalse(createBuffer.isRecycled());
            createBuffer.recycleBuffer();
            Assert.assertTrue(createBuffer.isRecycled());
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testRetriggerWithoutPartitionRequest() throws Exception {
        createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 500, 3000).retriggerSubpartitionRequest(0);
    }

    @Test
    public void testPartitionRequestExponentialBackoff() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TestVerifyPartitionRequestClient testVerifyPartitionRequestClient = new TestVerifyPartitionRequestClient();
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate, new TestVerifyConnectionManager(testVerifyPartitionRequestClient), resultPartitionID, 500, 3000);
        createRemoteInputChannel.requestSubpartition(0);
        testVerifyPartitionRequestClient.verifyResult(resultPartitionID, 0, 0);
        for (int i : new int[]{500, 1000, 2000, 3000}) {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            testVerifyPartitionRequestClient.verifyResult(resultPartitionID, 0, i);
        }
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testPartitionRequestSingleBackoff() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TestVerifyPartitionRequestClient testVerifyPartitionRequestClient = new TestVerifyPartitionRequestClient();
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate, new TestVerifyConnectionManager(testVerifyPartitionRequestClient), resultPartitionID, 500, 500);
        createRemoteInputChannel.requestSubpartition(0);
        testVerifyPartitionRequestClient.verifyResult(resultPartitionID, 0, 0);
        createRemoteInputChannel.retriggerSubpartitionRequest(0);
        testVerifyPartitionRequestClient.verifyResult(resultPartitionID, 0, 500);
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testPartitionRequestNoBackoff() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TestVerifyPartitionRequestClient testVerifyPartitionRequestClient = new TestVerifyPartitionRequestClient();
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate, new TestVerifyConnectionManager(testVerifyPartitionRequestClient), resultPartitionID, 0, 0);
        createRemoteInputChannel.requestSubpartition(0);
        testVerifyPartitionRequestClient.verifyResult(resultPartitionID, 0, 0);
        try {
            createRemoteInputChannel.retriggerSubpartitionRequest(0);
            createRemoteInputChannel.getNextBuffer();
            Assert.fail("Did not throw expected exception.");
        } catch (Exception e) {
        }
    }

    @Test
    public void testOnFailedPartitionRequest() throws Exception {
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        TestPartitionProducerStateProvider testPartitionProducerStateProvider = new TestPartitionProducerStateProvider(resultPartitionID);
        InputChannelBuilder.newBuilder().setPartitionId(resultPartitionID).buildRemoteChannel(new SingleInputGateBuilder().setPartitionProducerStateProvider(testPartitionProducerStateProvider).build()).onFailedPartitionRequest();
        Assert.assertTrue(testPartitionProducerStateProvider.isInvoked());
    }

    @Test(expected = CancelTaskException.class)
    public void testProducerFailedException() throws Exception {
        ConnectionManager connectionManager = (ConnectionManager) Mockito.mock(ConnectionManager.class);
        Mockito.when(connectionManager.createPartitionRequestClient((ConnectionID) Matchers.any(ConnectionID.class))).thenReturn(Mockito.mock(PartitionRequestClient.class));
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, connectionManager);
        createRemoteInputChannel.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
        createRemoteInputChannel.requestSubpartition(0);
        createRemoteInputChannel.getNextBuffer();
    }

    @Test(expected = PartitionConnectionException.class)
    public void testPartitionConnectionException() throws IOException {
        TestingExceptionConnectionManager testingExceptionConnectionManager = new TestingExceptionConnectionManager();
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, 0, testingExceptionConnectionManager);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        createSingleInputGate.requestPartitions();
        createRemoteInputChannel.getNextBuffer();
    }

    @Test
    public void testAvailableBuffersLessThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool bufferPool = (BufferPool) Mockito.spy(networkBufferPool.createBufferPool(14, 14));
            createSingleInputGate.setBufferPool(bufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            Buffer requestBuffer = createRemoteInputChannel.requestBuffer();
            Assert.assertNotNull(requestBuffer);
            ArrayDeque arrayDeque = new ArrayDeque(2);
            for (int i = 0; i < 2; i++) {
                Buffer requestBuffer2 = bufferPool.requestBuffer();
                Assert.assertNotNull(requestBuffer2);
                arrayDeque.add(requestBuffer2);
            }
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(2))).requestBuffer();
            createRemoteInputChannel.onSenderBacklog(14);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(15))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 13 buffers available in the channel", 13L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 16 buffers required in the channel", 16L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            createRemoteInputChannel.onSenderBacklog(16);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(15))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 13 buffers available in the channel", 13L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 18 buffers required in the channel", 18L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            requestBuffer.recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(15))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 18 buffers required in the channel", 18L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            ((Buffer) arrayDeque.poll()).recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(16))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(2))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 15 buffers available in the channel", 15L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 18 buffers required in the channel", 18L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            createRemoteInputChannel.onSenderBacklog(13);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(16))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(2))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 15 buffers available in the channel", 15L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 15 buffers required in the channel", 15L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            ((Buffer) arrayDeque.poll()).recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(16))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(2))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 15 buffers available in the channel", 15L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 15 buffers required in the channel", 15L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 1 buffers available in local pool", CHECKPOINT_ID, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertFalse(createRemoteInputChannel.isWaitingForFloatingBuffers());
            createRemoteInputChannel.onSenderBacklog(15);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(18))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(3))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 16 buffers available in the channel", 16L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 17 buffers required in the channel", 17L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertTrue(createRemoteInputChannel.isWaitingForFloatingBuffers());
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testAvailableBuffersEqualToRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool bufferPool = (BufferPool) Mockito.spy(networkBufferPool.createBufferPool(14, 14));
            createSingleInputGate.setBufferPool(bufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            Buffer requestBuffer = createRemoteInputChannel.requestBuffer();
            Assert.assertNotNull(requestBuffer);
            Buffer requestBuffer2 = bufferPool.requestBuffer();
            Assert.assertNotNull(requestBuffer2);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).requestBuffer();
            createRemoteInputChannel.onSenderBacklog(12);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 14 buffers required in the channel", 14L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            requestBuffer2.recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 14 buffers required in the channel", 14L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 1 buffer available in local pool", CHECKPOINT_ID, bufferPool.getNumberOfAvailableMemorySegments());
            requestBuffer.recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 14 buffers required in the channel", 14L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 2 buffers available in local pool", 2L, bufferPool.getNumberOfAvailableMemorySegments());
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testAvailableBuffersMoreThanRequiredBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(16, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool bufferPool = (BufferPool) Mockito.spy(networkBufferPool.createBufferPool(14, 14));
            createSingleInputGate.setBufferPool(bufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            Buffer requestBuffer = createRemoteInputChannel.requestBuffer();
            Assert.assertNotNull(requestBuffer);
            Buffer requestBuffer2 = bufferPool.requestBuffer();
            Assert.assertNotNull(requestBuffer2);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).requestBuffer();
            createRemoteInputChannel.onSenderBacklog(12);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 14 buffers required in the channel", 14L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            createRemoteInputChannel.onSenderBacklog(10);
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 12 buffers required in the channel", 12L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 0 buffers available in local pool", 0L, bufferPool.getNumberOfAvailableMemorySegments());
            requestBuffer.recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 12 buffers required in the channel", 12L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 1 buffer available in local pool", CHECKPOINT_ID, bufferPool.getNumberOfAvailableMemorySegments());
            requestBuffer2.recycleBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(14))).requestBuffer();
            ((BufferPool) Mockito.verify(bufferPool, Mockito.times(0))).addBufferListener(createRemoteInputChannel.getBufferManager());
            Assert.assertEquals("There should be 14 buffers available in the channel", 14L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 12 buffers required in the channel", 12L, createRemoteInputChannel.getNumberOfRequiredBuffers());
            Assert.assertEquals("There should be 2 buffers available in local pool", 2L, bufferPool.getNumberOfAvailableMemorySegments());
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, null, null, null, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testFairDistributionFloatingBuffers() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(12, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(3, networkBufferPool);
        RemoteInputChannel[] remoteInputChannelArr = {createRemoteInputChannel(createSingleInputGate), createRemoteInputChannel(createSingleInputGate), createRemoteInputChannel(createSingleInputGate)};
        createSingleInputGate.setInputChannels(remoteInputChannelArr);
        try {
            BufferPool bufferPool = (BufferPool) Mockito.spy(networkBufferPool.createBufferPool(3, 3));
            createSingleInputGate.setBufferPool(bufferPool);
            createSingleInputGate.setupChannels();
            createSingleInputGate.requestPartitions();
            for (RemoteInputChannel remoteInputChannel : remoteInputChannelArr) {
                remoteInputChannel.requestSubpartition(0);
            }
            ArrayList arrayList = new ArrayList(3);
            for (int i = 0; i < 3; i++) {
                Buffer requestBuffer = bufferPool.requestBuffer();
                Assert.assertNotNull(requestBuffer);
                arrayList.add(requestBuffer);
            }
            for (RemoteInputChannel remoteInputChannel2 : remoteInputChannelArr) {
                remoteInputChannel2.onSenderBacklog(8);
                ((BufferPool) Mockito.verify(bufferPool, Mockito.times(1))).addBufferListener(remoteInputChannel2.getBufferManager());
                Assert.assertEquals("There should be 2 buffers available in the channel", 2L, remoteInputChannel2.getNumberOfAvailableBuffers());
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Buffer) it.next()).recycleBuffer();
            }
            for (RemoteInputChannel remoteInputChannel3 : remoteInputChannelArr) {
                Assert.assertEquals("There should be 3 buffers available in the channel", 3L, remoteInputChannel3.getNumberOfAvailableBuffers());
                Assert.assertEquals("There should be 1 unannounced credits in the channel", CHECKPOINT_ID, remoteInputChannel3.getUnannouncedCredit());
            }
            cleanup(networkBufferPool, null, null, null, remoteInputChannelArr);
        } catch (Throwable th) {
            cleanup(networkBufferPool, null, null, th, remoteInputChannelArr);
        }
    }

    @Test
    public void testFailureInNotifyBufferAvailable() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(2, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createRemoteInputChannel.requestSubpartition(0);
        InputChannel createRemoteInputChannel2 = createRemoteInputChannel(createSingleInputGate);
        Buffer buffer = null;
        try {
            BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1);
            createSingleInputGate.setBufferPool(createBufferPool);
            Buffer buffer2 = (Buffer) Preconditions.checkNotNull(createBufferPool.requestBuffer());
            createRemoteInputChannel2.onSenderBacklog(1);
            createRemoteInputChannel.onSenderBacklog(2);
            buffer2.recycleBuffer();
            buffer = null;
            try {
                createRemoteInputChannel2.checkError();
                Assert.fail("The input channel should have an error based on the failure in RemoteInputChannel#notifyBufferAvailable()");
            } catch (IOException e) {
                MatcherAssert.assertThat(e, org.hamcrest.Matchers.hasProperty("cause", org.hamcrest.Matchers.isA(IllegalStateException.class)));
            }
            Assert.assertEquals(0L, createBufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertNull("buffer should still remain in failingRemoteIC", createRemoteInputChannel.requestBuffer());
            createRemoteInputChannel2.releaseAllResources();
            Assert.assertEquals(0L, createBufferPool.getNumberOfAvailableMemorySegments());
            buffer = createRemoteInputChannel.requestBuffer();
            Assert.assertNotNull("no buffer given to successfulRemoteIC", buffer);
            cleanup(networkBufferPool, null, buffer, null, createRemoteInputChannel2, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, null, buffer, null, createRemoteInputChannel2, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testConcurrentOnSenderBacklogAndRelease() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(130, 32);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        final InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(PendingCheckpointTest.MAX_PARALLELISM, PendingCheckpointTest.MAX_PARALLELISM));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    do {
                        for (int i = 1; i <= 128; i++) {
                            createRemoteInputChannel.onSenderBacklog(i);
                        }
                    } while (!createRemoteInputChannel.isReleased());
                    return null;
                }
            }, new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    createRemoteInputChannel.releaseAllResources();
                    return null;
                }
            }});
            Assert.assertEquals("There should be no buffers available in the channel.", 0L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 130 buffers available in local pool.", 130L, r0.getNumberOfAvailableMemorySegments() + networkBufferPool.getNumberOfAvailableMemorySegments());
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testConcurrentOnSenderBacklogAndRecycle() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        final InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, 120);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool createBufferPool = networkBufferPool.createBufferPool(PendingCheckpointTest.MAX_PARALLELISM, PendingCheckpointTest.MAX_PARALLELISM);
            createSingleInputGate.setBufferPool(createBufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{recycleBufferTask(createRemoteInputChannel, createBufferPool, 120, PendingCheckpointTest.MAX_PARALLELISM), new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    for (int i = 1; i <= 128; i++) {
                        createRemoteInputChannel.onSenderBacklog(i);
                    }
                    return null;
                }
            }});
            Assert.assertEquals("There should be " + createRemoteInputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.", createRemoteInputChannel.getNumberOfRequiredBuffers(), createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be no buffers available in local pool.", 0L, createBufferPool.getNumberOfAvailableMemorySegments());
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, newFixedThreadPool, null, th, createRemoteInputChannel);
        }
    }

    @Test
    public void testConcurrentRecycleAndRelease() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(248, 32);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        final InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(createSingleInputGate, 120);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool createBufferPool = networkBufferPool.createBufferPool(PendingCheckpointTest.MAX_PARALLELISM, PendingCheckpointTest.MAX_PARALLELISM);
            createSingleInputGate.setBufferPool(createBufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{recycleBufferTask(createRemoteInputChannel, createBufferPool, 120, PendingCheckpointTest.MAX_PARALLELISM), new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.5
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    createRemoteInputChannel.releaseAllResources();
                    return null;
                }
            }});
            Assert.assertEquals("There should be no buffers available in the channel.", 0L, createRemoteInputChannel.getNumberOfAvailableBuffers());
            Assert.assertEquals("There should be 128 buffers available in local pool.", 128L, createBufferPool.getNumberOfAvailableMemorySegments());
            Assert.assertEquals("There should be 120 buffers available in global pool.", 120L, networkBufferPool.getNumberOfAvailableMemorySegments());
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
            throw th;
        }
    }

    @Test
    public void testConcurrentRecycleAndRelease2() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(4, 32);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        try {
            BufferPool createBufferPool = networkBufferPool.createBufferPool(2, 2);
            createSingleInputGate.setBufferPool(createBufferPool);
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{() -> {
                for (int i = 0; i < 1000; i++) {
                    BufferBuilder requestBufferBuilderBlocking = createBufferPool.requestBufferBuilderBlocking();
                    Throwable th = null;
                    try {
                        try {
                            BufferBuilderTestUtils.buildSingleBuffer(requestBufferBuilderBlocking).recycleBuffer();
                            if (requestBufferBuilderBlocking != null) {
                                if (0 != 0) {
                                    try {
                                        requestBufferBuilderBlocking.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    requestBufferBuilderBlocking.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (requestBufferBuilderBlocking != null) {
                            if (th != null) {
                                try {
                                    requestBufferBuilderBlocking.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                requestBufferBuilderBlocking.close();
                            }
                        }
                        throw th3;
                    }
                }
                return null;
            }, () -> {
                ArrayList arrayList = new ArrayList(2);
                ArrayList arrayList2 = new ArrayList(2);
                for (int i = 0; i < 1000; i++) {
                    for (int i2 = 0; i2 < 4; i2++) {
                        try {
                            Buffer requestBuffer = createRemoteInputChannel.requestBuffer();
                            if (requestBuffer == null) {
                                break;
                            }
                            if (requestBuffer.getRecycler() == createRemoteInputChannel.getBufferManager()) {
                                arrayList.add(requestBuffer);
                            } else {
                                arrayList2.add(requestBuffer);
                            }
                        } finally {
                            createRemoteInputChannel.releaseAllResources();
                        }
                    }
                    arrayList2.forEach((v0) -> {
                        v0.recycleBuffer();
                    });
                    arrayList2.clear();
                    Assert.assertEquals(2L, arrayList.size());
                    createRemoteInputChannel.onSenderBacklog(0);
                    arrayList.forEach((v0) -> {
                        v0.recycleBuffer();
                    });
                    arrayList.clear();
                }
                return null;
            }});
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, newFixedThreadPool, null, th, createRemoteInputChannel);
        }
    }

    @Test
    public void testConcurrentGetNextBufferAndRelease() throws Exception {
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(1000, 32);
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, networkBufferPool);
        InputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate);
        createSingleInputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        try {
            createSingleInputGate.setBufferPool(networkBufferPool.createBufferPool(998, 998));
            createSingleInputGate.setupChannels();
            createRemoteInputChannel.requestSubpartition(0);
            for (int i = 0; i < 1000; i++) {
                createRemoteInputChannel.onBuffer(createRemoteInputChannel.requestBuffer(), i, 0);
            }
            submitTasksAndWaitForResults(newFixedThreadPool, new Callable[]{() -> {
                boolean isReleased;
                AssertionError assertionError;
                for (int i2 = 0; i2 < 1000; i2++) {
                    try {
                        createRemoteInputChannel.getNextBuffer().ifPresent(bufferAndAvailability -> {
                            bufferAndAvailability.buffer().recycleBuffer();
                        });
                    } finally {
                        if (!isReleased) {
                        }
                    }
                }
                return null;
            }, () -> {
                createRemoteInputChannel.releaseAllResources();
                return null;
            }});
            cleanup(networkBufferPool, newFixedThreadPool, null, null, createRemoteInputChannel);
        } catch (Throwable th) {
            cleanup(networkBufferPool, newFixedThreadPool, null, th, createRemoteInputChannel);
        }
    }

    @Test
    public void testPartitionNotFoundExceptionWhileRetriggeringRequest() throws Exception {
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingConnectionManager());
        createRemoteInputChannel.requestSubpartition(0);
        createRemoteInputChannel.retriggerSubpartitionRequest(0);
        try {
            createRemoteInputChannel.checkError();
            Assert.fail("Should throw a PartitionNotFoundException.");
        } catch (PartitionNotFoundException e) {
            MatcherAssert.assertThat(createRemoteInputChannel.getPartitionId(), org.hamcrest.Matchers.is(e.getPartitionId()));
        }
    }

    @Test
    public void testPartitionConnectionExceptionWhileRequestingPartition() throws Exception {
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1), 0, new TestingExceptionConnectionManager());
        try {
            createRemoteInputChannel.requestSubpartition(0);
            Assert.fail("Expected PartitionConnectionException.");
        } catch (PartitionConnectionException e) {
            MatcherAssert.assertThat(createRemoteInputChannel.getPartitionId(), org.hamcrest.Matchers.is(e.getPartitionId()));
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testUnblockReleasedChannel() throws Exception {
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(InputChannelTestUtils.createSingleInputGate(1));
        createRemoteInputChannel.releaseAllResources();
        createRemoteInputChannel.resumeConsumption();
    }

    @Test
    public void testOnUpstreamBlockedAndResumed() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(new TestBufferPool());
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate, 2);
        RemoteInputChannel createRemoteInputChannel2 = createRemoteInputChannel(createSingleInputGate, 0);
        createSingleInputGate.setup();
        createRemoteInputChannel.requestSubpartition(0);
        createRemoteInputChannel2.requestSubpartition(1);
        createRemoteInputChannel.onSenderBacklog(2);
        createRemoteInputChannel2.onSenderBacklog(2);
        Assert.assertEquals(4L, createRemoteInputChannel.getNumberOfAvailableBuffers());
        Assert.assertEquals(2L, createRemoteInputChannel2.getNumberOfAvailableBuffers());
        Buffer buffer = EventSerializer.toBuffer(new CheckpointBarrier(CHECKPOINT_ID, 123L, CheckpointOptions.alignedWithTimeout(CheckpointStorageLocationReference.getDefault(), 2147483647L)), false);
        createRemoteInputChannel.onBuffer(buffer, 0, 0);
        createRemoteInputChannel2.onBuffer(buffer, 0, 0);
        Assert.assertEquals(4L, createRemoteInputChannel.getNumberOfAvailableBuffers());
        Assert.assertEquals(0L, createRemoteInputChannel2.getNumberOfAvailableBuffers());
        createRemoteInputChannel.resumeConsumption();
        createRemoteInputChannel2.resumeConsumption();
        Assert.assertEquals(4L, createRemoteInputChannel.getUnannouncedCredit());
        Assert.assertEquals(0L, createRemoteInputChannel2.getUnannouncedCredit());
        createRemoteInputChannel.onSenderBacklog(4);
        createRemoteInputChannel2.onSenderBacklog(4);
        Assert.assertEquals(6L, createRemoteInputChannel.getNumberOfAvailableBuffers());
        Assert.assertEquals(4L, createRemoteInputChannel2.getNumberOfAvailableBuffers());
        Assert.assertEquals(6L, createRemoteInputChannel.getUnannouncedCredit());
        Assert.assertEquals(4L, createRemoteInputChannel2.getUnannouncedCredit());
    }

    @Test
    public void testRequestBuffer() throws Exception {
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(new TestBufferPool());
        RemoteInputChannel createRemoteInputChannel = createRemoteInputChannel(createSingleInputGate, 2);
        RemoteInputChannel createRemoteInputChannel2 = createRemoteInputChannel(createSingleInputGate, 0);
        createSingleInputGate.setup();
        createRemoteInputChannel.requestSubpartition(0);
        createRemoteInputChannel2.requestSubpartition(1);
        createRemoteInputChannel.onSenderBacklog(2);
        createRemoteInputChannel2.onSenderBacklog(2);
        for (int i = 4; i >= 0; i--) {
            Assert.assertEquals(i, createRemoteInputChannel.getNumberOfRequiredBuffers());
            createRemoteInputChannel.requestBuffer();
        }
        for (int i2 = 2; i2 >= 0; i2--) {
            Assert.assertEquals(i2, createRemoteInputChannel2.getNumberOfRequiredBuffers());
            createRemoteInputChannel2.requestBuffer();
        }
    }

    @Test
    public void testPrioritySequenceNumbers() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, UNALIGNED);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertGetNextBufferSequenceNumbers(buildInputGateAndGetChannel, 2, 0, 1, 3, 4);
    }

    @Test
    public void testGetInflightBuffers() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, UNALIGNED);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 1, 2);
    }

    @Test
    public void testGetAllInflightBuffers() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(2147483645);
        int i = 2147483645 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 2147483645, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        int i6 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i3, i4);
        int i7 = i5 + 1;
        int i8 = i6 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i6);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 1, 2, 3, 4);
    }

    @Test
    public void testGetInflightBuffersOverflow() throws Exception {
        for (int i = 2147483637; i != -2147483646; i++) {
            RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(i);
            int i2 = i;
            int i3 = i2 + 1;
            int i4 = 1 + 1;
            sendBuffer(buildInputGateAndGetChannel, i2, 1);
            int i5 = i3 + 1;
            int i6 = i4 + 1;
            sendBuffer(buildInputGateAndGetChannel, i3, i4);
            int i7 = i5 + 1;
            sendBarrier(buildInputGateAndGetChannel, i5, UNALIGNED);
            int i8 = i7 + 1;
            int i9 = i6 + 1;
            sendBuffer(buildInputGateAndGetChannel, i7, i6);
            int i10 = i8 + 1;
            int i11 = i9 + 1;
            sendBuffer(buildInputGateAndGetChannel, i8, i9);
            MatcherAssert.assertThat("For starting sequence " + i, toBufferSizes(buildInputGateAndGetChannel.getInflightBuffers(CHECKPOINT_ID)), org.hamcrest.Matchers.contains(new Integer[]{1, 2}));
        }
    }

    @Test
    public void testGetInflightBuffersAfterPollingBuffer() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, UNALIGNED);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertGetNextBufferSequenceNumbers(buildInputGateAndGetChannel, 2, 0);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 2);
    }

    private static List<Integer> toBufferSizes(List<Buffer> list) {
        return (List) list.stream().map(buffer -> {
            return Integer.valueOf(buffer.getSize());
        }).collect(Collectors.toList());
    }

    @Test
    public void testRequiresAnnouncement() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, ALIGNED_WITH_TIMEOUT);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        InputChannel.BufferAndAvailability bufferAndAvailability = (InputChannel.BufferAndAvailability) buildInputGateAndGetChannel.getNextBuffer().get();
        Assert.assertEquals(2L, bufferAndAvailability.getSequenceNumber());
        Assert.assertFalse(bufferAndAvailability.morePriorityEvents());
        Assert.assertTrue(bufferAndAvailability.moreAvailable());
        Assert.assertEquals(Buffer.DataType.PRIORITIZED_EVENT_BUFFER, bufferAndAvailability.buffer().getDataType());
        assertGetNextBufferSequenceNumbers(buildInputGateAndGetChannel, 0, 1);
        InputChannel.BufferAndAvailability bufferAndAvailability2 = (InputChannel.BufferAndAvailability) buildInputGateAndGetChannel.getNextBuffer().get();
        Assert.assertEquals(2L, bufferAndAvailability2.getSequenceNumber());
        Assert.assertEquals(Buffer.DataType.TIMEOUTABLE_ALIGNED_CHECKPOINT_BARRIER, bufferAndAvailability2.buffer().getDataType());
        Assert.assertEquals(3L, ((InputChannel.BufferAndAvailability) buildInputGateAndGetChannel.getNextBuffer().get()).getSequenceNumber());
    }

    @Test
    public void testGetInflightBuffersBeforeProcessingAnnouncement() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, ALIGNED_WITH_TIMEOUT);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 1, 2);
    }

    @Test
    public void testGetInflightBuffersAfterProcessingAnnouncement() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, ALIGNED_WITH_TIMEOUT);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertGetNextBufferSequenceNumbers(buildInputGateAndGetChannel, 2);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 1, 2);
    }

    @Test
    public void testGetInflightBuffersAfterProcessingAnnouncementAndBuffer() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel(0);
        int i = 0 + 1;
        int i2 = 1 + 1;
        sendBuffer(buildInputGateAndGetChannel, 0, 1);
        int i3 = i + 1;
        int i4 = i2 + 1;
        sendBuffer(buildInputGateAndGetChannel, i, i2);
        int i5 = i3 + 1;
        sendBarrier(buildInputGateAndGetChannel, i3, ALIGNED_WITH_TIMEOUT);
        int i6 = i5 + 1;
        int i7 = i4 + 1;
        sendBuffer(buildInputGateAndGetChannel, i5, i4);
        int i8 = i6 + 1;
        int i9 = i7 + 1;
        sendBuffer(buildInputGateAndGetChannel, i6, i7);
        assertGetNextBufferSequenceNumbers(buildInputGateAndGetChannel, 2, 0);
        assertInflightBufferSizes(buildInputGateAndGetChannel, 2);
    }

    private void sendBarrier(RemoteInputChannel remoteInputChannel, int i, CheckpointOptions checkpointOptions) throws IOException {
        send(remoteInputChannel, i, EventSerializer.toBuffer(new CheckpointBarrier(CHECKPOINT_ID, 123L, checkpointOptions), checkpointOptions.isUnalignedCheckpoint()));
    }

    private void sendBuffer(RemoteInputChannel remoteInputChannel, int i, int i2) throws IOException {
        send(remoteInputChannel, i, TestBufferFactory.createBuffer(i2));
    }

    private void send(RemoteInputChannel remoteInputChannel, int i, Buffer buffer) throws IOException {
        remoteInputChannel.onBuffer(buffer, i, 0);
        remoteInputChannel.checkError();
    }

    private void assertInflightBufferSizes(RemoteInputChannel remoteInputChannel, Integer... numArr) throws CheckpointException {
        Assert.assertEquals(Arrays.asList(numArr), toBufferSizes(remoteInputChannel.getInflightBuffers(CHECKPOINT_ID)));
    }

    private void assertGetNextBufferSequenceNumbers(RemoteInputChannel remoteInputChannel, Integer... numArr) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numArr.length; i++) {
            Optional map = remoteInputChannel.getNextBuffer().map((v0) -> {
                return v0.getSequenceNumber();
            });
            arrayList.getClass();
            map.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        MatcherAssert.assertThat(arrayList, org.hamcrest.Matchers.contains(numArr));
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate) {
        return createRemoteInputChannel(singleInputGate, 0, 0);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, int i) {
        return InputChannelBuilder.newBuilder().setNetworkBuffersPerChannel(i).buildRemoteChannel(singleInputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, int i, int i2) {
        return InputChannelBuilder.newBuilder().setInitialBackoff(i).setMaxBackoff(i2).buildRemoteChannel(singleInputGate);
    }

    private RemoteInputChannel createRemoteInputChannel(SingleInputGate singleInputGate, ConnectionManager connectionManager, ResultPartitionID resultPartitionID, int i, int i2) {
        return InputChannelBuilder.newBuilder().setInitialBackoff(i).setMaxBackoff(i2).setPartitionId(resultPartitionID).setConnectionManager(connectionManager).buildRemoteChannel(singleInputGate);
    }

    private RemoteInputChannel buildInputGateAndGetChannel(int i) throws IOException {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel();
        buildInputGateAndGetChannel.setExpectedSequenceNumber(i);
        return buildInputGateAndGetChannel;
    }

    private RemoteInputChannel buildInputGateAndGetChannel() throws IOException {
        return buildInputGate().getChannel(0);
    }

    private SingleInputGate buildInputGate() throws IOException {
        MemorySegmentProvider networkBufferPool = new NetworkBufferPool(4, 4096);
        SingleInputGate build = new SingleInputGateBuilder().setChannelFactory((v0, v1) -> {
            return v0.buildRemoteChannel(v1);
        }).setBufferPoolFactory(networkBufferPool.createBufferPool(1, 4)).setSegmentProvider(networkBufferPool).build();
        build.setup();
        build.requestPartitions();
        return build;
    }

    @Test
    public void testOnFailedPartitionRequestDoesNotBlockNetworkThreads() throws Exception {
        PartitionProducerStateProvider build = new TestTaskBuilder(new NettyShuffleEnvironmentBuilder().build()).setPartitionProducerStateChecker((jobID, intermediateDataSetID, resultPartitionID) -> {
            return CompletableFuture.completedFuture(ExecutionState.RUNNING);
        }).build();
        SingleInputGate build2 = new SingleInputGateBuilder().setPartitionProducerStateProvider(build).build();
        TestTaskBuilder.setTaskState(build, ExecutionState.RUNNING);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        final OneShotLatch oneShotLatch2 = new OneShotLatch();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        InputChannel buildRemoteChannel = InputChannelBuilder.newBuilder().setConnectionManager(new TestingConnectionManager() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.6
            @Override // org.apache.flink.runtime.io.network.TestingConnectionManager
            public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionID) {
                oneShotLatch.trigger();
                try {
                    oneShotLatch2.await(30000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException | TimeoutException e) {
                    atomicBoolean.set(true);
                }
                return new TestingPartitionRequestClient();
            }
        }).buildRemoteChannel(build2);
        build2.setInputChannels(new InputChannel[]{buildRemoteChannel});
        Thread thread = new Thread(() -> {
            try {
                oneShotLatch.await();
                buildRemoteChannel.onFailedPartitionRequest();
                oneShotLatch2.trigger();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        thread.start();
        build2.requestPartitions();
        thread.join();
        Assert.assertFalse("Test ended by timeout or interruption - this indicates that the network thread was blocked.", atomicBoolean.get());
    }

    @Test
    public void testNotifyOnPriority() throws IOException {
        SingleInputGate build = new SingleInputGateBuilder().build();
        RemoteInputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(build, 0);
        CheckpointOptions checkpointOptions = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
        AvailabilityUtil.assertPriorityAvailability(build, false, false, () -> {
            AvailabilityUtil.assertAvailability(build, false, true, () -> {
                createRemoteInputChannel.onBuffer(EventSerializer.toBuffer(new CheckpointBarrier(CHECKPOINT_ID, 123L, checkpointOptions), false), 0, 0);
            });
        });
        AvailabilityUtil.assertPriorityAvailability(build, false, true, () -> {
            AvailabilityUtil.assertAvailability(build, true, true, () -> {
                createRemoteInputChannel.onBuffer(EventSerializer.toBuffer(new CheckpointBarrier(2L, 123L, checkpointOptions), true), 1, 0);
            });
        });
    }

    @Test
    public void testBuffersInUseCount() throws Exception {
        RemoteInputChannel buildInputGateAndGetChannel = buildInputGateAndGetChannel();
        Buffer createBuffer = TestBufferFactory.createBuffer(32768);
        buildInputGateAndGetChannel.onBuffer(createBuffer.retainBuffer(), 0, 1);
        Assert.assertEquals(2L, buildInputGateAndGetChannel.getBuffersInUseCount());
        buildInputGateAndGetChannel.onBuffer(createBuffer.retainBuffer(), 1, 3);
        Assert.assertEquals(5L, buildInputGateAndGetChannel.getBuffersInUseCount());
        buildInputGateAndGetChannel.getNextBuffer();
        Assert.assertEquals(4L, buildInputGateAndGetChannel.getBuffersInUseCount());
        buildInputGateAndGetChannel.getNextBuffer();
        Assert.assertEquals(3L, buildInputGateAndGetChannel.getBuffersInUseCount());
        buildInputGateAndGetChannel.getNextBuffer();
        Assert.assertEquals(3L, buildInputGateAndGetChannel.getBuffersInUseCount());
    }

    private Callable<Void> recycleBufferTask(RemoteInputChannel remoteInputChannel, BufferPool bufferPool, int i, int i2) throws Exception {
        final ArrayDeque arrayDeque = new ArrayDeque(i);
        for (int i3 = 0; i3 < i; i3++) {
            Buffer requestBuffer = remoteInputChannel.requestBuffer();
            Assert.assertNotNull(requestBuffer);
            arrayDeque.add(requestBuffer);
        }
        final ArrayDeque arrayDeque2 = new ArrayDeque(i2);
        for (int i4 = 0; i4 < i2; i4++) {
            Buffer requestBuffer2 = bufferPool.requestBuffer();
            Assert.assertNotNull(requestBuffer2);
            arrayDeque2.add(requestBuffer2);
        }
        return new Callable<Void>() { // from class: org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannelTest.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                Random random = new Random();
                while (!arrayDeque.isEmpty() && !arrayDeque2.isEmpty()) {
                    if (random.nextBoolean()) {
                        ((Buffer) arrayDeque.poll()).recycleBuffer();
                    } else {
                        ((Buffer) arrayDeque2.poll()).recycleBuffer();
                    }
                }
                while (!arrayDeque.isEmpty()) {
                    ((Buffer) arrayDeque.poll()).recycleBuffer();
                }
                while (!arrayDeque2.isEmpty()) {
                    ((Buffer) arrayDeque2.poll()).recycleBuffer();
                }
                return null;
            }
        };
    }

    static void submitTasksAndWaitForResults(ExecutorService executorService, Callable[] callableArr) throws Exception {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(callableArr.length);
        for (Callable callable : callableArr) {
            newArrayListWithCapacity.add(executorService.submit(callable));
        }
        Iterator it = newArrayListWithCapacity.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
    }

    public static void cleanup(NetworkBufferPool networkBufferPool, @Nullable ExecutorService executorService, @Nullable Buffer buffer, @Nullable Throwable th, InputChannel... inputChannelArr) throws Exception {
        for (InputChannel inputChannel : inputChannelArr) {
            try {
                inputChannel.releaseAllResources();
            } catch (Throwable th2) {
                th = ExceptionUtils.firstOrSuppressed(th2, th);
            }
        }
        if (buffer != null && !buffer.isRecycled()) {
            buffer.recycleBuffer();
        }
        try {
            networkBufferPool.destroyAllBufferPools();
        } catch (Throwable th3) {
            th = ExceptionUtils.firstOrSuppressed(th3, th);
        }
        try {
            networkBufferPool.destroy();
        } catch (Throwable th4) {
            th = ExceptionUtils.firstOrSuppressed(th4, th);
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        if (th != null) {
            ExceptionUtils.rethrowException(th);
        }
    }
}
