/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ThreadSafe
class TaskChangelogRegistryImpl
implements TaskChangelogRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(TaskChangelogRegistryImpl.class);
    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<PhysicalStateHandleID, Set<UUID>>();
    private final Executor executor;

    public TaskChangelogRegistryImpl(Executor executor) {
        this.executor = executor;
    }

    @Override
    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
        LOG.debug("start tracking state, key: {}, state: {}", (Object)handle.getStreamStateHandleID(), (Object)handle);
        this.entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<UUID>(backendIDs));
    }

    @Override
    public void stopTracking(StreamStateHandle handle) {
        LOG.debug("stop tracking state, key: {}, state: {}", (Object)handle.getStreamStateHandleID(), (Object)handle);
        this.entries.remove(handle.getStreamStateHandleID());
    }

    @Override
    public void notUsed(StreamStateHandle handle, UUID backendId) {
        PhysicalStateHandleID key = handle.getStreamStateHandleID();
        LOG.debug("backend {} not using state, key: {}, state: {}", new Object[]{backendId, key, handle});
        Set<UUID> backends = this.entries.get(key);
        if (backends == null) {
            LOG.warn("backend {} was not using state, key: {}, state: {}", new Object[]{backendId, key, handle});
            return;
        }
        backends.remove(backendId);
        if (backends.isEmpty() && this.entries.remove(key) != null) {
            LOG.debug("state is not used by any backend, schedule discard: {}/{}", (Object)key, (Object)handle);
            this.scheduleDiscard(handle);
        }
    }

    private void scheduleDiscard(StreamStateHandle handle) {
        this.executor.execute(() -> {
            try {
                LOG.trace("discard uploaded but unused state changes: {}", (Object)handle);
                handle.discardState();
            }
            catch (Exception e) {
                LOG.warn("unable to discard uploaded but unused state changes", (Throwable)e);
            }
        });
    }
}

