package org.apache.flink.runtime.execution.librarycache;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.class */
public class BlobLibraryCacheRecoveryITCase extends TestLogger {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testRecoveryRegisterAndDownload() throws Exception {
        Random random = new Random();
        PermanentBlobService[] permanentBlobServiceArr = new BlobServer[2];
        InetSocketAddress[] inetSocketAddressArr = new InetSocketAddress[2];
        BlobLibraryCacheManager[] blobLibraryCacheManagerArr = new BlobLibraryCacheManager[2];
        PermanentBlobCache permanentBlobCache = null;
        BlobStoreService blobStoreService = null;
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().getAbsolutePath());
        configuration.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3600L);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
            BlobLibraryCacheManager.ClassLoaderFactory defaultClassLoaderFactory = BlobLibraryCacheManager.defaultClassLoaderFactory(FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0], (FatalErrorHandler) null, true);
            for (int i = 0; i < permanentBlobServiceArr.length; i++) {
                permanentBlobServiceArr[i] = new BlobServer(configuration, this.temporaryFolder.newFolder(), blobStoreService);
                permanentBlobServiceArr[i].start();
                inetSocketAddressArr[i] = new InetSocketAddress("localhost", permanentBlobServiceArr[i].getPort());
                blobLibraryCacheManagerArr[i] = new BlobLibraryCacheManager(permanentBlobServiceArr[i], defaultClassLoaderFactory);
            }
            byte[] bArr = new byte[HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE];
            random.nextBytes(bArr);
            ArrayList arrayList = new ArrayList(2);
            JobID jobID = new JobID();
            arrayList.add(permanentBlobServiceArr[0].putPermanent(jobID, bArr));
            arrayList.add(permanentBlobServiceArr[0].putPermanent(jobID, Arrays.copyOfRange(bArr, 32, 288)));
            permanentBlobCache = new PermanentBlobCache(configuration, this.temporaryFolder.newFolder(), blobStoreService, inetSocketAddressArr[0]);
            blobLibraryCacheManagerArr[0].registerClassLoaderLease(jobID).getOrResolveClassLoader(arrayList, Collections.emptyList());
            File file = permanentBlobCache.getFile(jobID, (PermanentBlobKey) arrayList.get(0));
            Assert.assertEquals(bArr.length, file.length());
            FileInputStream fileInputStream = new FileInputStream(file);
            Throwable th = null;
            for (int i2 = 0; i2 < bArr.length && fileInputStream.available() > 0; i2++) {
                try {
                    try {
                        Assert.assertEquals(bArr[i2], (byte) fileInputStream.read());
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                    if (fileInputStream != null) {
                        if (th != null) {
                            try {
                                fileInputStream.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            fileInputStream.close();
                        }
                    }
                }
            }
            Assert.assertEquals(0L, fileInputStream.available());
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            permanentBlobCache.close();
            permanentBlobCache = new PermanentBlobCache(configuration, this.temporaryFolder.newFolder(), blobStoreService, inetSocketAddressArr[1]);
            File file2 = permanentBlobCache.getFile(jobID, (PermanentBlobKey) arrayList.get(0));
            Assert.assertEquals(bArr.length, file2.length());
            fileInputStream = new FileInputStream(file2);
            Throwable th5 = null;
            for (int i3 = 0; i3 < bArr.length && fileInputStream.available() > 0; i3++) {
                try {
                    try {
                        Assert.assertEquals(bArr[i3], (byte) fileInputStream.read());
                    } finally {
                    }
                } catch (Throwable th6) {
                    th5 = th6;
                    throw th6;
                }
            }
            Assert.assertEquals(0L, fileInputStream.available());
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th7) {
                        th5.addSuppressed(th7);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            File file3 = permanentBlobCache.getFile(jobID, (PermanentBlobKey) arrayList.get(1));
            Assert.assertEquals(r0.length, file3.length());
            fileInputStream = new FileInputStream(file3);
            Throwable th8 = null;
            for (int i4 = 0; i4 < 256; i4++) {
                try {
                    try {
                        if (fileInputStream.available() <= 0) {
                            break;
                        }
                        Assert.assertEquals(r0[i4], (byte) fileInputStream.read());
                    } catch (Throwable th9) {
                        th8 = th9;
                        throw th9;
                    }
                } finally {
                }
            }
            Assert.assertEquals(0L, fileInputStream.available());
            if (fileInputStream != null) {
                if (0 != 0) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th10) {
                        th8.addSuppressed(th10);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            permanentBlobServiceArr[1].globalCleanupAsync(jobID, newSingleThreadExecutor).join();
            File[] listFiles = new File(configuration.getString(HighAvailabilityOptions.HA_STORAGE_PATH), configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID)).listFiles();
            Assert.assertNotNull("HA storage directory does not exist", listFiles);
            Assert.assertEquals("Unclean state backend: " + Arrays.toString(listFiles), 0L, listFiles.length);
            MatcherAssert.assertThat(newSingleThreadExecutor.shutdownNow(), IsEmptyCollection.empty());
            for (BlobLibraryCacheManager blobLibraryCacheManager : blobLibraryCacheManagerArr) {
                if (blobLibraryCacheManager != null) {
                    blobLibraryCacheManager.shutdown();
                }
            }
            for (PermanentBlobService permanentBlobService : permanentBlobServiceArr) {
                if (permanentBlobService != null) {
                    permanentBlobService.close();
                }
            }
            if (permanentBlobCache != null) {
                permanentBlobCache.close();
            }
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
        } catch (Throwable th11) {
            MatcherAssert.assertThat(newSingleThreadExecutor.shutdownNow(), IsEmptyCollection.empty());
            for (BlobLibraryCacheManager blobLibraryCacheManager2 : blobLibraryCacheManagerArr) {
                if (blobLibraryCacheManager2 != null) {
                    blobLibraryCacheManager2.shutdown();
                }
            }
            for (PermanentBlobService permanentBlobService2 : permanentBlobServiceArr) {
                if (permanentBlobService2 != null) {
                    permanentBlobService2.close();
                }
            }
            if (permanentBlobCache != null) {
                permanentBlobCache.close();
            }
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
            throw th11;
        }
    }
}
