package org.apache.flink.table.runtime.functions.table.lookup;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalCacheMetricGroup;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/lookup/CachingLookupFunction.class */
public class CachingLookupFunction extends LookupFunction {
    private static final long serialVersionUID = 1;
    public static final String LOOKUP_CACHE_METRIC_GROUP_NAME = "cache";

    @Nullable
    private final LookupFunction delegate;
    private LookupCache cache;
    private transient String cacheIdentifier;
    private transient CacheMetricGroup cacheMetricGroup;
    private transient Counter loadCounter;
    private transient Counter numLoadFailuresCounter;
    private volatile long latestLoadTime = -1;

    public CachingLookupFunction(LookupCache lookupCache, @Nullable LookupFunction lookupFunction) {
        this.cache = lookupCache;
        this.delegate = lookupFunction;
    }

    public void open(FunctionContext functionContext) throws Exception {
        this.cacheIdentifier = functionIdentifier();
        this.cache = LookupCacheManager.getInstance().registerCacheIfAbsent(this.cacheIdentifier, this.cache);
        this.cacheMetricGroup = new InternalCacheMetricGroup(functionContext.getMetricGroup(), "cache");
        if (!(this.cache instanceof LookupFullCache)) {
            this.loadCounter = new SimpleCounter();
            this.cacheMetricGroup.loadCounter(this.loadCounter);
            this.numLoadFailuresCounter = new SimpleCounter();
            this.cacheMetricGroup.numLoadFailuresCounter(this.numLoadFailuresCounter);
        }
        this.cache.open(this.cacheMetricGroup);
        if (this.cache instanceof LookupFullCache) {
            ((LookupFullCache) this.cache).open(new Configuration());
        }
        if (this.delegate != null) {
            this.delegate.open(functionContext);
        }
    }

    public Collection<RowData> lookup(RowData rowData) throws IOException {
        Collection<RowData> ifPresent = this.cache.getIfPresent(rowData);
        if (ifPresent != null) {
            return ifPresent;
        }
        Collection<RowData> lookupByDelegate = lookupByDelegate(rowData);
        if (lookupByDelegate == null || lookupByDelegate.isEmpty()) {
            this.cache.put(rowData, Collections.emptyList());
        } else {
            this.cache.put(rowData, lookupByDelegate);
        }
        return lookupByDelegate;
    }

    public void close() throws Exception {
        if (this.delegate != null) {
            this.delegate.close();
        }
        if (this.cacheIdentifier != null) {
            LookupCacheManager.getInstance().unregisterCache(this.cacheIdentifier);
        }
    }

    @VisibleForTesting
    public LookupCache getCache() {
        return this.cache;
    }

    private Collection<RowData> lookupByDelegate(RowData rowData) throws IOException {
        try {
            Preconditions.checkState(this.delegate != null, "User's lookup function can't be null, if there are possible cache misses.");
            Collection<RowData> lookup = this.delegate.lookup(rowData);
            this.loadCounter.inc();
            updateLatestLoadTime();
            return lookup;
        } catch (Exception e) {
            this.numLoadFailuresCounter.inc();
            throw new IOException(String.format("Failed to lookup with key '%s'", rowData), e);
        }
    }

    private void updateLatestLoadTime() {
        Preconditions.checkNotNull(this.cacheMetricGroup, "Could not register metric '%s' as cache metric group is not initialized", new Object[]{"latestLoadTime"});
        if (this.latestLoadTime == -1) {
            this.cacheMetricGroup.latestLoadTimeGauge(() -> {
                return Long.valueOf(this.latestLoadTime);
            });
        }
        this.latestLoadTime = System.currentTimeMillis();
    }
}
