/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.PartitionRequestProtocol;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class CancelPartitionRequestTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelPartitionRequest() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
            ResultPartitionManager partitions = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
            ResultPartitionID pid = new ResultPartitionID();
            CountDownLatch sync = new CountDownLatch(1);
            final ResultSubpartitionView view = (ResultSubpartitionView)Mockito.spy((Object)new InfiniteSubpartitionView(outboundBuffers, sync));
            Mockito.when((Object)partitions.createSubpartitionView((ResultPartitionID)Matchers.eq((Object)pid), Matchers.eq((int)0), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenAnswer((Answer)new Answer<ResultSubpartitionView>(){

                public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
                    BufferAvailabilityListener listener = (BufferAvailabilityListener)invocationOnMock.getArguments()[2];
                    listener.notifyBuffersAvailable(Long.MAX_VALUE);
                    return view;
                }
            });
            PartitionRequestProtocol protocol = new PartitionRequestProtocol((ResultPartitionProvider)partitions, (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class));
            serverAndClient = NettyTestUtil.initServerAndClient((NettyProtocol)protocol);
            Channel ch = NettyTestUtil.connect(serverAndClient);
            ch.writeAndFlush((Object)new NettyMessage.PartitionRequest(pid, 0, new InputChannelID())).await();
            if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
                Assert.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about cancelled partition."));
            }
            ((ResultSubpartitionView)Mockito.verify((Object)view, (VerificationMode)Mockito.times((int)1))).releaseAllResources();
            ((ResultSubpartitionView)Mockito.verify((Object)view, (VerificationMode)Mockito.times((int)0))).notifySubpartitionConsumed();
        }
        catch (Throwable throwable) {
            NettyTestUtil.shutdown(serverAndClient);
            throw throwable;
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicateCancel() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
            ResultPartitionManager partitions = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
            ResultPartitionID pid = new ResultPartitionID();
            CountDownLatch sync = new CountDownLatch(1);
            final ResultSubpartitionView view = (ResultSubpartitionView)Mockito.spy((Object)new InfiniteSubpartitionView(outboundBuffers, sync));
            Mockito.when((Object)partitions.createSubpartitionView((ResultPartitionID)Matchers.eq((Object)pid), Matchers.eq((int)0), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenAnswer((Answer)new Answer<ResultSubpartitionView>(){

                public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws Throwable {
                    BufferAvailabilityListener listener = (BufferAvailabilityListener)invocationOnMock.getArguments()[2];
                    listener.notifyBuffersAvailable(Long.MAX_VALUE);
                    return view;
                }
            });
            PartitionRequestProtocol protocol = new PartitionRequestProtocol((ResultPartitionProvider)partitions, (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class));
            serverAndClient = NettyTestUtil.initServerAndClient((NettyProtocol)protocol);
            Channel ch = NettyTestUtil.connect(serverAndClient);
            InputChannelID inputChannelId = new InputChannelID();
            ch.writeAndFlush((Object)new NettyMessage.PartitionRequest(pid, 0, inputChannelId)).await();
            if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), TimeUnit.MILLISECONDS)) {
                Assert.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION().toMillis() + " ms to be notified about cancelled partition."));
            }
            ch.writeAndFlush((Object)new NettyMessage.CancelPartitionRequest(inputChannelId)).await();
            ch.close();
            NettyTestUtil.awaitClose(ch);
            ((ResultSubpartitionView)Mockito.verify((Object)view, (VerificationMode)Mockito.times((int)1))).releaseAllResources();
            ((ResultSubpartitionView)Mockito.verify((Object)view, (VerificationMode)Mockito.times((int)0))).notifySubpartitionConsumed();
        }
        catch (Throwable throwable) {
            NettyTestUtil.shutdown(serverAndClient);
            throw throwable;
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    static class InfiniteSubpartitionView
    implements ResultSubpartitionView {
        private final BufferProvider bufferProvider;
        private final CountDownLatch sync;

        public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) {
            this.bufferProvider = (BufferProvider)Preconditions.checkNotNull((Object)bufferProvider);
            this.sync = (CountDownLatch)Preconditions.checkNotNull((Object)sync);
        }

        public Buffer getNextBuffer() throws IOException, InterruptedException {
            return this.bufferProvider.requestBufferBlocking();
        }

        public void notifyBuffersAvailable(long buffers) throws IOException {
        }

        public void releaseAllResources() throws IOException {
            this.sync.countDown();
        }

        public void notifySubpartitionConsumed() throws IOException {
        }

        public boolean isReleased() {
            return false;
        }

        public Throwable getFailureCause() {
            return null;
        }
    }
}

