package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.function.BiFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/changelog/fs/StateChangeFsUploader.class */
public class StateChangeFsUploader extends AbstractStateChangeFsUploader {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);

    @VisibleForTesting
    public static final String PATH_SUB_DIR = "dstl";
    private final Path basePath;
    private final FileSystem fileSystem;

    @VisibleForTesting
    public StateChangeFsUploader(JobID jobID, Path path, FileSystem fileSystem, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry) {
        this(jobID, path, fileSystem, z, i, changelogStorageMetricGroup, taskChangelogRegistry, (v1, v2) -> {
            return new FileStateHandle(v1, v2);
        });
    }

    public StateChangeFsUploader(JobID jobID, Path path, FileSystem fileSystem, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry, BiFunction<Path, Long, StreamStateHandle> biFunction) {
        super(z, i, changelogStorageMetricGroup, taskChangelogRegistry, biFunction);
        this.basePath = new Path(path, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
        this.fileSystem = fileSystem;
    }

    @VisibleForTesting
    public Path getBasePath() {
        return this.basePath;
    }

    @Override // org.apache.flink.changelog.fs.AbstractStateChangeFsUploader
    public OutputStreamWithPos prepareStream() throws IOException {
        String generateFileName = generateFileName();
        LOG.debug("upload tasks to {}", generateFileName);
        Path path = new Path(this.basePath, generateFileName);
        OutputStreamWithPos outputStreamWithPos = new OutputStreamWithPos(this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE), path);
        outputStreamWithPos.wrap(this.compression, this.bufferSize);
        return outputStreamWithPos;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
