/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class SingleInputGateTest {
    @Test(timeout=120000L)
    public void testBasicGetNextLogic() throws Exception {
        SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 2, (TaskActions)Mockito.mock(TaskActions.class), (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        Assert.assertEquals((Object)ResultPartitionType.PIPELINED, (Object)inputGate.getConsumedPartitionType());
        TestInputChannel[] inputChannels = new TestInputChannel[]{new TestInputChannel(inputGate, 0), new TestInputChannel(inputGate, 1)};
        inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[0].getInputChannel());
        inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[1].getInputChannel());
        inputChannels[0].readBuffer();
        inputChannels[0].readBuffer();
        inputChannels[1].readBuffer();
        inputChannels[1].readEndOfPartitionEvent();
        inputChannels[0].readEndOfPartitionEvent();
        inputGate.notifyChannelNonEmpty(inputChannels[0].getInputChannel());
        inputGate.notifyChannelNonEmpty(inputChannels[1].getInputChannel());
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 1);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, true, 0);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 1);
        SingleInputGateTest.verifyBufferOrEvent((InputGate)inputGate, false, 0);
        Assert.assertTrue((boolean)inputGate.isFinished());
        Assert.assertNull((Object)inputGate.getNextBufferOrEvent());
    }

    @Test
    public void testBackwardsEventWithUninitializedChannel() throws Exception {
        TaskEventDispatcher taskEventDispatcher = (TaskEventDispatcher)Mockito.mock(TaskEventDispatcher.class);
        Mockito.when((Object)taskEventDispatcher.publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class))).thenReturn((Object)true);
        ResultSubpartitionView iterator = (ResultSubpartitionView)Mockito.mock(ResultSubpartitionView.class);
        Mockito.when((Object)iterator.getNextBuffer()).thenReturn((Object)new Buffer(MemorySegmentFactory.allocateUnpooledSegment((int)1024), (BufferRecycler)Mockito.mock(BufferRecycler.class)));
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        Mockito.when((Object)partitionManager.createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class))).thenReturn((Object)iterator);
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, ResultPartitionType.PIPELINED, 0, 2, (TaskActions)Mockito.mock(TaskActions.class), (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        BufferPool bufferPool = (BufferPool)Mockito.mock(BufferPool.class);
        Mockito.when((Object)bufferPool.getNumberOfRequiredMemorySegments()).thenReturn((Object)2);
        inputGate.setBufferPool(bufferPool);
        ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
        LocalInputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, (ConnectionManager)Mockito.mock(ConnectionManager.class), 0, 0, (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        inputGate.setInputChannel(localPartitionId.getPartitionId(), (InputChannel)local);
        inputGate.setInputChannel(unknownPartitionId.getPartitionId(), (InputChannel)unknown);
        inputGate.requestPartitions();
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.times((int)1))).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
        TestTaskEvent event = new TestTaskEvent();
        inputGate.sendTaskEvent((TaskEvent)event);
        ((TaskEventDispatcher)Mockito.verify((Object)taskEventDispatcher, (VerificationMode)Mockito.times((int)1))).publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class));
        inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(new ResultPartitionID(unknownPartitionId.getPartitionId(), unknownPartitionId.getProducerId()), ResultPartitionLocation.createLocal()));
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.times((int)2))).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
        ((TaskEventDispatcher)Mockito.verify((Object)taskEventDispatcher, (VerificationMode)Mockito.times((int)2))).publish((ResultPartitionID)Matchers.any(ResultPartitionID.class), (TaskEvent)Matchers.any(TaskEvent.class));
    }

    @Test
    public void testUpdateChannelBeforeRequest() throws Exception {
        SingleInputGate inputGate = new SingleInputGate("t1", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 1, (TaskActions)Mockito.mock(TaskActions.class), (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 0, new ResultPartitionID(), partitionManager, new TaskEventDispatcher(), (ConnectionManager)new LocalConnectionManager(), 0, 0, (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        inputGate.setInputChannel(unknown.partitionId.getPartitionId(), (InputChannel)unknown);
        inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(unknown.partitionId, ResultPartitionLocation.createLocal()));
        ((ResultPartitionManager)Mockito.verify((Object)partitionManager, (VerificationMode)Mockito.never())).createSubpartitionView((ResultPartitionID)Matchers.any(ResultPartitionID.class), Matchers.anyInt(), (BufferAvailabilityListener)Matchers.any(BufferAvailabilityListener.class));
    }

    @Test
    public void testReleaseWhilePollingChannel() throws Exception {
        final AtomicReference asyncException = new AtomicReference();
        final SingleInputGate inputGate = new SingleInputGate("InputGate", new JobID(), new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, 1, (TaskActions)Mockito.mock(TaskActions.class), (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        UnknownInputChannel unknown = new UnknownInputChannel(inputGate, 0, new ResultPartitionID(), new ResultPartitionManager(), new TaskEventDispatcher(), (ConnectionManager)new LocalConnectionManager(), 0, 0, (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        inputGate.setInputChannel(unknown.partitionId.getPartitionId(), (InputChannel)unknown);
        Thread asyncConsumer = new Thread(){

            @Override
            public void run() {
                try {
                    inputGate.getNextBufferOrEvent();
                }
                catch (Exception e) {
                    asyncException.set(e);
                }
            }
        };
        asyncConsumer.start();
        boolean success = false;
        for (int i = 0; i < 50; ++i) {
            if (asyncConsumer.isAlive()) {
                boolean bl = success = asyncConsumer.getState() == Thread.State.WAITING;
            }
            if (success) break;
            Thread.sleep(100L);
        }
        Assert.assertTrue((String)"Did not trigger blocking buffer request.", (boolean)success);
        inputGate.releaseAllResources();
        asyncConsumer.join();
        Assert.assertNotNull(asyncException.get());
        Assert.assertEquals(IllegalStateException.class, ((Exception)asyncException.get()).getClass());
    }

    @Test
    public void testRequestBackoffConfiguration() throws Exception {
        InputChannel[] channels;
        ResultPartitionID[] partitionIds = new ResultPartitionID[]{new ResultPartitionID(), new ResultPartitionID(), new ResultPartitionID()};
        InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{new InputChannelDeploymentDescriptor(partitionIds[0], ResultPartitionLocation.createLocal()), new InputChannelDeploymentDescriptor(partitionIds[1], ResultPartitionLocation.createRemote((ConnectionID)new ConnectionID(new InetSocketAddress("localhost", 5000), 0))), new InputChannelDeploymentDescriptor(partitionIds[2], ResultPartitionLocation.createUnknown())};
        InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, channelDescs);
        int initialBackoff = 137;
        int maxBackoff = 1001;
        NetworkEnvironment netEnv = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)netEnv.getResultPartitionManager()).thenReturn((Object)new ResultPartitionManager());
        Mockito.when((Object)netEnv.getTaskEventDispatcher()).thenReturn((Object)new TaskEventDispatcher());
        Mockito.when((Object)netEnv.getPartitionRequestInitialBackoff()).thenReturn((Object)initialBackoff);
        Mockito.when((Object)netEnv.getPartitionRequestMaxBackoff()).thenReturn((Object)maxBackoff);
        Mockito.when((Object)netEnv.getConnectionManager()).thenReturn((Object)new LocalConnectionManager());
        SingleInputGate gate = SingleInputGate.create((String)"TestTask", (JobID)new JobID(), (ExecutionAttemptID)new ExecutionAttemptID(), (InputGateDeploymentDescriptor)gateDesc, (NetworkEnvironment)netEnv, (TaskActions)((TaskActions)Mockito.mock(TaskActions.class)), (TaskIOMetricGroup)new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
        Assert.assertEquals((Object)gateDesc.getConsumedPartitionType(), (Object)gate.getConsumedPartitionType());
        Map channelMap = gate.getInputChannels();
        Assert.assertEquals((long)3L, (long)channelMap.size());
        InputChannel localChannel = (InputChannel)channelMap.get(partitionIds[0].getPartitionId());
        Assert.assertEquals(LocalInputChannel.class, localChannel.getClass());
        InputChannel remoteChannel = (InputChannel)channelMap.get(partitionIds[1].getPartitionId());
        Assert.assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
        InputChannel unknownChannel = (InputChannel)channelMap.get(partitionIds[2].getPartitionId());
        Assert.assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
        for (InputChannel ch : channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel}) {
            Assert.assertEquals((long)0L, (long)ch.getCurrentBackoff());
            Assert.assertTrue((boolean)ch.increaseBackoff());
            Assert.assertEquals((long)initialBackoff, (long)ch.getCurrentBackoff());
            Assert.assertTrue((boolean)ch.increaseBackoff());
            Assert.assertEquals((long)(initialBackoff * 2), (long)ch.getCurrentBackoff());
            Assert.assertTrue((boolean)ch.increaseBackoff());
            Assert.assertEquals((long)(initialBackoff * 2 * 2), (long)ch.getCurrentBackoff());
            Assert.assertTrue((boolean)ch.increaseBackoff());
            Assert.assertEquals((long)maxBackoff, (long)ch.getCurrentBackoff());
            Assert.assertFalse((boolean)ch.increaseBackoff());
        }
    }

    static void verifyBufferOrEvent(InputGate inputGate, boolean isBuffer, int channelIndex) throws IOException, InterruptedException {
        BufferOrEvent boe = inputGate.getNextBufferOrEvent();
        Assert.assertEquals((Object)isBuffer, (Object)boe.isBuffer());
        Assert.assertEquals((long)channelIndex, (long)boe.getChannelIndex());
    }
}

