package org.apache.flink.queryablestate.server;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.query.KvStateEntry;
import org.apache.flink.runtime.query.KvStateInfo;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.Preconditions;

@Internal
@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/flink/queryablestate/server/KvStateServerHandler.class */
public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
    private final KvStateRegistry registry;

    public KvStateServerHandler(KvStateServerImpl kvStateServerImpl, KvStateRegistry kvStateRegistry, MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer, KvStateRequestStats kvStateRequestStats) {
        super(kvStateServerImpl, messageSerializer, kvStateRequestStats);
        this.registry = (KvStateRegistry) Preconditions.checkNotNull(kvStateRegistry);
    }

    public CompletableFuture<KvStateResponse> handleRequest(long j, KvStateInternalRequest kvStateInternalRequest) {
        CompletableFuture<KvStateResponse> completableFuture = new CompletableFuture<>();
        try {
            KvStateEntry kvState = this.registry.getKvState(kvStateInternalRequest.getKvStateId());
            if (kvState == null) {
                completableFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), kvStateInternalRequest.getKvStateId()));
            } else {
                byte[] serializedValue = getSerializedValue(kvState, kvStateInternalRequest.getSerializedKeyAndNamespace());
                if (serializedValue != null) {
                    completableFuture.complete(new KvStateResponse(serializedValue));
                } else {
                    completableFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
                }
            }
            return completableFuture;
        } catch (Throwable th) {
            completableFuture.completeExceptionally(new RuntimeException("Error while processing request with ID " + j + ". Caused by: " + ExceptionUtils.stringifyException(th)));
            return completableFuture;
        }
    }

    private static <K, N, V> byte[] getSerializedValue(KvStateEntry<K, N, V> kvStateEntry, byte[] bArr) throws Exception {
        return (byte[]) LambdaUtil.withContextClassLoader(kvStateEntry.getUserClassLoader(), () -> {
            InternalKvState state = kvStateEntry.getState();
            KvStateInfo infoForCurrentThread = kvStateEntry.getInfoForCurrentThread();
            return state.getSerializedValue(bArr, infoForCurrentThread.getKeySerializer(), infoForCurrentThread.getNamespaceSerializer(), infoForCurrentThread.getStateValueSerializer());
        });
    }

    public CompletableFuture<Void> shutdown() {
        return CompletableFuture.completedFuture(null);
    }
}
