package org.apache.flink.state.changelog;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
import org.apache.flink.state.changelog.restore.StateChangeApplier;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.class */
public class ChangelogKeyGroupedPriorityQueue<T> implements KeyGroupedInternalPriorityQueue<T>, ChangelogState {
    private KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
    private final StateChangeLogger<T, Void> logger;
    private final TypeSerializer<T> serializer;

    public ChangelogKeyGroupedPriorityQueue(KeyGroupedInternalPriorityQueue<T> keyGroupedInternalPriorityQueue, StateChangeLogger<T, Void> stateChangeLogger, TypeSerializer<T> typeSerializer) {
        this.delegatedPriorityQueue = (KeyGroupedInternalPriorityQueue) Preconditions.checkNotNull(keyGroupedInternalPriorityQueue);
        this.logger = (StateChangeLogger) Preconditions.checkNotNull(stateChangeLogger);
        this.serializer = typeSerializer;
    }

    public Set<T> getSubsetForKeyGroup(int i) {
        return this.delegatedPriorityQueue.getSubsetForKeyGroup(i);
    }

    @Nullable
    public T poll() {
        T t = (T) this.delegatedPriorityQueue.poll();
        logRemoval(t);
        return t;
    }

    @Nullable
    public T peek() {
        return (T) this.delegatedPriorityQueue.peek();
    }

    public boolean add(T t) {
        boolean add = this.delegatedPriorityQueue.add(t);
        logAddition(Collections.singletonList(t));
        return add;
    }

    public boolean remove(T t) {
        boolean remove = this.delegatedPriorityQueue.remove(t);
        logRemoval(t);
        return remove;
    }

    public boolean isEmpty() {
        return this.delegatedPriorityQueue.isEmpty();
    }

    public int size() {
        return this.delegatedPriorityQueue.size();
    }

    public void addAll(@Nullable Collection<? extends T> collection) {
        this.delegatedPriorityQueue.addAll(collection);
        logAddition(collection);
    }

    private void logAddition(Collection<? extends T> collection) {
        try {
            this.logger.valueElementAdded(dataOutputViewStreamWrapper -> {
                dataOutputViewStreamWrapper.writeInt(collection.size());
                Iterator it = collection.iterator();
                while (it.hasNext()) {
                    this.serializer.serialize(it.next(), dataOutputViewStreamWrapper);
                }
            }, null);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e);
        }
    }

    @Nonnull
    public CloseableIterator<T> iterator() {
        CloseableIterator it = this.delegatedPriorityQueue.iterator();
        StateChangeLogger<T, Void> stateChangeLogger = this.logger;
        TypeSerializer<T> typeSerializer = this.serializer;
        typeSerializer.getClass();
        return StateChangeLoggingIterator.create(it, stateChangeLogger, (v1, v2) -> {
            r2.serialize(v1, v2);
        }, null);
    }

    @Override // org.apache.flink.state.changelog.ChangelogState
    public StateChangeApplier getChangeApplier(ChangelogApplierFactory changelogApplierFactory) {
        return changelogApplierFactory.forPriorityQueue(this.delegatedPriorityQueue, this.serializer);
    }

    @Override // org.apache.flink.state.changelog.ChangelogState
    public <IS> void setDelegatedState(IS is) {
        this.delegatedPriorityQueue = (KeyGroupedInternalPriorityQueue) Preconditions.checkNotNull(is);
    }

    public StateChangeLogger<T, Void> getStateChangeLogger() {
        return this.logger;
    }

    @Override // org.apache.flink.state.changelog.ChangelogState
    public void resetWritingMetaFlag() {
        this.logger.resetWritingMetaFlag();
    }

    private void logRemoval(T t) {
        try {
            this.logger.valueElementRemoved(dataOutputViewStreamWrapper -> {
                this.serializer.serialize(t, dataOutputViewStreamWrapper);
            }, null);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e);
        }
    }
}
