/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class BlobRecoveryITCase
extends TestLogger {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlobServerRecovery() throws Exception {
        Configuration config = new Configuration();
        config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
        config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.getRoot().getPath());
        BlobStoreService blobStoreService = null;
        try {
            blobStoreService = BlobUtils.createBlobStoreFromConfig((Configuration)config);
            BlobRecoveryITCase.testBlobServerRecovery(config, (BlobStore)blobStoreService);
        }
        finally {
            if (blobStoreService != null) {
                blobStoreService.closeAndCleanupAllData();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void testBlobServerRecovery(Configuration config, BlobStore blobStore) throws IOException {
        String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
        String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
        Random rand = new Random();
        BlobServer[] server = new BlobServer[2];
        InetSocketAddress[] serverAddress = new InetSocketAddress[2];
        BlobClient client = null;
        try {
            int j;
            int i;
            byte[] actual;
            for (int i2 = 0; i2 < server.length; ++i2) {
                server[i2] = new BlobServer(config, blobStore);
                serverAddress[i2] = new InetSocketAddress("localhost", server[i2].getPort());
            }
            client = new BlobClient(serverAddress[0], config);
            byte[] expected = new byte[1024];
            rand.nextBytes(expected);
            BlobKey[] keys = new BlobKey[]{client.put(expected), client.put(expected, 32, 256)};
            JobID[] jobId = new JobID[]{new JobID(), new JobID()};
            String[] testKey = new String[]{"test-key-1", "test-key-2"};
            client.put(jobId[0], testKey[0], expected);
            client.put(jobId[1], testKey[1], expected, 32, 256);
            Path blobServerPath = new Path(storagePath, "blob");
            FileSystem fs = blobServerPath.getFileSystem();
            Assert.assertTrue((String)("Unknown storage dir: " + blobServerPath), (boolean)fs.exists(blobServerPath));
            client.close();
            client = new BlobClient(serverAddress[1], config);
            try (InputStream is = client.get(keys[0]);){
                actual = new byte[expected.length];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)expected.length, null);
                for (i = 0; i < expected.length; ++i) {
                    Assert.assertEquals((long)expected[i], (long)actual[i]);
                }
            }
            is = client.get(keys[1]);
            var15_18 = null;
            try {
                actual = new byte[256];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)256, null);
                i = 32;
                j = 0;
                while (i < 256) {
                    Assert.assertEquals((long)expected[i], (long)actual[j]);
                    ++i;
                    ++j;
                }
            }
            catch (Throwable x2) {
                var15_18 = x2;
                throw x2;
            }
            finally {
                if (is != null) {
                    if (var15_18 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable x2) {
                            var15_18.addSuppressed(x2);
                        }
                    } else {
                        is.close();
                    }
                }
            }
            is = client.get(jobId[0], testKey[0]);
            var15_18 = null;
            try {
                actual = new byte[expected.length];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)expected.length, null);
                for (i = 0; i < expected.length; ++i) {
                    Assert.assertEquals((long)expected[i], (long)actual[i]);
                }
            }
            catch (Throwable x2) {
                var15_18 = x2;
                throw x2;
            }
            finally {
                if (is != null) {
                    if (var15_18 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable x2) {
                            var15_18.addSuppressed(x2);
                        }
                    } else {
                        is.close();
                    }
                }
            }
            is = client.get(jobId[1], testKey[1]);
            var15_18 = null;
            try {
                actual = new byte[256];
                BlobUtils.readFully((InputStream)is, (byte[])actual, (int)0, (int)256, null);
                i = 32;
                j = 0;
                while (i < 256) {
                    Assert.assertEquals((long)expected[i], (long)actual[j]);
                    ++i;
                    ++j;
                }
            }
            catch (Throwable x2) {
                var15_18 = x2;
                throw x2;
            }
            finally {
                if (is != null) {
                    if (var15_18 != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable x2) {
                            var15_18.addSuppressed(x2);
                        }
                    } else {
                        is.close();
                    }
                }
            }
            client.delete(keys[0]);
            client.delete(keys[1]);
            client.delete(jobId[0], testKey[0]);
            client.delete(jobId[1], testKey[1]);
            Assert.assertTrue((String)"HA storage directory does not exist", (boolean)fs.exists(new Path(storagePath)));
            if (fs.exists(blobServerPath)) {
                FileStatus[] recoveryFiles = fs.listStatus(blobServerPath);
                ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
                for (FileStatus file : recoveryFiles) {
                    filenames.add(file.toString());
                }
                Assert.fail((String)("Unclean state backend: " + filenames));
            }
        }
        finally {
            for (BlobServer s : server) {
                if (s == null) continue;
                s.close();
            }
            if (client != null) {
                client.close();
            }
        }
    }
}

