package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.File;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateUploader;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.RocksDB;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.class */
public class RocksNativeFullSnapshotStrategy<K> extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources> {
    private static final String DESCRIPTION = "Asynchronous full RocksDB snapshot";
    private final RocksDBStateUploader stateUploader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy$RocksDBNativeFullSnapshotOperation.class */
    public final class RocksDBNativeFullSnapshotOperation extends RocksDBSnapshotStrategyBase<K, RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources>.RocksDBSnapshotOperation {
        private RocksDBNativeFullSnapshotOperation(long j, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull SnapshotDirectory snapshotDirectory, @Nonnull List<StateMetaInfoSnapshot> list) {
            super(j, checkpointStreamFactory, snapshotDirectory, list);
        }

        public SnapshotResult<KeyedStateHandle> get(CloseableRegistry closeableRegistry) throws Exception {
            boolean z = false;
            HashMap hashMap = new HashMap();
            try {
                SnapshotResult<StreamStateHandle> materializeMetaData = RocksNativeFullSnapshotStrategy.this.materializeMetaData(closeableRegistry, this.tmpResourcesRegistry, this.stateMetaInfoSnapshots, this.checkpointId, this.checkpointStreamFactory);
                Preconditions.checkNotNull(materializeMetaData, "Metadata was not properly created.");
                Preconditions.checkNotNull(materializeMetaData.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created.");
                uploadSstFiles(hashMap, closeableRegistry, this.tmpResourcesRegistry);
                IncrementalRemoteKeyedStateHandle incrementalRemoteKeyedStateHandle = new IncrementalRemoteKeyedStateHandle(RocksNativeFullSnapshotStrategy.this.backendUID, RocksNativeFullSnapshotStrategy.this.keyGroupRange, this.checkpointId, Collections.emptyMap(), hashMap, materializeMetaData.getJobManagerOwnedSnapshot(), materializeMetaData.getStateSize() + RocksSnapshotUtil.getUploadedStateSize(hashMap.values()));
                SnapshotResult<KeyedStateHandle> snapshotResult = (SnapshotResult) getLocalSnapshot((StreamStateHandle) materializeMetaData.getTaskLocalSnapshot(), Collections.emptyMap()).map(keyedStateHandle -> {
                    return SnapshotResult.withLocalState(incrementalRemoteKeyedStateHandle, keyedStateHandle);
                }).orElseGet(() -> {
                    return SnapshotResult.of(incrementalRemoteKeyedStateHandle);
                });
                z = true;
                if (1 == 0) {
                    RocksNativeFullSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                }
                return snapshotResult;
            } catch (Throwable th) {
                if (!z) {
                    RocksNativeFullSnapshotStrategy.this.cleanupIncompleteSnapshot(this.tmpResourcesRegistry, this.localBackupDirectory);
                }
                throw th;
            }
        }

        private void uploadSstFiles(@Nonnull Map<StateHandleID, StreamStateHandle> map, @Nonnull CloseableRegistry closeableRegistry, @Nonnull CloseableRegistry closeableRegistry2) throws Exception {
            Preconditions.checkState(this.localBackupDirectory.exists());
            Map<StateHandleID, Path> hashMap = new HashMap<>();
            Path[] listDirectory = this.localBackupDirectory.listDirectory();
            if (listDirectory != null) {
                for (Path path : listDirectory) {
                    hashMap.put(new StateHandleID(path.getFileName().toString()), path);
                }
                map.putAll(RocksNativeFullSnapshotStrategy.this.stateUploader.uploadFilesToCheckpointFs(hashMap, this.checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, closeableRegistry, closeableRegistry2));
            }
        }
    }

    public RocksNativeFullSnapshotStrategy(@Nonnull RocksDB rocksDB, @Nonnull ResourceGuard resourceGuard, @Nonnull TypeSerializer<K> typeSerializer, @Nonnull LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int i, @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull File file, @Nonnull UUID uuid, @Nonnull RocksDBStateUploader rocksDBStateUploader) {
        super(DESCRIPTION, rocksDB, resourceGuard, typeSerializer, linkedHashMap, keyGroupRange, i, localRecoveryConfig, file, uuid);
        this.stateUploader = rocksDBStateUploader;
    }

    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(RocksDBSnapshotStrategyBase.NativeRocksDBSnapshotResources nativeRocksDBSnapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        return nativeRocksDBSnapshotResources.stateMetaInfoSnapshots.isEmpty() ? closeableRegistry -> {
            return SnapshotResult.empty();
        } : new RocksDBNativeFullSnapshotOperation(j, checkpointStreamFactory, nativeRocksDBSnapshotResources.snapshotDirectory, nativeRocksDBSnapshotResources.stateMetaInfoSnapshots);
    }

    public void notifyCheckpointComplete(long j) {
    }

    public void notifyCheckpointAborted(long j) {
    }

    @Override // org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase
    protected RocksDBSnapshotStrategyBase.PreviousSnapshot snapshotMetaData(long j, @Nonnull List<StateMetaInfoSnapshot> list) {
        Iterator<Map.Entry<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo>> it = this.kvStateInformation.entrySet().iterator();
        while (it.hasNext()) {
            list.add(it.next().getValue().metaInfo.snapshot());
        }
        return EMPTY_PREVIOUS_SNAPSHOT;
    }

    @Override // org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase, java.lang.AutoCloseable
    public void close() {
        this.stateUploader.close();
    }
}
