package org.apache.flink.changelog.fs;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
@ThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/TaskChangelogRegistryImpl.class */
public class TaskChangelogRegistryImpl implements TaskChangelogRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(TaskChangelogRegistryImpl.class);
    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap();
    private final Executor executor;

    public TaskChangelogRegistryImpl(Executor executor) {
        this.executor = executor;
    }

    @Override // org.apache.flink.changelog.fs.TaskChangelogRegistry
    public void startTracking(StreamStateHandle streamStateHandle, Set<UUID> set) {
        LOG.debug("start tracking state, key: {}, state: {}", streamStateHandle.getStreamStateHandleID(), streamStateHandle);
        this.entries.put(streamStateHandle.getStreamStateHandleID(), new CopyOnWriteArraySet(set));
    }

    @Override // org.apache.flink.changelog.fs.TaskChangelogRegistry
    public void stopTracking(StreamStateHandle streamStateHandle) {
        LOG.debug("stop tracking state, key: {}, state: {}", streamStateHandle.getStreamStateHandleID(), streamStateHandle);
        this.entries.remove(streamStateHandle.getStreamStateHandleID());
    }

    @Override // org.apache.flink.changelog.fs.TaskChangelogRegistry
    public void notUsed(StreamStateHandle streamStateHandle, UUID uuid) {
        PhysicalStateHandleID streamStateHandleID = streamStateHandle.getStreamStateHandleID();
        LOG.debug("backend {} not using state, key: {}, state: {}", new Object[]{uuid, streamStateHandleID, streamStateHandle});
        Set<UUID> set = this.entries.get(streamStateHandleID);
        if (set == null) {
            LOG.warn("backend {} was not using state, key: {}, state: {}", new Object[]{uuid, streamStateHandleID, streamStateHandle});
            return;
        }
        set.remove(uuid);
        if (!set.isEmpty() || this.entries.remove(streamStateHandleID) == null) {
            return;
        }
        LOG.debug("state is not used by any backend, schedule discard: {}/{}", streamStateHandleID, streamStateHandle);
        scheduleDiscard(streamStateHandle);
    }

    private void scheduleDiscard(StreamStateHandle streamStateHandle) {
        this.executor.execute(() -> {
            try {
                LOG.trace("discard uploaded but unused state changes: {}", streamStateHandle);
                streamStateHandle.discardState();
            } catch (Exception e) {
                LOG.warn("unable to discard uploaded but unused state changes", e);
            }
        });
    }
}
