package org.apache.flink.table.gateway.service;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.FunctionIdentifier;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationManager;
import org.apache.flink.table.gateway.service.session.Session;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlCancelException;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.batch.sql.TestModule;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.TableFunc0;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.Condition;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.class */
public class SqlGatewayServiceITCase extends AbstractTestBase {

    @RegisterExtension
    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension();
    private static SessionManager sessionManager;
    private static SqlGatewayServiceImpl service;
    private final SessionEnvironment defaultSessionEnvironment = SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).build();
    private final ThreadFactory threadFactory = new ExecutorThreadFactory("SqlGatewayService Test Pool", IgnoreExceptionHandler.INSTANCE);

    @BeforeAll
    public static void setUp() {
        sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
        service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
    }

    @Test
    public void testOpenSessionWithConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "val1");
        hashMap.put("key2", "val2");
        Assertions.assertThat(service.getSessionConfig(service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).addSessionConfig(hashMap).build()))).containsAllEntriesOf(hashMap);
    }

    @Test
    public void testOpenSessionWithEnvironment() throws Exception {
        TableEnvironmentInternal tableEnvironment = service.getSession(service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("default", new GenericInMemoryCatalog("default", "testDb")).registerModuleAtHead("testModule", new TestModule()).setDefaultCatalog("default").build())).createExecutor(new Configuration()).getTableEnvironment();
        Assertions.assertThat(tableEnvironment.getCurrentCatalog()).isEqualTo("default");
        Assertions.assertThat(tableEnvironment.getCurrentDatabase()).isEqualTo("testDb");
        Assertions.assertThat(tableEnvironment.listModules()).contains(new String[]{"testModule"});
    }

    @Test
    public void testFetchResultsInRunning() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            countDownLatch.countDown();
            countDownLatch2.await();
        });
        countDownLatch.await();
        Assertions.assertThat(service.fetchResults(openSession, submitDefaultOperation, 0L, Integer.MAX_VALUE)).isEqualTo(ResultSet.NOT_READY_RESULTS);
        countDownLatch2.countDown();
    }

    @Test
    public void testGetOperationFinishedAndFetchResults() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            countDownLatch.countDown();
            countDownLatch2.await();
        });
        countDownLatch.await();
        Assertions.assertThat(service.getOperationInfo(openSession, submitDefaultOperation)).isEqualTo(new OperationInfo(OperationStatus.RUNNING));
        countDownLatch2.countDown();
        OperationInfo operationInfo = new OperationInfo(OperationStatus.FINISHED);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getOperationInfo(openSession, submitDefaultOperation).equals(operationInfo));
        }, Duration.ofSeconds(10L), "Failed to wait operation finish.");
        Long l = 0L;
        List data = getDefaultResultSet().getData();
        ArrayList arrayList = new ArrayList();
        while (l != null) {
            ResultSet fetchResults = service.fetchResults(openSession, submitDefaultOperation, l.longValue(), 1);
            arrayList.addAll((Collection) Preconditions.checkNotNull(fetchResults.getData()));
            l = fetchResults.getNextToken();
        }
        Assertions.assertThat(arrayList).isEqualTo(data);
        service.closeOperation(openSession, submitDefaultOperation);
        Assertions.assertThat(sessionManager.getOperationCount(openSession)).isEqualTo(0);
    }

    @Test
    public void testCancelOperation() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            countDownLatch.countDown();
            countDownLatch2.await();
        });
        countDownLatch.await();
        Assertions.assertThat(service.getOperationInfo(openSession, submitDefaultOperation)).isEqualTo(new OperationInfo(OperationStatus.RUNNING));
        service.cancelOperation(openSession, submitDefaultOperation);
        Assertions.assertThat(service.getOperationInfo(openSession, submitDefaultOperation)).isEqualTo(new OperationInfo(OperationStatus.CANCELED));
        service.closeOperation(openSession, submitDefaultOperation);
        Assertions.assertThat(sessionManager.getOperationCount(openSession)).isEqualTo(0);
    }

    @Test
    public void testCancelOperationByForce() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            while (true) {
            }
        });
        service.cancelOperation(openSession, submitDefaultOperation);
        Assertions.assertThat(service.getOperationInfo(openSession, submitDefaultOperation).getStatus()).isEqualTo(OperationStatus.CANCELED);
    }

    @Test
    void testCancelUninterruptedOperation() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        AtomicReference atomicReference = new AtomicReference(false);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            while (true) {
                atomicReference.compareAndSet(false, true);
            }
        });
        atomicReference.getClass();
        CommonTestUtils.waitUtil(atomicReference::get, Duration.ofSeconds(10L), "Failed to start up the task.");
        Assertions.assertThatThrownBy(() -> {
            service.cancelOperation(openSession, submitDefaultOperation);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class, String.format("Operation '%s' did not react to \"Future.cancel(true)\" and is stuck for %s seconds in method.\n", submitDefaultOperation, 5))});
        Assertions.assertThat(service.getOperationInfo(openSession, submitDefaultOperation).getStatus()).isEqualTo(OperationStatus.CANCELED);
    }

    @Test
    void testCloseUninterruptedOperation() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        AtomicReference atomicReference = new AtomicReference(false);
        for (int i = 0; i < 10; i++) {
            this.threadFactory.newThread(() -> {
                submitDefaultOperation(openSession, () -> {
                    while (true) {
                        atomicReference.compareAndSet(false, true);
                    }
                });
            }).start();
        }
        atomicReference.getClass();
        CommonTestUtils.waitUtil(atomicReference::get, Duration.ofSeconds(10L), "Failed to start up the task.");
        Session session = service.getSession(openSession);
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            service.closeSession(openSession);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlCancelException.class)});
        Assertions.assertThat(session.getOperationManager().getOperationCount()).isEqualTo(0);
    }

    @Test
    public void testOperationGetErrorAndFetchError() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        String str = "Artificial Exception.";
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            countDownLatch.countDown();
            throw new SqlExecutionException(str);
        });
        countDownLatch.await();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getOperationInfo(openSession, submitDefaultOperation).getStatus().equals(OperationStatus.ERROR));
        }, Duration.ofSeconds(10L), "Failed to get expected operation status.");
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            service.fetchResults(openSession, submitDefaultOperation, 0L, Integer.MAX_VALUE);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlExecutionException.class, "Artificial Exception.")});
        service.closeOperation(openSession, submitDefaultOperation);
        Assertions.assertThat(sessionManager.getOperationCount(openSession)).isEqualTo(0);
    }

    @Test
    public void testExecuteSqlWithConfig() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        OperationHandle executeStatement = service.executeStatement(openSession, "SET", -1L, Configuration.fromMap(Collections.singletonMap("username", "Flink")));
        Long l = 0L;
        ArrayList arrayList = new ArrayList();
        while (l != null) {
            ResultSet fetchResults = service.fetchResults(openSession, executeStatement, l.longValue(), Integer.MAX_VALUE);
            arrayList.addAll(fetchResults.getData());
            l = fetchResults.getNextToken();
        }
        Assertions.assertThat(arrayList).contains(new RowData[]{GenericRowData.of(new Object[]{StringData.fromString("username"), StringData.fromString("Flink")})});
    }

    @Test
    public void testGetOperationSchemaUntilOperationIsReady() throws Exception {
        runGetOperationSchemaUntilOperationIsReadyOrError(this::getDefaultResultSet, futureTask -> {
            Assertions.assertThat(futureTask.get()).isEqualTo(getDefaultResultSet().getResultSchema());
        });
    }

    @Test
    public void testGetCurrentCatalog() {
        Assertions.assertThat(service.getCurrentCatalog(service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat1", new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", new GenericInMemoryCatalog("cat2")).setDefaultCatalog("cat2").build()))).isEqualTo("cat2");
    }

    @Test
    public void testListCatalogs() {
        Assertions.assertThat(service.listCatalogs(service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat1", new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", new GenericInMemoryCatalog("cat2")).build()))).contains(new String[]{"cat1", "cat2"});
    }

    @Test
    public void testListDatabases() throws Exception {
        SessionHandle openSession = service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat", new GenericInMemoryCatalog("cat")).setDefaultCatalog("cat").build());
        Configuration fromMap = Configuration.fromMap(service.getSessionConfig(openSession));
        service.executeStatement(openSession, "CREATE DATABASE db1", -1L, fromMap);
        OperationHandle executeStatement = service.executeStatement(openSession, "CREATE DATABASE db2", -1L, fromMap);
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getOperationInfo(openSession, executeStatement).getStatus().isTerminalStatus());
        }, Duration.ofSeconds(100L), "Failed to wait operation finish.");
        Assertions.assertThat(service.listDatabases(openSession, "cat")).contains(new String[]{"db1", "db2"});
    }

    @Test
    public void testListTables() {
        SessionHandle createInitializedSession = createInitializedSession();
        Assertions.assertThat(service.listTables(createInitializedSession, "cat1", "db1", new HashSet(Arrays.asList(CatalogBaseTable.TableKind.TABLE, CatalogBaseTable.TableKind.VIEW)))).isEqualTo(new HashSet(Arrays.asList(new TableInfo(ObjectIdentifier.of("cat1", "db1", "tbl1"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of("cat1", "db1", "tbl2"), CatalogBaseTable.TableKind.TABLE), new TableInfo(ObjectIdentifier.of("cat1", "db1", "tbl3"), CatalogBaseTable.TableKind.VIEW), new TableInfo(ObjectIdentifier.of("cat1", "db1", "tbl4"), CatalogBaseTable.TableKind.VIEW))));
        Assertions.assertThat(service.listTables(createInitializedSession, "cat1", "db2", Collections.singleton(CatalogBaseTable.TableKind.TABLE))).isEqualTo(Collections.singleton(new TableInfo(ObjectIdentifier.of("cat1", "db2", "tbl1"), CatalogBaseTable.TableKind.TABLE)));
        Assertions.assertThat(service.listTables(createInitializedSession, "cat2", "db0", Collections.singleton(CatalogBaseTable.TableKind.VIEW))).isEqualTo(Collections.emptySet());
    }

    @Test
    public void testListSystemFunctions() {
        Assertions.assertThat(service.listSystemFunctions(service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat1", new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", new GenericInMemoryCatalog("cat2")).build()))).contains(new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of("sin"), FunctionKind.SCALAR), new FunctionInfo(FunctionIdentifier.of("sum"), FunctionKind.AGGREGATE), new FunctionInfo(FunctionIdentifier.of("as"), FunctionKind.OTHER)});
    }

    @Test
    public void testListUserDefinedFunctions() {
        SessionHandle openSession = service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat1", new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", new GenericInMemoryCatalog("cat2")).build());
        TableEnvironmentInternal tableEnvironment = service.getSession(openSession).createExecutor().getTableEnvironment();
        tableEnvironment.createTemporarySystemFunction("count_distinct", JavaUserDefinedAggFunctions.CountDistinct.class);
        tableEnvironment.createFunction("java1", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        tableEnvironment.createTemporaryFunction("table_func0", TableFunc0.class);
        tableEnvironment.createFunction("cat1.default.filter_out_function", JavaUserDefinedScalarFunctions.JavaFunc1.class);
        Assertions.assertThat(service.listUserDefinedFunctions(openSession, "default_catalog", "default_database")).contains(new FunctionInfo[]{new FunctionInfo(FunctionIdentifier.of("count_distinct")), new FunctionInfo(FunctionIdentifier.of(ObjectIdentifier.of("default_catalog", "default_database", "java1"))), new FunctionInfo(FunctionIdentifier.of(ObjectIdentifier.of("default_catalog", "default_database", "table_func0")))});
    }

    @Test
    public void testGetTable() {
        SessionHandle createInitializedSession = createInitializedSession();
        ResolvedCatalogTable table = service.getTable(createInitializedSession, ObjectIdentifier.of("cat1", "db1", "tbl1"));
        Assertions.assertThat(table.getResolvedSchema()).isEqualTo(ResolvedSchema.of(new Column[0]));
        Assertions.assertThat(table.getOptions()).isEqualTo(Collections.singletonMap("connector", "values"));
        Assertions.assertThat(service.getTable(createInitializedSession, ObjectIdentifier.of("cat1", "db1", "tbl3")).getOriginalQuery()).isEqualTo("SELECT 1");
    }

    @Test
    public void testCancelOperationAndFetchResultInParallel() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.getClass();
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, countDownLatch::await);
        runCancelOrCloseOperationWhenFetchResults(openSession, submitDefaultOperation, () -> {
            service.cancelOperation(openSession, submitDefaultOperation);
        }, new Condition<>(str -> {
            return str.contains(String.format("Can not fetch results from the %s in %s status.", submitDefaultOperation, OperationStatus.CANCELED));
        }, "Fetch results with expected error message.", new Object[0]));
        countDownLatch.countDown();
    }

    @Test
    public void testCloseOperationAndFetchResultInParallel() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
            Thread.sleep(1L);
        });
        runCancelOrCloseOperationWhenFetchResults(openSession, submitDefaultOperation, () -> {
            service.closeOperation(openSession, submitDefaultOperation);
        }, new Condition<>(str -> {
            return str.contains(String.format("Can not find the submitted operation in the OperationManager with the %s.", submitDefaultOperation)) || str.contains(String.format("Can not fetch results from the %s in %s status.", submitDefaultOperation, OperationStatus.CLOSED));
        }, "Fetch results with expected error message.", new Object[0]));
    }

    @Test
    public void testCancelAndCloseOperationInParallel() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        ArrayList arrayList = new ArrayList(200);
        for (int i = 0; i < 200; i++) {
            boolean z = i % 2 == 0;
            OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, () -> {
                Thread.sleep(100L);
                if (z) {
                    throw new SqlGatewayException("Artificial Exception.");
                }
            });
            arrayList.add(service.getSession(openSession).getOperationManager().getOperation(submitDefaultOperation));
            this.threadFactory.newThread(() -> {
                service.cancelOperation(openSession, submitDefaultOperation);
            }).start();
            this.threadFactory.newThread(() -> {
                service.closeOperation(openSession, submitDefaultOperation);
            }).start();
        }
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getSession(openSession).getOperationManager().getOperationCount() == 0);
        }, Duration.ofSeconds(10L), "All operations should be closed.");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((OperationManager.Operation) it.next()).getOperationInfo().getStatus()).isEqualTo(OperationStatus.CLOSED);
        }
    }

    @Test
    public void testSubmitOperationAndCloseOperationManagerInParallel1() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        OperationManager operationManager = service.getSession(openSession).getOperationManager();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; i++) {
            this.threadFactory.newThread(() -> {
                try {
                    submitDefaultOperation(openSession, () -> {
                    });
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
        operationManager.close();
        countDownLatch.await();
        Assertions.assertThat(operationManager.getOperationCount()).isEqualTo(0);
    }

    @Test
    public void testSubmitOperationAndCloseOperationManagerInParallel2() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        for (int i = 0; i < 3; i++) {
            this.threadFactory.newThread(() -> {
                service.submitOperation(openSession, () -> {
                    countDownLatch.countDown();
                    countDownLatch2.await();
                    return getDefaultResultSet();
                });
            }).start();
        }
        countDownLatch.await();
        service.getSession(openSession).getOperationManager().close();
        countDownLatch2.countDown();
    }

    @Test
    public void testExecuteOperationInSequence() throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        AtomicReference atomicReference = new AtomicReference(0);
        ArrayList<OperationHandle> arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(service.submitOperation(openSession, () -> {
                atomicReference.set(Integer.valueOf(((Integer) atomicReference.get()).intValue() + 1));
                return getDefaultResultSet();
            }));
        }
        for (OperationHandle operationHandle : arrayList) {
            CommonTestUtils.waitUtil(() -> {
                return Boolean.valueOf(service.getOperationInfo(openSession, operationHandle).getStatus().isTerminalStatus());
            }, Duration.ofSeconds(10L), "Failed to wait operation terminate");
        }
        Assertions.assertThat((Integer) atomicReference.get()).isEqualTo(100);
    }

    @Test
    public void testReleaseLockWhenFailedToSubmitOperation() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 500; i++) {
            SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
            arrayList.add(openSession);
            arrayList2.add(service.submitOperation(openSession, () -> {
                countDownLatch.await();
                return getDefaultResultSet();
            }));
        }
        SessionHandle openSession2 = service.openSession(this.defaultSessionEnvironment);
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            service.submitOperation(openSession2, () -> {
                countDownLatch.await();
                return getDefaultResultSet();
            });
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RejectedExecutionException.class)});
        countDownLatch.countDown();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(service.getOperationInfo((SessionHandle) arrayList.get(0), (OperationHandle) arrayList2.get(0)).getStatus().isTerminalStatus());
        }, Duration.ofSeconds(10L), "Should come to end soon.");
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        service.submitOperation(openSession2, () -> {
            countDownLatch2.countDown();
            return getDefaultResultSet();
        });
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(countDownLatch2.getCount() == 0);
        }, Duration.ofSeconds(10L), "Should come to end.");
    }

    @Test
    public void testFetchResultsFromCanceledOperation() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.getClass();
        OperationHandle submitDefaultOperation = submitDefaultOperation(openSession, countDownLatch::await);
        service.cancelOperation(openSession, submitDefaultOperation);
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            service.fetchResults(openSession, submitDefaultOperation, 0L, Integer.MAX_VALUE);
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(String.format("Can not fetch results from the %s in %s status.", submitDefaultOperation, OperationStatus.CANCELED))});
        countDownLatch.countDown();
    }

    @Test
    public void testRequestNonExistOperation() {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        OperationHandle create = OperationHandle.create();
        for (RunnableWithException runnableWithException : Arrays.asList(() -> {
            service.cancelOperation(openSession, create);
        }, () -> {
            service.getOperationInfo(openSession, create);
        }, () -> {
            service.fetchResults(openSession, create, 0L, Integer.MAX_VALUE);
        })) {
            runnableWithException.getClass();
            AssertionsForClassTypes.assertThatThrownBy(runnableWithException::run).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(String.format("Can not find the submitted operation in the OperationManager with the %s.", create))});
        }
    }

    @Test
    public void testGetOperationSchemaWhenOperationGetError() throws Exception {
        String str = "Artificial Exception.";
        runGetOperationSchemaUntilOperationIsReadyOrError(() -> {
            throw new SqlGatewayException(str);
        }, futureTask -> {
            futureTask.getClass();
            AssertionsForClassTypes.assertThatThrownBy(futureTask::get).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(SqlGatewayException.class, str)});
        });
    }

    private OperationHandle submitDefaultOperation(SessionHandle sessionHandle, RunnableWithException runnableWithException) {
        return service.submitOperation(sessionHandle, () -> {
            runnableWithException.run();
            return getDefaultResultSet();
        });
    }

    private ResultSet getDefaultResultSet() {
        return new ResultSet(ResultSet.ResultType.PAYLOAD, (Long) null, ResolvedSchema.of(new Column[]{Column.physical("id", DataTypes.BIGINT()), Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())}), Arrays.asList(GenericRowData.ofKind(RowKind.INSERT, new Object[]{1L, StringData.fromString("Flink CDC"), 3}), GenericRowData.ofKind(RowKind.INSERT, new Object[]{2L, StringData.fromString("MySql"), null}), GenericRowData.ofKind(RowKind.DELETE, new Object[]{1, null, null}), GenericRowData.ofKind(RowKind.UPDATE_AFTER, new Object[]{2, null, 101})));
    }

    private void runGetOperationSchemaUntilOperationIsReadyOrError(Callable<ResultSet> callable, org.apache.flink.util.function.ThrowingConsumer<FutureTask<ResolvedSchema>, Exception> throwingConsumer) throws Exception {
        SessionHandle openSession = service.openSession(this.defaultSessionEnvironment);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        OperationHandle submitOperation = service.submitOperation(openSession, () -> {
            countDownLatch.await();
            return (ResultSet) callable.call();
        });
        FutureTask futureTask = new FutureTask(() -> {
            countDownLatch2.countDown();
            return service.getOperationResultSchema(openSession, submitOperation);
        });
        this.threadFactory.newThread(futureTask).start();
        countDownLatch2.await();
        countDownLatch.countDown();
        throwingConsumer.accept(futureTask);
    }

    private void runCancelOrCloseOperationWhenFetchResults(SessionHandle sessionHandle, OperationHandle operationHandle, RunnableWithException runnableWithException, Condition<String> condition) {
        ArrayList arrayList = new ArrayList();
        this.threadFactory.newThread(() -> {
            try {
                runnableWithException.run();
            } catch (Exception e) {
            }
        }).start();
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            Long l = 0L;
            while (l != null) {
                ResultSet fetchResults = service.fetchResults(sessionHandle, operationHandle, l.longValue(), Integer.MAX_VALUE);
                if (fetchResults.getNextToken() != null) {
                    l = fetchResults.getNextToken();
                }
                if (fetchResults.getResultType() == ResultSet.ResultType.PAYLOAD) {
                    arrayList.addAll(fetchResults.getData());
                }
            }
        }).satisfies(new ThrowingConsumer[]{th -> {
            FlinkAssertions.assertThatChainOfCauses(th).anySatisfy(th -> {
                condition.matches(th.getMessage());
            });
        }});
        Assertions.assertThat(getDefaultResultSet().getData()).containsAll(arrayList);
    }

    private SessionHandle createInitializedSession() {
        SessionHandle openSession = service.openSession(SessionEnvironment.newBuilder().setSessionEndpointVersion(MockedEndpointVersion.V1).registerCatalog("cat1", new GenericInMemoryCatalog("cat1")).registerCatalog("cat2", new GenericInMemoryCatalog("cat2")).build());
        TableEnvironmentInternal tableEnvironment = service.getSession(openSession).createExecutor().getTableEnvironment();
        tableEnvironment.executeSql("CREATE DATABASE cat1.db1");
        tableEnvironment.executeSql("CREATE TEMPORARY TABLE cat1.db1.tbl1 WITH ('connector' = 'values')");
        tableEnvironment.executeSql("CREATE TABLE cat1.db1.tbl2 WITH('connector' = 'values')");
        tableEnvironment.executeSql("CREATE TEMPORARY VIEW cat1.db1.tbl3 AS SELECT 1");
        tableEnvironment.executeSql("CREATE VIEW cat1.db1.tbl4 AS SELECT 1");
        tableEnvironment.executeSql("CREATE DATABASE cat1.db2");
        tableEnvironment.executeSql("CREATE TABLE cat1.db2.tbl1 WITH ('connector' = 'values')");
        tableEnvironment.executeSql("CREATE VIEW cat1.db2.tbl2 AS SELECT 1");
        tableEnvironment.executeSql("CREATE DATABASE cat2.db0");
        tableEnvironment.executeSql("CREATE TABLE cat2.db0.tbl0 WITH('connector' = 'values')");
        return openSession;
    }
}
