package org.apache.hadoop.net;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.channels.Pipe;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/net/TestSocketIOWithTimeout.class */
public class TestSocketIOWithTimeout {
    private MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
    static final Logger LOG = LoggerFactory.getLogger(TestSocketIOWithTimeout.class);
    private static int TIMEOUT = 1000;
    private static String TEST_STRING = "1234567890";
    private static final int PAGE_SIZE = (int) NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();

    private void doIO(InputStream inputStream, OutputStream outputStream, int i) throws IOException {
        byte[] bArr = new byte[PAGE_SIZE + 19];
        while (true) {
            long now = Time.now();
            if (inputStream != null) {
                try {
                    inputStream.read(bArr);
                } catch (SocketTimeoutException e) {
                    long now2 = Time.now() - now;
                    LOG.info("Got SocketTimeoutException as expected after " + now2 + " millis : " + e.getMessage());
                    Assert.assertTrue(Math.abs(((long) i) - now2) <= 200);
                    return;
                }
            } else {
                outputStream.write(bArr);
            }
        }
    }

    @Test
    public void testSocketIOWithTimeout() throws Exception {
        Pipe open = Pipe.open();
        Pipe.SourceChannel source = open.source();
        Pipe.SinkChannel sink = open.sink();
        try {
            final SocketInputStream socketInputStream = new SocketInputStream(source, TIMEOUT);
            SocketOutputStream socketOutputStream = new SocketOutputStream(sink, TIMEOUT);
            byte[] bytes = TEST_STRING.getBytes();
            byte[] bArr = new byte[bytes.length];
            socketOutputStream.write(bytes);
            socketOutputStream.write(-128);
            doIO(null, socketOutputStream, TIMEOUT);
            socketInputStream.read(bArr);
            Assert.assertTrue(Arrays.equals(bytes, bArr));
            Assert.assertEquals((-128) & 255, socketInputStream.read());
            doIO(socketInputStream, null, TIMEOUT);
            socketInputStream.setTimeout(TIMEOUT * 2);
            doIO(socketInputStream, null, TIMEOUT * 2);
            socketInputStream.setTimeout(0L);
            MultithreadedTestUtil.TestingThread testingThread = new MultithreadedTestUtil.TestingThread(this.ctx) { // from class: org.apache.hadoop.net.TestSocketIOWithTimeout.1
                @Override // org.apache.hadoop.test.MultithreadedTestUtil.TestingThread
                public void doWork() throws Exception {
                    try {
                        socketInputStream.read();
                        Assert.fail("Did not fail with interrupt");
                    } catch (InterruptedIOException e) {
                        TestSocketIOWithTimeout.LOG.info("Got expection while reading as expected : " + e.getMessage());
                    }
                }
            };
            this.ctx.addThread(testingThread);
            this.ctx.startThreads();
            Thread.sleep(1000L);
            testingThread.interrupt();
            this.ctx.stop();
            Assert.assertTrue(source.isOpen());
            Assert.assertTrue(sink.isOpen());
            if (!Shell.WINDOWS) {
                try {
                    socketOutputStream.write(1);
                    Assert.fail("Did not throw");
                } catch (IOException e) {
                    GenericTestUtils.assertExceptionContains("stream is closed", e);
                }
            }
            socketOutputStream.close();
            Assert.assertFalse(sink.isOpen());
            Assert.assertEquals(-1L, socketInputStream.read());
            socketInputStream.close();
            Assert.assertFalse(source.isOpen());
            if (source != null) {
                source.close();
            }
            if (sink != null) {
                sink.close();
            }
        } catch (Throwable th) {
            if (source != null) {
                source.close();
            }
            if (sink != null) {
                sink.close();
            }
            throw th;
        }
    }

