/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.execution.librarycache;

import java.io.File;
import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobLibraryCacheRecoveryITCase
extends TestLogger {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoveryRegisterAndDownload() throws Exception {
        Random rand = new Random();
        BlobServer[] server = new BlobServer[2];
        InetSocketAddress[] serverAddress = new InetSocketAddress[2];
        BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2];
        BlobCache cache = null;
        BlobLibraryCacheManager libCache = null;
        BlobStoreService blobStoreService = null;
        Configuration config = new Configuration();
        config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.getRoot().getAbsolutePath());
        try {
            int i;
            blobStoreService = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            for (int i2 = 0; i2 < server.length; ++i2) {
                server[i2] = new BlobServer(config, (BlobStore)blobStoreService);
                serverAddress[i2] = new InetSocketAddress("localhost", server[i2].getPort());
                libServer[i2] = new BlobLibraryCacheManager((BlobService)server[i2], 3600000L);
            }
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            ArrayList<BlobKey> keys = new ArrayList<BlobKey>(2);
            try (BlobClient client = new BlobClient(serverAddress[0], config);){
                keys.add(client.put(expected));
                keys.add(client.put(expected, 32, 256));
            }
            cache = new BlobCache(serverAddress[0], config, (BlobView)blobStoreService);
            libCache = new BlobLibraryCacheManager((BlobService)cache, 3600000L);
            JobID jobId = new JobID();
            ExecutionAttemptID executionId = new ExecutionAttemptID();
            libServer[0].registerTask(jobId, executionId, keys, Collections.emptyList());
            File f = libCache.getFile((BlobKey)keys.get(0));
            Assert.assertEquals((long)expected.length, (long)f.length());
            try (FileInputStream fis = new FileInputStream(f);){
                for (i = 0; i < expected.length && fis.available() > 0; ++i) {
                    Assert.assertEquals((long)expected[i], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            cache.close();
            libCache.shutdown();
            cache = new BlobCache(serverAddress[1], config, (BlobView)blobStoreService);
            libCache = new BlobLibraryCacheManager((BlobService)cache, 3600000L);
            f = libCache.getFile((BlobKey)keys.get(0));
            Assert.assertEquals((long)expected.length, (long)f.length());
            fis = new FileInputStream(f);
            x2 = null;
            try {
                for (i = 0; i < expected.length && fis.available() > 0; ++i) {
                    Assert.assertEquals((long)expected[i], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            catch (Throwable x2) {
                x2 = x2;
                throw x2;
            }
            finally {
                if (fis != null) {
                    if (x2 != null) {
                        try {
                            fis.close();
                        }
                        catch (Throwable x2) {
                            x2.addSuppressed(x2);
                        }
                    } else {
                        fis.close();
                    }
                }
            }
            f = libCache.getFile((BlobKey)keys.get(1));
            Assert.assertEquals((long)256L, (long)f.length());
            fis = new FileInputStream(f);
            x2 = null;
            try {
                for (int i3 = 0; i3 < 256 && fis.available() > 0; ++i3) {
                    Assert.assertEquals((long)expected[32 + i3], (long)((byte)fis.read()));
                }
                Assert.assertEquals((long)0L, (long)fis.available());
            }
            catch (Throwable x2) {
                x2 = x2;
                throw x2;
            }
            finally {
                if (fis != null) {
                    if (x2 != null) {
                        try {
                            fis.close();
                        }
                        catch (Throwable x2) {
                            x2.addSuppressed(x2);
                        }
                    } else {
                        fis.close();
                    }
                }
            }
            x2 = null;
            try (BlobClient client = new BlobClient(serverAddress[1], config);){
                client.delete((BlobKey)keys.get(0));
                client.delete((BlobKey)keys.get(1));
            }
            catch (Throwable x2) {
                x2 = x2;
                throw x2;
            }
            String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
            File haBlobStoreDir = new File(this.temporaryFolder.getRoot(), clusterId);
            Object[] recoveryFiles = haBlobStoreDir.listFiles();
            Assert.assertNotNull((String)"HA storage directory does not exist", (Object)recoveryFiles);
            Assert.assertEquals((String)("Unclean state backend: " + Arrays.toString(recoveryFiles)), (long)0L, (long)recoveryFiles.length);
        }
        finally {
            for (BlobServer s : server) {
                if (s == null) continue;
                s.close();
            }
            if (cache != null) {
                cache.close();
            }
            if (libCache != null) {
                libCache.shutdown();
            }
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
        }
    }
}

