package org.apache.flink.queryablestate.server;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/queryablestate/server/KvStateServerImpl.class */
public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
    private final KvStateRegistry kvStateRegistry;
    private final KvStateRequestStats stats;
    private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;

    public KvStateServerImpl(String str, Iterator<Integer> it, Integer num, Integer num2, KvStateRegistry kvStateRegistry, KvStateRequestStats kvStateRequestStats) {
        super("Queryable State Server", str, it, num, num2);
        this.stats = (KvStateRequestStats) Preconditions.checkNotNull(kvStateRequestStats);
        this.kvStateRegistry = (KvStateRegistry) Preconditions.checkNotNull(kvStateRegistry);
    }

    public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
        this.serializer = new MessageSerializer<>(new KvStateInternalRequest.KvStateInternalRequestDeserializer(), new KvStateResponse.KvStateResponseDeserializer());
        return new KvStateServerHandler(this, this.kvStateRegistry, this.serializer, this.stats);
    }

    public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
        Preconditions.checkState(this.serializer != null, "Server " + getServerName() + " has not been started.");
        return this.serializer;
    }

    public void start() throws Throwable {
        super.start();
    }

    public InetSocketAddress getServerAddress() {
        return super.getServerAddress();
    }

    public void shutdown() {
        try {
            shutdownServer().get(10L, TimeUnit.SECONDS);
            this.log.info("{} was shutdown successfully.", getServerName());
        } catch (Exception e) {
            this.log.warn("{} shutdown failed: {}", getServerName(), e);
        }
    }
}
