package org.apache.flink.queryablestate.client.proxy;

import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
import org.apache.flink.queryablestate.exceptions.UnknownLocationException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.jobmaster.KvStateLocationOracle;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.class */
public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
    private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
    private final KvStateClientProxy proxy;
    private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache;
    private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;

    public KvStateClientProxyHandler(KvStateClientProxyImpl kvStateClientProxyImpl, int i, MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer, KvStateRequestStats kvStateRequestStats) {
        super(kvStateClientProxyImpl, messageSerializer, kvStateRequestStats);
        this.lookupCache = new ConcurrentHashMap();
        this.proxy = (KvStateClientProxy) Preconditions.checkNotNull(kvStateClientProxyImpl);
        this.kvStateClient = createInternalClient(i);
    }

    private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int i) {
        return new Client<>("Queryable State Proxy Client", i, new MessageSerializer(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer()), new DisabledKvStateRequestStats());
    }

    public CompletableFuture<KvStateResponse> handleRequest(long j, KvStateRequest kvStateRequest) {
        CompletableFuture<KvStateResponse> completableFuture = new CompletableFuture<>();
        executeActionAsync(completableFuture, kvStateRequest, false);
        return completableFuture;
    }

    private void executeActionAsync(CompletableFuture<KvStateResponse> completableFuture, KvStateRequest kvStateRequest, boolean z) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<KvStateResponse> state = getState(kvStateRequest, z);
        state.whenCompleteAsync((kvStateResponse, th) -> {
            if (th == null) {
                completableFuture.complete(kvStateResponse);
                return;
            }
            if (!(th.getCause() instanceof UnknownKvStateIdException) && !(th.getCause() instanceof UnknownKvStateKeyGroupLocationException) && !(th.getCause() instanceof ConnectException)) {
                completableFuture.completeExceptionally(th);
            } else {
                LOG.debug("Retrying after failing to retrieve state due to: {}.", th.getCause().getMessage());
                executeActionAsync(completableFuture, kvStateRequest, true);
            }
        }, (Executor) this.queryExecutor);
        completableFuture.whenComplete((kvStateResponse2, th2) -> {
            state.cancel(false);
        });
    }

    private CompletableFuture<KvStateResponse> getState(KvStateRequest kvStateRequest, boolean z) {
        return getKvStateLookupInfo(kvStateRequest.getJobId(), kvStateRequest.getStateName(), z).thenComposeAsync(kvStateLocation -> {
            int computeKeyGroupForKeyHash = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(kvStateRequest.getKeyHashCode(), kvStateLocation.getNumKeyGroups());
            InetSocketAddress kvStateServerAddress = kvStateLocation.getKvStateServerAddress(computeKeyGroupForKeyHash);
            if (kvStateServerAddress == null) {
                return FutureUtils.completedExceptionally(new UnknownKvStateKeyGroupLocationException(getServerName()));
            }
            return this.kvStateClient.sendRequest(kvStateServerAddress, new KvStateInternalRequest(kvStateLocation.getKvStateID(computeKeyGroupForKeyHash), kvStateRequest.getSerializedKeyAndNamespace()));
        }, (Executor) this.queryExecutor);
    }

    private CompletableFuture<KvStateLocation> getKvStateLookupInfo(JobID jobID, String str, boolean z) {
        Tuple2<JobID, String> tuple2 = new Tuple2<>(jobID, str);
        CompletableFuture<KvStateLocation> completableFuture = this.lookupCache.get(tuple2);
        if (!z && completableFuture != null && !completableFuture.isCompletedExceptionally()) {
            LOG.debug("Retrieving location for state={} of job={} from the cache.", str, jobID);
            return completableFuture;
        }
        KvStateLocationOracle kvStateLocationOracle = this.proxy.getKvStateLocationOracle(jobID);
        if (kvStateLocationOracle == null) {
            return FutureUtils.completedExceptionally(new UnknownLocationException("Could not retrieve location of state=" + str + " of job=" + jobID + ". Potential reasons are: i) the state is not ready, or ii) the job does not exist."));
        }
        LOG.debug("Retrieving location for state={} of job={} from the key-value state location oracle.", str, jobID);
        CompletableFuture<KvStateLocation> completableFuture2 = new CompletableFuture<>();
        this.lookupCache.put(tuple2, completableFuture2);
        kvStateLocationOracle.requestKvStateLocation(jobID, str).whenComplete((kvStateLocation, th) -> {
            if (th == null) {
                completableFuture2.complete(kvStateLocation);
                return;
            }
            if (ExceptionUtils.stripCompletionException(th) instanceof FlinkJobNotFoundException) {
                this.lookupCache.remove(tuple2);
            }
            completableFuture2.completeExceptionally(th);
        });
        return completableFuture2;
    }

    public CompletableFuture<Void> shutdown() {
        return this.kvStateClient.shutdown();
    }
}
