package org.apache.flink.streaming.api.runners.python.beam.state;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.common.base.Charsets;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.utils.ByteArrayWrapper;

/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/state/BeamStateRequestHandler.class */
public class BeamStateRequestHandler implements StateRequestHandler {
    private final BeamStateStore keyedStateStore;
    private final BeamStateStore operatorStateStore;
    private final BeamStateHandler<ListState<byte[]>> bagStateHandler;
    private final BeamStateHandler<MapState<ByteArrayWrapper, byte[]>> mapStateHandler;
    private final BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = createCacheToken();
    static final /* synthetic */ boolean $assertionsDisabled;

    public BeamStateRequestHandler(BeamStateStore beamStateStore, BeamStateStore beamStateStore2, BeamStateHandler<ListState<byte[]>> beamStateHandler, BeamStateHandler<MapState<ByteArrayWrapper, byte[]>> beamStateHandler2) {
        this.keyedStateStore = beamStateStore;
        this.operatorStateStore = beamStateStore2;
        this.bagStateHandler = beamStateHandler;
        this.mapStateHandler = beamStateHandler2;
    }

    @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
    public CompletionStage<BeamFnApi.StateResponse.Builder> handle(BeamFnApi.StateRequest stateRequest) throws Exception {
        BeamFnApi.StateKey.TypeCase typeCase = stateRequest.getStateKey().getTypeCase();
        switch (typeCase) {
            case BAG_USER_STATE:
                return CompletableFuture.completedFuture(this.bagStateHandler.handle(stateRequest, this.keyedStateStore.getListState(stateRequest)));
            case MULTIMAP_SIDE_INPUT:
                return CompletableFuture.completedFuture(this.mapStateHandler.handle(stateRequest, this.keyedStateStore.getMapState(stateRequest)));
            case MULTIMAP_KEYS_SIDE_INPUT:
                return CompletableFuture.completedFuture(this.mapStateHandler.handle(stateRequest, this.operatorStateStore.getMapState(stateRequest)));
            default:
                throw new RuntimeException("Unsupported state type: " + typeCase);
        }
    }

    @Override // org.apache.beam.runners.fnexecution.state.StateRequestHandler
    public Iterable<BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
        return Collections.singleton(this.cacheToken);
    }

    private BeamFnApi.ProcessBundleRequest.CacheToken createCacheToken() {
        return BeamFnApi.ProcessBundleRequest.CacheToken.newBuilder().setUserState(BeamFnApi.ProcessBundleRequest.CacheToken.UserState.getDefaultInstance()).setToken(ByteString.copyFrom(UUID.randomUUID().toString().getBytes(Charsets.UTF_8))).build();
    }

    public static BeamStateRequestHandler of(@Nullable KeyedStateBackend<?> keyedStateBackend, @Nullable OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> typeSerializer, @Nullable TypeSerializer<?> typeSerializer2, ReadableConfig readableConfig) {
        BeamStateStore unsupported = BeamStateStore.unsupported();
        if (keyedStateBackend != null) {
            if (!$assertionsDisabled && typeSerializer == null) {
                throw new AssertionError();
            }
            unsupported = new BeamKeyedStateStore(keyedStateBackend, typeSerializer, typeSerializer2);
        }
        BeamStateStore unsupported2 = BeamStateStore.unsupported();
        if (operatorStateBackend != null) {
            unsupported2 = new BeamOperatorStateStore(operatorStateBackend);
        }
        return new BeamStateRequestHandler(unsupported, unsupported2, new BeamBagStateHandler(typeSerializer2), new BeamMapStateHandler(readableConfig));
    }

    static {
        $assertionsDisabled = !BeamStateRequestHandler.class.desiredAssertionStatus();
    }
}