    @Test
    public void testSocketIOWithTimeoutByMultiThread() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Runnable runnable = () -> {
            ?? r11;
            ?? r12;
            ?? r13;
            ?? r14;
            try {
                try {
                    Pipe open = Pipe.open();
                    Pipe.SourceChannel source = open.source();
                    Throwable th = null;
                    try {
                        SocketInputStream socketInputStream = new SocketInputStream(source, TIMEOUT);
                        Throwable th2 = null;
                        try {
                            Pipe.SinkChannel sink = open.sink();
                            Throwable th3 = null;
                            SocketOutputStream socketOutputStream = new SocketOutputStream(sink, TIMEOUT);
                            Throwable th4 = null;
                            try {
                                try {
                                    byte[] bytes = TEST_STRING.getBytes();
                                    byte[] bArr = new byte[bytes.length];
                                    countDownLatch.await();
                                    socketOutputStream.write(bytes);
                                    doIO(null, socketOutputStream, TIMEOUT);
                                    socketInputStream.read(bArr);
                                    Assert.assertArrayEquals(bytes, bArr);
                                    doIO(socketInputStream, null, TIMEOUT);
                                    if (socketOutputStream != null) {
                                        if (0 != 0) {
                                            try {
                                                socketOutputStream.close();
                                            } catch (Throwable th5) {
                                                th4.addSuppressed(th5);
                                            }
                                        } else {
                                            socketOutputStream.close();
                                        }
                                    }
                                    if (sink != null) {
                                        if (0 != 0) {
                                            try {
                                                sink.close();
                                            } catch (Throwable th6) {
                                                th3.addSuppressed(th6);
                                            }
                                        } else {
                                            sink.close();
                                        }
                                    }
                                    if (socketInputStream != null) {
                                        if (0 != 0) {
                                            try {
                                                socketInputStream.close();
                                            } catch (Throwable th7) {
                                                th2.addSuppressed(th7);
                                            }
                                        } else {
                                            socketInputStream.close();
                                        }
                                    }
                                    if (source != null) {
                                        if (0 != 0) {
                                            try {
                                                source.close();
                                            } catch (Throwable th8) {
                                                th.addSuppressed(th8);
                                            }
                                        } else {
                                            source.close();
                                        }
                                    }
                                } catch (Throwable th9) {
                                    th4 = th9;
                                    throw th9;
                                }
                            } catch (Throwable th10) {
                                if (socketOutputStream != null) {
                                    if (th4 != null) {
                                        try {
                                            socketOutputStream.close();
                                        } catch (Throwable th11) {
                                            th4.addSuppressed(th11);
                                        }
                                    } else {
                                        socketOutputStream.close();
                                    }
                                }
                                throw th10;
                            }
                        } catch (Throwable th12) {
                            if (r13 != 0) {
                                if (r14 != 0) {
                                    try {
                                        r13.close();
                                    } catch (Throwable th13) {
                                        r14.addSuppressed(th13);
                                    }
                                } else {
                                    r13.close();
                                }
                            }
                            throw th12;
                        }
                    } catch (Throwable th14) {
                        if (r11 != 0) {
                            if (r12 != 0) {
                                try {
                                    r11.close();
                                } catch (Throwable th15) {
                                    r12.addSuppressed(th15);
                                }
                            } else {
                                r11.close();
                            }
                        }
                        throw th14;
                    }
                } catch (Exception e) {
                    Assert.fail(e.getMessage());
                }
            } finally {
            }
        };
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(64);
        for (int i = 0; i < 64; i++) {
            newFixedThreadPool.submit(runnable);
        }
        Thread.sleep(1000L);
        countDownLatch.countDown();
        newFixedThreadPool.shutdown();
        Assert.assertTrue(newFixedThreadPool.awaitTermination(3L, TimeUnit.SECONDS));
    }

    @Test
    public void testSocketIOWithTimeoutInterrupted() throws Exception {
        Pipe open = Pipe.open();
        final int i = TIMEOUT * 10;
        Pipe.SourceChannel source = open.source();
        Throwable th = null;
        try {
            final SocketInputStream socketInputStream = new SocketInputStream(source, i);
            Throwable th2 = null;
            try {
                try {
                    MultithreadedTestUtil.TestingThread testingThread = new MultithreadedTestUtil.TestingThread(this.ctx) { // from class: org.apache.hadoop.net.TestSocketIOWithTimeout.2
                        @Override // org.apache.hadoop.test.MultithreadedTestUtil.TestingThread
                        public void doWork() throws Exception {
                            try {
                                socketInputStream.read();
                                Assert.fail("Did not fail with interrupt");
                            } catch (InterruptedIOException e) {
                                String message = e.getMessage();
                                Assert.assertTrue(message.contains("Total timeout mills is " + i));
                                Assert.assertTrue(message.contains("millis timeout left"));
                            }
                        }
                    };
                    this.ctx.addThread(testingThread);
                    this.ctx.startThreads();
                    Thread.sleep(1000L);
                    testingThread.interrupt();
                    this.ctx.stop();
                    if (socketInputStream != null) {
                        if (0 != 0) {
                            try {
                                socketInputStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            socketInputStream.close();
                        }
                    }
                    if (source != null) {
                        if (0 == 0) {
                            source.close();
                            return;
                        }
                        try {
                            source.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (socketInputStream != null) {
                    if (th2 != null) {
                        try {
                            socketInputStream.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        socketInputStream.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (source != null) {
                if (0 != 0) {
                    try {
                        source.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    source.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void testSocketIOWithTimeoutInterruptedByMultiThread() throws Exception {
        int i = TIMEOUT * 10;
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        Runnable runnable = () -> {
            ?? r10;
            ?? r11;
            try {
                try {
                    try {
                        Pipe.SourceChannel source = Pipe.open().source();
                        Throwable th = null;
                        SocketInputStream socketInputStream = new SocketInputStream(source, i);
                        Throwable th2 = null;
                        try {
                            try {
                                socketInputStream.read();
                                atomicLong.incrementAndGet();
                                if (socketInputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            socketInputStream.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        socketInputStream.close();
                                    }
                                }
                                if (source != null) {
                                    if (0 != 0) {
                                        try {
                                            source.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        source.close();
                                    }
                                }
                            } catch (Throwable th5) {
                                th2 = th5;
                                throw th5;
                            }
                        } catch (Throwable th6) {
                            if (socketInputStream != null) {
                                if (th2 != null) {
                                    try {
                                        socketInputStream.close();
                                    } catch (Throwable th7) {
                                        th2.addSuppressed(th7);
                                    }
                                } else {
                                    socketInputStream.close();
                                }
                            }
                            throw th6;
                        }
                    } catch (InterruptedIOException e) {
                        atomicLong2.incrementAndGet();
                    }
                } catch (Throwable th8) {
                    if (r10 != 0) {
                        if (r11 != 0) {
                            try {
                                r10.close();
                            } catch (Throwable th9) {
                                r11.addSuppressed(th9);
                            }
                        } else {
                            r10.close();
                        }
                    }
                    throw th8;
                }
            } catch (Exception e2) {
                Assert.fail(e2.getMessage());
            }
        };
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(64);
        for (int i2 = 0; i2 < 64; i2++) {
            newFixedThreadPool.submit(runnable);
        }
        Thread.sleep(1000L);
        newFixedThreadPool.shutdownNow();
        newFixedThreadPool.awaitTermination(1L, TimeUnit.SECONDS);
        Assert.assertEquals(0L, atomicLong.get());
        Assert.assertEquals(64, atomicLong2.get());
    }
}
