package org.apache.beam.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.ProvisionApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.AutoValue_DefaultJobBundleFactory_ServerInfo;
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.UnmodifiableIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.class */
public class DefaultJobBundleFactory implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultJobBundleFactory.class);
    private static final IdGenerator factoryIdGenerator = IdGenerators.incrementingLongs();
    private final String factoryId;
    private final ImmutableList<EnvironmentCacheAndLock> environmentCaches;
    private final AtomicInteger stageBundleFactoryCount;
    private final Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap;
    private final ExecutorService executor;
    private final MapControlClientPool clientPool;
    private final IdGenerator stageIdGenerator;
    private final int environmentExpirationMillis;
    private final Semaphore availableCachesSemaphore;
    private final LinkedBlockingDeque<EnvironmentCacheAndLock> availableCaches;
    private final boolean loadBalanceBundles;
    private final Set<WrappedSdkHarnessClient> evictedActiveClients;
    private boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$EnvironmentCacheAndLock.class */
    public static class EnvironmentCacheAndLock {
        final Lock lock;
        final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> cache;

        EnvironmentCacheAndLock(LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> loadingCache, Lock lock) {
            this.lock = lock;
            this.cache = loadingCache;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$PreparedClient.class */
    public static class PreparedClient {
        private SdkHarnessClient.BundleProcessor processor;
        private ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private WrappedSdkHarnessClient wrappedClient;

        private PreparedClient() {
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$ServerInfo.class */
    public static abstract class ServerInfo {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$ServerInfo$Builder.class */
        public static abstract class Builder {
            abstract Builder setControlServer(GrpcFnServer<FnApiControlClientPoolService> grpcFnServer);

            abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> grpcFnServer);

            abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> grpcFnServer);

            abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> grpcFnServer);

            abstract Builder setDataServer(GrpcFnServer<GrpcDataService> grpcFnServer);

            abstract Builder setStateServer(GrpcFnServer<GrpcStateService> grpcFnServer);

            abstract ServerInfo build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<FnApiControlClientPoolService> getControlServer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<GrpcDataService> getDataServer();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract GrpcFnServer<GrpcStateService> getStateServer();

        abstract Builder toBuilder();
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$SimpleStageBundleFactory.class */
    private class SimpleStageBundleFactory implements StageBundleFactory {
        private final ExecutableStage executableStage;
        private final int environmentIndex;
        private final Map<WrappedSdkHarnessClient, PreparedClient> preparedClients;
        private volatile PreparedClient currentClient;

        /* JADX WARN: Multi-variable type inference failed */
        private SimpleStageBundleFactory(ExecutableStage executableStage) {
            this.preparedClients = new IdentityHashMap();
            this.executableStage = executableStage;
            this.environmentIndex = DefaultJobBundleFactory.this.stageBundleFactoryCount.getAndIncrement() % DefaultJobBundleFactory.this.environmentCaches.size();
            WrappedSdkHarnessClient unchecked = ((EnvironmentCacheAndLock) DefaultJobBundleFactory.this.environmentCaches.get(this.environmentIndex)).cache.getUnchecked(executableStage.getEnvironment());
            this.currentClient = DefaultJobBundleFactory.this.prepare(unchecked, executableStage);
            this.preparedClients.put(unchecked, this.currentClient);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.runners.fnexecution.control.StageBundleFactory
        public RemoteBundle getBundle(OutputReceiverFactory outputReceiverFactory, TimerReceiverFactory timerReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler, BundleFinalizationHandler bundleFinalizationHandler, BundleCheckpointHandler bundleCheckpointHandler) throws Exception {
            EnvironmentCacheAndLock environmentCacheAndLock;
            WrappedSdkHarnessClient unchecked;
            if (DefaultJobBundleFactory.this.loadBalanceBundles) {
                DefaultJobBundleFactory.this.availableCachesSemaphore.acquire();
                environmentCacheAndLock = (EnvironmentCacheAndLock) DefaultJobBundleFactory.this.availableCaches.take();
                try {
                    environmentCacheAndLock.lock.lock();
                    unchecked = environmentCacheAndLock.cache.getUnchecked(this.executableStage.getEnvironment());
                    unchecked.ref();
                    environmentCacheAndLock.lock.unlock();
                    this.currentClient = this.preparedClients.get(unchecked);
                    if (this.currentClient == null) {
                        Map<WrappedSdkHarnessClient, PreparedClient> map = this.preparedClients;
                        PreparedClient prepare = DefaultJobBundleFactory.this.prepare(unchecked, this.executableStage);
                        this.currentClient = prepare;
                        map.put(unchecked, prepare);
                        this.preparedClients.keySet().removeIf(wrappedSdkHarnessClient -> {
                            return wrappedSdkHarnessClient.bundleRefCount.get() == 0;
                        });
                    }
                } finally {
                }
            } else {
                environmentCacheAndLock = (EnvironmentCacheAndLock) DefaultJobBundleFactory.this.environmentCaches.get(this.environmentIndex);
                try {
                    environmentCacheAndLock.lock.lock();
                    unchecked = environmentCacheAndLock.cache.getUnchecked(this.executableStage.getEnvironment());
                    unchecked.ref();
                    environmentCacheAndLock.lock.unlock();
                    if (this.currentClient.wrappedClient != unchecked) {
                        this.preparedClients.clear();
                        this.currentClient = DefaultJobBundleFactory.this.prepare(unchecked, this.executableStage);
                        this.preparedClients.put(unchecked, this.currentClient);
                    }
                } finally {
                }
            }
            if (DefaultJobBundleFactory.this.environmentExpirationMillis > 0) {
                DefaultJobBundleFactory.this.evictedActiveClients.removeIf(wrappedSdkHarnessClient2 -> {
                    return wrappedSdkHarnessClient2.bundleRefCount.get() == 0;
                });
            }
            final SdkHarnessClient.BundleProcessor.ActiveBundle newBundle = this.currentClient.processor.newBundle(DefaultJobBundleFactory.getOutputReceivers(this.currentClient.processBundleDescriptor, outputReceiverFactory), DefaultJobBundleFactory.getTimerReceivers(this.currentClient.processBundleDescriptor, timerReceiverFactory), stateRequestHandler, bundleProgressHandler, bundleFinalizationHandler, bundleCheckpointHandler);
            final WrappedSdkHarnessClient wrappedSdkHarnessClient3 = unchecked;
            final EnvironmentCacheAndLock environmentCacheAndLock2 = environmentCacheAndLock;
            return new RemoteBundle() { // from class: org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.SimpleStageBundleFactory.1
                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
                public String getId() {
                    return newBundle.getId();
                }

                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
                public Map<String, FnDataReceiver> getInputReceivers() {
                    return newBundle.getInputReceivers();
                }

                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
                public Map<KV<String, String>, FnDataReceiver<Timer>> getTimerReceivers() {
                    return newBundle.getTimerReceivers();
                }

                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
                public void requestProgress() {
                    throw new UnsupportedOperationException();
                }

                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
                public void split(double d) {
                    newBundle.split(d);
                }

                @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle, java.lang.AutoCloseable
                public void close() throws Exception {
                    try {
                        newBundle.close();
                    } finally {
                        wrappedSdkHarnessClient3.unref();
                        if (DefaultJobBundleFactory.this.loadBalanceBundles) {
                            DefaultJobBundleFactory.this.availableCaches.offer(environmentCacheAndLock2);
                            DefaultJobBundleFactory.this.availableCachesSemaphore.release();
                        }
                    }
                }
            };
        }

        @Override // org.apache.beam.runners.fnexecution.control.StageBundleFactory
        public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
            return this.currentClient.processBundleDescriptor;
        }

        @Override // org.apache.beam.runners.fnexecution.control.StageBundleFactory
        public InstructionRequestHandler getInstructionRequestHandler() {
            return this.currentClient.wrappedClient.getClient().getInstructionRequestHandler();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.preparedClients.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory$WrappedSdkHarnessClient.class */
    public static class WrappedSdkHarnessClient {
        private final RemoteEnvironment environment;
        private final SdkHarnessClient client;
        private final ServerInfo serverInfo;
        private final AtomicInteger bundleRefCount = new AtomicInteger();
        private boolean closed;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment remoteEnvironment, ServerInfo serverInfo) {
            return new WrappedSdkHarnessClient(remoteEnvironment, SdkHarnessClient.usingFnApiClient(remoteEnvironment.getInstructionRequestHandler(), serverInfo.getDataServer().getService()), serverInfo);
        }

        private WrappedSdkHarnessClient(RemoteEnvironment remoteEnvironment, SdkHarnessClient sdkHarnessClient, ServerInfo serverInfo) {
            this.environment = remoteEnvironment;
            this.client = sdkHarnessClient;
            this.serverInfo = serverInfo;
            ref();
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        ServerInfo getServerInfo() {
            return this.serverInfo;
        }

        /* JADX WARN: Failed to calculate best type for var: r10v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r11v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r12v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r13v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r14v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r15v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r16v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r17v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r6v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r7v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r8v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r9v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 10, insn: 0x0119: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r10 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:85:0x0119 */
        /* JADX WARN: Not initialized variable reg: 11, insn: 0x011e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r11 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:87:0x011e */
        /* JADX WARN: Not initialized variable reg: 12, insn: 0x00f0: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r12 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:76:0x00f0 */
        /* JADX WARN: Not initialized variable reg: 13, insn: 0x00f5: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r13 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:78:0x00f5 */
        /* JADX WARN: Not initialized variable reg: 14, insn: 0x00c7: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r14 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:67:0x00c7 */
        /* JADX WARN: Not initialized variable reg: 15, insn: 0x00cc: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r15 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:69:0x00cc */
        /* JADX WARN: Not initialized variable reg: 16, insn: 0x009e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r16 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:61:0x009e */
        /* JADX WARN: Not initialized variable reg: 17, insn: 0x00a3: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:63:0x00a3 */
        /* JADX WARN: Not initialized variable reg: 6, insn: 0x0160: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r6 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:106:0x0160 */
        /* JADX WARN: Not initialized variable reg: 7, insn: 0x0164: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:108:0x0164 */
        /* JADX WARN: Not initialized variable reg: 8, insn: 0x0140: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:94:0x0140 */
        /* JADX WARN: Not initialized variable reg: 9, insn: 0x0144: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r9 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:96:0x0144 */
        /* JADX WARN: Type inference failed for: r10v1, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r11v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r12v1, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r14v1, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r15v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r16v0, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r17v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r6v0, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r7v0, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r8v1, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r9v0, types: [java.lang.Throwable] */
        public synchronized void close() {
            ?? r6;
            ?? r7;
            ?? r8;
            ?? r9;
            ?? r10;
            ?? r11;
            ?? r12;
            ?? r13;
            if (this.closed) {
                return;
            }
            try {
                try {
                    RemoteEnvironment remoteEnvironment = this.environment;
                    try {
                        GrpcFnServer<StaticGrpcProvisionService> provisioningServer = this.serverInfo.getProvisioningServer();
                        try {
                            GrpcFnServer<ArtifactRetrievalService> retrievalServer = this.serverInfo.getRetrievalServer();
                            try {
                                GrpcFnServer<GrpcStateService> stateServer = this.serverInfo.getStateServer();
                                try {
                                    GrpcFnServer<GrpcDataService> dataServer = this.serverInfo.getDataServer();
                                    try {
                                        GrpcFnServer<FnApiControlClientPoolService> controlServer = this.serverInfo.getControlServer();
                                        GrpcFnServer<GrpcLoggingService> loggingServer = this.serverInfo.getLoggingServer();
                                        Throwable th = null;
                                        try {
                                            try {
                                                this.closed = true;
                                                if (loggingServer != null) {
                                                    $closeResource(null, loggingServer);
                                                }
                                                if (controlServer != null) {
                                                    $closeResource(null, controlServer);
                                                }
                                                if (dataServer != null) {
                                                    $closeResource(null, dataServer);
                                                }
                                                if (stateServer != null) {
                                                    $closeResource(null, stateServer);
                                                }
                                                if (retrievalServer != null) {
                                                    $closeResource(null, retrievalServer);
                                                }
                                                if (provisioningServer != null) {
                                                    $closeResource(null, provisioningServer);
                                                }
                                                if (remoteEnvironment != null) {
                                                    $closeResource(null, remoteEnvironment);
                                                }
                                            } catch (Throwable th2) {
                                                th = th2;
                                                throw th2;
                                            }
                                        } catch (Throwable th3) {
                                            if (loggingServer != null) {
                                                $closeResource(th, loggingServer);
                                            }
                                            throw th3;
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } finally {
                                if (r12 != 0) {
                                    $closeResource(r13, r12);
                                }
                            }
                        } finally {
                            if (r10 != 0) {
                                $closeResource(r11, r10);
                            }
                        }
                    } finally {
                        if (r8 != 0) {
                            $closeResource(r9, r8);
                        }
                    }
                } catch (Exception e) {
                    DefaultJobBundleFactory.LOG.warn("Error cleaning up servers {}", this.environment.getEnvironment(), e);
                }
            } finally {
                if (r6 != 0) {
                    $closeResource(r7, r6);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int ref() {
            return this.bundleRefCount.incrementAndGet();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int unref() {
            int decrementAndGet = this.bundleRefCount.decrementAndGet();
            Preconditions.checkState(decrementAndGet >= 0, "Reference count must not be negative.");
            if (decrementAndGet == 0) {
                DefaultJobBundleFactory.LOG.info("Closing environment {}", this.environment.getEnvironment());
                close();
            }
            return decrementAndGet;
        }

        private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
            if (th == null) {
                autoCloseable.close();
                return;
            }
            try {
                autoCloseable.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }

    public static DefaultJobBundleFactory create(JobInfo jobInfo) {
        PipelineOptions fromProto = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
        return new DefaultJobBundleFactory(jobInfo, ImmutableMap.of(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER), (EmbeddedEnvironmentFactory.Provider) new DockerEnvironmentFactory.Provider(fromProto), BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.PROCESS), (EmbeddedEnvironmentFactory.Provider) new ProcessEnvironmentFactory.Provider(fromProto), BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL), (EmbeddedEnvironmentFactory.Provider) new ExternalEnvironmentFactory.Provider(), Environments.ENVIRONMENT_EMBEDDED, new EmbeddedEnvironmentFactory.Provider(fromProto)));
    }

    public static DefaultJobBundleFactory create(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map) {
        return new DefaultJobBundleFactory(jobInfo, map);
    }

    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map) {
        this.factoryId = factoryIdGenerator.getId();
        this.stageBundleFactoryCount = new AtomicInteger();
        IdGenerator incrementingLongs = IdGenerators.incrementingLongs();
        this.environmentFactoryProviderMap = map;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = () -> {
            return this.factoryId + "-" + incrementingLongs.getId();
        };
        this.environmentExpirationMillis = getEnvironmentExpirationMillis(jobInfo);
        this.loadBalanceBundles = shouldLoadBalanceBundles(jobInfo);
        this.environmentCaches = createEnvironmentCaches(serverFactory -> {
            return createServerInfo(jobInfo, serverFactory);
        }, getMaxEnvironmentClients(jobInfo));
        this.availableCachesSemaphore = new Semaphore(this.environmentCaches.size(), true);
        this.availableCaches = new LinkedBlockingDeque<>(this.environmentCaches);
        this.evictedActiveClients = Sets.newConcurrentHashSet();
    }

    @VisibleForTesting
    DefaultJobBundleFactory(JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> map, IdGenerator idGenerator, ServerInfo serverInfo) {
        this.factoryId = factoryIdGenerator.getId();
        this.stageBundleFactoryCount = new AtomicInteger();
        this.environmentFactoryProviderMap = map;
        this.executor = Executors.newCachedThreadPool();
        this.clientPool = MapControlClientPool.create();
        this.stageIdGenerator = idGenerator;
        this.environmentExpirationMillis = getEnvironmentExpirationMillis(jobInfo);
        this.loadBalanceBundles = shouldLoadBalanceBundles(jobInfo);
        this.environmentCaches = createEnvironmentCaches(serverFactory -> {
            return serverInfo;
        }, getMaxEnvironmentClients(jobInfo));
        this.availableCachesSemaphore = new Semaphore(this.environmentCaches.size(), true);
        this.availableCaches = new LinkedBlockingDeque<>(this.environmentCaches);
        this.evictedActiveClients = Sets.newConcurrentHashSet();
    }

    private ImmutableList<EnvironmentCacheAndLock> createEnvironmentCaches(final ThrowingFunction<ServerFactory, ServerInfo> throwingFunction, int i) {
        ImmutableList.Builder builder = ImmutableList.builder();
        for (int i2 = 0; i2 < i; i2++) {
            Lock reentrantLock = this.environmentExpirationMillis > 0 ? new ReentrantLock(true) : NoopLock.get();
            Lock lock = reentrantLock;
            CacheBuilder<K1, V1> removalListener = CacheBuilder.newBuilder().removalListener(removalNotification -> {
                WrappedSdkHarnessClient wrappedSdkHarnessClient = (WrappedSdkHarnessClient) removalNotification.getValue();
                try {
                    lock.lock();
                    int unref = wrappedSdkHarnessClient.unref();
                    lock.unlock();
                    if (unref > 0) {
                        LOG.warn("Expiring environment {} with {} remaining bundle references. Taking note to clean it up during shutdown if the references are not removed by then.", removalNotification.getKey(), Integer.valueOf(unref));
                        this.evictedActiveClients.add(wrappedSdkHarnessClient);
                    }
                } catch (Throwable th) {
                    lock.unlock();
                    throw th;
                }
            });
            if (this.environmentExpirationMillis > 0) {
                removalListener.expireAfterWrite(this.environmentExpirationMillis, TimeUnit.MILLISECONDS);
            }
            builder.add((ImmutableList.Builder) new EnvironmentCacheAndLock(removalListener.build(new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>() { // from class: org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.1
                @Override // org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader
                public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                    EnvironmentFactory.Provider provider = (EnvironmentFactory.Provider) DefaultJobBundleFactory.this.environmentFactoryProviderMap.get(environment.getUrn());
                    ServerInfo serverInfo = (ServerInfo) throwingFunction.apply(provider.getServerFactory());
                    String id = DefaultJobBundleFactory.this.stageIdGenerator.getId();
                    serverInfo.getProvisioningServer().getService().registerEnvironment(id, environment);
                    return WrappedSdkHarnessClient.wrapping(provider.createEnvironmentFactory(serverInfo.getControlServer(), serverInfo.getLoggingServer(), serverInfo.getRetrievalServer(), serverInfo.getProvisioningServer(), DefaultJobBundleFactory.this.clientPool, DefaultJobBundleFactory.this.stageIdGenerator).createEnvironment(environment, id), serverInfo);
                }
            }), reentrantLock));
        }
        return builder.build();
    }

    private static int getEnvironmentExpirationMillis(JobInfo jobInfo) {
        return ((PortablePipelineOptions) PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class)).getEnvironmentExpirationMillis();
    }

    private static int getMaxEnvironmentClients(JobInfo jobInfo) {
        int intValue = ((Integer) MoreObjects.firstNonNull(Integer.valueOf(((PortablePipelineOptions) PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class)).getSdkWorkerParallelism()), 1)).intValue();
        Preconditions.checkArgument(intValue >= 0, "sdk_worker_parallelism must be >= 0");
        if (intValue == 0) {
            intValue = Math.max(Runtime.getRuntime().availableProcessors() - 1, 1);
        }
        return intValue;
    }

    private static boolean shouldLoadBalanceBundles(JobInfo jobInfo) {
        PipelineOptions fromProto = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
        boolean loadBalanceBundles = ((PortablePipelineOptions) fromProto.as(PortablePipelineOptions.class)).getLoadBalanceBundles();
        if (loadBalanceBundles) {
            Preconditions.checkArgument(Integer.parseInt((String) MoreObjects.firstNonNull(ExperimentalOptions.getExperimentValue(fromProto, ExperimentalOptions.STATE_CACHE_SIZE), "0")) == 0, "%s must be 0 when using bundle load balancing", ExperimentalOptions.STATE_CACHE_SIZE);
        }
        return loadBalanceBundles;
    }

    @Override // org.apache.beam.runners.fnexecution.control.JobBundleFactory
    public StageBundleFactory forStage(ExecutableStage executableStage) {
        return new SimpleStageBundleFactory(executableStage);
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() throws Exception {
        if (this.closed) {
            return;
        }
        Exception exc = null;
        UnmodifiableIterator<EnvironmentCacheAndLock> it = this.environmentCaches.iterator();
        while (it.hasNext()) {
            EnvironmentCacheAndLock next = it.next();
            try {
                next.cache.invalidateAll();
                next.cache.cleanUp();
            } catch (Exception e) {
                if (exc != null) {
                    exc.addSuppressed(e);
                } else {
                    exc = e;
                }
            }
        }
        Iterator<WrappedSdkHarnessClient> it2 = this.evictedActiveClients.iterator();
        while (it2.hasNext()) {
            do {
                try {
                } catch (Exception e2) {
                    if (exc != null) {
                        exc.addSuppressed(e2);
                    } else {
                        exc = e2;
                    }
                }
            } while (it2.next().unref() > 0);
        }
        try {
            this.executor.shutdown();
        } catch (Exception e3) {
            if (exc != null) {
                exc.addSuppressed(e3);
            } else {
                exc = e3;
            }
        }
        this.closed = true;
        if (exc != null) {
            throw exc;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, RemoteOutputReceiver<?>> getOutputReceivers(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, OutputReceiverFactory outputReceiverFactory) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (Map.Entry<String, Coder> entry : executableProcessBundleDescriptor.getRemoteOutputCoders().entrySet()) {
            String key = entry.getKey();
            builder.put(key, RemoteOutputReceiver.of(entry.getValue(), outputReceiverFactory.create((String) Iterables.getOnlyElement(executableProcessBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(key).getInputsMap().values()))));
        }
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<KV<String, String>, RemoteOutputReceiver<Timer<?>>> getTimerReceivers(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, TimerReceiverFactory timerReceiverFactory) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<Map.Entry<String, Map<String, ProcessBundleDescriptors.TimerSpec>>> it = executableProcessBundleDescriptor.getTimerSpecs().entrySet().iterator();
        while (it.hasNext()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : it.next().getValue().values()) {
                builder.put(KV.of(timerSpec.transformId(), timerSpec.timerId()), RemoteOutputReceiver.of(timerSpec.coder(), timerReceiverFactory.create(timerSpec.transformId(), timerSpec.timerId())));
            }
        }
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PreparedClient prepare(WrappedSdkHarnessClient wrappedSdkHarnessClient, ExecutableStage executableStage) {
        PreparedClient preparedClient = new PreparedClient();
        try {
            preparedClient.wrappedClient = wrappedSdkHarnessClient;
            preparedClient.processBundleDescriptor = ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, wrappedSdkHarnessClient.getServerInfo().getDataServer().getApiServiceDescriptor(), wrappedSdkHarnessClient.getServerInfo().getStateServer().getApiServiceDescriptor());
            preparedClient.processor = wrappedSdkHarnessClient.getClient().getProcessor(preparedClient.processBundleDescriptor.getProcessBundleDescriptor(), preparedClient.processBundleDescriptor.getRemoteInputDestinations(), wrappedSdkHarnessClient.getServerInfo().getStateServer().getService(), preparedClient.processBundleDescriptor.getTimerSpecs());
            return preparedClient;
        } catch (IOException e) {
            throw new RuntimeException("Failed to create ProcessBundleDescriptor.", e);
        }
    }

    private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory) throws IOException {
        Preconditions.checkNotNull(serverFactory, "serverFactory can not be null");
        PortablePipelineOptions portablePipelineOptions = (PortablePipelineOptions) PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()).as(PortablePipelineOptions.class);
        GrpcFnServer<FnApiControlClientPoolService> allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(this.clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer<GrpcLoggingService> allocatePortAndCreateFor2 = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        GrpcFnServer<ArtifactRetrievalService> allocatePortAndCreateFor3 = GrpcFnServer.allocatePortAndCreateFor(new ArtifactRetrievalService(), serverFactory);
        ProvisionApi.ProvisionInfo.Builder builder = jobInfo.toProvisionInfo().toBuilder();
        builder.setLoggingEndpoint(allocatePortAndCreateFor2.getApiServiceDescriptor());
        builder.setArtifactEndpoint(allocatePortAndCreateFor3.getApiServiceDescriptor());
        builder.setControlEndpoint(allocatePortAndCreateFor.getApiServiceDescriptor());
        GrpcFnServer<StaticGrpcProvisionService> allocatePortAndCreateFor4 = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(builder.build(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer<GrpcDataService> allocatePortAndCreateFor5 = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(portablePipelineOptions, this.executor, OutboundObserverFactory.serverDirect()), serverFactory);
        return new AutoValue_DefaultJobBundleFactory_ServerInfo.Builder().setControlServer(allocatePortAndCreateFor).setLoggingServer(allocatePortAndCreateFor2).setRetrievalServer(allocatePortAndCreateFor3).setProvisioningServer(allocatePortAndCreateFor4).setDataServer(allocatePortAndCreateFor5).setStateServer(GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory)).build();
    }
}
