/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class FlinkFutureTest
extends TestLogger {
    private static ExecutorService executor;

    @BeforeClass
    public static void setup() {
        executor = Executors.newSingleThreadExecutor();
    }

    @AfterClass
    public static void teardown() {
        executor.shutdown();
    }

    @Test(timeout=10000L)
    public void testFutureApplyAsync() throws Exception {
        int expectedValue = 42;
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        Future appliedFuture = initialFuture.thenApplyAsync((ApplyFunction)new ApplyFunction<Integer, String>(){

            public String apply(Integer value) {
                return String.valueOf(value);
            }
        }, (Executor)executor);
        initialFuture.complete((Object)expectedValue);
        Assert.assertEquals((Object)String.valueOf(expectedValue), (Object)appliedFuture.get());
    }

    @Test(expected=TimeoutException.class)
    public void testFutureGetTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        FlinkCompletableFuture future = new FlinkCompletableFuture();
        future.get(10L, TimeUnit.MILLISECONDS);
        Assert.fail((String)"Get should have thrown a timeout exception.");
    }

    @Test(expected=TestException.class)
    public void testExceptionalCompletion() throws Throwable {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        initialFuture.completeExceptionally((Throwable)new TestException("Test exception"));
        try {
            initialFuture.get();
            Assert.fail((String)"Get should have thrown an exception.");
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(expected=TestException.class)
    public void testExceptionPropagation() throws Throwable {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        Future mappedFuture = initialFuture.thenApplyAsync((ApplyFunction)new ApplyFunction<Integer, String>(){

            public String apply(Integer value) {
                throw new TestException("Test exception");
            }
        }, (Executor)executor);
        Future mapped2Future = mappedFuture.thenApplyAsync((ApplyFunction)new ApplyFunction<String, String>(){

            public String apply(String value) {
                return "foobar";
            }
        }, (Executor)executor);
        initialFuture.complete((Object)42);
        try {
            mapped2Future.get();
            Assert.fail((String)"Get should have thrown an exception.");
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test(timeout=10000L)
    public void testExceptionallyAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        String exceptionMessage = "Foobar";
        Future recovered = initialFuture.exceptionallyAsync((ApplyFunction)new ApplyFunction<Throwable, String>(){

            public String apply(Throwable value) {
                return value.getMessage();
            }
        }, (Executor)executor);
        initialFuture.completeExceptionally((Throwable)new TestException(exceptionMessage));
        String actualMessage = (String)recovered.get();
        Assert.assertEquals((Object)exceptionMessage, (Object)actualMessage);
    }

    @Test(timeout=10000L)
    public void testComposeAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        int expectedValue = 42;
        Future composedFuture = initialFuture.thenComposeAsync((ApplyFunction)new ApplyFunction<Integer, Future<Integer>>(){

            public Future<Integer> apply(Integer value) {
                return FlinkFuture.supplyAsync((Callable)new Callable<Integer>(){

                    @Override
                    public Integer call() throws Exception {
                        return 42;
                    }
                }, (Executor)executor);
            }
        }, (Executor)executor);
        initialFuture.complete((Object)42);
        int actualValue = (Integer)composedFuture.get();
        Assert.assertEquals((long)42L, (long)actualValue);
    }

    @Test(timeout=10000L)
    public void testCombineAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture leftFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture rightFuture = new FlinkCompletableFuture();
        int expectedLeftValue = 42;
        String expectedRightValue = "foobar";
        Future resultFuture = leftFuture.thenCombineAsync((Future)rightFuture, (BiFunction)new BiFunction<Integer, String, String>(){

            public String apply(Integer integer, String s) {
                return s + integer;
            }
        }, (Executor)executor);
        leftFuture.complete((Object)42);
        rightFuture.complete((Object)"foobar");
        String result = (String)resultFuture.get();
        Assert.assertEquals((Object)"foobar42", (Object)result);
    }

    @Test(timeout=10000L)
    public void testCombineAsyncLeftFailure() throws InterruptedException {
        FlinkCompletableFuture leftFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture rightFuture = new FlinkCompletableFuture();
        String expectedRightValue = "foobar";
        TestException testException = new TestException("barfoo");
        Future resultFuture = leftFuture.thenCombineAsync((Future)rightFuture, (BiFunction)new BiFunction<Integer, String, String>(){

            public String apply(Integer integer, String s) {
                return s + integer;
            }
        }, (Executor)executor);
        leftFuture.completeExceptionally((Throwable)testException);
        rightFuture.complete((Object)"foobar");
        try {
            resultFuture.get();
            Assert.fail((String)"We should have caught an ExecutionException.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
        }
    }

    @Test(timeout=10000L)
    public void testCombineAsyncRightFailure() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture leftFuture = new FlinkCompletableFuture();
        FlinkCompletableFuture rightFuture = new FlinkCompletableFuture();
        int expectedLeftValue = 42;
        TestException testException = new TestException("barfoo");
        Future resultFuture = leftFuture.thenCombineAsync((Future)rightFuture, (BiFunction)new BiFunction<Integer, String, String>(){

            public String apply(Integer integer, String s) {
                return s + integer;
            }
        }, (Executor)executor);
        leftFuture.complete((Object)42);
        rightFuture.completeExceptionally((Throwable)testException);
        try {
            resultFuture.get();
            Assert.fail((String)"We should have caught an ExecutionException.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals((Object)testException, (Object)e.getCause());
        }
    }

    @Test
    public void testGetNow() throws ExecutionException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        int absentValue = 41;
        Assert.assertEquals((Object)new Integer(41), (Object)initialFuture.getNow((Object)41));
    }

    @Test(timeout=10000L)
    public void testAcceptAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        int expectedValue = 42;
        Future result = initialFuture.thenAcceptAsync((AcceptFunction)new AcceptFunction<Integer>(){

            public void accept(Integer value) {
                atomicInteger.set(value);
            }
        }, (Executor)executor);
        initialFuture.complete((Object)expectedValue);
        result.get();
        Assert.assertEquals((long)expectedValue, (long)atomicInteger.get());
    }

    @Test(timeout=10000L)
    public void testHandleAsync() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        int expectedValue = 43;
        Future result = initialFuture.handleAsync((BiFunction)new BiFunction<Integer, Throwable, String>(){

            public String apply(Integer integer, Throwable throwable) {
                if (integer != null) {
                    return String.valueOf(integer);
                }
                return throwable.getMessage();
            }
        }, (Executor)executor);
        initialFuture.complete((Object)expectedValue);
        Assert.assertEquals((Object)String.valueOf(expectedValue), (Object)result.get());
    }

    @Test(timeout=10000L)
    public void testHandleAsyncException() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        String exceptionMessage = "foobar";
        Future result = initialFuture.handleAsync((BiFunction)new BiFunction<Integer, Throwable, String>(){

            public String apply(Integer integer, Throwable throwable) {
                if (integer != null) {
                    return String.valueOf(integer);
                }
                return throwable.getMessage();
            }
        }, (Executor)executor);
        initialFuture.completeExceptionally((Throwable)new TestException(exceptionMessage));
        Assert.assertEquals((Object)exceptionMessage, (Object)result.get());
    }

    @Test(timeout=10000L)
    public void testMultipleCompleteOperations() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        int expectedValue = 42;
        Assert.assertTrue((boolean)initialFuture.complete((Object)expectedValue));
        Assert.assertFalse((boolean)initialFuture.complete((Object)1337));
        Assert.assertFalse((boolean)initialFuture.completeExceptionally((Throwable)new TestException("foobar")));
        Assert.assertEquals((Object)new Integer(expectedValue), (Object)initialFuture.get());
    }

    @Test
    public void testApply() throws ExecutionException, InterruptedException {
        int expectedValue = 42;
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        Future appliedFuture = initialFuture.thenApply((ApplyFunction)new ApplyFunction<Integer, String>(){

            public String apply(Integer value) {
                return String.valueOf(value);
            }
        });
        initialFuture.complete((Object)expectedValue);
        Assert.assertEquals((Object)String.valueOf(expectedValue), (Object)appliedFuture.get());
    }

    @Test
    public void testAccept() throws ExecutionException, InterruptedException {
        int expectedValue = 42;
        FlinkCompletableFuture initialFuture = FlinkCompletableFuture.completed((Object)expectedValue);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        Future result = initialFuture.thenAccept((AcceptFunction)new AcceptFunction<Integer>(){

            public void accept(Integer value) {
                atomicInteger.set(value);
            }
        });
        result.get();
        Assert.assertEquals((long)expectedValue, (long)atomicInteger.get());
    }

    @Test
    public void testExceptionally() throws ExecutionException, InterruptedException {
        String exceptionMessage = "Foobar";
        FlinkCompletableFuture initialFuture = FlinkCompletableFuture.completedExceptionally((Throwable)new TestException(exceptionMessage));
        Future recovered = initialFuture.exceptionally((ApplyFunction)new ApplyFunction<Throwable, String>(){

            public String apply(Throwable value) {
                return value.getMessage();
            }
        });
        String actualMessage = (String)recovered.get();
        Assert.assertEquals((Object)exceptionMessage, (Object)actualMessage);
    }

    @Test
    public void testHandle() throws ExecutionException, InterruptedException {
        int expectedValue = 43;
        FlinkCompletableFuture initialFuture = FlinkCompletableFuture.completed((Object)expectedValue);
        Future result = initialFuture.handle((BiFunction)new BiFunction<Integer, Throwable, String>(){

            public String apply(Integer integer, Throwable throwable) {
                if (integer != null) {
                    return String.valueOf(integer);
                }
                return throwable.getMessage();
            }
        });
        Assert.assertEquals((Object)String.valueOf(expectedValue), (Object)result.get());
    }

    @Test
    public void testCompose() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture initialFuture = new FlinkCompletableFuture();
        int expectedValue = 42;
        Future composedFuture = initialFuture.thenCompose((ApplyFunction)new ApplyFunction<Integer, Future<Integer>>(){

            public Future<Integer> apply(Integer value) {
                return FlinkFuture.supplyAsync((Callable)new Callable<Integer>(){

                    @Override
                    public Integer call() throws Exception {
                        return 42;
                    }
                }, (Executor)executor);
            }
        });
        initialFuture.complete((Object)42);
        int actualValue = (Integer)composedFuture.get();
        Assert.assertEquals((long)42L, (long)actualValue);
    }

    @Test
    public void testCombine() throws ExecutionException, InterruptedException {
        int expectedLeftValue = 1;
        int expectedRightValue = 2;
        FlinkCompletableFuture left = FlinkCompletableFuture.completed((Object)expectedLeftValue);
        FlinkCompletableFuture right = FlinkCompletableFuture.completed((Object)expectedRightValue);
        Future sum = left.thenCombine((Future)right, (BiFunction)new BiFunction<Integer, Integer, Integer>(){

            public Integer apply(Integer left, Integer right) {
                return left + right;
            }
        });
        int result = (Integer)sum.get();
        Assert.assertEquals((long)(expectedLeftValue + expectedRightValue), (long)result);
    }

    @Test(timeout=10000L)
    public void testMultipleFunctionsOnCompleteFuture() throws Exception {
        FlinkCompletableFuture future = FlinkCompletableFuture.completed((Object)"test");
        Future result1 = future.handleAsync((BiFunction)new BiFunction<String, Throwable, String>(){

            public String apply(String s, Throwable throwable) {
                return s != null ? s : throwable.getMessage();
            }
        }, (Executor)executor);
        Future result2 = future.thenAcceptAsync((AcceptFunction)new AcceptFunction<String>(){

            public void accept(String value) {
            }
        }, (Executor)executor);
        Assert.assertEquals((Object)"test", (Object)result1.get());
        Assert.assertNull((Object)result2.get());
    }

    @Test(timeout=10000L)
    public void testMultipleFunctionsOnIncompleteFuture() throws Exception {
        FlinkCompletableFuture future = new FlinkCompletableFuture();
        Future result1 = future.handleAsync((BiFunction)new BiFunction<String, Throwable, String>(){

            public String apply(String s, Throwable throwable) {
                return s != null ? s : throwable.getMessage();
            }
        }, (Executor)executor);
        Future result2 = future.thenAcceptAsync((AcceptFunction)new AcceptFunction<String>(){

            public void accept(String value) {
            }
        }, (Executor)executor);
        future.complete((Object)"value");
        Assert.assertEquals((Object)"value", (Object)result1.get());
        Assert.assertNull((Object)result2.get());
    }

    @Test(timeout=10000L)
    public void testMultipleFunctionsExceptional() throws Exception {
        FlinkCompletableFuture future = new FlinkCompletableFuture();
        Future result1 = future.handleAsync((BiFunction)new BiFunction<String, Throwable, String>(){

            public String apply(String s, Throwable throwable) {
                return s != null ? s : throwable.getMessage();
            }
        }, (Executor)executor);
        Future result2 = future.thenAcceptAsync((AcceptFunction)new AcceptFunction<String>(){

            public void accept(String value) {
            }
        }, (Executor)executor);
        future.completeExceptionally((Throwable)new TestException("test"));
        Assert.assertEquals((Object)"test", (Object)result1.get());
        try {
            result2.get();
            Assert.fail((String)"We should have caught an ExecutionException.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof TestException));
        }
    }

    @Test(timeout=10000L)
    public void testChainedFutureExceptionalCompletion() throws ExecutionException, InterruptedException {
        FlinkCompletableFuture future = new FlinkCompletableFuture();
        Future apply2 = future.thenApplyAsync((ApplyFunction)new ApplyFunction<String, String>(){

            public String apply(String value) {
                return value;
            }
        }, (Executor)executor);
        Future applyException = apply2.exceptionallyAsync((ApplyFunction)new ApplyFunction<Throwable, Throwable>(){

            public Throwable apply(Throwable value) {
                return value;
            }
        }, (Executor)executor);
        Future accept1 = future.thenAcceptAsync((AcceptFunction)new AcceptFunction<String>(){

            public void accept(String value) {
            }
        }, (Executor)executor);
        Future accept1Exception = accept1.exceptionallyAsync((ApplyFunction)new ApplyFunction<Throwable, Throwable>(){

            public Throwable apply(Throwable value) {
                return value;
            }
        }, (Executor)executor);
        Future accept2 = future.thenAcceptAsync((AcceptFunction)new AcceptFunction<String>(){

            public void accept(String value) {
            }
        }, (Executor)executor);
        Future accept2Exception = accept2.exceptionallyAsync((ApplyFunction)new ApplyFunction<Throwable, Throwable>(){

            public Throwable apply(Throwable value) {
                return value;
            }
        }, (Executor)executor);
        TestException testException = new TestException("test");
        future.completeExceptionally((Throwable)testException);
        Assert.assertEquals((Object)testException, (Object)applyException.get());
        Assert.assertEquals((Object)testException, (Object)accept1Exception.get());
        Assert.assertEquals((Object)testException, (Object)accept2Exception.get());
    }

    private static class TestException
    extends RuntimeException {
        private static final long serialVersionUID = -1274022962838535130L;

        public TestException(String message) {
            super(message);
        }
    }
}

