package org.apache.flink.state.changelog;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestableKeyedStateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.state.changelog.restore.ChangelogRestoreTarget;
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend.class */
public class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K>, InternalCheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
    private static final CheckpointOptions CHECKPOINT_OPTIONS = new CheckpointOptions(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault());
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final Map<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private final ChangelogStateFactory changelogStateFactory;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter;
    private final Closer closer;
    private final CheckpointStreamFactory streamFactory;
    private ChangelogSnapshotState changelogSnapshotState;
    private long lastCheckpointId;
    private long materializedId;
    private InternalKvState lastState;
    private String lastName;
    private final FunctionDelegationHelper functionDelegationHelper;

    @Nullable
    private SequenceNumber lastUploadedFrom;

    @Nullable
    private SequenceNumber lastUploadedTo;
    private final String subtaskName;
    private short lastCreatedStateId;
    private final NavigableMap<Long, Long> materializationIdByCheckpointId;
    private long lastConfirmedMaterializationId;
    private final ChangelogTruncateHelper changelogTruncateHelper;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyedStateBackend$ChangelogSnapshotState.class */
    public static class ChangelogSnapshotState {
        private final List<KeyedStateHandle> materializedSnapshot;
        private final SequenceNumber materializedTo;
        private final List<ChangelogStateHandle> restoredNonMaterialized;
        private final long materializationID;

        public ChangelogSnapshotState(List<KeyedStateHandle> list, List<ChangelogStateHandle> list2, SequenceNumber sequenceNumber, long j) {
            this.materializedSnapshot = Collections.unmodifiableList(list);
            this.restoredNonMaterialized = Collections.unmodifiableList(list2);
            this.materializedTo = sequenceNumber;
            this.materializationID = j;
        }

        public List<KeyedStateHandle> getMaterializedSnapshot() {
            return this.materializedSnapshot;
        }

        public SequenceNumber lastMaterializedTo() {
            return this.materializedTo;
        }

        public List<ChangelogStateHandle> getRestoredNonMaterialized() {
            return this.restoredNonMaterialized;
        }

        public long getMaterializationID() {
            return this.materializationID;
        }
    }

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, String str, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> collection, CheckpointStorageWorkerView checkpointStorageWorkerView) {
        this(abstractKeyedStateBackend, str, executionConfig, ttlTimeProvider, stateChangelogWriter, collection, checkpointStorageWorkerView, new ChangelogStateFactory());
    }

    public ChangelogKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, String str, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> collection, final CheckpointStorageWorkerView checkpointStorageWorkerView, ChangelogStateFactory changelogStateFactory) {
        this.closer = Closer.create();
        this.lastCheckpointId = -1L;
        this.materializedId = 0L;
        this.functionDelegationHelper = new FunctionDelegationHelper();
        this.lastCreatedStateId = (short) -1;
        this.materializationIdByCheckpointId = new TreeMap();
        this.lastConfirmedMaterializationId = -1L;
        this.keyedStateBackend = abstractKeyedStateBackend;
        this.subtaskName = str;
        this.executionConfig = executionConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.keyValueStatesByName = new HashMap();
        this.changelogStateFactory = changelogStateFactory;
        this.stateChangelogWriter = stateChangelogWriter;
        this.lastUploadedTo = stateChangelogWriter.initialSequenceNumber();
        this.closer.register(() -> {
            stateChangelogWriter.truncateAndClose(this.lastUploadedTo);
        });
        this.changelogSnapshotState = completeRestore(collection);
        this.streamFactory = new CheckpointStreamFactory() { // from class: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.1
            public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope checkpointedStateScope) throws IOException {
                return checkpointStorageWorkerView.createTaskOwnedStateStream();
            }

            public boolean canFastDuplicate(StreamStateHandle streamStateHandle, CheckpointedStateScope checkpointedStateScope) throws IOException {
                return false;
            }

            public List<StreamStateHandle> duplicate(List<StreamStateHandle> list, CheckpointedStateScope checkpointedStateScope) throws IOException {
                return null;
            }
        };
        this.closer.register(abstractKeyedStateBackend);
        this.changelogTruncateHelper = new ChangelogTruncateHelper(stateChangelogWriter);
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    public void close() throws IOException {
        this.closer.close();
    }

    public void setCurrentKey(K k) {
        this.keyedStateBackend.setCurrentKey(k);
    }

    public K getCurrentKey() {
        return (K) this.keyedStateBackend.getCurrentKey();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keyedStateBackend.getKeySerializer();
    }

    public <N> Stream<K> getKeys(String str, N n) {
        return this.keyedStateBackend.getKeys(str, n);
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String str) {
        return this.keyedStateBackend.getKeysAndNamespaces(str);
    }

    public void dispose() {
        this.keyedStateBackend.dispose();
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
        this.changelogStateFactory.dispose();
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        this.keyedStateBackend.registerKeySelectionListener(keySelectionListener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        return this.keyedStateBackend.deregisterKeySelectionListener(keySelectionListener);
    }

    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        this.keyedStateBackend.applyToAllKeys(n, typeSerializer, stateDescriptor, keyedStateFunction, this::getPartitionedState);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(n, "Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(n);
            return this.lastState;
        }
        InternalKvState<K, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState != null) {
            this.lastState = internalKvState;
            this.lastState.setCurrentNamespace(n);
            this.lastName = stateDescriptor.getName();
            this.functionDelegationHelper.addOrUpdate(stateDescriptor);
            return internalKvState;
        }
        InternalKvState orCreateKeyedState = getOrCreateKeyedState(typeSerializer, stateDescriptor);
        InternalKvState internalKvState2 = orCreateKeyedState;
        this.lastName = stateDescriptor.getName();
        this.lastState = internalKvState2;
        internalKvState2.setCurrentNamespace(n);
        return orCreateKeyedState;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        this.lastCheckpointId = j;
        this.lastUploadedFrom = this.changelogSnapshotState.lastMaterializedTo();
        this.lastUploadedTo = this.stateChangelogWriter.nextSequenceNumber();
        this.changelogTruncateHelper.checkpoint(j, this.lastUploadedTo);
        LOG.info("snapshot of {} for checkpoint {}, change range: {}..{}", new Object[]{this.subtaskName, Long.valueOf(j), this.lastUploadedFrom, this.lastUploadedTo});
        ChangelogSnapshotState changelogSnapshotState = this.changelogSnapshotState;
        this.materializationIdByCheckpointId.put(Long.valueOf(j), Long.valueOf(changelogSnapshotState.materializationID));
        return toRunnableFuture(this.stateChangelogWriter.persist(this.lastUploadedFrom).thenApply(changelogStateHandle -> {
            return buildSnapshotResult(j, changelogStateHandle, changelogSnapshotState);
        }));
    }

    private SnapshotResult<KeyedStateHandle> buildSnapshotResult(long j, ChangelogStateHandle changelogStateHandle, ChangelogSnapshotState changelogSnapshotState) {
        ArrayList arrayList = new ArrayList(changelogSnapshotState.getRestoredNonMaterialized());
        long j2 = 0;
        if (changelogStateHandle != null && changelogStateHandle.getStateSize() > 0) {
            arrayList.add(changelogStateHandle);
            j2 = 0 + changelogStateHandle.getCheckpointedSize();
        }
        return (arrayList.isEmpty() && changelogSnapshotState.getMaterializedSnapshot().isEmpty()) ? SnapshotResult.empty() : SnapshotResult.of(new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(changelogSnapshotState.getMaterializedSnapshot(), arrayList, getKeyGroupRange(), j, changelogSnapshotState.materializationID, j2));
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        ChangelogKeyGroupedPriorityQueue changelogKeyGroupedPriorityQueue = (ChangelogKeyGroupedPriorityQueue) this.changelogStateFactory.getExistingState(str, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        if (changelogKeyGroupedPriorityQueue == null) {
            InternalKeyContext keyContext = this.keyedStateBackend.getKeyContext();
            StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter = this.stateChangelogWriter;
            RegisteredPriorityQueueStateBackendMetaInfo registeredPriorityQueueStateBackendMetaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(str, typeSerializer);
            short s = (short) (this.lastCreatedStateId + 1);
            this.lastCreatedStateId = s;
            PriorityQueueStateChangeLoggerImpl priorityQueueStateChangeLoggerImpl = new PriorityQueueStateChangeLoggerImpl(typeSerializer, keyContext, stateChangelogWriter, registeredPriorityQueueStateBackendMetaInfo, s);
            this.closer.register(priorityQueueStateChangeLoggerImpl);
            changelogKeyGroupedPriorityQueue = this.changelogStateFactory.create(str, this.keyedStateBackend.create(str, typeSerializer), priorityQueueStateChangeLoggerImpl, typeSerializer);
        }
        return changelogKeyGroupedPriorityQueue;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries() {
        return this.keyedStateBackend.numKeyValueStateEntries();
    }

    public boolean isSafeToReuseKVState() {
        return this.keyedStateBackend.isSafeToReuseKVState();
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        return this.keyedStateBackend.savepoint();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.lastCheckpointId == j) {
            this.stateChangelogWriter.confirm(this.lastUploadedFrom, this.lastUploadedTo);
        }
        Long l = (Long) this.materializationIdByCheckpointId.remove(Long.valueOf(j));
        if (l != null && l.longValue() > this.lastConfirmedMaterializationId) {
            this.keyedStateBackend.notifyCheckpointComplete(l.longValue());
            this.lastConfirmedMaterializationId = l.longValue();
        }
        this.materializationIdByCheckpointId.headMap(Long.valueOf(j), true).clear();
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        if (this.lastCheckpointId == j) {
            this.stateChangelogWriter.reset(this.lastUploadedFrom, this.lastUploadedTo);
        }
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        Preconditions.checkNotNull(getKeySerializer(), "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState<K, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            internalKvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(typeSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.keyedStateBackend.getLatencyTrackingStateConfig());
            this.keyValueStatesByName.put(stateDescriptor.getName(), internalKvState);
            this.keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, internalKvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return internalKvState;
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
    @Nonnull
    public State createInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(stateDescriptor.getType(), stateDescriptor.getName(), typeSerializer, stateDescriptor.getSerializer(), stateSnapshotTransformFactory);
        InternalKvState createInternalState = this.keyedStateBackend.createInternalState(typeSerializer, stateDescriptor, stateSnapshotTransformFactory);
        TypeSerializer keySerializer = createInternalState.getKeySerializer();
        TypeSerializer namespaceSerializer = createInternalState.getNamespaceSerializer();
        TypeSerializer valueSerializer = createInternalState.getValueSerializer();
        InternalKeyContext keyContext = this.keyedStateBackend.getKeyContext();
        StateChangelogWriter<? extends ChangelogStateHandle> stateChangelogWriter = this.stateChangelogWriter;
        StateTtlConfig ttlConfig = stateDescriptor.getTtlConfig();
        Object defaultValue = stateDescriptor.getDefaultValue();
        short s = (short) (this.lastCreatedStateId + 1);
        this.lastCreatedStateId = s;
        KvStateChangeLoggerImpl kvStateChangeLoggerImpl = new KvStateChangeLoggerImpl(keySerializer, namespaceSerializer, valueSerializer, keyContext, stateChangelogWriter, registeredKeyValueStateBackendMetaInfo, ttlConfig, defaultValue, s);
        this.closer.register(kvStateChangeLoggerImpl);
        return this.changelogStateFactory.create(stateDescriptor, createInternalState, (KvStateChangeLogger) kvStateChangeLoggerImpl, (InternalKeyContext) this.keyedStateBackend);
    }

    public void registerCloseable(@Nullable Closeable closeable) {
        this.closer.register(closeable);
    }

    private ChangelogSnapshotState completeRestore(Collection<ChangelogStateBackendHandle> collection) {
        long j = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ChangelogStateBackendHandle changelogStateBackendHandle : collection) {
            if (changelogStateBackendHandle != null) {
                arrayList.addAll(changelogStateBackendHandle.getMaterializedStateHandles());
                arrayList2.addAll(changelogStateBackendHandle.getNonMaterializedStateHandles());
                j = Math.max(j, changelogStateBackendHandle.getMaterializationID());
            }
        }
        this.materializedId = j + 1;
        return new ChangelogSnapshotState(arrayList, arrayList2, this.stateChangelogWriter.initialSequenceNumber(), j);
    }

    /*  JADX ERROR: Failed to decode insn: 0x0038: MOVE_MULTI, method: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.initMaterialization():java.util.Optional<org.apache.flink.state.changelog.PeriodicMaterializationManager$MaterializationRunnable>
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[10]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public java.util.Optional<org.apache.flink.state.changelog.PeriodicMaterializationManager.MaterializationRunnable> initMaterialization() throws java.lang.Exception {
        /*
            r10 = this;
            r0 = r10
            org.apache.flink.runtime.state.changelog.StateChangelogWriter<? extends org.apache.flink.runtime.state.changelog.ChangelogStateHandle> r0 = r0.stateChangelogWriter
            org.apache.flink.runtime.state.changelog.SequenceNumber r0 = r0.nextSequenceNumber()
            r11 = r0
            r0 = r10
            org.apache.flink.state.changelog.ChangelogKeyedStateBackend$ChangelogSnapshotState r0 = r0.changelogSnapshotState
            org.apache.flink.runtime.state.changelog.SequenceNumber r0 = r0.lastMaterializedTo()
            r12 = r0
            org.slf4j.Logger r0 = org.apache.flink.state.changelog.ChangelogKeyedStateBackend.LOG
            java.lang.String r1 = "Initialize Materialization. Current changelog writers last append to sequence number {}"
            r2 = r11
            r0.info(r1, r2)
            r0 = r11
            r1 = r12
            int r0 = r0.compareTo(r1)
            if (r0 <= 0) goto L69
            org.slf4j.Logger r0 = org.apache.flink.state.changelog.ChangelogKeyedStateBackend.LOG
            java.lang.String r1 = "Starting materialization from {} : {}"
            r2 = r12
            r3 = r11
            r0.info(r1, r2, r3)
            r0 = r10
            r1 = r0
            long r1 = r1.materializedId
            // decode failed: arraycopy: source index -1 out of bounds for object array[10]
            r2 = 1
            long r1 = r1 + r2
            r0.materializedId = r1
            r13 = r-1
            org.apache.flink.state.changelog.PeriodicMaterializationManager$MaterializationRunnable r-1 = new org.apache.flink.state.changelog.PeriodicMaterializationManager$MaterializationRunnable
            r0 = r-1
            r1 = r10
            org.apache.flink.runtime.state.AbstractKeyedStateBackend<K> r1 = r1.keyedStateBackend
            r2 = r13
            long r3 = java.lang.System.currentTimeMillis()
            r4 = r10
            org.apache.flink.runtime.state.CheckpointStreamFactory r4 = r4.streamFactory
            org.apache.flink.runtime.checkpoint.CheckpointOptions r5 = org.apache.flink.state.changelog.ChangelogKeyedStateBackend.CHECKPOINT_OPTIONS
            java.util.concurrent.RunnableFuture r1 = r1.snapshot(r2, r3, r4, r5)
            r2 = r13
            r3 = r11
            r0.<init>(r1, r2, r3)
            r15 = r-1
            r-1 = r10
            org.apache.flink.state.changelog.ChangelogStateFactory r-1 = r-1.changelogStateFactory
            r-1.resetAllWritingMetaFlags()
            r-1 = r15
            java.util.Optional.of(r-1)
            return r-1
            org.slf4j.Logger r0 = org.apache.flink.state.changelog.ChangelogKeyedStateBackend.LOG
            java.lang.String r1 = "Skip materialization, last materialized to {} : last log to {}"
            r2 = r12
            r3 = r11
            r0.debug(r1, r2, r3)
            java.util.Optional r0 = java.util.Optional.empty()
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.initMaterialization():java.util.Optional");
    }

    public void updateChangelogSnapshotState(SnapshotResult<KeyedStateHandle> snapshotResult, long j, SequenceNumber sequenceNumber) {
        LOG.info("Task {} finishes materialization, updates the snapshotState upTo {} : {}", new Object[]{this.subtaskName, sequenceNumber, snapshotResult});
        this.changelogSnapshotState = new ChangelogSnapshotState(getMaterializedResult(snapshotResult), Collections.emptyList(), sequenceNumber, j);
        this.changelogTruncateHelper.materialized(sequenceNumber);
    }

    private List<KeyedStateHandle> getMaterializedResult(@Nonnull SnapshotResult<KeyedStateHandle> snapshotResult) {
        KeyedStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();
        return jobManagerOwnedSnapshot == null ? Collections.emptyList() : Collections.singletonList(jobManagerOwnedSnapshot);
    }

    public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean z) {
        return this.keyedStateBackend.getDelegatedKeyedStateBackend(z);
    }

    public void notifyCheckpointSubsumed(long j) throws Exception {
        this.changelogTruncateHelper.checkpointSubsumed(j);
    }

    public ChangelogRestoreTarget<K> getChangelogRestoreTarget() {
        return new ChangelogRestoreTarget<K>() { // from class: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.2
            @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
            public KeyGroupRange getKeyGroupRange() {
                return ChangelogKeyedStateBackend.this.getKeyGroupRange();
            }

            @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
            public <N, S extends State, V> S createKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
                return (S) ChangelogKeyedStateBackend.this.getOrCreateKeyedState(typeSerializer, stateDescriptor);
            }

            @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
            @Nonnull
            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> createPqState(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
                return ChangelogKeyedStateBackend.this.create(str, typeSerializer);
            }

            @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
            public ChangelogState getExistingState(String str, StateMetaInfoSnapshot.BackendStateType backendStateType) {
                return ChangelogKeyedStateBackend.this.changelogStateFactory.getExistingState(str, backendStateType);
            }

            @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
            public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
                return ChangelogKeyedStateBackend.this;
            }
        };
    }

    private static <T> RunnableFuture<T> toRunnableFuture(final CompletableFuture<T> completableFuture) {
        return new RunnableFuture<T>() { // from class: org.apache.flink.state.changelog.ChangelogKeyedStateBackend.3
            @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
            public void run() {
                completableFuture.join();
            }

            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return completableFuture.cancel(z);
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return completableFuture.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return completableFuture.isDone();
            }

            @Override // java.util.concurrent.Future
            public T get() throws InterruptedException, ExecutionException {
                return (T) completableFuture.get();
            }

            @Override // java.util.concurrent.Future
            public T get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return (T) completableFuture.get(j, timeUnit);
            }
        };
    }

    @VisibleForTesting
    StateChangelogWriter<? extends ChangelogStateHandle> getChangelogWriter() {
        return this.stateChangelogWriter;
    }
}
