package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.class */
public class DefaultCompletedCheckpointStoreUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultCompletedCheckpointStoreUtils.class);

    private DefaultCompletedCheckpointStoreUtils() {
    }

    public static int getMaximumNumberOfRetainedCheckpoints(Configuration configuration, Logger logger) {
        int integer = configuration.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
        if (integer > 0) {
            return integer;
        }
        logger.warn("The setting for '{} : {}' is invalid. Using default value of {}", new Object[]{CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(), Integer.valueOf(integer), CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()});
        return ((Integer) CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue();
    }

    public static <R extends ResourceVersion<R>> Collection<CompletedCheckpoint> retrieveCompletedCheckpoints(StateHandleStore<CompletedCheckpoint, R> stateHandleStore, CheckpointStoreUtil checkpointStoreUtil) throws Exception {
        LOG.info("Recovering checkpoints from {}.", stateHandleStore);
        List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> allAndLock = stateHandleStore.getAllAndLock();
        allAndLock.sort(Comparator.comparing(tuple2 -> {
            return (String) tuple2.f1;
        }));
        int size = allAndLock.size();
        LOG.info("Found {} checkpoints in {}.", Integer.valueOf(size), stateHandleStore);
        ArrayList arrayList = new ArrayList(size);
        LOG.info("Trying to fetch {} checkpoints from storage.", Integer.valueOf(size));
        Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> it = allAndLock.iterator();
        while (it.hasNext()) {
            arrayList.add(Preconditions.checkNotNull(retrieveCompletedCheckpoint(checkpointStoreUtil, it.next())));
        }
        return Collections.unmodifiableList(arrayList);
    }

    private static CompletedCheckpoint retrieveCompletedCheckpoint(CheckpointStoreUtil checkpointStoreUtil, Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> tuple2) throws FlinkException {
        long nameToCheckpointID = checkpointStoreUtil.nameToCheckpointID((String) tuple2.f1);
        LOG.info("Trying to retrieve checkpoint {}.", Long.valueOf(nameToCheckpointID));
        try {
            return (CompletedCheckpoint) ((RetrievableStateHandle) tuple2.f0).retrieveState();
        } catch (IOException e) {
            throw new FlinkException(String.format("Could not retrieve checkpoint %d from state handle under %s. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", Long.valueOf(nameToCheckpointID), tuple2.f1), e);
        } catch (ClassNotFoundException e2) {
            throw new FlinkException(String.format("Could not retrieve checkpoint %d from state handle under %s. This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", Long.valueOf(nameToCheckpointID), tuple2.f1), e2);
        }
    }
}
