package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.class */
public class SpilledSubpartitionViewTest {
    private static final IOManager IO_MANAGER = new IOManagerAsync();
    private static final TestInfiniteBufferProvider writerBufferPool = new TestInfiniteBufferProvider();

    @AfterClass
    public static void shutdown() {
        IO_MANAGER.shutdown();
    }

    @Test
    public void testWriteConsume() throws Exception {
        BufferFileWriter createWriterAndWriteBuffers = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512);
        createWriterAndWriteBuffers.close();
        TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(1);
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
        testSubpartitionConsumer.setSubpartitionView(new SpilledSubpartitionView((ResultSubpartition) Mockito.mock(ResultSubpartition.class), testPooledBufferProvider.getMemorySegmentSize(), createWriterAndWriteBuffers, 513L, testSubpartitionConsumer));
        testSubpartitionConsumer.call();
    }

    @Test
    public void testConsumeWithFewBuffers() throws Exception {
        BufferFileWriter createWriterAndWriteBuffers = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512);
        createWriterAndWriteBuffers.close();
        TestSubpartitionConsumer testSubpartitionConsumer = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
        testSubpartitionConsumer.setSubpartitionView(new SpilledSubpartitionView((ResultSubpartition) Mockito.mock(ResultSubpartition.class), TestBufferFactory.BUFFER_SIZE, createWriterAndWriteBuffers, 513L, testSubpartitionConsumer));
        testSubpartitionConsumer.call();
    }

    @Test
    public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
        ExecutorService executorService = null;
        BufferFileWriter[] bufferFileWriterArr = null;
        ResultSubpartitionView[] resultSubpartitionViewArr = null;
        try {
            executorService = Executors.newCachedThreadPool();
            bufferFileWriterArr = new BufferFileWriter[]{createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512), createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512)};
            resultSubpartitionViewArr = new ResultSubpartitionView[bufferFileWriterArr.length];
            TestSubpartitionConsumer[] testSubpartitionConsumerArr = new TestSubpartitionConsumer[bufferFileWriterArr.length];
            TestPooledBufferProvider testPooledBufferProvider = new TestPooledBufferProvider(2);
            ResultSubpartition resultSubpartition = (ResultSubpartition) Mockito.mock(ResultSubpartition.class);
            for (BufferFileWriter bufferFileWriter : bufferFileWriterArr) {
                bufferFileWriter.close();
            }
            for (int i = 0; i < resultSubpartitionViewArr.length; i++) {
                testSubpartitionConsumerArr[i] = new TestSubpartitionConsumer(false, new TestConsumerCallback.RecyclingCallback());
                resultSubpartitionViewArr[i] = new SpilledSubpartitionView(resultSubpartition, testPooledBufferProvider.getMemorySegmentSize(), bufferFileWriterArr[i], 513L, testSubpartitionConsumerArr[i]);
                testSubpartitionConsumerArr[i].setSubpartitionView(resultSubpartitionViewArr[i]);
            }
            ArrayList newArrayList = Lists.newArrayList();
            for (TestSubpartitionConsumer testSubpartitionConsumer : testSubpartitionConsumerArr) {
                newArrayList.add(executorService.submit(testSubpartitionConsumer));
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                try {
                    ((Future) it.next()).get(2L, TimeUnit.MINUTES);
                } catch (TimeoutException e) {
                    throw new TimeoutException("There has been a timeout in the test. This indicates that there is a bug/deadlock in the tested subpartition view.");
                }
            }
            if (bufferFileWriterArr != null) {
                for (BufferFileWriter bufferFileWriter2 : bufferFileWriterArr) {
                    if (bufferFileWriter2 != null) {
                        bufferFileWriter2.deleteChannel();
                    }
                }
            }
            if (resultSubpartitionViewArr != null) {
                for (ResultSubpartitionView resultSubpartitionView : resultSubpartitionViewArr) {
                    if (resultSubpartitionView != null) {
                        resultSubpartitionView.releaseAllResources();
                    }
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
        } catch (Throwable th) {
            if (bufferFileWriterArr != null) {
                for (BufferFileWriter bufferFileWriter3 : bufferFileWriterArr) {
                    if (bufferFileWriter3 != null) {
                        bufferFileWriter3.deleteChannel();
                    }
                }
            }
            if (resultSubpartitionViewArr != null) {
                for (ResultSubpartitionView resultSubpartitionView2 : resultSubpartitionViewArr) {
                    if (resultSubpartitionView2 != null) {
                        resultSubpartitionView2.releaseAllResources();
                    }
                }
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            throw th;
        }
    }

    static BufferFileWriter createWriterAndWriteBuffers(IOManager iOManager, BufferProvider bufferProvider, int i) throws IOException {
        BufferFileWriter createBufferFileWriter = iOManager.createBufferFileWriter(iOManager.createChannel());
        for (int i2 = 0; i2 < i; i2++) {
            createBufferFileWriter.writeBlock(bufferProvider.requestBuffer());
        }
        createBufferFileWriter.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
        return createBufferFileWriter;
    }
}
