/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.util.TernaryBoolean;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamExecutionEnvironmentComplexConfigurationTest {
    StreamExecutionEnvironmentComplexConfigurationTest() {
    }

    @Test
    void testJobConfigFromEnvToExecutionGraph() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(StateBackendOptions.STATE_BACKEND, (Object)"hashmap");
        String path = "file:///valid";
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)path);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)RestartStrategyOptions.RestartStrategyType.EXPONENTIAL_DELAY.getMainValue());
        env.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        env.fromSequence(0L, 1L).addSink((SinkFunction)new DiscardingSink());
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(env.getStreamGraph().getJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), Executors.newSingleThreadScheduledExecutor()).build();
        Configuration jobConfiguration = scheduler.getExecutionGraph().getJobConfiguration();
        Assertions.assertThat((String)((String)jobConfiguration.get(StateBackendOptions.STATE_BACKEND))).isEqualTo((String)configuration.get(StateBackendOptions.STATE_BACKEND));
        Assertions.assertThat((String)((String)jobConfiguration.get(CheckpointingOptions.CHECKPOINT_STORAGE))).isEqualTo((String)configuration.get(CheckpointingOptions.CHECKPOINT_STORAGE));
        Assertions.assertThat((String)((String)jobConfiguration.get(RestartStrategyOptions.RESTART_STRATEGY))).isEqualTo((String)configuration.get(RestartStrategyOptions.RESTART_STRATEGY));
    }

    @Test
    void testLoadingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        envFromConfiguration.registerCachedFile("/tmp4", "file4", true);
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.cached-files", "name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2;name:file3,path:'oss://bucket/file1'");
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((List)envFromConfiguration.getCachedFiles()).isEqualTo(Arrays.asList(Tuple2.of((Object)"file1", (Object)new DistributedCache.DistributedCacheEntry("/tmp1", Boolean.valueOf(true))), Tuple2.of((Object)"file2", (Object)new DistributedCache.DistributedCacheEntry("/tmp2", Boolean.valueOf(false))), Tuple2.of((Object)"file3", (Object)new DistributedCache.DistributedCacheEntry("oss://bucket/file1", Boolean.valueOf(false)))));
    }

    @Test
    void testLoadingKryoSerializersFromConfiguration() {
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.serialization-config", "{org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo: {type: kryo, kryo-type: default, class: org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer}}");
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        LinkedHashMap<Class<CustomPojo>, Class<CustomPojoSerializer>> serializers = new LinkedHashMap<Class<CustomPojo>, Class<CustomPojoSerializer>>();
        serializers.put(CustomPojo.class, CustomPojoSerializer.class);
        Assertions.assertThat((Map)envFromConfiguration.getConfig().getSerializerConfig().getDefaultKryoSerializerClasses()).isEqualTo(serializers);
    }

    @Test
    void testOverridingChangelogStateBackendWithFromConfigurationWhenSet() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        Assertions.assertThat((Comparable)TernaryBoolean.UNDEFINED).isEqualTo((Object)envFromConfiguration.isChangelogStateBackendEnabled());
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)true);
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((Comparable)TernaryBoolean.TRUE).isEqualTo((Object)envFromConfiguration.isChangelogStateBackendEnabled());
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((Comparable)TernaryBoolean.TRUE).isEqualTo((Object)envFromConfiguration.isChangelogStateBackendEnabled());
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)false);
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((Comparable)TernaryBoolean.FALSE).isEqualTo((Object)envFromConfiguration.isChangelogStateBackendEnabled());
    }

    @Test
    void testNotOverridingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        envFromConfiguration.registerCachedFile("/tmp3", "file3", true);
        Configuration configuration = new Configuration();
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((List)envFromConfiguration.getCachedFiles()).isEqualTo(Arrays.asList(Tuple2.of((Object)"file3", (Object)new DistributedCache.DistributedCacheEntry("/tmp3", Boolean.valueOf(true)))));
    }

    @Test
    void testLoadingListenersFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Class> listenersClass = Arrays.asList(BasicJobSubmittedCounter.class, BasicJobExecutedCounter.class);
        Configuration configuration = new Configuration();
        ConfigUtils.encodeCollectionToConfig((WritableConfig)configuration, (ConfigOption)DeploymentOptions.JOB_LISTENERS, listenersClass, Class::getName);
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assertions.assertThat((int)envFromConfiguration.getJobListeners().size()).isEqualTo(2);
        Assertions.assertThat((Object)((JobListener)envFromConfiguration.getJobListeners().get(0))).isInstanceOf(BasicJobSubmittedCounter.class);
        Assertions.assertThat((Object)((JobListener)envFromConfiguration.getJobListeners().get(1))).isInstanceOf(BasicJobExecutedCounter.class);
    }

    @Test
    void testGettingEnvironmentWithConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, (Object)10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, (Object)Duration.ofMillis(100L));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        Assertions.assertThat((int)env.getParallelism()).isEqualTo(10);
        Assertions.assertThat((long)env.getConfig().getAutoWatermarkInterval()).isEqualTo(100L);
    }

    @Test
    void testLocalEnvironmentExplicitParallelism() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, (Object)10);
        configuration.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, (Object)Duration.ofMillis(100L));
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)2, (Configuration)configuration);
        Assertions.assertThat((int)env.getParallelism()).isEqualTo(2);
        Assertions.assertThat((long)env.getConfig().getAutoWatermarkInterval()).isEqualTo(100L);
    }

    public static class CustomPojoSerializer
    extends Serializer<CustomPojo> {
        public void write(Kryo kryo, Output output, CustomPojo object) {
        }

        public CustomPojo read(Kryo kryo, Input input, Class<? extends CustomPojo> type) {
            return null;
        }
    }

    public static class CustomPojo {
    }

    public static class BasicJobExecutedCounter
    implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
            ++this.count;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
        }
    }

    public static class BasicJobSubmittedCounter
    implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
            ++this.count;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
        }
    }
}

