/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job.savepoints;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointRequestBody;
import org.apache.flink.runtime.rest.messages.job.savepoints.stop.StopWithSavepointTriggerHeaders;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;

public class SavepointHandlers {
    @Nullable
    private final String defaultSavepointDir;

    public SavepointHandlers(@Nullable String defaultSavepointDir) {
        this.defaultSavepointDir = defaultSavepointDir;
    }

    private static RestHandlerException createInternalServerError(Throwable throwable, AsynchronousJobOperationKey key, String errorMessageInfix) {
        return new RestHandlerException(String.format("Internal server error while %s savepoint operation with triggerId=%s for job %s.", new Object[]{errorMessageInfix, key.getTriggerId(), key.getJobId()}), HttpResponseStatus.INTERNAL_SERVER_ERROR, throwable);
    }

    public static class SavepointStatusHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, AsynchronousOperationResult<SavepointInfo>, SavepointStatusMessageParameters> {
        public SavepointStatusHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, SavepointStatusHeaders.getInstance());
        }

        @Override
        public CompletableFuture<AsynchronousOperationResult<SavepointInfo>> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            AsynchronousJobOperationKey key = this.getOperationKey(request);
            return gateway.getTriggeredSavepointStatus(key).handle((operationResult, throwable) -> {
                if (throwable == null) {
                    switch (operationResult.getStatus()) {
                        case SUCCESS: {
                            return AsynchronousOperationResult.completed(this.operationResultResponse((String)operationResult.getResult()));
                        }
                        case FAILURE: {
                            return AsynchronousOperationResult.completed(this.exceptionalOperationResultResponse(operationResult.getThrowable()));
                        }
                        case IN_PROGRESS: {
                            return AsynchronousOperationResult.inProgress();
                        }
                    }
                    throw new IllegalStateException("No handler for operation status " + (Object)((Object)operationResult.getStatus()) + ", encountered for key " + key);
                }
                throw new CompletionException((Throwable)((Object)SavepointStatusHandler.maybeCreateNotFoundError(throwable, key).orElseGet(() -> SavepointHandlers.createInternalServerError(throwable, key, "retrieving status of"))));
            });
        }

        private static Optional<RestHandlerException> maybeCreateNotFoundError(Throwable throwable, AsynchronousJobOperationKey key) {
            if (ExceptionUtils.findThrowable((Throwable)throwable, UnknownOperationKeyException.class).isPresent()) {
                return Optional.of(new RestHandlerException(String.format("There is no savepoint operation with triggerId=%s for job %s.", new Object[]{key.getTriggerId(), key.getJobId()}), HttpResponseStatus.NOT_FOUND));
            }
            return Optional.empty();
        }

        protected AsynchronousJobOperationKey getOperationKey(HandlerRequest<EmptyRequestBody> request) {
            TriggerId triggerId = (TriggerId)((Object)request.getPathParameter(TriggerIdPathParameter.class));
            JobID jobId = (JobID)request.getPathParameter(JobIDPathParameter.class);
            return AsynchronousJobOperationKey.of(triggerId, jobId);
        }

        protected SavepointInfo exceptionalOperationResultResponse(Throwable throwable) {
            return new SavepointInfo(null, new SerializedThrowable(throwable));
        }

        protected SavepointInfo operationResultResponse(String operationResult) {
            return new SavepointInfo(operationResult, null);
        }
    }

    public class SavepointTriggerHandler
    extends SavepointHandlerBase<SavepointTriggerRequestBody> {
        public SavepointTriggerHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, SavepointTriggerHeaders.getInstance());
        }

        @Override
        protected Optional<TriggerId> extractTriggerId(SavepointTriggerRequestBody request) {
            return request.getTriggerId();
        }

        @Override
        protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<SavepointTriggerRequestBody> request, AsynchronousJobOperationKey operationKey, RestfulGateway gateway) throws RestHandlerException {
            Optional<String> requestedTargetDirectory = request.getRequestBody().getTargetDirectory();
            if (!requestedTargetDirectory.isPresent() && SavepointHandlers.this.defaultSavepointDir == null) {
                throw new RestHandlerException(String.format("Config key [%s] is not set. Property [%s] must be provided.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "target-directory"), HttpResponseStatus.BAD_REQUEST);
            }
            TriggerSavepointMode savepointMode = request.getRequestBody().isCancelJob() ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT : TriggerSavepointMode.SAVEPOINT;
            String targetDirectory = requestedTargetDirectory.orElse(SavepointHandlers.this.defaultSavepointDir);
            return gateway.triggerSavepoint(operationKey, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
        }
    }

    public class StopWithSavepointHandler
    extends SavepointHandlerBase<StopWithSavepointRequestBody> {
        public StopWithSavepointHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders) {
            super(leaderRetriever, timeout, responseHeaders, StopWithSavepointTriggerHeaders.getInstance());
        }

        @Override
        protected Optional<TriggerId> extractTriggerId(StopWithSavepointRequestBody request) {
            return request.getTriggerId();
        }

        @Override
        protected CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<StopWithSavepointRequestBody> request, AsynchronousJobOperationKey operationKey, RestfulGateway gateway) throws RestHandlerException {
            Optional<String> requestedTargetDirectory = request.getRequestBody().getTargetDirectory();
            if (!requestedTargetDirectory.isPresent() && SavepointHandlers.this.defaultSavepointDir == null) {
                throw new RestHandlerException(String.format("Config key [%s] is not set. Property [%s] must be provided.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "targetDirectory"), HttpResponseStatus.BAD_REQUEST);
            }
            TriggerSavepointMode savepointMode = request.getRequestBody().shouldDrain() ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT;
            String targetDirectory = requestedTargetDirectory.orElse(SavepointHandlers.this.defaultSavepointDir);
            return gateway.stopWithSavepoint(operationKey, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
        }
    }

    private static abstract class SavepointHandlerBase<B extends RequestBody>
    extends AbstractRestHandler<RestfulGateway, B, TriggerResponse, SavepointTriggerMessageParameters> {
        SavepointHandlerBase(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<B, TriggerResponse, SavepointTriggerMessageParameters> messageHeaders) {
            super(leaderRetriever, timeout, responseHeaders, messageHeaders);
        }

        protected AsynchronousJobOperationKey createOperationKey(HandlerRequest<B> request) {
            JobID jobId = (JobID)request.getPathParameter(JobIDPathParameter.class);
            return AsynchronousJobOperationKey.of(this.extractTriggerId(request.getRequestBody()).orElseGet(TriggerId::new), jobId);
        }

        protected abstract Optional<TriggerId> extractTriggerId(B var1);

        @Override
        public CompletableFuture<TriggerResponse> handleRequest(@Nonnull HandlerRequest<B> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            AsynchronousJobOperationKey operationKey = this.createOperationKey(request);
            return this.triggerOperation(request, operationKey, gateway).handle((acknowledge, throwable) -> {
                if (throwable == null) {
                    return new TriggerResponse(operationKey.getTriggerId());
                }
                throw new CompletionException((Throwable)((Object)SavepointHandlers.createInternalServerError(throwable, operationKey, "triggering")));
            });
        }

        protected abstract CompletableFuture<Acknowledge> triggerOperation(HandlerRequest<B> var1, AsynchronousJobOperationKey var2, RestfulGateway var3) throws RestHandlerException;
    }
}

