/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;

public class FsCheckpointStateOutputStreamTest {
    private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI());

    @Test(expected=IllegalArgumentException.class)
    public void testWrongParameters() {
        new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000);
    }

    @Test
    public void testEmptyState() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
        StreamStateHandle handle = stream.closeAndGetHandle();
        Assert.assertTrue((handle == null ? 1 : 0) != 0);
    }

    @Test
    public void testStateBelowMemThreshold() throws Exception {
        this.runTest(222, 999, 512, false);
    }

    @Test
    public void testStateOneBufferAboveThreshold() throws Exception {
        this.runTest(896, 1024, 15, true);
    }

    @Test
    public void testStateAboveMemThreshold() throws Exception {
        this.runTest(576446, 259, 17, true);
    }

    @Test
    public void testZeroThreshold() throws Exception {
        this.runTest(16678, 4096, 0, true);
    }

    @Test
    public void testGetPos() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
        for (int i = 0; i < 64; ++i) {
            Assert.assertEquals((long)i, (long)stream.getPos());
            stream.write(66);
        }
        stream.closeAndGetHandle();
        stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 31, 17);
        byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
        for (int i = 0; i < 7; ++i) {
            Assert.assertEquals((long)(i * (1 + data.length)), (long)stream.getPos());
            stream.write(66);
            stream.write(data);
        }
        stream.closeAndGetHandle();
    }

    @Test
    public void testCleanupWhenClosingStream() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), Matchers.anyBoolean())).thenReturn((Object)outputStream);
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, fs, 4, 0);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), Matchers.anyBoolean());
        stream.close();
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)pathCaptor.getValue()), Matchers.anyBoolean());
    }

    @Test
    public void testCleanupWhenFailingCloseAndGetHandle() throws IOException {
        FileSystem fs = (FileSystem)Mockito.mock(FileSystem.class);
        FSDataOutputStream outputStream = (FSDataOutputStream)Mockito.mock(FSDataOutputStream.class);
        ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(Path.class);
        Mockito.when((Object)fs.create((Path)pathCaptor.capture(), Matchers.anyBoolean())).thenReturn((Object)outputStream);
        ((FSDataOutputStream)Mockito.doThrow((Throwable)new IOException("Test IOException.")).when((Object)outputStream)).close();
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, fs, 4, 0);
        stream.write(new byte[]{1, 2, 3, 4, 5});
        ((FileSystem)Mockito.verify((Object)fs)).create((Path)Matchers.any(Path.class), Matchers.anyBoolean());
        try {
            stream.closeAndGetHandle();
            Assert.fail((String)"Expected IOException");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        ((FileSystem)Mockito.verify((Object)fs)).delete((Path)Matchers.eq((Object)pathCaptor.getValue()), Matchers.anyBoolean());
    }

    private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold);
        Random rnd = new Random();
        byte[] original = new byte[numBytes];
        byte[] bytes = new byte[original.length];
        rnd.nextBytes(original);
        System.arraycopy(original, 0, bytes, 0, original.length);
        int pos = 0;
        while (pos < bytes.length) {
            boolean single = rnd.nextBoolean();
            if (single) {
                stream.write((int)bytes[pos++]);
                continue;
            }
            int num = rnd.nextInt(Math.min(10, bytes.length - pos));
            stream.write(bytes, pos, num);
            pos += num;
        }
        StreamStateHandle handle = stream.closeAndGetHandle();
        if (expectFile) {
            Assert.assertTrue((boolean)(handle instanceof FileStateHandle));
        } else {
            Assert.assertTrue((boolean)(handle instanceof ByteStreamStateHandle));
        }
        Assert.assertArrayEquals((byte[])original, (byte[])bytes);
        try (FSDataInputStream inStream = handle.openInputStream();){
            byte[] validation = new byte[bytes.length];
            DataInputStream dataInputStream = new DataInputStream((InputStream)inStream);
            dataInputStream.readFully(validation);
            Assert.assertArrayEquals((byte[])bytes, (byte[])validation);
        }
        handle.discardState();
    }

    @Test
    public void testWriteFailsFastWhenClosed() throws Exception {
        FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512);
        Assert.assertFalse((boolean)stream.isClosed());
        stream.close();
        Assert.assertTrue((boolean)stream.isClosed());
        try {
            stream.write(1);
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        try {
            stream.write(new byte[4], 1, 2);
            Assert.fail();
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }
}

