/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartition;
import org.apache.flink.runtime.io.network.partition.SpillableSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SpilledSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class SpillableSubpartitionTest
extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();
    private static final IOManager ioManager = new IOManagerAsync();

    @AfterClass
    public static void shutdown() {
        executorService.shutdownNow();
        ioManager.shutdown();
    }

    SpillableSubpartition createSubpartition() {
        ResultPartition parent = (ResultPartition)Mockito.mock(ResultPartition.class);
        BufferProvider bufferProvider = (BufferProvider)Mockito.mock(BufferProvider.class);
        Mockito.when((Object)parent.getBufferProvider()).thenReturn((Object)bufferProvider);
        Mockito.when((Object)bufferProvider.getMemorySegmentSize()).thenReturn((Object)32768);
        return new SpillableSubpartition(0, parent, ioManager);
    }

    @Test
    public void testConcurrentFinishAndReleaseMemory() throws Exception {
        final CountDownLatch doneLatch = new CountDownLatch(1);
        final CountDownLatch blockLatch = new CountDownLatch(1);
        AsynchronousBufferFileWriter spillWriter = (AsynchronousBufferFileWriter)Mockito.mock(AsynchronousBufferFileWriter.class);
        ((AsynchronousBufferFileWriter)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                blockLatch.countDown();
                doneLatch.await();
                return null;
            }
        }).when((Object)spillWriter)).close();
        IOManager ioManager = (IOManager)Mockito.mock(IOManager.class);
        Mockito.when((Object)ioManager.createBufferFileWriter((FileIOChannel.ID)Matchers.any(FileIOChannel.ID.class))).thenReturn((Object)spillWriter);
        final SpillableSubpartition partition = new SpillableSubpartition(0, (ResultPartition)Mockito.mock(ResultPartition.class), ioManager);
        Assert.assertEquals((long)0L, (long)partition.releaseMemory());
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Void> blockingFinish = executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                partition.finish();
                return null;
            }
        });
        blockLatch.await();
        partition.releaseMemory();
        doneLatch.countDown();
        blockingFinish.get();
    }

    @Test
    public void testReleasePartitionAndGetNext() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        partition.finish();
        ResultSubpartitionView readView = (ResultSubpartitionView)Mockito.spy((Object)partition.createReadView(new BufferAvailabilityListener(){

            public void notifyBuffersAvailable(long numBuffers) {
            }
        }));
        ((ResultSubpartitionView)Mockito.doNothing().when((Object)readView)).releaseAllResources();
        partition.release();
        Assert.assertNull((Object)readView.getNextBuffer());
    }

    @Test
    public void testConsumeSpilledPartition() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        partition.add(buffer);
        partition.add(buffer);
        partition.add(buffer);
        Assert.assertEquals((long)3L, (long)partition.releaseMemory());
        partition.finish();
        BufferAvailabilityListener listener = (BufferAvailabilityListener)Mockito.mock(BufferAvailabilityListener.class);
        SpilledSubpartitionView reader = (SpilledSubpartitionView)partition.createReadView(listener);
        ((BufferAvailabilityListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).notifyBuffersAvailable(Matchers.eq((long)4L));
        Buffer read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer((Buffer)read, (ClassLoader)ClassLoader.getSystemClassLoader()).getClass());
        read.recycle();
    }

    @Test
    public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
        SpillableSubpartition partition = this.createSubpartition();
        Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)4096), FreeingBufferRecycler.INSTANCE);
        buffer.retain();
        buffer.retain();
        partition.add(buffer);
        partition.add(buffer);
        partition.add(buffer);
        partition.finish();
        AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
        SpillableSubpartitionView reader = (SpillableSubpartitionView)partition.createReadView((BufferAvailabilityListener)listener);
        Assert.assertEquals((long)1L, (long)listener.getNumNotifiedBuffers());
        Buffer read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        Assert.assertEquals((long)2L, (long)listener.getNumNotifiedBuffers());
        Assert.assertEquals((long)2L, (long)partition.releaseMemory());
        listener.awaitNotifications(4L, 30000L);
        Assert.assertEquals((long)4L, (long)listener.getNumNotifiedBuffers());
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        read.recycle();
        read = reader.getNextBuffer();
        Assert.assertNotNull((Object)read);
        Assert.assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer((Buffer)read, (ClassLoader)ClassLoader.getSystemClassLoader()).getClass());
        read.recycle();
    }

    private static class AwaitableBufferAvailablityListener
    implements BufferAvailabilityListener {
        private long numNotifiedBuffers;

        private AwaitableBufferAvailablityListener() {
        }

        public void notifyBuffersAvailable(long numBuffers) {
            this.numNotifiedBuffers += numBuffers;
        }

        long getNumNotifiedBuffers() {
            return this.numNotifiedBuffers;
        }

        void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException {
            long deadline = System.currentTimeMillis() + timeoutMillis;
            while (this.numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) {
                Thread.sleep(1L);
            }
        }
    }
}

