/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog.restore;

import java.util.Collection;
import java.util.HashMap;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.restore.ChangelogBackendLogApplier;
import org.apache.flink.state.changelog.restore.StateID;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;

@Internal
public class ChangelogBackendRestoreOperation {
    public static <K, T extends ChangelogStateHandle> ChangelogKeyedStateBackend<K> restore(StateChangelogHandleReader<T> changelogHandleReader, ClassLoader classLoader, Collection<ChangelogStateBackendHandle> stateHandles, BaseBackendBuilder<K> baseBackendBuilder, DeltaBackendBuilder<K> changelogBackendBuilder) throws Exception {
        Collection<KeyedStateHandle> baseState = ChangelogBackendRestoreOperation.extractBaseState(stateHandles);
        AbstractKeyedStateBackend baseBackend = (AbstractKeyedStateBackend)baseBackendBuilder.apply(baseState);
        ChangelogKeyedStateBackend changelogBackend = (ChangelogKeyedStateBackend)changelogBackendBuilder.apply(baseBackend, stateHandles);
        for (ChangelogStateBackendHandle handle : stateHandles) {
            if (handle == null) continue;
            ChangelogBackendRestoreOperation.readBackendHandle(changelogBackend, handle, changelogHandleReader, classLoader);
        }
        return changelogBackend;
    }

    private static <T extends ChangelogStateHandle> void readBackendHandle(ChangelogKeyedStateBackend<?> backend, ChangelogStateBackendHandle backendHandle, StateChangelogHandleReader<T> changelogHandleReader, ClassLoader classLoader) throws Exception {
        HashMap<Short, StateID> stateIds = new HashMap<Short, StateID>();
        for (ChangelogStateHandle changelogHandle : backendHandle.getNonMaterializedStateHandles()) {
            CloseableIterator changes = changelogHandleReader.getChanges(changelogHandle);
            Throwable throwable = null;
            try {
                while (changes.hasNext()) {
                    ChangelogBackendLogApplier.apply((StateChange)changes.next(), backend, classLoader, stateIds);
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (changes == null) continue;
                if (throwable != null) {
                    try {
                        changes.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                changes.close();
            }
        }
    }

    private static Collection<KeyedStateHandle> extractBaseState(Collection<ChangelogStateBackendHandle> stateHandles) {
        Preconditions.checkNotNull(stateHandles);
        return stateHandles.stream().filter(Objects::nonNull).map(ChangelogStateBackendHandle::getMaterializedStateHandles).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private ChangelogBackendRestoreOperation() {
    }

    @FunctionalInterface
    public static interface DeltaBackendBuilder<K>
    extends BiFunctionWithException<AbstractKeyedStateBackend<K>, Collection<ChangelogStateBackendHandle>, ChangelogKeyedStateBackend<K>, Exception> {
    }

    @FunctionalInterface
    public static interface BaseBackendBuilder<K>
    extends FunctionWithException<Collection<KeyedStateHandle>, AbstractKeyedStateBackend<K>, Exception> {
    }
}

