package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import junit.framework.TestCase;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.class */
public class NettyMessageClientDecoderDelegateTest extends TestLogger {
    private static final int BUFFER_SIZE = 1024;
    private static final int NUMBER_OF_BUFFER_RESPONSES = 5;
    private static final NettyBufferPool ALLOCATOR = new NettyBufferPool(1);
    private EmbeddedChannel channel;
    private NetworkBufferPool networkBufferPool;
    private SingleInputGate inputGate;
    private InputChannelID inputChannelId;
    private InputChannelID releasedInputChannelId;

    @Before
    public void setup() throws IOException, InterruptedException {
        CreditBasedPartitionRequestClientHandler creditBasedPartitionRequestClientHandler = new CreditBasedPartitionRequestClientHandler();
        this.networkBufferPool = new NetworkBufferPool(NUMBER_OF_BUFFER_RESPONSES, BUFFER_SIZE);
        this.channel = new EmbeddedChannel(new ChannelHandler[]{new NettyMessageClientDecoderDelegate(creditBasedPartitionRequestClientHandler)});
        this.inputGate = InputChannelTestUtils.createSingleInputGate(1, this.networkBufferPool);
        InputChannel createRemoteInputChannel = InputChannelTestUtils.createRemoteInputChannel(this.inputGate, new TestingPartitionRequestClient(), NUMBER_OF_BUFFER_RESPONSES);
        this.inputGate.setInputChannels(new InputChannel[]{createRemoteInputChannel});
        this.inputGate.setup();
        createRemoteInputChannel.requestSubpartition();
        creditBasedPartitionRequestClientHandler.addInputChannel(createRemoteInputChannel);
        this.inputChannelId = createRemoteInputChannel.getInputChannelId();
        SingleInputGate createSingleInputGate = InputChannelTestUtils.createSingleInputGate(1, this.networkBufferPool);
        RemoteInputChannel buildRemoteChannel = new InputChannelBuilder().buildRemoteChannel(this.inputGate);
        createSingleInputGate.close();
        creditBasedPartitionRequestClientHandler.addInputChannel(buildRemoteChannel);
        this.releasedInputChannelId = buildRemoteChannel.getInputChannelId();
    }

    @After
    public void tearDown() throws IOException {
        if (this.inputGate != null) {
            this.inputGate.close();
        }
        if (this.networkBufferPool != null) {
            this.networkBufferPool.destroyAllBufferPools();
            this.networkBufferPool.destroy();
        }
        if (this.channel != null) {
            this.channel.close();
        }
    }

    @Test
    public void testClientMessageDecode() throws Exception {
        testNettyMessageClientDecoding(false, false, false);
    }

    @Test
    public void testClientMessageDecodeWithEmptyBuffers() throws Exception {
        testNettyMessageClientDecoding(true, false, false);
    }

    @Test
    public void testClientMessageDecodeWithReleasedInputChannel() throws Exception {
        testNettyMessageClientDecoding(false, true, false);
    }

    @Test
    public void testClientMessageDecodeWithRemovedInputChannel() throws Exception {
        testNettyMessageClientDecoding(false, false, true);
    }

    private void testNettyMessageClientDecoding(boolean z, boolean z2, boolean z3) throws Exception {
        ByteBuf[] byteBufArr = null;
        List<NettyMessage> list = null;
        try {
            List<NettyMessage.BufferResponse> createMessageList = createMessageList(z, z2, z3);
            byteBufArr = encodeMessages(createMessageList);
            list = decodeMessages(this.channel, repartitionMessages(byteBufArr));
            verifyDecodedMessages(createMessageList, list);
            releaseBuffers(byteBufArr);
            if (list != null) {
                Iterator<NettyMessage> it = list.iterator();
                while (it.hasNext()) {
                    NettyMessage.BufferResponse bufferResponse = (NettyMessage) it.next();
                    if (bufferResponse instanceof NettyMessage.BufferResponse) {
                        bufferResponse.releaseBuffer();
                    }
                }
            }
        } catch (Throwable th) {
            releaseBuffers(byteBufArr);
            if (list != null) {
                Iterator<NettyMessage> it2 = list.iterator();
                while (it2.hasNext()) {
                    NettyMessage.BufferResponse bufferResponse2 = (NettyMessage) it2.next();
                    if (bufferResponse2 instanceof NettyMessage.BufferResponse) {
                        bufferResponse2.releaseBuffer();
                    }
                }
            }
            throw th;
        }
    }

