package org.apache.flink.streaming.runtime.io;

import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.streaming.runtime.io.BufferSpiller;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/BufferSpillerTest.class */
public class BufferSpillerTest {
    private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class);
    private static final int PAGE_SIZE = 4096;
    private static IOManager IO_MANAGER;
    private BufferSpiller spiller;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/BufferSpillerTest$SequenceToConsume.class */
    private static class SequenceToConsume {
        final BufferSpiller.SpilledBufferOrEventSequence sequence;
        final ArrayList<BufferOrEvent> events;
        final Random bufferRnd;
        final int numBuffersAndEvents;
        final int numChannels;

        private SequenceToConsume(Random random, ArrayList<BufferOrEvent> arrayList, BufferSpiller.SpilledBufferOrEventSequence spilledBufferOrEventSequence, int i, int i2) {
            this.bufferRnd = random;
            this.events = arrayList;
            this.sequence = spilledBufferOrEventSequence;
            this.numBuffersAndEvents = i;
            this.numChannels = i2;
        }
    }

    @BeforeClass
    public static void setupIOManager() {
        IO_MANAGER = new IOManagerAsync();
    }

    @AfterClass
    public static void shutdownIOManager() {
        IO_MANAGER.shutdown();
    }

    @Before
    public void createSpiller() {
        try {
            this.spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Cannot create BufferSpiller: " + e.getMessage());
        }
    }

    @After
    public void cleanupSpiller() {
        if (this.spiller != null) {
            try {
                this.spiller.close();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("Cannot properly close the BufferSpiller: " + e.getMessage());
            }
            Assert.assertFalse(this.spiller.getCurrentChannel().isOpen());
            Assert.assertFalse(this.spiller.getCurrentSpillFile().exists());
        }
        checkNoTempFilesRemain();
    }

    @Test
    public void testRollOverEmptySequences() {
        try {
            Assert.assertNull(this.spiller.rollOver());
            Assert.assertNull(this.spiller.rollOver());
            Assert.assertNull(this.spiller.rollOver());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSpillAndRollOverSimple() {
        try {
            Random random = new Random();
            Random random2 = new Random();
            for (int i = 0; i < 5; i++) {
                long nextLong = random.nextLong();
                random2.setSeed(nextLong);
                int nextInt = random.nextInt(3000) + 1;
                int nextInt2 = random.nextInt(1656) + 1;
                ArrayList arrayList = new ArrayList(128);
                for (int i2 = 0; i2 < nextInt; i2++) {
                    if (random.nextDouble() < 0.05d) {
                        BufferOrEvent generateRandomEvent = generateRandomEvent(random, nextInt2);
                        arrayList.add(generateRandomEvent);
                        this.spiller.add(generateRandomEvent);
                    } else {
                        this.spiller.add(generateRandomBuffer(random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2)));
                    }
                }
                random2.setSeed(nextLong);
                BufferSpiller.SpilledBufferOrEventSequence rollOver = this.spiller.rollOver();
                rollOver.open();
                int i3 = 0;
                for (int i4 = 0; i4 < nextInt; i4++) {
                    BufferOrEvent next = rollOver.getNext();
                    Assert.assertNotNull(next);
                    if (next.isEvent()) {
                        int i5 = i3;
                        i3++;
                        Assert.assertEquals(((BufferOrEvent) arrayList.get(i5)).getEvent(), next.getEvent());
                        Assert.assertEquals(r0.getChannelIndex(), next.getChannelIndex());
                    } else {
                        validateBuffer(next, random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2));
                    }
                }
                Assert.assertNull(rollOver.getNext());
                Assert.assertEquals(arrayList.size(), i3);
                rollOver.cleanup();
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSpillWhileReading() {
        LOG.info("Starting SpillWhileReading test");
        try {
            Random random = new Random();
            int i = 0;
            ArrayDeque arrayDeque = new ArrayDeque();
            SequenceToConsume sequenceToConsume = null;
            int i2 = 0;
            int i3 = 0;
            for (int i4 = 0; i4 < 20; i4++) {
                if (i4 % 2 == 1) {
                    Assert.assertNull(this.spiller.rollOver());
                } else {
                    long nextLong = random.nextLong();
                    Random random2 = new Random(nextLong);
                    int nextInt = random.nextInt(30000) + 1;
                    int nextInt2 = random.nextInt(1656) + 1;
                    ArrayList arrayList = new ArrayList(128);
                    int i5 = 0;
                    while (i5 < nextInt) {
                        if (sequenceToConsume == null || random.nextDouble() < 0.5d) {
                            if (random.nextDouble() < 0.05d) {
                                BufferOrEvent generateRandomEvent = generateRandomEvent(random, nextInt2);
                                arrayList.add(generateRandomEvent);
                                this.spiller.add(generateRandomEvent);
                            } else {
                                this.spiller.add(generateRandomBuffer(random2.nextInt(PAGE_SIZE) + 1, random2.nextInt(nextInt2)));
                            }
                            i5++;
                        } else {
                            BufferOrEvent next = sequenceToConsume.sequence.getNext();
                            Assert.assertNotNull(next);
                            if (next.isEvent()) {
                                int i6 = i2;
                                i2++;
                                Assert.assertEquals(sequenceToConsume.events.get(i6).getEvent(), next.getEvent());
                                Assert.assertEquals(r0.getChannelIndex(), next.getChannelIndex());
                            } else {
                                Random random3 = sequenceToConsume.bufferRnd;
                                validateBuffer(next, random3.nextInt(PAGE_SIZE) + 1, random3.nextInt(sequenceToConsume.numChannels));
                            }
                            i3++;
                            if (i3 == sequenceToConsume.numBuffersAndEvents) {
                                sequenceToConsume.sequence.cleanup();
                                i++;
                                Assert.assertEquals(sequenceToConsume.events.size(), i2);
                                sequenceToConsume = (SequenceToConsume) arrayDeque.pollFirst();
                                if (sequenceToConsume != null) {
                                    sequenceToConsume.sequence.open();
                                }
                                i3 = 0;
                                i2 = 0;
                            }
                        }
                    }
                    random2.setSeed(nextLong);
                    SequenceToConsume sequenceToConsume2 = new SequenceToConsume(random2, arrayList, this.spiller.rollOver(), nextInt, nextInt2);
                    if (sequenceToConsume == null) {
                        sequenceToConsume = sequenceToConsume2;
                        sequenceToConsume2.sequence.open();
                    } else {
                        arrayDeque.addLast(sequenceToConsume2);
                    }
                }
            }
            while (sequenceToConsume != null) {
                BufferOrEvent next2 = sequenceToConsume.sequence.getNext();
                Assert.assertNotNull(next2);
                if (next2.isEvent()) {
                    int i7 = i2;
                    i2++;
                    Assert.assertEquals(sequenceToConsume.events.get(i7).getEvent(), next2.getEvent());
                    Assert.assertEquals(r0.getChannelIndex(), next2.getChannelIndex());
                } else {
                    Random random4 = sequenceToConsume.bufferRnd;
                    validateBuffer(next2, random4.nextInt(PAGE_SIZE) + 1, random4.nextInt(sequenceToConsume.numChannels));
                }
                i3++;
                if (i3 == sequenceToConsume.numBuffersAndEvents) {
                    sequenceToConsume.sequence.cleanup();
                    i++;
                    Assert.assertEquals(sequenceToConsume.events.size(), i2);
                    sequenceToConsume = (SequenceToConsume) arrayDeque.pollFirst();
                    if (sequenceToConsume != null) {
                        sequenceToConsume.sequence.open();
                    }
                    i3 = 0;
                    i2 = 0;
                }
            }
            Assert.assertEquals(10L, i);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testHeaderSizeStaticField() throws Exception {
        this.spiller.add(generateRandomBuffer(13, 0));
        Assert.assertEquals("Changed the header format, but did not adjust the HEADER_SIZE field", 9 + 13, this.spiller.getBytesWritten());
    }

    private static BufferOrEvent generateRandomEvent(Random random, int i) {
        long nextLong = random.nextLong();
        byte[] bArr = new byte[random.nextInt(1000)];
        random.nextBytes(bArr);
        return new BufferOrEvent(new TestEvent(nextLong, bArr), random.nextInt(i));
    }

    private static BufferOrEvent generateRandomBuffer(int i, int i2) {
        MemorySegment allocateUnpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
        for (int i3 = 0; i3 < i; i3++) {
            allocateUnpooledSegment.put(i3, (byte) i3);
        }
        Buffer buffer = new Buffer(allocateUnpooledSegment, FreeingBufferRecycler.INSTANCE);
        buffer.setSize(i);
        return new BufferOrEvent(buffer, i2);
    }

    private static void validateBuffer(BufferOrEvent bufferOrEvent, int i, int i2) {
        Assert.assertEquals("wrong channel index", i2, bufferOrEvent.getChannelIndex());
        Assert.assertTrue("is not buffer", bufferOrEvent.isBuffer());
        Buffer buffer = bufferOrEvent.getBuffer();
        Assert.assertEquals("wrong buffer size", i, buffer.getSize());
        MemorySegment memorySegment = buffer.getMemorySegment();
        for (int i3 = 0; i3 < i; i3++) {
            byte b = (byte) i3;
            if (b != memorySegment.get(i3)) {
                Assert.fail(String.format("wrong buffer contents at position %s : expected=%d , found=%d", Integer.valueOf(i3), Byte.valueOf(b), Byte.valueOf(memorySegment.get(i3))));
            }
        }
    }

    private static void checkNoTempFilesRemain() {
        for (File file : IO_MANAGER.getSpillingDirectories()) {
            for (String str : file.list()) {
                if (str != null && !str.equals(".") && !str.equals("..")) {
                    Assert.fail("barrier buffer did not clean up temp files. remaining file: " + str);
                }
            }
        }
    }
}
