package org.apache.flink.python.env;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.python.util.CompressionUtils;
import org.apache.flink.python.util.PythonDependencyUtils;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/python/env/AbstractPythonEnvironmentManager.class */
public abstract class AbstractPythonEnvironmentManager implements PythonEnvironmentManager {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractPythonEnvironmentManager.class);
    private static final long CHECK_INTERVAL = 20;
    private static final long CHECK_TIMEOUT = 1000;
    private transient Thread shutdownHook;
    protected transient PythonLeasedResource resource;
    protected final PythonDependencyInfo dependencyInfo;
    private final Map<String, String> systemEnv;
    private final String[] tmpDirectories;
    private final JobID jobID;

    @VisibleForTesting
    public static final String PYTHON_REQUIREMENTS_DIR = "python-requirements";

    @VisibleForTesting
    public static final String PYTHON_REQUIREMENTS_FILE = "_PYTHON_REQUIREMENTS_FILE";

    @VisibleForTesting
    public static final String PYTHON_REQUIREMENTS_CACHE = "_PYTHON_REQUIREMENTS_CACHE";

    @VisibleForTesting
    public static final String PYTHON_REQUIREMENTS_INSTALL_DIR = "_PYTHON_REQUIREMENTS_INSTALL_DIR";

    @VisibleForTesting
    public static final String PYTHON_WORKING_DIR = "_PYTHON_WORKING_DIR";

    @VisibleForTesting
    public static final String PYTHON_FILES_DIR = "python-files";

    @VisibleForTesting
    public static final String PYTHON_ARCHIVES_DIR = "python-archives";

    @VisibleForTesting
    public static final String PYFLINK_GATEWAY_DISABLED = "PYFLINK_GATEWAY_DISABLED";

    /* loaded from: input_file:org/apache/flink/python/env/AbstractPythonEnvironmentManager$PythonEnvResources.class */
    private static final class PythonEnvResources {
        private static final ReentrantLock lock = new ReentrantLock();

        @GuardedBy("lock")
        private static final Map<Object, PythonLeasedResource> reservedResources = new HashMap();

        private PythonEnvResources() {
        }

        static PythonLeasedResource getOrAllocateSharedResource(Object obj, FunctionWithException<Object, Tuple2<String, Map<String, String>>, Exception> functionWithException) throws Exception {
            try {
                lock.lockInterruptibly();
                try {
                    PythonLeasedResource pythonLeasedResource = reservedResources.get(obj);
                    if (pythonLeasedResource == null) {
                        pythonLeasedResource = createResource(functionWithException, obj);
                        reservedResources.put(obj, pythonLeasedResource);
                    }
                    pythonLeasedResource.incRef();
                    PythonLeasedResource pythonLeasedResource2 = pythonLeasedResource;
                    lock.unlock();
                    return pythonLeasedResource2;
                } catch (Throwable th) {
                    lock.unlock();
                    throw th;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted which preparing python environment.");
            }
        }

        public static void release(JobID jobID) throws Exception {
            lock.lock();
            try {
                PythonLeasedResource pythonLeasedResource = reservedResources.get(jobID);
                if (pythonLeasedResource == null) {
                    lock.unlock();
                    return;
                }
                pythonLeasedResource.decRef();
                if (pythonLeasedResource.refCount == 0) {
                    reservedResources.remove(jobID);
                    pythonLeasedResource.close();
                }
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }

        private static PythonLeasedResource createResource(FunctionWithException<Object, Tuple2<String, Map<String, String>>, Exception> functionWithException, Object obj) throws Exception {
            Tuple2 tuple2 = (Tuple2) functionWithException.apply(obj);
            return new PythonLeasedResource((String) tuple2.f0, (Map) tuple2.f1);
        }
    }

    /* loaded from: input_file:org/apache/flink/python/env/AbstractPythonEnvironmentManager$PythonLeasedResource.class */
    public static final class PythonLeasedResource implements AutoCloseable {
        public final Map<String, String> env;
        public final String baseDirectory;
        private long refCount = 0;

        PythonLeasedResource(String str, Map<String, String> map) {
            this.baseDirectory = str;
            this.env = map;
        }

        void incRef() {
            this.refCount++;
        }

        void decRef() {
            Preconditions.checkState(this.refCount > 0);
            this.refCount--;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            int i = 0;
            while (true) {
                try {
                    FileUtils.deleteDirectory(new File(this.baseDirectory));
                    return;
                } catch (Throwable th) {
                    i++;
                    if (i > 50) {
                        AbstractPythonEnvironmentManager.LOG.warn(String.format("Failed to delete the working directory %s of the Python UDF worker.", this.baseDirectory), th);
                        return;
                    }
                    AbstractPythonEnvironmentManager.LOG.warn(String.format("Failed to delete the working directory %s of the Python UDF worker. Retrying...", this.baseDirectory), th);
                }
            }
        }
    }

    public AbstractPythonEnvironmentManager(PythonDependencyInfo pythonDependencyInfo, String[] strArr, Map<String, String> map, JobID jobID) {
        this.dependencyInfo = (PythonDependencyInfo) Objects.requireNonNull(pythonDependencyInfo);
        this.tmpDirectories = (String[]) Objects.requireNonNull(strArr);
        this.systemEnv = (Map) Objects.requireNonNull(map);
        this.jobID = (JobID) Objects.requireNonNull(jobID);
    }

    @Override // org.apache.flink.python.env.PythonEnvironmentManager
    public void open() throws Exception {
        this.resource = PythonEnvResources.getOrAllocateSharedResource(this.jobID, obj -> {
            String createBaseDirectory = createBaseDirectory(this.tmpDirectories);
            File file = new File(createBaseDirectory);
            if (!file.exists() && !file.mkdir()) {
                throw new IOException("Could not create the base directory: " + createBaseDirectory);
            }
            Map<String, String> constructEnvironmentVariables = constructEnvironmentVariables(createBaseDirectory);
            installRequirements(createBaseDirectory, constructEnvironmentVariables);
            return Tuple2.of(createBaseDirectory, constructEnvironmentVariables);
        });
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            PythonEnvResources.release(this.jobID);
        } finally {
            if (this.shutdownHook != null) {
                ShutdownHookUtil.removeShutdownHook(this.shutdownHook, AbstractPythonEnvironmentManager.class.getSimpleName(), LOG);
                this.shutdownHook = null;
            }
        }
    }

    @VisibleForTesting
    public String getBaseDirectory() {
        return this.resource.baseDirectory;
    }

    @VisibleForTesting
    public Map<String, String> getPythonEnv() {
        return this.resource.env;
    }

    @VisibleForTesting
    public Map<String, String> constructEnvironmentVariables(String str) throws IOException {
        HashMap hashMap = new HashMap(this.systemEnv);
        constructFilesDirectory(hashMap, str);
        if (this.dependencyInfo.getPythonPath().isPresent()) {
            appendToPythonPath(hashMap, Collections.singletonList(this.dependencyInfo.getPythonPath().get()));
        }
        LOG.info("PYTHONPATH of python worker: {}", hashMap.get("PYTHONPATH"));
        constructRequirementsDirectory(hashMap, str);
        constructArchivesDirectory(hashMap, str);
        hashMap.put("BOOT_LOG_DIR", str);
        hashMap.put(PYFLINK_GATEWAY_DISABLED, "true");
        hashMap.put("python", this.dependencyInfo.getPythonExec());
        LOG.info("Python interpreter path: {}", this.dependencyInfo.getPythonExec());
        return hashMap;
    }

    private static String createBaseDirectory(String[] strArr) throws IOException {
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            File file = new File(strArr[random.nextInt(strArr.length)], "python-dist-" + UUID.randomUUID().toString());
            if (file.mkdirs()) {
                return file.getAbsolutePath();
            }
        }
        throw new IOException("Could not find a unique directory name in '" + Arrays.toString(strArr) + "' for storing the generated files of python dependency.");
    }

    private void installRequirements(String str, Map<String, String> map) throws IOException {
        String join = String.join(File.separator, str, PYTHON_REQUIREMENTS_DIR);
        if (this.dependencyInfo.getRequirementsFilePath().isPresent()) {
            LOG.info("Trying to pip install the Python requirements...");
            PythonEnvironmentManagerUtils.pipInstallRequirements(this.dependencyInfo.getRequirementsFilePath().get(), this.dependencyInfo.getRequirementsCacheDir().orElse(null), join, this.dependencyInfo.getPythonExec(), map);
        }
    }

    private void constructFilesDirectory(Map<String, String> map, String str) throws IOException {
        String join;
        ArrayList arrayList = new ArrayList();
        String join2 = String.join(File.separator, str, PYTHON_FILES_DIR);
        for (Map.Entry<String, String> entry : this.dependencyInfo.getPythonFiles().entrySet()) {
            String name = new File(entry.getKey()).getName();
            String value = entry.getValue();
            Path path = FileSystems.getDefault().getPath(join2, name, value);
            if (!path.getParent().toFile().mkdirs()) {
                throw new IOException(String.format("Could not create the directory: %s !", path.getParent().toString()));
            }
            Path path2 = FileSystems.getDefault().getPath(entry.getKey(), new String[0]);
            try {
                Files.createSymbolicLink(path, path2, new FileAttribute[0]);
            } catch (IOException e) {
                LOG.warn(String.format("Could not create the symbolic link of: %s, the link path is %s, fallback to copy.", path2, path), e);
                FileUtils.copy(new org.apache.flink.core.fs.Path(path2.toUri()), new org.apache.flink.core.fs.Path(path.toUri()), false);
            }
            File file = new File(entry.getKey());
            if (file.isFile() && value.endsWith(".py")) {
                join = String.join(File.separator, join2, name);
            } else if (file.isFile() && value.endsWith(".zip")) {
                org.apache.flink.core.fs.Path path3 = new org.apache.flink.core.fs.Path(join2, String.join(File.separator, name, value.substring(0, value.lastIndexOf("."))));
                FileUtils.expandDirectory(new org.apache.flink.core.fs.Path(file.getAbsolutePath()), path3);
                join = path3.toString();
            } else {
                join = String.join(File.separator, join2, name, value);
            }
            arrayList.add(join);
        }
        appendToPythonPath(map, arrayList);
    }

    private void constructRequirementsDirectory(Map<String, String> map, String str) throws IOException {
        String join = String.join(File.separator, str, PYTHON_REQUIREMENTS_DIR);
        if (this.dependencyInfo.getRequirementsFilePath().isPresent()) {
            if (!new File(join).mkdirs()) {
                throw new IOException(String.format("Creating the requirements target directory: %s failed!", join));
            }
            map.put(PYTHON_REQUIREMENTS_FILE, this.dependencyInfo.getRequirementsFilePath().get());
            LOG.info("Requirements.txt of python worker: {}", this.dependencyInfo.getRequirementsFilePath().get());
            if (this.dependencyInfo.getRequirementsCacheDir().isPresent()) {
                map.put(PYTHON_REQUIREMENTS_CACHE, this.dependencyInfo.getRequirementsCacheDir().get());
                LOG.info("Requirements cache dir of python worker: {}", this.dependencyInfo.getRequirementsCacheDir().get());
            }
            map.put(PYTHON_REQUIREMENTS_INSTALL_DIR, join);
            LOG.info("Requirements install directory of python worker: {}", join);
        }
    }

    private void constructArchivesDirectory(Map<String, String> map, String str) throws IOException {
        String value;
        String str2;
        String join = String.join(File.separator, str, PYTHON_ARCHIVES_DIR);
        if (this.dependencyInfo.getArchives().isEmpty()) {
            return;
        }
        map.put(PYTHON_WORKING_DIR, join);
        LOG.info("Python working dir of python worker: {}", join);
        for (Map.Entry<String, String> entry : this.dependencyInfo.getArchives().entrySet()) {
            String key = entry.getKey();
            if (entry.getValue().contains(PythonDependencyUtils.PARAM_DELIMITER)) {
                String[] split = entry.getValue().split(PythonDependencyUtils.PARAM_DELIMITER, 2);
                value = split[0];
                str2 = split[1];
            } else {
                value = entry.getValue();
                str2 = value;
            }
            CompressionUtils.extractFile(key, String.join(File.separator, join, str2), value);
        }
    }

    private static void appendToPythonPath(Map<String, String> map, List<String> list) {
        if (list.isEmpty()) {
            return;
        }
        String join = String.join(File.pathSeparator, list);
        String str = map.get("PYTHONPATH");
        if (Strings.isNullOrEmpty(str)) {
            map.put("PYTHONPATH", join);
        } else {
            map.put("PYTHONPATH", String.join(File.pathSeparator, join, str));
        }
    }
}
