/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorStateChangelogStoragesManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class);
    private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId = new HashMap();
    private boolean closed = false;
    private final Thread shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, (String)this.getClass().getSimpleName(), (Logger)LOG);

    @Nullable
    public StateChangelogStorage<?> stateChangelogStorageForJob(@Nonnull JobID jobId, Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorage.");
        }
        Optional<StateChangelogStorage<?>> stateChangelogStorage = this.changelogStoragesByJobId.get(jobId);
        if (stateChangelogStorage == null) {
            StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration, metricGroup);
            stateChangelogStorage = Optional.ofNullable(loaded);
            this.changelogStoragesByJobId.put(jobId, stateChangelogStorage);
            if (loaded != null) {
                LOG.debug("Registered new state changelog storage for job {} : {}.", (Object)jobId, loaded);
            } else {
                LOG.info("Try to registered new state changelog storage for job {}, but result is null.", (Object)jobId);
            }
        } else if (stateChangelogStorage.isPresent()) {
            LOG.debug("Found existing state changelog storage for job {}: {}.", (Object)jobId, stateChangelogStorage.get());
        } else {
            LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", (Object)jobId);
        }
        return stateChangelogStorage.orElse(null);
    }

    public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
        LOG.debug("Releasing state changelog storage under job id {}.", (Object)jobId);
        if (this.closed) {
            return;
        }
        Optional<StateChangelogStorage<?>> cleanupChangelogStorage = this.changelogStoragesByJobId.remove(jobId);
        if (cleanupChangelogStorage != null) {
            cleanupChangelogStorage.ifPresent(this::doRelease);
        }
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        HashMap toRelease = new HashMap(this.changelogStoragesByJobId);
        this.changelogStoragesByJobId.clear();
        ShutdownHookUtil.removeShutdownHook((Thread)this.shutdownHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
        for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry : toRelease.entrySet()) {
            entry.getValue().ifPresent(this::doRelease);
        }
    }

    private void doRelease(StateChangelogStorage<?> storage) {
        if (storage != null) {
            try {
                storage.close();
            }
            catch (Exception e) {
                LOG.warn("Exception while disposing state changelog storage {}.", storage, (Object)e);
            }
        }
    }

    @Nullable
    @VisibleForTesting
    public Optional<StateChangelogStorage<?>> getChangelogStoragesByJobId(JobID jobId) {
        return this.changelogStoragesByJobId.get(jobId);
    }
}