    private List<NettyMessage.BufferResponse> createMessageList(boolean z, boolean z2, boolean z3) {
        int i = 1;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < 4; i2++) {
            int i3 = i;
            i++;
            addBufferResponse(arrayList, this.inputChannelId, Buffer.DataType.DATA_BUFFER, BUFFER_SIZE, i3);
        }
        if (z) {
            int i4 = i;
            i++;
            addBufferResponse(arrayList, this.inputChannelId, Buffer.DataType.DATA_BUFFER, 0, i4);
        }
        if (z3) {
            int i5 = i;
            i++;
            addBufferResponse(arrayList, this.releasedInputChannelId, Buffer.DataType.DATA_BUFFER, BUFFER_SIZE, i5);
        }
        if (z2) {
            int i6 = i;
            i++;
            addBufferResponse(arrayList, new InputChannelID(), Buffer.DataType.DATA_BUFFER, BUFFER_SIZE, i6);
        }
        addBufferResponse(arrayList, this.inputChannelId, Buffer.DataType.EVENT_BUFFER, 32, i);
        addBufferResponse(arrayList, this.inputChannelId, Buffer.DataType.DATA_BUFFER, BUFFER_SIZE, i + 1);
        return arrayList;
    }

    private void addBufferResponse(List<NettyMessage.BufferResponse> list, InputChannelID inputChannelID, Buffer.DataType dataType, int i, int i2) {
        list.add(new NettyMessage.BufferResponse(createDataBuffer(i, dataType), i2, inputChannelID, 1));
    }

    private Buffer createDataBuffer(int i, Buffer.DataType dataType) {
        NetworkBuffer networkBuffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(i), FreeingBufferRecycler.INSTANCE, dataType);
        for (int i2 = 0; i2 < i / 4; i2++) {
            networkBuffer.writeInt(i2);
        }
        return networkBuffer;
    }

    private ByteBuf[] encodeMessages(List<NettyMessage.BufferResponse> list) throws Exception {
        ByteBuf[] byteBufArr = new ByteBuf[list.size()];
        for (int i = 0; i < list.size(); i++) {
            byteBufArr[i] = list.get(i).write(ALLOCATOR);
        }
        return byteBufArr;
    }

    private List<ByteBuf> repartitionMessages(ByteBuf[] byteBufArr) {
        ArrayList arrayList = new ArrayList();
        ByteBuf byteBuf = null;
        ByteBuf byteBuf2 = null;
        try {
            try {
                byteBuf = mergeBuffers(byteBufArr, 0, byteBufArr.length / 2);
                byteBuf2 = mergeBuffers(byteBufArr, byteBufArr.length / 2, byteBufArr.length);
                arrayList.addAll(partitionBuffer(byteBuf, 2048));
                arrayList.addAll(partitionBuffer(byteBuf2, 256));
                releaseBuffers(byteBuf, byteBuf2);
            } catch (Throwable th) {
                releaseBuffers((ByteBuf[]) arrayList.toArray(new ByteBuf[0]));
                ExceptionUtils.rethrow(th);
                releaseBuffers(byteBuf, byteBuf2);
            }
            return arrayList;
        } catch (Throwable th2) {
            releaseBuffers(byteBuf, byteBuf2);
            throw th2;
        }
    }

    private ByteBuf mergeBuffers(ByteBuf[] byteBufArr, int i, int i2) {
        ByteBuf buffer = ALLOCATOR.buffer();
        for (int i3 = i; i3 < i2; i3++) {
            buffer.writeBytes(byteBufArr[i3]);
        }
        return buffer;
    }

    private List<ByteBuf> partitionBuffer(ByteBuf byteBuf, int i) {
        ArrayList arrayList = new ArrayList();
        try {
            int readableBytes = byteBuf.readableBytes();
            int i2 = 0;
            while (i2 < readableBytes) {
                int min = Math.min(i2 + i, readableBytes);
                ByteBuf buffer = ALLOCATOR.buffer(min - i2);
                buffer.writeBytes(byteBuf, i2, min - i2);
                arrayList.add(buffer);
                i2 += i;
            }
        } catch (Throwable th) {
            releaseBuffers((ByteBuf[]) arrayList.toArray(new ByteBuf[0]));
            ExceptionUtils.rethrow(th);
        }
        return arrayList;
    }

    private List<NettyMessage> decodeMessages(EmbeddedChannel embeddedChannel, List<ByteBuf> list) {
        Iterator<ByteBuf> it = list.iterator();
        while (it.hasNext()) {
            embeddedChannel.writeInbound(new Object[]{it.next()});
        }
        embeddedChannel.runPendingTasks();
        ArrayList arrayList = new ArrayList();
        while (true) {
            Object readInbound = embeddedChannel.readInbound();
            if (readInbound == null) {
                return arrayList;
            }
            TestCase.assertTrue(readInbound instanceof NettyMessage);
            arrayList.add((NettyMessage) readInbound);
        }
    }

    private void verifyDecodedMessages(List<NettyMessage.BufferResponse> list, List<NettyMessage> list2) {
        TestCase.assertEquals(list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            TestCase.assertEquals(list.get(i).getClass(), list2.get(i).getClass());
            NettyMessage.BufferResponse bufferResponse = list.get(i);
            NettyMessage.BufferResponse bufferResponse2 = list2.get(i);
            NettyTestUtil.verifyBufferResponseHeader(bufferResponse, bufferResponse2);
            if (bufferResponse.bufferSize == 0 || !bufferResponse.receiverId.equals(this.inputChannelId)) {
                Assert.assertNull(bufferResponse2.getBuffer());
            } else {
                TestCase.assertEquals(bufferResponse.getBuffer(), bufferResponse2.getBuffer());
            }
        }
    }

    private void releaseBuffers(@Nullable ByteBuf... byteBufArr) {
        if (byteBufArr != null) {
            for (ByteBuf byteBuf : byteBufArr) {
                if (byteBuf != null) {
                    byteBuf.release();
                }
            }
        }
    }
}
