package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;

@Category({AlsoRunWithLegacyScheduler.class})
/* loaded from: input_file:org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.class */
public class ResumeCheckpointManuallyITCase extends TestLogger {
    private static final int PARALLELISM = 2;
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int SLOTS_PER_TASK_MANAGER = 2;

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase$NotifyingInfiniteTupleSource.class */
    public static class NotifyingInfiniteTupleSource extends ManualWindowSpeedITCase.InfiniteTupleSource implements CheckpointListener {
        private static final long serialVersionUID = 8120981235081181746L;
        private static CountDownLatch countDownLatch;
        private static CountDownLatch checkpointCompletedLatch;

        public NotifyingInfiniteTupleSource(int i) {
            super(i);
        }

        @Override // org.apache.flink.test.state.ManualWindowSpeedITCase.InfiniteTupleSource
        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            super.run(sourceContext);
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (checkpointCompletedLatch != null) {
                checkpointCompletedLatch.countDown();
            }
        }
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createRocksDBStateBackend(newFolder, true), false);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createRocksDBStateBackend(newFolder, false), false);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createRocksDBStateBackend(newFolder, true), true);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createRocksDBStateBackend(newFolder, false), true);
    }

    @Test
    public void testExternalizedFSCheckpointsStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createFsStateBackend(newFolder), false);
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File newFolder = temporaryFolder.newFolder();
        testExternalizedCheckpoints(newFolder, null, createFsStateBackend(newFolder), true);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createRocksDBStateBackend(newFolder, true), false);
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createRocksDBStateBackend(newFolder, false), false);
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createRocksDBStateBackend(newFolder, true), true);
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createRocksDBStateBackend(newFolder, false), true);
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFSCheckpointsZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createFsStateBackend(newFolder), false);
        } finally {
            testingServer.stop();
        }
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        try {
            File newFolder = temporaryFolder.newFolder();
            testExternalizedCheckpoints(newFolder, testingServer.getConnectString(), createFsStateBackend(newFolder), true);
        } finally {
            testingServer.stop();
        }
    }

    private FsStateBackend createFsStateBackend(File file) throws IOException {
        return new FsStateBackend(file.toURI().toString(), true);
    }

    private RocksDBStateBackend createRocksDBStateBackend(File file, boolean z) throws IOException {
        return new RocksDBStateBackend(file.toURI().toString(), z);
    }

    private void testExternalizedCheckpoints(File file, String str, StateBackend stateBackend, boolean z) throws Exception {
        Configuration configuration = new Configuration();
        File newFolder = temporaryFolder.newFolder();
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file.toURI().toString());
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, newFolder.toURI().toString());
        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, z);
        if (str != null) {
            File newFolder2 = temporaryFolder.newFolder();
            configuration.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
            configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, str);
            configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, newFolder2.toURI().toString());
        }
        MiniClusterWithClientResource miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        miniClusterWithClientResource.before();
        ClusterClient clusterClient = miniClusterWithClientResource.getClusterClient();
        try {
            String runJobAndGetExternalizedCheckpoint = runJobAndGetExternalizedCheckpoint(stateBackend, file, null, clusterClient);
            Assert.assertNotNull(runJobAndGetExternalizedCheckpoint);
            String runJobAndGetExternalizedCheckpoint2 = runJobAndGetExternalizedCheckpoint(stateBackend, file, runJobAndGetExternalizedCheckpoint, clusterClient);
            Assert.assertNotNull(runJobAndGetExternalizedCheckpoint2);
            Assert.assertNotNull(runJobAndGetExternalizedCheckpoint(stateBackend, file, runJobAndGetExternalizedCheckpoint2, clusterClient));
            miniClusterWithClientResource.after();
        } catch (Throwable th) {
            miniClusterWithClientResource.after();
            throw th;
        }
    }

    private static String runJobAndGetExternalizedCheckpoint(StateBackend stateBackend, File file, @Nullable String str, ClusterClient<?> clusterClient) throws Exception {
        JobGraph jobGraph = getJobGraph(stateBackend, str);
        CountDownLatch unused = NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(2);
        CountDownLatch unused2 = NotifyingInfiniteTupleSource.checkpointCompletedLatch = new CountDownLatch(2);
        ClientUtils.submitJob(clusterClient, jobGraph);
        NotifyingInfiniteTupleSource.countDownLatch.await();
        NotifyingInfiniteTupleSource.checkpointCompletedLatch.await();
        clusterClient.cancel(jobGraph.getJobID()).get();
        waitUntilCanceled(jobGraph.getJobID(), clusterClient);
        return getExternalizedCheckpointCheckpointPath(file, jobGraph.getJobID());
    }

    private static String getExternalizedCheckpointCheckpointPath(File file, JobID jobID) throws IOException {
        Optional<Path> findExternalizedCheckpoint = findExternalizedCheckpoint(file, jobID);
        if (findExternalizedCheckpoint.isPresent()) {
            return findExternalizedCheckpoint.get().toString();
        }
        throw new AssertionError("No complete checkpoint could be found.");
    }

    private static Optional<Path> findExternalizedCheckpoint(File file, JobID jobID) throws IOException {
        Stream<Path> list = Files.list(file.toPath().resolve(jobID.toString()));
        Throwable th = null;
        try {
            try {
                Optional<Path> findAny = list.filter(path -> {
                    return path.getFileName().toString().startsWith("chk-");
                }).filter(path2 -> {
                    try {
                        Stream<Path> list2 = Files.list(path2);
                        Throwable th2 = null;
                        try {
                            boolean anyMatch = list2.anyMatch(path2 -> {
                                return path2.getFileName().toString().contains("meta");
                            });
                            if (list2 != null) {
                                if (0 != 0) {
                                    try {
                                        list2.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    list2.close();
                                }
                            }
                            return anyMatch;
                        } finally {
                        }
                    } catch (IOException e) {
                        return false;
                    }
                }).findAny();
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                return findAny;
            } finally {
            }
        } catch (Throwable th3) {
            if (list != null) {
                if (th != null) {
                    try {
                        list.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    list.close();
                }
            }
            throw th3;
        }
    }

    private static void waitUntilCanceled(JobID jobID, ClusterClient<?> clusterClient) throws ExecutionException, InterruptedException {
        while (clusterClient.getJobStatus(jobID).get() != JobStatus.CANCELED) {
            Thread.sleep(50L);
        }
    }

    private static JobGraph getJobGraph(StateBackend stateBackend, @Nullable String str) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(500L);
        executionEnvironment.setStateBackend(stateBackend);
        executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        executionEnvironment.setParallelism(2);
        executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        executionEnvironment.addSource(new NotifyingInfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds(3L)).reduce((tuple2, tuple22) -> {
            return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
        }).filter(tuple23 -> {
            return ((String) tuple23.f0).startsWith("Tuple 0");
        });
        JobGraph jobGraph = executionEnvironment.getStreamGraph("Test").getJobGraph();
        if (str != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        }
        return jobGraph;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1127816848:
                if (implMethodName.equals("lambda$getJobGraph$704c4ec3$1")) {
                    z = false;
                    break;
                }
                break;
            case 1449882950:
                if (implMethodName.equals("lambda$getJobGraph$bf652cf1$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return (tuple2, tuple22) -> {
                        return Tuple2.of(tuple2.f0, Integer.valueOf(((Integer) tuple2.f1).intValue() + ((Integer) tuple22.f1).intValue()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Z")) {
                    return tuple23 -> {
                        return ((String) tuple23.f0).startsWith("Tuple 0");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
