package org.apache.flink.table.runtime.functions.table;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/CachingAsyncLookupFunctionTest.class */
class CachingAsyncLookupFunctionTest {
    private static final RowData KEY_1 = GenericRowData.of(new Object[]{1});
    private static final Collection<RowData> VALUE_1 = Collections.singletonList(GenericRowData.of(new Object[]{1, "Alice", 18L}));
    private static final RowData KEY_2 = GenericRowData.of(new Object[]{2});
    private static final Collection<RowData> VALUE_2 = Arrays.asList(GenericRowData.of(new Object[]{2, "Bob", 20L}), GenericRowData.of(new Object[]{2, "Charlie", 22L}));
    private static final RowData NON_EXIST_KEY = GenericRowData.of(new Object[]{3});

    /* loaded from: input_file:org/apache/flink/table/runtime/functions/table/CachingAsyncLookupFunctionTest$TestingAsyncLookupFunction.class */
    private static final class TestingAsyncLookupFunction extends AsyncLookupFunction {
        private final transient ConcurrentMap<RowData, Collection<RowData>> data;
        private transient AtomicInteger lookupCount;
        private transient ExecutorService executor;

        private TestingAsyncLookupFunction() {
            this.data = new ConcurrentHashMap();
        }

        public void open(FunctionContext functionContext) throws Exception {
            this.data.put(CachingAsyncLookupFunctionTest.KEY_1, CachingAsyncLookupFunctionTest.VALUE_1);
            this.data.put(CachingAsyncLookupFunctionTest.KEY_2, CachingAsyncLookupFunctionTest.VALUE_2);
            this.lookupCount = new AtomicInteger(0);
            this.executor = Executors.newFixedThreadPool(3);
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData rowData) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(0, 10));
                    Collection<RowData> collection = this.data.get(rowData);
                    this.lookupCount.incrementAndGet();
                    return collection;
                } catch (Exception e) {
                    throw new RuntimeException("Failed to lookup value", e);
                }
            }, this.executor);
        }

        public AtomicInteger getLookupCount() {
            return this.lookupCount;
        }
    }

    CachingAsyncLookupFunctionTest() {
    }

    @Test
    void testCaching() throws Exception {
        TestingAsyncLookupFunction testingAsyncLookupFunction = new TestingAsyncLookupFunction();
        CachingAsyncLookupFunction createCachingFunction = createCachingFunction(testingAsyncLookupFunction);
        FutureUtils.completeAll(Arrays.asList(createCachingFunction.asyncLookup(KEY_1), createCachingFunction.asyncLookup(KEY_2), createCachingFunction.asyncLookup(NON_EXIST_KEY))).get();
        FutureUtils.completeAll(Arrays.asList(createCachingFunction.asyncLookup(KEY_1), createCachingFunction.asyncLookup(KEY_2), createCachingFunction.asyncLookup(NON_EXIST_KEY))).get();
        Assertions.assertThat(testingAsyncLookupFunction.getLookupCount()).hasValue(3);
        Assertions.assertThat(createCachingFunction.getCache().getIfPresent(KEY_1)).containsExactlyInAnyOrderElementsOf(VALUE_1);
        Assertions.assertThat(createCachingFunction.getCache().getIfPresent(KEY_2)).containsExactlyInAnyOrderElementsOf(VALUE_2);
        Assertions.assertThat(createCachingFunction.getCache().getIfPresent(NON_EXIST_KEY)).isEmpty();
    }

    private CachingAsyncLookupFunction createCachingFunction(AsyncLookupFunction asyncLookupFunction) throws Exception {
        CachingAsyncLookupFunction cachingAsyncLookupFunction = new CachingAsyncLookupFunction(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE).build(), asyncLookupFunction);
        cachingAsyncLookupFunction.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 0)));
        return cachingAsyncLookupFunction;
    }
}
