package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.class */
public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, SessionId> extends MessageAcknowledgingSourceBase<Type, UId> {
    private static final long serialVersionUID = 42;
    private static final Logger LOG = LoggerFactory.getLogger(MultipleIdsMessageAcknowledgingSourceBase.class);
    protected transient Deque<Tuple2<Long, List<SessionId>>> sessionIdsPerSnapshot;
    protected transient List<SessionId> sessionIds;

    protected MultipleIdsMessageAcknowledgingSourceBase(Class<UId> cls) {
        super(cls);
    }

    protected MultipleIdsMessageAcknowledgingSourceBase(TypeInformation<UId> typeInformation) {
        super(typeInformation);
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.sessionIds = new ArrayList(64);
        this.sessionIdsPerSnapshot = new ArrayDeque();
    }

    @Override // org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase
    public void close() throws Exception {
        super.close();
        this.sessionIds.clear();
        this.sessionIdsPerSnapshot.clear();
    }

    @Override // org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase
    protected final void acknowledgeIDs(long j, Set<UId> set) {
        LOG.debug("Acknowledging ids for checkpoint {}", Long.valueOf(j));
        Iterator<Tuple2<Long, List<SessionId>>> it = this.sessionIdsPerSnapshot.iterator();
        while (it.hasNext()) {
            Tuple2<Long, List<SessionId>> next = it.next();
            if (((Long) next.f0).longValue() <= j) {
                acknowledgeSessionIDs((List) next.f1);
                it.remove();
            }
        }
    }

    protected abstract void acknowledgeSessionIDs(List<SessionId> list);

    @Override // org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase, org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.sessionIdsPerSnapshot.add(new Tuple2<>(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.sessionIds));
        this.sessionIds = new ArrayList(64);
        super.snapshotState(functionSnapshotContext);
    }
}
