package org.apache.flink.test.checkpointing;

import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationSwitchStateBackendITCase.class */
public class ChangelogPeriodicMaterializationSwitchStateBackendITCase extends ChangelogPeriodicMaterializationSwitchEnvTestBase {
    public ChangelogPeriodicMaterializationSwitchStateBackendITCase(AbstractStateBackend abstractStateBackend) {
        super(abstractStateBackend);
    }

    @Test
    public void testSwitchFromEnablingToDisabling() throws Exception {
        testSwitchEnv(getEnv(true), getEnv(false));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingOut() throws Exception {
        testSwitchEnv(getEnv(true, 2), getEnv(false, 4));
    }

    @Test
    public void testSwitchFromEnablingToDisablingWithRescalingIn() throws Exception {
        testSwitchEnv(getEnv(true, 4), getEnv(false, 2));
    }

    private StreamExecutionEnvironment getEnv(boolean z) {
        return getEnv(z, 4);
    }

    private StreamExecutionEnvironment getEnv(boolean z, int i) {
        StreamExecutionEnvironment env = getEnv(this.delegatedStateBackend, 100L, 0, 500L, 0);
        env.enableChangelogStateBackend(z);
        env.setParallelism(i);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        return env;
    }
}
