package org.apache.flink.table.gateway.service.operation;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/table/gateway/service/operation/OperationManager.class */
public class OperationManager {
    private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class);
    private final ExecutorService service;
    private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
    private final Map<OperationHandle, Operation> submittedOperations = new HashMap();
    private final Semaphore operationLock = new Semaphore(1);
    private boolean isRunning = true;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/table/gateway/service/operation/OperationManager$Operation.class */
    public class Operation implements AutoCloseable {
        private static final long WAIT_CLEAN_UP_MILLISECONDS = 5000;
        private final OperationHandle operationHandle;
        private final AtomicReference<OperationStatus> status = new AtomicReference<>(OperationStatus.INITIALIZED);
        private final Callable<ResultFetcher> resultSupplier;
        private volatile FutureTask<?> invocation;
        private volatile ResultFetcher resultFetcher;
        private volatile SqlExecutionException operationError;

        public Operation(OperationHandle operationHandle, Callable<ResultFetcher> callable) {
            this.operationHandle = operationHandle;
            this.resultSupplier = callable;
        }

        void runBefore() {
            updateState(OperationStatus.RUNNING);
        }

        void runAfter() {
            updateState(OperationStatus.FINISHED);
        }

        public void run() {
            try {
                try {
                    OperationManager.this.operationLock.acquire();
                    OperationManager.LOG.debug(String.format("Operation %s acquires the operation lock.", this.operationHandle));
                    updateState(OperationStatus.PENDING);
                    FutureTask<Void> futureTask = new FutureTask<Void>(() -> {
                        try {
                            runBefore();
                            this.resultFetcher = this.resultSupplier.call();
                            runAfter();
                        } catch (InterruptedException e) {
                            OperationManager.LOG.error(String.format("Operation %s is interrupted.", this.operationHandle), e);
                        } catch (Throwable th) {
                            processThrowable(th);
                        }
                    }, null) { // from class: org.apache.flink.table.gateway.service.operation.OperationManager.Operation.1
                        @Override // java.util.concurrent.FutureTask
                        protected void done() {
                            OperationManager.LOG.debug(String.format("Release the operation lock: %s when task completes.", Operation.this.operationHandle));
                            OperationManager.this.operationLock.release();
                        }
                    };
                    OperationManager.this.service.submit(futureTask);
                    this.invocation = futureTask;
                    OperationStatus operationStatus = this.status.get();
                    if (operationStatus == OperationStatus.CLOSED || operationStatus == OperationStatus.CANCELED) {
                        OperationManager.LOG.debug(String.format("The current status is %s after updating the operation %s status to %s. Close the resources.", operationStatus, this.operationHandle, OperationStatus.PENDING));
                        closeResources();
                    }
                    if (this.invocation == null) {
                        OperationManager.LOG.debug(String.format("Operation %s releases the operation lock when failed to submit the operation to the pool.", this.operationHandle));
                        OperationManager.this.operationLock.release();
                    }
                } catch (Throwable th) {
                    processThrowable(th);
                    throw new SqlGatewayException("Failed to submit the operation to the thread pool.", th);
                }
            } catch (Throwable th2) {
                if (this.invocation == null) {
                    OperationManager.LOG.debug(String.format("Operation %s releases the operation lock when failed to submit the operation to the pool.", this.operationHandle));
                    OperationManager.this.operationLock.release();
                }
                throw th2;
            }
        }

        public void cancel() {
            updateState(OperationStatus.CANCELED);
            closeResources();
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            updateState(OperationStatus.CLOSED);
            closeResources();
        }

        public ResultSet fetchResults(long j, int i) {
            return fetchResultsInternal(() -> {
                return this.resultFetcher.fetchResults(j, i);
            });
        }

        public ResultSet fetchResults(FetchOrientation fetchOrientation, int i) {
            return fetchResultsInternal(() -> {
                return this.resultFetcher.fetchResults(fetchOrientation, i);
            });
        }

        public ResolvedSchema getResultSchema() throws Exception {
            awaitTermination();
            if (this.status.get() != OperationStatus.FINISHED) {
                throw new IllegalStateException(String.format("The result schema is available when the Operation is in FINISHED state but the current status is %s.", this.status));
            }
            return this.resultFetcher.getResultSchema();
        }

        public OperationInfo getOperationInfo() {
            return new OperationInfo(this.status.get(), this.operationError);
        }

        public void awaitTermination() throws Exception {
            synchronized (this.status) {
                while (!this.status.get().isTerminalStatus()) {
                    this.status.wait();
                }
            }
            if (this.status.get() == OperationStatus.ERROR) {
                throw this.operationError;
            }
        }

        private ResultSet fetchResultsInternal(Supplier<ResultSet> supplier) {
            OperationStatus operationStatus = this.status.get();
            if (operationStatus == OperationStatus.ERROR) {
                throw this.operationError;
            }
            if (operationStatus == OperationStatus.FINISHED) {
                return supplier.get();
            }
            if (operationStatus == OperationStatus.RUNNING || operationStatus == OperationStatus.PENDING || operationStatus == OperationStatus.INITIALIZED) {
                return NotReadyResult.INSTANCE;
            }
            throw new SqlGatewayException(String.format("Can not fetch results from the %s in %s status.", this.operationHandle, operationStatus));
        }

        private void updateState(OperationStatus operationStatus) {
            OperationStatus operationStatus2;
            do {
                operationStatus2 = this.status.get();
                if (!OperationStatus.isValidStatusTransition(operationStatus2, operationStatus)) {
                    throw new SqlGatewayException(String.format("Failed to convert the Operation Status from %s to %s for %s.", operationStatus2, operationStatus, this.operationHandle));
                }
            } while (!this.status.compareAndSet(operationStatus2, operationStatus));
            synchronized (this.status) {
                this.status.notifyAll();
            }
            OperationManager.LOG.debug(String.format("Convert operation %s from %s to %s.", this.operationHandle, operationStatus2, operationStatus));
        }

        private void closeResources() {
            if (this.invocation != null && !this.invocation.isDone()) {
                this.invocation.cancel(true);
                waitTaskCleanup(this.invocation);
                OperationManager.LOG.debug(String.format("Cancel the operation %s.", this.operationHandle));
            }
            if (this.resultFetcher != null) {
                this.resultFetcher.close();
            }
        }

        private void processThrowable(Throwable th) {
            String format = String.format("Failed to execute the operation %s.", this.operationHandle);
            OperationManager.LOG.error(format, th);
            this.operationError = new SqlExecutionException(format, th);
            updateState(OperationStatus.ERROR);
        }

        private void waitTaskCleanup(FutureTask<?> futureTask) {
            Deadline fromNow = Deadline.fromNow(Duration.ofMillis(5000L));
            while (fromNow.hasTimeLeft()) {
                if (!getThreadInFuture(futureTask).isPresent()) {
                    return;
                } else {
                    Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS);
                }
            }
            getThreadInFuture(futureTask).ifPresent(this::throwExceptionWithThreadStackTrace);
        }

        private Optional<Thread> getThreadInFuture(FutureTask<?> futureTask) {
            try {
                Field declaredField = FutureTask.class.getDeclaredField("runner");
                declaredField.setAccessible(true);
                return Optional.of((Thread) declaredField.get(futureTask));
            } catch (Throwable th) {
                return Optional.empty();
            }
        }

        private void throwExceptionWithThreadStackTrace(Thread thread) {
            StackTraceElement[] stackTrace = thread.getStackTrace();
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : stackTrace) {
                sb.append("\tat ").append(stackTraceElement).append("\n");
            }
            throw new SqlCancelException(String.format("Operation '%s' did not react to \"Future.cancel(true)\" and is stuck for %s seconds in method.\nThread name: %s, thread state: %s, thread stacktrace:\n%s", this.operationHandle, 5L, thread.getName(), thread.getState(), sb));
        }
    }

    public OperationManager(ExecutorService executorService) {
        this.service = executorService;
    }

    public OperationHandle submitOperation(Callable<ResultSet> callable) {
        OperationHandle create = OperationHandle.create();
        submitOperationInternal(create, new Operation(create, () -> {
            ResultSet resultSet = (ResultSet) callable.call();
            return ResultFetcher.fromResults(create, resultSet.getResultSchema(), resultSet.getData());
        }));
        return create;
    }

    public OperationHandle submitOperation(Function<OperationHandle, ResultFetcher> function) {
        OperationHandle create = OperationHandle.create();
        submitOperationInternal(create, new Operation(create, () -> {
            return (ResultFetcher) function.apply(create);
        }));
        return create;
    }

    public void cancelOperation(OperationHandle operationHandle) {
        getOperation(operationHandle).cancel();
    }

    public void closeOperation(OperationHandle operationHandle) {
        writeLock(() -> {
            Operation remove = this.submittedOperations.remove(operationHandle);
            if (remove != null) {
                remove.close();
            }
        });
    }

    public void awaitOperationTermination(OperationHandle operationHandle) throws Exception {
        getOperation(operationHandle).awaitTermination();
    }

    public OperationInfo getOperationInfo(OperationHandle operationHandle) {
        return getOperation(operationHandle).getOperationInfo();
    }

    public ResolvedSchema getOperationResultSchema(OperationHandle operationHandle) throws Exception {
        return getOperation(operationHandle).getResultSchema();
    }

    public ResultSet fetchResults(OperationHandle operationHandle, long j, int i) {
        return getOperation(operationHandle).fetchResults(j, i);
    }

    public ResultSet fetchResults(OperationHandle operationHandle, FetchOrientation fetchOrientation, int i) {
        return getOperation(operationHandle).fetchResults(fetchOrientation, i);
    }

    public void close() {
        this.stateLock.writeLock().lock();
        Exception exc = null;
        try {
            this.isRunning = false;
            IOUtils.closeAll(this.submittedOperations.values(), Throwable.class);
        } catch (Exception e) {
            exc = e;
        } finally {
            this.submittedOperations.clear();
            this.stateLock.writeLock().unlock();
        }
        try {
            try {
                this.operationLock.acquire();
                this.operationLock.release();
            } catch (Exception e2) {
                LOG.error("Failed to wait all operation closed.", e2);
                this.operationLock.release();
            }
            LOG.debug("Closes the Operation Manager.");
            if (exc != null) {
                throw new SqlExecutionException("Failed to close the OperationManager.", exc);
            }
        } catch (Throwable th) {
            this.operationLock.release();
            throw th;
        }
    }

    @VisibleForTesting
    public int getOperationCount() {
        return this.submittedOperations.size();
    }

    @VisibleForTesting
    public Operation getOperation(OperationHandle operationHandle) {
        return (Operation) readLock(() -> {
            Operation operation = this.submittedOperations.get(operationHandle);
            if (operation == null) {
                throw new SqlGatewayException(String.format("Can not find the submitted operation in the OperationManager with the %s.", operationHandle));
            }
            return operation;
        });
    }

    private void submitOperationInternal(OperationHandle operationHandle, Operation operation) {
        writeLock(() -> {
            this.submittedOperations.put(operationHandle, operation);
        });
        operation.run();
    }

    private void writeLock(Runnable runnable) {
        this.stateLock.writeLock().lock();
        try {
            if (!this.isRunning) {
                throw new SqlGatewayException("The OperationManager is closed.");
            }
            runnable.run();
        } finally {
            this.stateLock.writeLock().unlock();
        }
    }

    private <T> T readLock(Supplier<T> supplier) {
        this.stateLock.readLock().lock();
        try {
            if (this.isRunning) {
                return supplier.get();
            }
            throw new SqlGatewayException("The OperationManager is closed.");
        } finally {
            this.stateLock.readLock().unlock();
        }
    }
}
