package org.apache.flink.contrib.streaming.state.benchmark;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBResourceContainer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.util.IOUtils;
import org.rocksdb.RocksDBException;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils.class */
public class StateBackendBenchmarkUtils {
    private static final String rootDirName = "benchmark";
    private static final String recoveryDirName = "localRecovery";
    private static final String dbDirName = "dbPath";
    private static File rootDir;

    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/benchmark/StateBackendBenchmarkUtils$StateBackendType.class */
    public enum StateBackendType {
        HEAP,
        ROCKSDB,
        BATCH_EXECUTION
    }

    public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType stateBackendType, File file) throws IOException {
        switch (stateBackendType) {
            case HEAP:
                rootDir = prepareDirectory(rootDirName, file);
                return createHeapKeyedStateBackend(rootDir);
            case ROCKSDB:
                rootDir = prepareDirectory(rootDirName, file);
                return createRocksDBKeyedStateBackend(rootDir);
            case BATCH_EXECUTION:
                return createBatchExecutionStateBackend();
            default:
                throw new IllegalArgumentException("Unknown backend type: " + stateBackendType);
        }
    }

    public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType stateBackendType) throws IOException {
        return createKeyedStateBackend(stateBackendType, null);
    }

    private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend() {
        return new BatchExecutionStateBackend().createKeyedStateBackend(MockEnvironment.builder().build(), new JobID(), "Test", new LongSerializer(), 2, new KeyGroupRange(0, 1), (TaskKvStateRegistry) null, TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), (CloseableRegistry) null);
    }

    private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(File file) throws IOException {
        prepareDirectory(recoveryDirName, file);
        File prepareDirectory = prepareDirectory(dbDirName, file);
        ExecutionConfig executionConfig = new ExecutionConfig();
        RocksDBResourceContainer rocksDBResourceContainer = new RocksDBResourceContainer();
        try {
            return new RocksDBKeyedStateBackendBuilder("Test", Thread.currentThread().getContextClassLoader(), prepareDirectory, rocksDBResourceContainer, str -> {
                return rocksDBResourceContainer.getColumnOptions();
            }, (TaskKvStateRegistry) null, LongSerializer.INSTANCE, 2, new KeyGroupRange(0, 1), executionConfig, new LocalRecoveryConfig((LocalRecoveryDirectoryProvider) null), EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), new UnregisteredMetricsGroup(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator(executionConfig), new CloseableRegistry()).build();
        } catch (Exception e) {
            IOUtils.closeQuietly(rocksDBResourceContainer);
            throw e;
        }
    }

    private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(File file) throws IOException {
        prepareDirectory(recoveryDirName, file);
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
        ExecutionConfig executionConfig = new ExecutionConfig();
        return new HeapKeyedStateBackendBuilder((TaskKvStateRegistry) null, new LongSerializer(), Thread.currentThread().getContextClassLoader(), numberOfKeyGroups, keyGroupRange, executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator(executionConfig), new LocalRecoveryConfig((LocalRecoveryDirectoryProvider) null), new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128), false, new CloseableRegistry()).build();
    }

    private static File prepareDirectory(String str, File file) throws IOException {
        File createTempFile = File.createTempFile(str, "", file);
        if (createTempFile.exists() && !createTempFile.delete()) {
            throw new IOException("Target dir {" + createTempFile.getAbsolutePath() + "} exists but failed to clean it up");
        }
        if (createTempFile.mkdirs()) {
            return createTempFile;
        }
        throw new IOException("Failed to create target directory: " + createTempFile.getAbsolutePath());
    }

    public static <T> ValueState<T> getValueState(KeyedStateBackend<T> keyedStateBackend, ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
        return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
    }

    public static <T> ListState<T> getListState(KeyedStateBackend<T> keyedStateBackend, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
    }

    public static <K, V> MapState<K, V> getMapState(KeyedStateBackend<K> keyedStateBackend, MapStateDescriptor<K, V> mapStateDescriptor) throws Exception {
        return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
    }

    public static <K, S extends State, T> void applyToAllKeys(KeyedStateBackend<K> keyedStateBackend, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        keyedStateBackend.applyToAllKeys(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor, keyedStateFunction);
    }

    public static <K, S extends State, T> void compactState(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend, StateDescriptor<S, T> stateDescriptor) throws RocksDBException {
        rocksDBKeyedStateBackend.compactState(stateDescriptor);
    }

    public static void cleanUp(KeyedStateBackend<?> keyedStateBackend) throws IOException {
        keyedStateBackend.dispose();
        if (rootDir != null) {
            Path fromLocalFile = Path.fromLocalFile(rootDir);
            fromLocalFile.getFileSystem().delete(fromLocalFile, true);
        }
    }
}
