package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TernaryBoolean;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest.class */
public class StreamExecutionEnvironmentComplexConfigurationTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$BasicJobExecutedCounter.class */
    public static class BasicJobExecutedCounter implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
            this.count++;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$BasicJobSubmittedCounter.class */
    public static class BasicJobSubmittedCounter implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable th) {
            this.count++;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable th) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo.class */
    public static class CustomPojo {
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer.class */
    public static class CustomPojoSerializer extends Serializer<CustomPojo> {
        public void write(Kryo kryo, Output output, CustomPojo customPojo) {
        }

        public CustomPojo read(Kryo kryo, Input input, Class<CustomPojo> cls) {
            return null;
        }

        /* renamed from: read, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m10read(Kryo kryo, Input input, Class cls) {
            return read(kryo, input, (Class<CustomPojo>) cls);
        }
    }

    @Test
    public void testLoadingStateBackendFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.setString("state.backend", "jobmanager");
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertThat(executionEnvironment.getStateBackend(), CoreMatchers.instanceOf(MemoryStateBackend.class));
    }

    @Test
    public void testLoadingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile("/tmp4", "file4", true);
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.cached-files", "name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2;name:file3,path:'oss://bucket/file1'");
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertThat(executionEnvironment.getCachedFiles(), CoreMatchers.equalTo(Arrays.asList(Tuple2.of("file1", new DistributedCache.DistributedCacheEntry("/tmp1", true)), Tuple2.of("file2", new DistributedCache.DistributedCacheEntry("/tmp2", false)), Tuple2.of("file3", new DistributedCache.DistributedCacheEntry("oss://bucket/file1", false)))));
    }

    @Test
    public void testLoadingKryoSerializersFromConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.default-kryo-serializers", "class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo',serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(CustomPojo.class, CustomPojoSerializer.class);
        Assert.assertThat(executionEnvironment.getConfig().getDefaultKryoSerializerClasses(), CoreMatchers.equalTo(linkedHashMap));
    }

    @Test
    public void testNotOverridingStateBackendWithDefaultsFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setStateBackend(new MemoryStateBackend());
        executionEnvironment.configure(new Configuration(), Thread.currentThread().getContextClassLoader());
        Assert.assertThat(executionEnvironment.getStateBackend(), CoreMatchers.instanceOf(MemoryStateBackend.class));
    }

    @Test
    public void testOverridingChangelogStateBackendWithFromConfigurationWhenSet() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Assert.assertEquals(TernaryBoolean.UNDEFINED, executionEnvironment.isChangelogStateBackendEnabled());
        Configuration configuration = new Configuration();
        configuration.setBoolean(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(TernaryBoolean.TRUE, executionEnvironment.isChangelogStateBackendEnabled());
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(TernaryBoolean.TRUE, executionEnvironment.isChangelogStateBackendEnabled());
        configuration.setBoolean(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(TernaryBoolean.FALSE, executionEnvironment.isChangelogStateBackendEnabled());
    }

    @Test
    public void testNotOverridingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile("/tmp3", "file3", true);
        executionEnvironment.configure(new Configuration(), Thread.currentThread().getContextClassLoader());
        Assert.assertThat(executionEnvironment.getCachedFiles(), CoreMatchers.equalTo(Arrays.asList(Tuple2.of("file3", new DistributedCache.DistributedCacheEntry("/tmp3", true)))));
    }

    @Test
    public void testLoadingListenersFromConfiguration() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        List asList = Arrays.asList(BasicJobSubmittedCounter.class, BasicJobExecutedCounter.class);
        Configuration configuration = new Configuration();
        ConfigUtils.encodeCollectionToConfig(configuration, DeploymentOptions.JOB_LISTENERS, asList, (v0) -> {
            return v0.getName();
        });
        executionEnvironment.configure(configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(executionEnvironment.getJobListeners().size(), 2L);
        Assert.assertThat(executionEnvironment.getJobListeners().get(0), CoreMatchers.instanceOf(BasicJobSubmittedCounter.class));
        Assert.assertThat(executionEnvironment.getJobListeners().get(1), CoreMatchers.instanceOf(BasicJobExecutedCounter.class));
    }

    @Test
    public void testGettingEnvironmentWithConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString("state.backend", "jobmanager");
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100L));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        Assert.assertThat(Integer.valueOf(executionEnvironment.getParallelism()), CoreMatchers.equalTo(10));
        Assert.assertThat(Long.valueOf(executionEnvironment.getConfig().getAutoWatermarkInterval()), CoreMatchers.equalTo(100L));
        Assert.assertThat(executionEnvironment.getStateBackend(), CoreMatchers.instanceOf(MemoryStateBackend.class));
    }

    @Test
    public void testLocalEnvironmentExplicitParallelism() {
        Configuration configuration = new Configuration();
        configuration.setString("state.backend", "jobmanager");
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, Duration.ofMillis(100L));
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
        Assert.assertThat(Integer.valueOf(createLocalEnvironment.getParallelism()), CoreMatchers.equalTo(2));
        Assert.assertThat(Long.valueOf(createLocalEnvironment.getConfig().getAutoWatermarkInterval()), CoreMatchers.equalTo(100L));
        Assert.assertThat(createLocalEnvironment.getStateBackend(), CoreMatchers.instanceOf(MemoryStateBackend.class));
    }
}
