package org.apache.flink.contrib.streaming.state;

import java.io.File;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.IOUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.class */
public class RocksDBStateBackendConfigTest {

    @Rule
    public final TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest$TestOptionsFactory.class */
    public static class TestOptionsFactory implements ConfigurableRocksDBOptionsFactory {
        public static final ConfigOption<Integer> BACKGROUND_JOBS_OPTION = ConfigOptions.key("my.custom.rocksdb.backgroundJobs").intType().defaultValue(2);
        private int backgroundJobs = ((Integer) BACKGROUND_JOBS_OPTION.defaultValue()).intValue();

        public DBOptions createDBOptions(DBOptions dBOptions, Collection<AutoCloseable> collection) {
            return dBOptions.setMaxBackgroundJobs(this.backgroundJobs);
        }

        public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions, Collection<AutoCloseable> collection) {
            return columnFamilyOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
        }

        public RocksDBOptionsFactory configure(ReadableConfig readableConfig) {
            this.backgroundJobs = ((Integer) readableConfig.get(BACKGROUND_JOBS_OPTION)).intValue();
            return this;
        }
    }

    @Test
    public void testDefaultsInSync() throws Exception {
        Assert.assertEquals(Boolean.valueOf(((Boolean) CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue()), Boolean.valueOf(new EmbeddedRocksDBStateBackend().isIncrementalCheckpointsEnabled()));
    }

    @Test
    public void testDefaultDbLogDir() throws Exception {
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        File createTempFile = File.createTempFile(getClass().getSimpleName() + "-", ".log");
        System.setProperty("log.file", createTempFile.getPath());
        try {
            RocksDBResourceContainer createOptionsAndResourceContainer = embeddedRocksDBStateBackend.createOptionsAndResourceContainer((File) null);
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals(RocksDBConfigurableOptions.LOG_LEVEL.defaultValue(), createOptionsAndResourceContainer.getDbOptions().infoLogLevel());
                    Assert.assertEquals(createTempFile.getParent(), createOptionsAndResourceContainer.getDbOptions().dbLogDir());
                    if (createOptionsAndResourceContainer != null) {
                        if (0 != 0) {
                            try {
                                createOptionsAndResourceContainer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createOptionsAndResourceContainer.close();
                        }
                    }
                    StringBuilder sb = new StringBuilder(this.tempFolder.newFolder().getAbsolutePath());
                    while (sb.length() < 255) {
                        sb.append("/append-for-long-path");
                    }
                    try {
                        RocksDBResourceContainer createOptionsAndResourceContainer2 = embeddedRocksDBStateBackend.createOptionsAndResourceContainer(new File(sb.toString()));
                        Throwable th3 = null;
                        try {
                            Assert.assertTrue(createOptionsAndResourceContainer2.getDbOptions().dbLogDir().isEmpty());
                            if (createOptionsAndResourceContainer2 != null) {
                                if (0 != 0) {
                                    try {
                                        createOptionsAndResourceContainer2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    createOptionsAndResourceContainer2.close();
                                }
                            }
                            createTempFile.delete();
                        } finally {
                        }
                    } finally {
                        createTempFile.delete();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testSetDbPath() throws Exception {
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        String absolutePath = this.tempFolder.newFolder().getAbsolutePath();
        String absolutePath2 = this.tempFolder.newFolder().getAbsolutePath();
        Assert.assertNull(embeddedRocksDBStateBackend.getDbStoragePaths());
        embeddedRocksDBStateBackend.setDbStoragePath(absolutePath);
        Assert.assertArrayEquals(new String[]{absolutePath}, embeddedRocksDBStateBackend.getDbStoragePaths());
        embeddedRocksDBStateBackend.setDbStoragePath((String) null);
        Assert.assertNull(embeddedRocksDBStateBackend.getDbStoragePaths());
        embeddedRocksDBStateBackend.setDbStoragePaths(new String[]{absolutePath, absolutePath2});
        Assert.assertArrayEquals(new String[]{absolutePath, absolutePath2}, embeddedRocksDBStateBackend.getDbStoragePaths());
        MockEnvironment mockEnvironment = getMockEnvironment(this.tempFolder.newFolder());
        RocksDBKeyedStateBackend createKeyedStateBackend = RocksDBTestUtils.createKeyedStateBackend(embeddedRocksDBStateBackend, mockEnvironment, IntSerializer.INSTANCE);
        try {
            MatcherAssert.assertThat(createKeyedStateBackend.getInstanceBasePath().getAbsolutePath(), CoreMatchers.anyOf(Matchers.startsWith(absolutePath), Matchers.startsWith(absolutePath2)));
            embeddedRocksDBStateBackend.setDbStoragePaths((String[]) null);
            Assert.assertNull(embeddedRocksDBStateBackend.getDbStoragePaths());
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
            throw th;
        }
    }

    @Test
    public void testConfigureTimerService() throws Exception {
        MockEnvironment mockEnvironment = getMockEnvironment(this.tempFolder.newFolder());
        Assert.assertEquals("state.backend.rocksdb.timer-service.factory", RocksDBOptions.TIMER_SERVICE_FACTORY.key());
        Assert.assertEquals(2L, EmbeddedRocksDBStateBackend.PriorityQueueStateType.values().length);
        Assert.assertEquals("ROCKSDB", EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
        Assert.assertEquals("HEAP", EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
        Assert.assertEquals(EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB, RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue());
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        RocksDBKeyedStateBackend createKeyedStateBackend = RocksDBTestUtils.createKeyedStateBackend(embeddedRocksDBStateBackend, mockEnvironment, IntSerializer.INSTANCE);
        Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, createKeyedStateBackend.getPriorityQueueFactory().getClass());
        createKeyedStateBackend.dispose();
        Configuration configuration = new Configuration();
        configuration.set(RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
        RocksDBKeyedStateBackend createKeyedStateBackend2 = RocksDBTestUtils.createKeyedStateBackend(embeddedRocksDBStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()), mockEnvironment, IntSerializer.INSTANCE);
        Assert.assertEquals(HeapPriorityQueueSetFactory.class, createKeyedStateBackend2.getPriorityQueueFactory().getClass());
        createKeyedStateBackend2.dispose();
        mockEnvironment.close();
    }

    @Test
    public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
        MockEnvironment build = new MockEnvironmentBuilder().build();
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        embeddedRocksDBStateBackend.setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
        Configuration configuration = new Configuration();
        configuration.setString(RocksDBOptions.TIMER_SERVICE_FACTORY.key(), RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
        RocksDBKeyedStateBackend createKeyedStateBackend = RocksDBTestUtils.createKeyedStateBackend(embeddedRocksDBStateBackend.configure(configuration, Thread.currentThread().getContextClassLoader()), build, IntSerializer.INSTANCE);
        MatcherAssert.assertThat(createKeyedStateBackend.getPriorityQueueFactory(), CoreMatchers.instanceOf(HeapPriorityQueueSetFactory.class));
        createKeyedStateBackend.close();
        createKeyedStateBackend.dispose();
        build.close();
    }

    @Test
    public void testStoragePathWithFilePrefix() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        String path = new Path(newFolder.toURI().toString()).toString();
        Assert.assertTrue(path.startsWith("file:"));
        testLocalDbPaths(path, newFolder);
    }

    @Test
    public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
        try {
            Configuration configuration = new Configuration();
            configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
            FileSystem.initialize(configuration);
            testLocalDbPaths(null, this.tempFolder.getRoot());
        } finally {
            FileSystem.initialize(new Configuration());
        }
    }

    @Test
    public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        String absolutePath = newFolder.getAbsolutePath();
        try {
            Configuration configuration = new Configuration();
            configuration.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
            FileSystem.initialize(configuration);
            testLocalDbPaths(absolutePath, newFolder);
            FileSystem.initialize(new Configuration());
        } catch (Throwable th) {
            FileSystem.initialize(new Configuration());
            throw th;
        }
    }

    private void testLocalDbPaths(String str, File file) throws Exception {
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        embeddedRocksDBStateBackend.setDbStoragePath(str);
        MockEnvironment mockEnvironment = getMockEnvironment(this.tempFolder.newFolder());
        RocksDBKeyedStateBackend createKeyedStateBackend = RocksDBTestUtils.createKeyedStateBackend(embeddedRocksDBStateBackend, mockEnvironment, IntSerializer.INSTANCE);
        try {
            MatcherAssert.assertThat(createKeyedStateBackend.getInstanceBasePath().getAbsolutePath(), Matchers.startsWith(file.getAbsolutePath()));
            embeddedRocksDBStateBackend.setDbStoragePaths((String[]) null);
            Assert.assertNull(embeddedRocksDBStateBackend.getDbStoragePaths());
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
            throw th;
        }
    }

    @Test(expected = IllegalArgumentException.class)
    public void testSetEmptyPaths() throws Exception {
        new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString()).setDbStoragePaths(new String[0]);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testNonFileSchemePath() throws Exception {
        new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString()).setDbStoragePath("hdfs:///some/path/to/perdition");
    }

    @Test(expected = IllegalArgumentException.class)
    public void testDbPathRelativePaths() throws Exception {
        new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString()).setDbStoragePath("relative/path");
    }

    @Test
    public void testUseTempDirectories() throws Exception {
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString());
        File newFolder = this.tempFolder.newFolder();
        Assert.assertNull(rocksDBStateBackend.getDbStoragePaths());
        MockEnvironment mockEnvironment = getMockEnvironment(newFolder);
        RocksDBKeyedStateBackend createKeyedStateBackend = rocksDBStateBackend.createKeyedStateBackend(mockEnvironment, mockEnvironment.getJobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), mockEnvironment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
        try {
            MatcherAssert.assertThat(createKeyedStateBackend.getInstanceBasePath().getAbsolutePath(), Matchers.startsWith(newFolder.getAbsolutePath()));
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
        } catch (Throwable th) {
            IOUtils.closeQuietly(createKeyedStateBackend);
            createKeyedStateBackend.dispose();
            mockEnvironment.close();
            throw th;
        }
    }

    @Test
    public void testFailWhenNoLocalStorageDir() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        Assume.assumeTrue("Cannot mark directory non-writable", newFolder.setWritable(false, false));
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString());
        try {
            MockEnvironment mockEnvironment = getMockEnvironment(this.tempFolder.newFolder());
            Throwable th = null;
            try {
                try {
                    rocksDBStateBackend.setDbStoragePath(newFolder.getAbsolutePath());
                    boolean z = false;
                    try {
                        rocksDBStateBackend.createKeyedStateBackend(mockEnvironment, mockEnvironment.getJobID(), "foobar", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(mockEnvironment.getJobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
                    } catch (Exception e) {
                        Assert.assertTrue(e.getMessage().contains("No local storage directories available"));
                        Assert.assertTrue(e.getMessage().contains(newFolder.getAbsolutePath()));
                        z = true;
                    }
                    Assert.assertTrue("We must see a failure because no storaged directory is feasible.", z);
                    if (mockEnvironment != null) {
                        if (0 != 0) {
                            try {
                                mockEnvironment.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            mockEnvironment.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            newFolder.setWritable(true, false);
        }
    }

    @Test
    public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
        File newFolder = this.tempFolder.newFolder();
        File newFolder2 = this.tempFolder.newFolder();
        Assume.assumeTrue("Cannot mark directory non-writable", newFolder.setWritable(false, false));
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString());
        try {
            MockEnvironment mockEnvironment = getMockEnvironment(this.tempFolder.newFolder());
            Throwable th = null;
            try {
                try {
                    rocksDBStateBackend.setDbStoragePaths(new String[]{newFolder.getAbsolutePath(), newFolder2.getAbsolutePath()});
                    try {
                        AbstractKeyedStateBackend createKeyedStateBackend = rocksDBStateBackend.createKeyedStateBackend(mockEnvironment, mockEnvironment.getJobID(), "foobar", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(mockEnvironment.getJobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry());
                        IOUtils.closeQuietly(createKeyedStateBackend);
                        createKeyedStateBackend.dispose();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail("Backend initialization failed even though some paths were available");
                    }
                    if (mockEnvironment != null) {
                        if (0 != 0) {
                            try {
                                mockEnvironment.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            mockEnvironment.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            newFolder.setWritable(true, false);
        }
    }

    @Test
    public void testPredefinedOptions() throws Exception {
        String uri = this.tempFolder.newFolder().toURI().toString();
        Assert.assertEquals(PredefinedOptions.DEFAULT, new RocksDBStateBackend(uri).getPredefinedOptions());
        Configuration configuration = new Configuration();
        configuration.setString(RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
        RocksDBStateBackend configure = new RocksDBStateBackend(uri).configure(configuration, getClass().getClassLoader());
        Assert.assertEquals(PredefinedOptions.FLASH_SSD_OPTIMIZED, configure.getPredefinedOptions());
        configure.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
        Assert.assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, configure.getPredefinedOptions());
    }

    @Test
    public void testConfigurableOptionsFromConfig() throws Exception {
        Configuration configuration = new Configuration();
        verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_LEVEL, "DEBUG");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_DIR, "tmp/rocksdb-logs/");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_DIR, "");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_FILE_NUM, "0");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_FILE_NUM, "-1");
        verifyIllegalArgument(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, "-1KB");
        verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
        verifyIllegalArgument(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, "-1");
        verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
        verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
        verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
        verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
        verifyIllegalArgument(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE, "0MB");
        verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
        verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
        verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
        verifyIllegalArgument(RocksDBConfigurableOptions.USE_BLOOM_FILTER, "NO");
        verifyIllegalArgument(RocksDBConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE, "YES");
        verifyIllegalArgument(RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD, "2");
        configuration.setString(RocksDBConfigurableOptions.LOG_LEVEL.key(), "DEBUG_LEVEL");
        configuration.setString(RocksDBConfigurableOptions.LOG_DIR.key(), "/tmp/rocksdb-logs/");
        configuration.setString(RocksDBConfigurableOptions.LOG_FILE_NUM.key(), "10");
        configuration.setString(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE.key(), "2MB");
        configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE.key(), "level");
        configuration.setString(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE.key(), "TRUE");
        configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE.key(), "8 mb");
        configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE.key(), "128MB");
        configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS.key(), "4");
        configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER.key(), "4");
        configuration.setString(RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2");
        configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB");
        configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
        configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(), "8 kb");
        configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");
        configuration.setString(RocksDBConfigurableOptions.USE_BLOOM_FILTER.key(), "TRUE");
        configuration.setString(RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.key(), "0.5");
        RocksDBResourceContainer rocksDBResourceContainer = new RocksDBResourceContainer(configuration, PredefinedOptions.DEFAULT, (RocksDBOptionsFactory) null, (OpaqueMemoryResource) null, (File) null, false);
        Throwable th = null;
        try {
            try {
                DBOptions dbOptions = rocksDBResourceContainer.getDbOptions();
                Assert.assertEquals(-1L, dbOptions.maxOpenFiles());
                Assert.assertEquals(InfoLogLevel.DEBUG_LEVEL, dbOptions.infoLogLevel());
                Assert.assertEquals("/tmp/rocksdb-logs/", dbOptions.dbLogDir());
                Assert.assertEquals(10L, dbOptions.keepLogFileNum());
                Assert.assertEquals(2097152L, dbOptions.maxLogFileSize());
                ColumnFamilyOptions columnOptions = rocksDBResourceContainer.getColumnOptions();
                Assert.assertEquals(CompactionStyle.LEVEL, columnOptions.compactionStyle());
                Assert.assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
                Assert.assertEquals(8388608L, columnOptions.targetFileSizeBase());
                Assert.assertEquals(134217728L, columnOptions.maxBytesForLevelBase());
                Assert.assertEquals(4L, columnOptions.maxWriteBufferNumber());
                Assert.assertEquals(2L, columnOptions.minWriteBufferNumberToMerge());
                Assert.assertEquals(67108864L, columnOptions.writeBufferSize());
                BlockBasedTableConfig tableFormatConfig = columnOptions.tableFormatConfig();
                Assert.assertEquals(4096L, tableFormatConfig.blockSize());
                Assert.assertEquals(8192L, tableFormatConfig.metadataBlockSize());
                Assert.assertEquals(536870912L, tableFormatConfig.blockCacheSize());
                Assert.assertTrue(tableFormatConfig.filterPolicy() instanceof BloomFilter);
                if (rocksDBResourceContainer != null) {
                    if (0 == 0) {
                        rocksDBResourceContainer.close();
                        return;
                    }
                    try {
                        rocksDBResourceContainer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (rocksDBResourceContainer != null) {
                if (th != null) {
                    try {
                        rocksDBResourceContainer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    rocksDBResourceContainer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testOptionsFactory() throws Exception {
        Throwable th;
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(this.tempFolder.newFolder().toURI().toString());
        Configuration configuration = new Configuration();
        configuration.setString(RocksDBOptions.OPTIONS_FACTORY.key(), TestOptionsFactory.class.getName());
        configuration.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4");
        RocksDBStateBackend configure = rocksDBStateBackend.configure(configuration, getClass().getClassLoader());
        Assert.assertTrue(configure.getRocksDBOptions() instanceof TestOptionsFactory);
        RocksDBResourceContainer createOptionsAndResourceContainer = configure.createOptionsAndResourceContainer();
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals(4L, createOptionsAndResourceContainer.getDbOptions().maxBackgroundJobs());
                if (createOptionsAndResourceContainer != null) {
                    if (0 != 0) {
                        try {
                            createOptionsAndResourceContainer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createOptionsAndResourceContainer.close();
                    }
                }
                configure.setRocksDBOptions(new RocksDBOptionsFactory() { // from class: org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest.1
                    public DBOptions createDBOptions(DBOptions dBOptions, Collection<AutoCloseable> collection) {
                        return dBOptions;
                    }

                    public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions, Collection<AutoCloseable> collection) {
                        return columnFamilyOptions.setCompactionStyle(CompactionStyle.FIFO);
                    }
                });
                createOptionsAndResourceContainer = configure.createOptionsAndResourceContainer();
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(CompactionStyle.FIFO, createOptionsAndResourceContainer.getColumnOptions().compactionStyle());
                    if (createOptionsAndResourceContainer != null) {
                        if (0 == 0) {
                            createOptionsAndResourceContainer.close();
                            return;
                        }
                        try {
                            createOptionsAndResourceContainer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testPredefinedAndConfigurableOptions() throws Exception {
        Throwable th;
        Configuration configuration = new Configuration();
        configuration.set(RocksDBConfigurableOptions.COMPACTION_STYLE, CompactionStyle.UNIVERSAL);
        RocksDBResourceContainer rocksDBResourceContainer = new RocksDBResourceContainer(configuration, PredefinedOptions.SPINNING_DISK_OPTIMIZED, (RocksDBOptionsFactory) null, (OpaqueMemoryResource) null, (File) null, false);
        Throwable th2 = null;
        try {
            try {
                ColumnFamilyOptions columnOptions = rocksDBResourceContainer.getColumnOptions();
                Assert.assertNotNull(columnOptions);
                Assert.assertEquals(CompactionStyle.UNIVERSAL, columnOptions.compactionStyle());
                if (rocksDBResourceContainer != null) {
                    if (0 != 0) {
                        try {
                            rocksDBResourceContainer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        rocksDBResourceContainer.close();
                    }
                }
                rocksDBResourceContainer = new RocksDBResourceContainer(new Configuration(), PredefinedOptions.SPINNING_DISK_OPTIMIZED, (RocksDBOptionsFactory) null, (OpaqueMemoryResource) null, (File) null, false);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    ColumnFamilyOptions columnOptions2 = rocksDBResourceContainer.getColumnOptions();
                    Assert.assertNotNull(columnOptions2);
                    Assert.assertEquals(CompactionStyle.LEVEL, columnOptions2.compactionStyle());
                    if (rocksDBResourceContainer != null) {
                        if (0 == 0) {
                            rocksDBResourceContainer.close();
                            return;
                        }
                        try {
                            rocksDBResourceContainer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testPredefinedAndOptionsFactory() throws Exception {
        RocksDBResourceContainer rocksDBResourceContainer = new RocksDBResourceContainer(PredefinedOptions.SPINNING_DISK_OPTIMIZED, new RocksDBOptionsFactory() { // from class: org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest.2
            public DBOptions createDBOptions(DBOptions dBOptions, Collection<AutoCloseable> collection) {
                return dBOptions;
            }

            public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions columnFamilyOptions, Collection<AutoCloseable> collection) {
                return columnFamilyOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
            }
        });
        Throwable th = null;
        try {
            try {
                ColumnFamilyOptions columnOptions = rocksDBResourceContainer.getColumnOptions();
                Assert.assertNotNull(columnOptions);
                Assert.assertEquals(CompactionStyle.UNIVERSAL, columnOptions.compactionStyle());
                if (rocksDBResourceContainer != null) {
                    if (0 == 0) {
                        rocksDBResourceContainer.close();
                        return;
                    }
                    try {
                        rocksDBResourceContainer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (rocksDBResourceContainer != null) {
                if (th != null) {
                    try {
                        rocksDBResourceContainer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    rocksDBResourceContainer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
        FsStateBackend fsStateBackend = new FsStateBackend(this.tempFolder.newFolder().toURI().toString());
        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(fsStateBackend, !((Boolean) CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue()).booleanValue());
        PredefinedOptions predefinedOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
        Assert.assertNotEquals(predefinedOptions, rocksDBStateBackend.getPredefinedOptions());
        rocksDBStateBackend.setPredefinedOptions(predefinedOptions);
        rocksDBStateBackend.setRocksDBOptions((RocksDBOptionsFactory) Mockito.mock(RocksDBOptionsFactory.class));
        rocksDBStateBackend.setDbStoragePaths(new String[]{this.tempFolder.newFolder().getAbsolutePath(), this.tempFolder.newFolder().getAbsolutePath()});
        RocksDBStateBackend configure = rocksDBStateBackend.configure(new Configuration(), Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(Boolean.valueOf(rocksDBStateBackend.isIncrementalCheckpointsEnabled()), Boolean.valueOf(configure.isIncrementalCheckpointsEnabled()));
        Assert.assertArrayEquals(rocksDBStateBackend.getDbStoragePaths(), configure.getDbStoragePaths());
        Assert.assertEquals(rocksDBStateBackend.getRocksDBOptions(), configure.getRocksDBOptions());
        Assert.assertEquals(rocksDBStateBackend.getPredefinedOptions(), configure.getPredefinedOptions());
        FsStateBackend checkpointBackend = configure.getCheckpointBackend();
        Assert.assertEquals(fsStateBackend.getCheckpointPath(), checkpointBackend.getCheckpointPath());
        Assert.assertEquals(fsStateBackend.getSavepointPath(), checkpointBackend.getSavepointPath());
    }

    @Test
    public void testDefaultMemoryControlParameters() {
        RocksDBMemoryConfiguration rocksDBMemoryConfiguration = new RocksDBMemoryConfiguration();
        Assert.assertTrue(rocksDBMemoryConfiguration.isUsingManagedMemory());
        Assert.assertFalse(rocksDBMemoryConfiguration.isUsingFixedMemoryPerSlot());
        Assert.assertEquals(((Double) RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue()).doubleValue(), rocksDBMemoryConfiguration.getHighPriorityPoolRatio(), 0.0d);
        Assert.assertEquals(((Double) RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue()).doubleValue(), rocksDBMemoryConfiguration.getWriteBufferRatio(), 0.0d);
        RocksDBMemoryConfiguration fromOtherAndConfiguration = RocksDBMemoryConfiguration.fromOtherAndConfiguration(rocksDBMemoryConfiguration, new Configuration());
        Assert.assertTrue(fromOtherAndConfiguration.isUsingManagedMemory());
        Assert.assertFalse(fromOtherAndConfiguration.isUsingFixedMemoryPerSlot());
        Assert.assertEquals(((Double) RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue()).doubleValue(), fromOtherAndConfiguration.getHighPriorityPoolRatio(), 0.0d);
        Assert.assertEquals(((Double) RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue()).doubleValue(), fromOtherAndConfiguration.getWriteBufferRatio(), 0.0d);
    }

    @Test
    public void testConfigureManagedMemory() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);
        Assert.assertTrue(RocksDBMemoryConfiguration.fromOtherAndConfiguration(new RocksDBMemoryConfiguration(), configuration).isUsingManagedMemory());
    }

    @Test
    public void testConfigureIllegalMemoryControlParameters() {
        RocksDBMemoryConfiguration rocksDBMemoryConfiguration = new RocksDBMemoryConfiguration();
        verifySetParameter(() -> {
            rocksDBMemoryConfiguration.setFixedMemoryPerSlot("-1B");
        });
        verifySetParameter(() -> {
            rocksDBMemoryConfiguration.setHighPriorityPoolRatio(-0.1d);
        });
        verifySetParameter(() -> {
            rocksDBMemoryConfiguration.setHighPriorityPoolRatio(1.1d);
        });
        verifySetParameter(() -> {
            rocksDBMemoryConfiguration.setWriteBufferRatio(-0.1d);
        });
        verifySetParameter(() -> {
            rocksDBMemoryConfiguration.setWriteBufferRatio(1.1d);
        });
        rocksDBMemoryConfiguration.setFixedMemoryPerSlot("128MB");
        rocksDBMemoryConfiguration.setWriteBufferRatio(0.6d);
        rocksDBMemoryConfiguration.setHighPriorityPoolRatio(0.6d);
        try {
            rocksDBMemoryConfiguration.validate();
            Assert.fail("Expected an IllegalArgumentException.");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testDefaultRestoreOverlapThreshold() {
        Assert.assertTrue(((Double) RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue()).doubleValue() == new EmbeddedRocksDBStateBackend(true).getOverlapFractionThreshold());
    }

    @Test
    public void testConfigureRestoreOverlapThreshold() {
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
        Configuration configuration = new Configuration();
        configuration.setDouble(RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD, 0.3d);
        Assert.assertTrue(0.3d == embeddedRocksDBStateBackend.configure(configuration, getClass().getClassLoader()).getOverlapFractionThreshold());
    }

    private void verifySetParameter(Runnable runnable) {
        try {
            runnable.run();
            Assert.fail("No expected IllegalArgumentException.");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void testCallsForwardedToNonPartitionedBackend() throws Exception {
        MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
        Assert.assertEquals(memoryStateBackend, new RocksDBStateBackend(memoryStateBackend).getCheckpointBackend());
    }

    static MockEnvironment getMockEnvironment(File file) {
        return MockEnvironment.builder().setUserCodeClassLoader(RocksDBStateBackendConfigTest.class.getClassLoader()).setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(new Configuration(), file)).build();
    }

    private void verifyIllegalArgument(ConfigOption<?> configOption, String str) {
        Configuration configuration = new Configuration();
        configuration.setString(configOption.key(), str);
        try {
            new EmbeddedRocksDBStateBackend().configure(configuration, (ClassLoader) null);
            Assert.fail("Not throwing expected IllegalArgumentException.");
        } catch (IllegalArgumentException e) {
        }
    }
}
