package org.apache.flink.streaming.api.runners.python.beam.state;

import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/state/BeamBagStateHandler.class */
public class BeamBagStateHandler extends AbstractBeamStateHandler<ListState<byte[]>> {
    private static final String MERGE_NAMESPACES_MARK = "merge_namespaces";

    @Nullable
    private final TypeSerializer<?> namespaceSerializer;
    private final ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
    private final DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(this.bais);

    public BeamBagStateHandler(@Nullable TypeSerializer<?> typeSerializer) {
        this.namespaceSerializer = typeSerializer;
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleGet(BeamFnApi.StateRequest stateRequest, ListState<byte[]> listState) throws Exception {
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setGet(BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(convertToByteString(listState))));
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleAppend(BeamFnApi.StateRequest stateRequest, ListState<byte[]> listState) throws Exception {
        if (stateRequest.getStateKey().getBagUserState().getTransformId().equals(MERGE_NAMESPACES_MARK)) {
            Preconditions.checkNotNull(this.namespaceSerializer);
            byte[] byteArray = stateRequest.getAppend().getData().toByteArray();
            this.bais.setBuffer(byteArray, 0, byteArray.length);
            int readInt = this.baisWrapper.readInt();
            HashSet hashSet = new HashSet();
            for (int i = 0; i < readInt; i++) {
                hashSet.add(this.namespaceSerializer.deserialize(this.baisWrapper));
            }
            byte[] byteArray2 = stateRequest.getStateKey().getBagUserState().getWindow().toByteArray();
            this.bais.setBuffer(byteArray2, 0, byteArray2.length);
            ((InternalListState) listState).mergeNamespaces(this.namespaceSerializer.deserialize(this.baisWrapper), hashSet);
        } else {
            listState.add(stateRequest.getAppend().getData().toByteArray());
        }
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance());
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleClear(BeamFnApi.StateRequest stateRequest, ListState<byte[]> listState) throws Exception {
        listState.clear();
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance());
    }

    private static List<ByteString> convertToByteString(ListState<byte[]> listState) throws Exception {
        LinkedList linkedList = new LinkedList();
        if (listState.get() == null) {
            return linkedList;
        }
        Iterator it = ((Iterable) listState.get()).iterator();
        while (it.hasNext()) {
            linkedList.add(ByteString.copyFrom((byte[]) it.next()));
        }
        return linkedList;
    }
}
