package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedThrowable;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.class */
public class UnalignedCheckpointFailureHandlingITCase {
    private static final int PARALLELISM = 2;

    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(PARALLELISM).setNumberSlotsPerTaskManager(1).build());

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase$FailingOnceFsCheckpointOutputStream.class */
    private static class FailingOnceFsCheckpointOutputStream extends FsCheckpointStreamFactory.FsCheckpointStateOutputStream {
        private final AtomicBoolean failOnClose;
        private volatile boolean failedCloseAndGetHandle;

        public FailingOnceFsCheckpointOutputStream(File file, int i, int i2, AtomicBoolean atomicBoolean) throws IOException {
            super(Path.fromLocalFile(file.getAbsoluteFile()), FileSystem.get(file.toURI()), i, i2);
            this.failedCloseAndGetHandle = false;
            this.failOnClose = atomicBoolean;
        }

        public StreamStateHandle closeAndGetHandle() throws IOException {
            if (!this.failOnClose.get()) {
                return super.closeAndGetHandle();
            }
            this.failedCloseAndGetHandle = true;
            throw new TestException("failure from closeAndGetHandle");
        }

        public void close() {
            if (this.failedCloseAndGetHandle && this.failOnClose.compareAndSet(true, false)) {
                ExceptionUtils.rethrow(new TestException("failure from close"));
            } else {
                super.close();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase$TestCheckpointStorage.class */
    private static class TestCheckpointStorage implements CheckpointStorage {
        private final CheckpointStorage delegate;
        private final SharedReference<AtomicBoolean> failOnCloseRef;
        private final SharedReference<TemporaryFolder> tempFolderRef;

        private TestCheckpointStorage(CheckpointStorage checkpointStorage, SharedObjects sharedObjects, TemporaryFolder temporaryFolder) {
            this.delegate = checkpointStorage;
            this.failOnCloseRef = sharedObjects.add(new AtomicBoolean(true));
            this.tempFolderRef = sharedObjects.add(temporaryFolder);
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return new TestCheckpointStorageAccess(this.delegate.createCheckpointStorage(jobID), (AtomicBoolean) this.failOnCloseRef.get(), ((TemporaryFolder) this.tempFolderRef.get()).newFolder());
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return this.delegate.resolveCheckpoint(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase$TestCheckpointStorageAccess.class */
    private static class TestCheckpointStorageAccess implements CheckpointStorageAccess {
        private final CheckpointStorageAccess delegate;
        private final AtomicBoolean failOnClose;
        private final File path;

        public TestCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess, AtomicBoolean atomicBoolean, File file) {
            this.delegate = checkpointStorageAccess;
            this.failOnClose = atomicBoolean;
            this.path = file;
        }

        public CheckpointStreamFactory resolveCheckpointStorageLocation(long j, CheckpointStorageLocationReference checkpointStorageLocationReference) {
            return checkpointedStateScope -> {
                return new FailingOnceFsCheckpointOutputStream(this.path, 100, 0, this.failOnClose);
            };
        }

        public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
            return this.delegate.createTaskOwnedStateStream();
        }

        public boolean supportsHighlyAvailableStorage() {
            return this.delegate.supportsHighlyAvailableStorage();
        }

        public boolean hasDefaultSavepointLocation() {
            return this.delegate.hasDefaultSavepointLocation();
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return this.delegate.resolveCheckpoint(str);
        }

        public void initializeBaseLocationsForCheckpoint() throws IOException {
            this.delegate.initializeBaseLocationsForCheckpoint();
        }

        public CheckpointStorageLocation initializeLocationForCheckpoint(long j) throws IOException {
            return this.delegate.initializeLocationForCheckpoint(j);
        }

        public CheckpointStorageLocation initializeLocationForSavepoint(long j, @Nullable String str) throws IOException {
            return this.delegate.initializeLocationForSavepoint(j, str);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase$TestException.class */
    private static class TestException extends IOException {
        public TestException(String str) {
            super(str);
        }
    }

    @Test
    public void testCheckpointSuccessAfterFailure() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        configure(executionEnvironment, new TestCheckpointStorage(new JobManagerCheckpointStorage(), this.sharedObjects, this.temporaryFolder));
        buildGraph(executionEnvironment);
        JobClient executeAsync = executionEnvironment.executeAsync();
        JobID jobID = executeAsync.getJobID();
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        CommonTestUtils.waitForJobStatus(executeAsync, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(30L)));
        CommonTestUtils.waitForAllTaskRunning(miniCluster, jobID, false);
        triggerFailingCheckpoint(jobID, TestException.class, miniCluster);
        miniCluster.triggerCheckpoint(jobID).get();
    }

    private void configure(StreamExecutionEnvironment streamExecutionEnvironment, TestCheckpointStorage testCheckpointStorage) {
        streamExecutionEnvironment.enableCheckpointing(Long.MAX_VALUE, CheckpointingMode.EXACTLY_ONCE);
        streamExecutionEnvironment.getCheckpointConfig().setCheckpointStorage(testCheckpointStorage);
        streamExecutionEnvironment.setStateBackend(new MockStateBackend(true));
        streamExecutionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints();
        streamExecutionEnvironment.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ZERO);
        streamExecutionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        streamExecutionEnvironment.setParallelism(PARALLELISM);
        streamExecutionEnvironment.disableOperatorChaining();
    }

    private void buildGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.fromSource(new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.noWatermarks(), "num-source").keyBy(l -> {
            return l;
        }).map(l2 -> {
            Thread.sleep(1L);
            return l2;
        }).addSink(new DiscardingSink());
    }

    private void triggerFailingCheckpoint(JobID jobID, Class<TestException> cls, MiniCluster miniCluster) throws InterruptedException, ExecutionException {
        while (true) {
            Optional optional = (Optional) miniCluster.triggerCheckpoint(jobID).thenApply(str -> {
                return Optional.empty();
            }).handle((optional2, th) -> {
                return Optional.ofNullable(th);
            }).get();
            if (!optional.isPresent()) {
                Thread.sleep(50L);
            } else if (isCausedBy((Throwable) optional.get(), cls)) {
                return;
            } else {
                ExceptionUtils.rethrow((Throwable) optional.get());
            }
        }
    }

    private boolean isCausedBy(Throwable th, Class<TestException> cls) {
        return ExceptionUtils.findThrowable(th, SerializedThrowable.class).flatMap(serializedThrowable -> {
            return ExceptionUtils.findThrowable(serializedThrowable.deserializeError(getClass().getClassLoader()), cls);
        }).isPresent();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 458469496:
                if (implMethodName.equals("lambda$buildGraph$e1faee96$1")) {
                    z = false;
                    break;
                }
                break;
            case 1422944076:
                if (implMethodName.equals("lambda$buildGraph$29e4a0b5$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l2 -> {
                        Thread.sleep(1L);
                        return l2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return l;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
