package org.apache.flink.runtime.blob;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServerGetTest.class */
public class BlobServerGetTest extends TestLogger {
    private final Random rnd = new Random();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testGetFailsDuringLookup() throws IOException {
        BlobServer blobServer = null;
        BlobClient blobClient = null;
        try {
            Configuration configuration = new Configuration();
            blobServer = new BlobServer(configuration, new VoidBlobStore());
            blobClient = new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), configuration);
            byte[] bArr = new byte[2000000];
            this.rnd.nextBytes(bArr);
            BlobKey put = blobClient.put(bArr);
            Assert.assertNotNull(put);
            Assert.assertTrue(blobServer.getStorageLocation(put).delete());
            try {
                blobClient.get(put);
                Assert.fail("This should not succeed.");
            } catch (IOException e) {
            }
            if (blobClient != null) {
                blobClient.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
        } catch (Throwable th) {
            if (blobClient != null) {
                blobClient.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            throw th;
        }
    }

    @Test
    public void testGetFailsDuringStreaming() throws IOException {
        BlobServer blobServer = null;
        BlobClient blobClient = null;
        try {
            Configuration configuration = new Configuration();
            blobServer = new BlobServer(configuration, new VoidBlobStore());
            blobClient = new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), configuration);
            byte[] bArr = new byte[5000000];
            this.rnd.nextBytes(bArr);
            BlobKey put = blobClient.put(bArr);
            Assert.assertNotNull(put);
            InputStream inputStream = blobClient.get(put);
            byte[] bArr2 = new byte[50000];
            BlobUtils.readFully(inputStream, bArr2, 0, bArr2.length, (String) null);
            BlobUtils.readFully(inputStream, bArr2, 0, bArr2.length, (String) null);
            Iterator it = blobServer.getCurrentActiveConnections().iterator();
            while (it.hasNext()) {
                ((BlobServerConnection) it.next()).close();
            }
            try {
                byte[] bArr3 = new byte[bArr.length - (2 * bArr2.length)];
                BlobUtils.readFully(inputStream, bArr3, 0, bArr3.length, (String) null);
            } catch (IOException e) {
            }
            if (blobClient != null) {
                blobClient.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
        } catch (Throwable th) {
            if (blobClient != null) {
                blobClient.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            throw th;
        }
    }

    @Test
    public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.setString("blob.storage.directory", this.temporaryFolder.newFolder().getAbsolutePath());
        BlobStore blobStore = (BlobStore) Mockito.mock(BlobStore.class);
        ArrayList arrayList = new ArrayList(3);
        final byte[] bArr = {1, 2, 3, 4, 99, 42};
        final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        final BlobKey blobKey = new BlobKey(BlobUtils.createMessageDigest().digest(bArr));
        ((BlobStore) Mockito.doAnswer(new Answer() { // from class: org.apache.flink.runtime.blob.BlobServerGetTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                FileUtils.copyInputStreamToFile(byteArrayInputStream, (File) invocationOnMock.getArguments()[1]);
                return null;
            }
        }).when(blobStore)).get((BlobKey) Matchers.any(BlobKey.class), (File) Matchers.any(File.class));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        try {
            final BlobServer blobServer = new BlobServer(configuration, blobStore);
            Throwable th = null;
            for (int i = 0; i < 3; i++) {
                try {
                    try {
                        arrayList.add(FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() { // from class: org.apache.flink.runtime.blob.BlobServerGetTest.2
                            /* JADX WARN: Can't rename method to resolve collision */
                            @Override // java.util.concurrent.Callable
                            public InputStream call() throws Exception {
                                BlobClient createClient = blobServer.createClient();
                                Throwable th2 = null;
                                try {
                                    InputStream inputStream = createClient.get(blobKey);
                                    Throwable th3 = null;
                                    try {
                                        try {
                                            byte[] bArr2 = new byte[bArr.length];
                                            IOUtils.readFully(inputStream, bArr2);
                                            ByteArrayInputStream byteArrayInputStream2 = new ByteArrayInputStream(bArr2);
                                            if (inputStream != null) {
                                                if (0 != 0) {
                                                    try {
                                                        inputStream.close();
                                                    } catch (Throwable th4) {
                                                        th3.addSuppressed(th4);
                                                    }
                                                } else {
                                                    inputStream.close();
                                                }
                                            }
                                            return byteArrayInputStream2;
                                        } finally {
                                        }
                                    } catch (Throwable th5) {
                                        if (inputStream != null) {
                                            if (th3 != null) {
                                                try {
                                                    inputStream.close();
                                                } catch (Throwable th6) {
                                                    th3.addSuppressed(th6);
                                                }
                                            } else {
                                                inputStream.close();
                                            }
                                        }
                                        throw th5;
                                    }
                                } finally {
                                    if (createClient != null) {
                                        if (0 != 0) {
                                            try {
                                                createClient.close();
                                            } catch (Throwable th7) {
                                                th2.addSuppressed(th7);
                                            }
                                        } else {
                                            createClient.close();
                                        }
                                    }
                                }
                            }
                        }, newFixedThreadPool));
                    } finally {
                    }
                } finally {
                }
            }
            for (InputStream inputStream : (Collection) FutureUtils.combineAll(arrayList).get()) {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bArr.length);
                IOUtils.copy(inputStream, byteArrayOutputStream);
                byteArrayOutputStream.close();
                Assert.assertArrayEquals(bArr, byteArrayOutputStream.toByteArray());
                inputStream.close();
            }
            ((BlobStore) Mockito.verify(blobStore, Mockito.times(1))).get((BlobKey) Matchers.eq(blobKey), (File) Matchers.any(File.class));
            if (blobServer != null) {
                if (0 != 0) {
                    try {
                        blobServer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    blobServer.close();
                }
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }
}
