package org.apache.flink.state.changelog.restore;

import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;

@Internal
/* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.class */
public class ChangelogBackendRestoreOperation {

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation$BaseBackendBuilder.class */
    public interface BaseBackendBuilder<K> extends FunctionWithException<Collection<KeyedStateHandle>, AbstractKeyedStateBackend<K>, Exception> {
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation$DeltaBackendBuilder.class */
    public interface DeltaBackendBuilder<K> extends BiFunctionWithException<AbstractKeyedStateBackend<K>, Collection<ChangelogStateBackendHandle>, ChangelogKeyedStateBackend<K>, Exception> {
    }

    public static <K, T extends ChangelogStateHandle> ChangelogKeyedStateBackend<K> restore(StateChangelogHandleReader<T> stateChangelogHandleReader, ClassLoader classLoader, Collection<ChangelogStateBackendHandle> collection, BaseBackendBuilder<K> baseBackendBuilder, DeltaBackendBuilder<K> deltaBackendBuilder) throws Exception {
        ChangelogKeyedStateBackend<K> changelogKeyedStateBackend = (ChangelogKeyedStateBackend) deltaBackendBuilder.apply((AbstractKeyedStateBackend) baseBackendBuilder.apply(extractBaseState(collection)), collection);
        for (ChangelogStateBackendHandle changelogStateBackendHandle : collection) {
            if (changelogStateBackendHandle != null) {
                readBackendHandle(changelogKeyedStateBackend, changelogStateBackendHandle, stateChangelogHandleReader, classLoader);
            }
        }
        return changelogKeyedStateBackend;
    }

    private static <T extends ChangelogStateHandle> void readBackendHandle(ChangelogKeyedStateBackend<?> changelogKeyedStateBackend, ChangelogStateBackendHandle changelogStateBackendHandle, StateChangelogHandleReader<T> stateChangelogHandleReader, ClassLoader classLoader) throws Exception {
        Iterator it = changelogStateBackendHandle.getNonMaterializedStateHandles().iterator();
        while (it.hasNext()) {
            CloseableIterator changes = stateChangelogHandleReader.getChanges((ChangelogStateHandle) it.next());
            Throwable th = null;
            while (changes.hasNext()) {
                try {
                    try {
                        ChangelogBackendLogApplier.apply((StateChange) changes.next(), changelogKeyedStateBackend, classLoader);
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (changes != null) {
                        if (th != null) {
                            try {
                                changes.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            changes.close();
                        }
                    }
                    throw th2;
                }
            }
            if (changes != null) {
                if (0 != 0) {
                    try {
                        changes.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    changes.close();
                }
            }
        }
    }

    private static Collection<KeyedStateHandle> extractBaseState(Collection<ChangelogStateBackendHandle> collection) {
        Preconditions.checkNotNull(collection);
        return (Collection) collection.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getMaterializedStateHandles();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private ChangelogBackendRestoreOperation() {
    }
}
