package org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/lookup/fullcache/inputformat/InputFormatCacheLoader.class */
public class InputFormatCacheLoader extends CacheLoader {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(InputFormatCacheLoader.class);
    private final InputFormat<RowData, InputSplit> initialInputFormat;
    private final GenericRowDataKeySelector keySelector;
    private final RowDataSerializer cacheEntriesSerializer;
    private volatile transient List<InputSplitCacheLoadTask> cacheLoadTasks;
    private transient Configuration parameters;
    private volatile boolean isStopped;

    public InputFormatCacheLoader(InputFormat<RowData, ?> inputFormat, GenericRowDataKeySelector genericRowDataKeySelector, RowDataSerializer rowDataSerializer) {
        this.initialInputFormat = inputFormat;
        this.keySelector = genericRowDataKeySelector;
        this.cacheEntriesSerializer = rowDataSerializer;
    }

    @Override // org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.parameters = configuration;
        this.initialInputFormat.configure(configuration);
    }

    @Override // org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader
    protected void reloadCache() throws Exception {
        InputSplit[] createInputSplits = createInputSplits();
        int length = createInputSplits.length;
        ConcurrentHashMap<RowData, Collection<RowData>> concurrentHashMap = new ConcurrentHashMap<>(16, 0.75f, getConcurrencyLevel(length));
        this.cacheLoadTasks = (List) Arrays.stream(createInputSplits).map(inputSplit -> {
            return createCacheLoadTask(inputSplit, concurrentHashMap);
        }).collect(Collectors.toList());
        ExecutorService executorService = null;
        ArrayList arrayList = null;
        if (length > 1) {
            arrayList = new ArrayList();
            executorService = Executors.newFixedThreadPool(getConcurrencyLevel(length) - 1);
            for (int i = 1; i < length; i++) {
                arrayList.add(executorService.submit(this.cacheLoadTasks.get(i)));
            }
        }
        this.cacheLoadTasks.get(0).run();
        if (executorService != null) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            executorService.shutdownNow();
        }
        if (this.isStopped) {
            return;
        }
        this.cache = concurrentHashMap;
    }

    @Override // org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader
    public void close() throws Exception {
        super.close();
        this.isStopped = true;
        if (this.cacheLoadTasks != null) {
            this.cacheLoadTasks.forEach((v0) -> {
                v0.stopRunning();
            });
        }
    }

    private InputSplitCacheLoadTask createCacheLoadTask(InputSplit inputSplit, ConcurrentHashMap<RowData, Collection<RowData>> concurrentHashMap) {
        try {
            InputFormat clone = InstantiationUtil.clone(this.initialInputFormat);
            clone.configure(this.parameters);
            return new InputSplitCacheLoadTask(concurrentHashMap, this.keySelector.copy(), this.cacheEntriesSerializer, clone, inputSplit);
        } catch (Exception e) {
            throw new RuntimeException("Failed to create InputFormatCacheLoadTask", e);
        }
    }

    private InputSplit[] createInputSplits() throws IOException {
        InputSplit[] createInputSplits = this.initialInputFormat.createInputSplits(1);
        if (LOG.isDebugEnabled()) {
            LOG.debug("InputFormat created {} InputSplits: {}", Integer.valueOf(createInputSplits.length), Arrays.deepToString(createInputSplits));
        }
        Preconditions.checkState(createInputSplits.length >= 1, "InputFormat must provide at least one input split to load data into the lookup 'FULL' cache.");
        return createInputSplits;
    }

    private int getConcurrencyLevel(int i) {
        return Math.min(i, Runtime.getRuntime().availableProcessors());
    }
}
