/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageForRecovery;
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
import org.apache.flink.changelog.fs.StateChangeFsUploader;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ThreadSafe
public class FsStateChangelogStorage
extends FsStateChangelogStorageForRecovery
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogStorage.class);
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final AtomicInteger logIdGenerator = new AtomicInteger(0);
    private final TaskChangelogRegistry changelogRegistry;

    public FsStateChangelogStorage(Configuration config, TaskManagerJobMetricGroup metricGroup) throws IOException {
        this(config, metricGroup, TaskChangelogRegistry.defaultChangelogRegistry((Integer)config.get(FsStateChangelogOptions.NUM_DISCARD_THREADS)));
    }

    public FsStateChangelogStorage(Configuration config, TaskManagerJobMetricGroup metricGroup, TaskChangelogRegistry changelogRegistry) throws IOException {
        this(StateChangeUploadScheduler.fromConfig((ReadableConfig)config, new ChangelogStorageMetricGroup((MetricGroup)metricGroup), changelogRegistry), ((MemorySize)config.get(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD)).getBytes(), changelogRegistry);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(Path basePath, boolean compression, int bufferSize, ChangelogStorageMetricGroup metricGroup, TaskChangelogRegistry changelogRegistry) throws IOException {
        this(StateChangeUploadScheduler.directScheduler(new StateChangeFsUploader(basePath, basePath.getFileSystem(), compression, bufferSize, metricGroup, changelogRegistry)), ((MemorySize)FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD.defaultValue()).getBytes(), changelogRegistry);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, TaskChangelogRegistry changelogRegistry) {
        this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
        this.changelogRegistry = changelogRegistry;
        this.uploader = uploader;
    }

    public FsStateChangelogWriter createWriter(String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
        UUID logId = new UUID(0L, this.logIdGenerator.getAndIncrement());
        LOG.info("createWriter for operator {}/{}: {}", new Object[]{operatorID, keyGroupRange, logId});
        return new FsStateChangelogWriter(logId, keyGroupRange, this.uploader, this.preEmptivePersistThresholdInBytes, mailboxExecutor, this.changelogRegistry);
    }

    public void close() throws Exception {
        this.uploader.close();
    }

    public AvailabilityProvider getAvailabilityProvider() {
        return this.uploader.getAvailabilityProvider();
    }
}

