/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Serializer;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.transformations.CacheTransformation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.WrappingRuntimeException;

@Public
public class StreamExecutionEnvironment
implements AutoCloseable {
    private final List<CollectResultIterator<?>> collectIterators = new ArrayList();
    @Deprecated
    public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
    private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.EventTime;
    private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
    private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal();
    private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
    protected final ExecutionConfig config = new ExecutionConfig();
    protected final CheckpointConfig checkpointCfg = new CheckpointConfig();
    protected final List<Transformation<?>> transformations = new ArrayList();
    private final Map<AbstractID, CacheTransformation<?>> cachedTransformations = new HashMap();
    private long bufferTimeout = ((Duration)ExecutionOptions.BUFFER_TIMEOUT.defaultValue()).toMillis();
    protected boolean isChainingEnabled = true;
    private StateBackend defaultStateBackend;
    private TernaryBoolean changelogStateBackendEnabled = TernaryBoolean.UNDEFINED;
    private Path defaultSavepointDirectory;
    private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
    protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCache.DistributedCacheEntry>>();
    private final PipelineExecutorServiceLoader executorServiceLoader;
    protected final Configuration configuration;
    private final ClassLoader userClassloader;
    private final List<JobListener> jobListeners = new ArrayList<JobListener>();
    private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<String, ResourceProfile>();

    @Internal
    public void registerCollectIterator(CollectResultIterator<?> iterator) {
        this.collectIterators.add(iterator);
    }

    public StreamExecutionEnvironment() {
        this(new Configuration());
    }

    @PublicEvolving
    public StreamExecutionEnvironment(Configuration configuration) {
        this(configuration, null);
    }

    @PublicEvolving
    public StreamExecutionEnvironment(Configuration configuration, ClassLoader userClassloader) {
        this((PipelineExecutorServiceLoader)new DefaultExecutorServiceLoader(), configuration, userClassloader);
    }

    @PublicEvolving
    public StreamExecutionEnvironment(PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, ClassLoader userClassloader) {
        this.executorServiceLoader = (PipelineExecutorServiceLoader)Preconditions.checkNotNull((Object)executorServiceLoader);
        this.configuration = new Configuration((Configuration)Preconditions.checkNotNull((Object)configuration));
        this.userClassloader = userClassloader == null ? this.getClass().getClassLoader() : userClassloader;
        this.configure((ReadableConfig)this.configuration, this.userClassloader);
    }

    protected ClassLoader getUserClassloader() {
        return this.userClassloader;
    }

    public ExecutionConfig getConfig() {
        return this.config;
    }

    public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
        return this.cacheFile;
    }

    @PublicEvolving
    public List<JobListener> getJobListeners() {
        return this.jobListeners;
    }

    public StreamExecutionEnvironment setParallelism(int parallelism) {
        this.config.setParallelism(parallelism);
        return this;
    }

    @PublicEvolving
    public StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode) {
        Preconditions.checkNotNull((Object)executionMode);
        this.configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)executionMode);
        return this;
    }

    public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
        Preconditions.checkArgument((maxParallelism > 0 && maxParallelism <= 32768 ? 1 : 0) != 0, (Object)("maxParallelism is out of bounds 0 < maxParallelism <= 32768. Found: " + maxParallelism));
        this.config.setMaxParallelism(maxParallelism);
        return this;
    }

    @PublicEvolving
    public StreamExecutionEnvironment registerSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
        ResourceSpec resourceSpec = SlotSharingGroupUtils.extractResourceSpec((SlotSharingGroup)slotSharingGroup);
        if (!resourceSpec.equals((Object)ResourceSpec.UNKNOWN)) {
            this.slotSharingGroupResources.put(slotSharingGroup.getName(), ResourceProfile.fromResourceSpec((ResourceSpec)SlotSharingGroupUtils.extractResourceSpec((SlotSharingGroup)slotSharingGroup), (MemorySize)MemorySize.ZERO));
        }
        return this;
    }

    public int getParallelism() {
        return this.config.getParallelism();
    }

    public int getMaxParallelism() {
        return this.config.getMaxParallelism();
    }

    public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
        if (timeoutMillis < -1L) {
            throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
        }
        this.bufferTimeout = timeoutMillis;
        return this;
    }

    public long getBufferTimeout() {
        return this.bufferTimeout;
    }

    @PublicEvolving
    public StreamExecutionEnvironment disableOperatorChaining() {
        this.isChainingEnabled = false;
        return this;
    }

    @PublicEvolving
    public boolean isChainingEnabled() {
        return this.isChainingEnabled;
    }

    public CheckpointConfig getCheckpointConfig() {
        return this.checkpointCfg;
    }

    public StreamExecutionEnvironment enableCheckpointing(long interval) {
        this.checkpointCfg.setCheckpointInterval(interval);
        return this;
    }

    public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
        this.checkpointCfg.setCheckpointingMode(mode);
        this.checkpointCfg.setCheckpointInterval(interval);
        return this;
    }

    @Deprecated
    @PublicEvolving
    public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
        this.checkpointCfg.setCheckpointingMode(mode);
        this.checkpointCfg.setCheckpointInterval(interval);
        this.checkpointCfg.setForceCheckpointing(force);
        return this;
    }

    @Deprecated
    @PublicEvolving
    public StreamExecutionEnvironment enableCheckpointing() {
        this.checkpointCfg.setCheckpointInterval(500L);
        return this;
    }

    public long getCheckpointInterval() {
        return this.checkpointCfg.getCheckpointInterval();
    }

    @Deprecated
    @PublicEvolving
    public boolean isForceCheckpointing() {
        return this.checkpointCfg.isForceCheckpointing();
    }

    @PublicEvolving
    public boolean isUnalignedCheckpointsEnabled() {
        return this.checkpointCfg.isUnalignedCheckpointsEnabled();
    }

    @PublicEvolving
    public boolean isForceUnalignedCheckpoints() {
        return this.checkpointCfg.isForceUnalignedCheckpoints();
    }

    public CheckpointingMode getCheckpointingMode() {
        return this.checkpointCfg.getCheckpointingMode();
    }

    @PublicEvolving
    public StreamExecutionEnvironment setStateBackend(StateBackend backend) {
        this.defaultStateBackend = (StateBackend)Preconditions.checkNotNull((Object)backend);
        return this;
    }

    @PublicEvolving
    public StateBackend getStateBackend() {
        return this.defaultStateBackend;
    }

    @PublicEvolving
    public StreamExecutionEnvironment enableChangelogStateBackend(boolean enabled) {
        this.changelogStateBackendEnabled = TernaryBoolean.fromBoolean((boolean)enabled);
        return this;
    }

    @PublicEvolving
    public TernaryBoolean isChangelogStateBackendEnabled() {
        return this.changelogStateBackendEnabled;
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(String savepointDirectory) {
        Preconditions.checkNotNull((Object)savepointDirectory);
        return this.setDefaultSavepointDirectory(new Path(savepointDirectory));
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(URI savepointDirectory) {
        Preconditions.checkNotNull((Object)savepointDirectory);
        return this.setDefaultSavepointDirectory(new Path(savepointDirectory));
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(Path savepointDirectory) {
        this.defaultSavepointDirectory = (Path)Preconditions.checkNotNull((Object)savepointDirectory);
        return this;
    }

    @Nullable
    @PublicEvolving
    public Path getDefaultSavepointDirectory() {
        return this.defaultSavepointDirectory;
    }

    @PublicEvolving
    public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
        this.config.setRestartStrategy(restartStrategyConfiguration);
    }

    @PublicEvolving
    public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
        return this.config.getRestartStrategy();
    }

    @Deprecated
    @PublicEvolving
    public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
        this.config.setNumberOfExecutionRetries(numberOfExecutionRetries);
    }

    @Deprecated
    @PublicEvolving
    public int getNumberOfExecutionRetries() {
        return this.config.getNumberOfExecutionRetries();
    }

    public <T extends Serializer<?>> void addDefaultKryoSerializer(Class<?> type, T serializer) {
        this.config.addDefaultKryoSerializer(type, serializer);
    }

    public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) {
        this.config.addDefaultKryoSerializer(type, serializerClass);
    }

    public <T extends Serializer<?>> void registerTypeWithKryoSerializer(Class<?> type, T serializer) {
        this.config.registerTypeWithKryoSerializer(type, serializer);
    }

    public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) {
        this.config.registerTypeWithKryoSerializer(type, serializerClass);
    }

    public void registerType(Class<?> type) {
        if (type == null) {
            throw new NullPointerException("Cannot register null type class.");
        }
        TypeInformation typeInfo = TypeExtractor.createTypeInfo(type);
        if (typeInfo instanceof PojoTypeInfo) {
            this.config.registerPojoType(type);
        } else {
            this.config.registerKryoType(type);
        }
    }

    @Deprecated
    @PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = (TimeCharacteristic)((Object)Preconditions.checkNotNull((Object)((Object)characteristic)));
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            this.getConfig().setAutoWatermarkInterval(0L);
        } else {
            this.getConfig().setAutoWatermarkInterval(200L);
        }
    }

    @Deprecated
    @PublicEvolving
    public TimeCharacteristic getStreamTimeCharacteristic() {
        return this.timeCharacteristic;
    }

    @PublicEvolving
    public void configure(ReadableConfig configuration) {
        this.configure(configuration, this.userClassloader);
    }

    @PublicEvolving
    public void configure(ReadableConfig configuration, ClassLoader classLoader) {
        configuration.getOptional(StreamPipelineOptions.TIME_CHARACTERISTIC).ifPresent(this::setStreamTimeCharacteristic);
        configuration.getOptional(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG).ifPresent(this::enableChangelogStateBackend);
        Optional.ofNullable(this.loadStateBackend(configuration, classLoader)).ifPresent(this::setStateBackend);
        configuration.getOptional(PipelineOptions.OPERATOR_CHAINING).ifPresent(c -> {
            this.isChainingEnabled = c;
        });
        configuration.getOptional(ExecutionOptions.BUFFER_TIMEOUT).ifPresent(t -> this.setBufferTimeout(t.toMillis()));
        configuration.getOptional(DeploymentOptions.JOB_LISTENERS).ifPresent(listeners -> this.registerCustomListeners(classLoader, (List<String>)listeners));
        configuration.getOptional(PipelineOptions.CACHED_FILES).ifPresent(f -> {
            this.cacheFile.clear();
            this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString((List)f));
        });
        configuration.getOptional(ExecutionOptions.RUNTIME_MODE).ifPresent(runtimeMode -> this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode));
        configuration.getOptional(ExecutionOptions.BATCH_SHUFFLE_MODE).ifPresent(shuffleMode -> this.configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, shuffleMode));
        configuration.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(sortInputs -> this.configuration.set(ExecutionOptions.SORT_INPUTS, sortInputs));
        configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(sortInputs -> this.configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs));
        configuration.getOptional(PipelineOptions.NAME).ifPresent(jobName -> this.configuration.set(PipelineOptions.NAME, jobName));
        configuration.getOptional(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH).ifPresent(flag -> this.configuration.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, flag));
        configuration.getOptional(PipelineOptions.JARS).ifPresent(jars -> this.configuration.set(PipelineOptions.JARS, jars));
        this.config.configure(configuration, classLoader);
        this.checkpointCfg.configure(configuration);
    }

    private void registerCustomListeners(ClassLoader classLoader, List<String> listeners) {
        for (String listener : listeners) {
            try {
                JobListener jobListener = (JobListener)InstantiationUtil.instantiate((String)listener, JobListener.class, (ClassLoader)classLoader);
                this.jobListeners.add(jobListener);
            }
            catch (FlinkException e) {
                throw new WrappingRuntimeException("Could not load JobListener : " + listener, (Throwable)e);
            }
        }
    }

    private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader classLoader) {
        try {
            return StateBackendLoader.loadStateBackendFromConfig((ReadableConfig)configuration, (ClassLoader)classLoader, null);
        }
        catch (IOException | DynamicCodeLoadingException e) {
            throw new WrappingRuntimeException(e);
        }
    }

    @Deprecated
    public DataStreamSource<Long> generateSequence(long from, long to) {
        if (from > to) {
            throw new IllegalArgumentException("Start of sequence must not be greater than the end");
        }
        return this.addSource(new StatefulSequenceSource(from, to), "Sequence Source (Deprecated)");
    }

    public DataStreamSource<Long> fromSequence(long from, long to) {
        if (from > to) {
            throw new IllegalArgumentException("Start of sequence must not be greater than the end");
        }
        return this.fromSource((Source)new NumberSequenceSource(from, to), (WatermarkStrategy)WatermarkStrategy.noWatermarks(), "Sequence Source");
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(OUT ... data) {
        TypeInformation typeInfo;
        if (data.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }
        try {
            typeInfo = TypeExtractor.getForObject(data[0]);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
        return this.fromCollection(Arrays.asList(data), typeInfo);
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT ... data) {
        TypeInformation typeInfo;
        if (data.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }
        try {
            typeInfo = TypeExtractor.getForClass(type);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + type.getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
        return this.fromCollection(Arrays.asList(data), typeInfo);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
        TypeInformation typeInfo;
        Preconditions.checkNotNull(data, (String)"Collection must not be null");
        if (data.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        OUT first = data.iterator().next();
        if (first == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            typeInfo = TypeExtractor.getForObject(first);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + first.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
        return this.fromCollection(data, typeInfo);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
        Preconditions.checkNotNull(data, (String)"Collection must not be null");
        FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
        FromElementsFunction<OUT> function = new FromElementsFunction<OUT>(data);
        return this.addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED).setParallelism(1);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type) {
        return this.fromCollection(data, TypeExtractor.getForClass(type));
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo) {
        Preconditions.checkNotNull(data, (String)"The iterator must not be null");
        FromIteratorFunction<OUT> function = new FromIteratorFunction<OUT>(data);
        return this.addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED);
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type) {
        return this.fromParallelCollection(iterator, TypeExtractor.getForClass(type));
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo) {
        return this.fromParallelCollection(iterator, typeInfo, "Parallel Collection Source");
    }

    private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo, String operatorName) {
        return this.addSource(new FromSplittableIteratorFunction<OUT>(iterator), operatorName, typeInfo, Boundedness.BOUNDED);
    }

    @Deprecated
    public DataStreamSource<String> readTextFile(String filePath) {
        return this.readTextFile(filePath, "UTF-8");
    }

    @Deprecated
    public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)filePath) ? 1 : 0) != 0, (Object)"The file path must not be null or blank.");
        TextInputFormat format = new TextInputFormat(new Path(filePath));
        format.setFilesFilter(FilePathFilter.createDefaultFilter());
        BasicTypeInfo typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
        format.setCharsetName(charsetName);
        return this.readFile((FileInputFormat)format, filePath, FileProcessingMode.PROCESS_ONCE, -1L, (TypeInformation)typeInfo);
    }

    @Deprecated
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
        return this.readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1L);
    }

    @Deprecated
    @PublicEvolving
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, FilePathFilter filter) {
        TypeInformation typeInformation;
        inputFormat.setFilesFilter(filter);
        try {
            typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
        }
        catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
        return this.readFile(inputFormat, filePath, watchType, interval, typeInformation);
    }

    @Deprecated
    @PublicEvolving
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval) {
        TypeInformation typeInformation;
        try {
            typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
        }
        catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
        return this.readFile(inputFormat, filePath, watchType, interval, typeInformation);
    }

    @Deprecated
    public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType) {
        DataStreamSource<Tuple3<String, Long, Long>> source = this.addSource(new FileMonitoringFunction(filePath, intervalMillis, watchType), "Read File Stream source");
        return source.flatMap(new FileReadFunction());
    }

    @Deprecated
    @PublicEvolving
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(inputFormat, (String)"InputFormat must not be null.");
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)filePath) ? 1 : 0) != 0, (Object)"The file path must not be null or blank.");
        inputFormat.setFilePath(filePath);
        return this.createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, interval);
    }

    @Deprecated
    public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry) {
        return this.socketTextStream(hostname, port, String.valueOf(delimiter), maxRetry);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
        return this.addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream");
    }

    @Deprecated
    public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
        return this.socketTextStream(hostname, port, delimiter, 0L);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
        return this.socketTextStream(hostname, port, delimiter, 0L);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String hostname, int port) {
        return this.socketTextStream(hostname, port, "\n");
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
        return this.createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
        DataStreamSource<OUT> source;
        if (inputFormat instanceof FileInputFormat) {
            FileInputFormat format = (FileInputFormat)inputFormat;
            source = this.createFileInput(format, typeInfo, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1L);
        } else {
            source = this.createInput(inputFormat, typeInfo, "Custom Source");
        }
        return source;
    }

    private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo, String sourceName) {
        InputFormatSourceFunction<OUT> function = new InputFormatSourceFunction<OUT>(inputFormat, typeInfo);
        return this.addSource(function, sourceName, typeInfo);
    }

    private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat, TypeInformation<OUT> typeInfo, String sourceName, FileProcessingMode monitoringMode, long interval) {
        Preconditions.checkNotNull(inputFormat, (String)"Unspecified file input format.");
        Preconditions.checkNotNull(typeInfo, (String)"Unspecified output type information.");
        Preconditions.checkNotNull((Object)sourceName, (String)"Unspecified name for the source.");
        Preconditions.checkNotNull((Object)((Object)monitoringMode), (String)"Unspecified monitoring mode.");
        Preconditions.checkArgument((monitoringMode.equals((Object)FileProcessingMode.PROCESS_ONCE) || interval >= 1L ? 1 : 0) != 0, (Object)"The path monitoring interval cannot be less than 1 ms.");
        ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<OUT>(inputFormat, monitoringMode, this.getParallelism(), interval);
        ContinuousFileReaderOperatorFactory factory = new ContinuousFileReaderOperatorFactory(inputFormat);
        Boundedness boundedness = monitoringMode == FileProcessingMode.PROCESS_ONCE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
        SingleOutputStreamOperator<OUT> source = this.addSource(monitoringFunction, sourceName, null, boundedness).forceNonParallel().transform("Split Reader: " + sourceName, typeInfo, factory);
        return new DataStreamSource<OUT>(source);
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
        return this.addSource(function, "Custom Source");
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName) {
        return this.addSource(function, sourceName, null);
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> typeInfo) {
        return this.addSource(function, "Custom Source", typeInfo);
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
        return this.addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);
    }

    private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, @Nullable TypeInformation<OUT> typeInfo, Boundedness boundedness) {
        Preconditions.checkNotNull(function);
        Preconditions.checkNotNull((Object)sourceName);
        Preconditions.checkNotNull((Object)boundedness);
        Object resolvedTypeInfo = this.getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
        boolean isParallel = function instanceof ParallelSourceFunction;
        this.clean(function);
        StreamSource sourceOperator = new StreamSource(function);
        return new DataStreamSource(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName) {
        return this.fromSource(source, timestampsAndWatermarks, sourceName, null);
    }

    @Experimental
    public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) {
        Object resolvedTypeInfo = this.getTypeInfo(source, sourceName, Source.class, typeInfo);
        return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, (String)"source"), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, (String)"timestampsAndWatermarks"), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull((Object)sourceName));
    }

    public JobExecutionResult execute() throws Exception {
        return this.execute((String)null);
    }

    public JobExecutionResult execute(String jobName) throws Exception {
        ArrayList originalTransformations = new ArrayList(this.transformations);
        StreamGraph streamGraph = this.getStreamGraph();
        if (jobName != null) {
            streamGraph.setJobName(jobName);
        }
        try {
            return this.execute(streamGraph);
        }
        catch (Throwable t) {
            Optional clusterDatasetCorruptedException = ExceptionUtils.findThrowable((Throwable)t, ClusterDatasetCorruptedException.class);
            if (!clusterDatasetCorruptedException.isPresent()) {
                throw t;
            }
            this.invalidateCacheTransformations(originalTransformations);
            streamGraph = this.getStreamGraph(originalTransformations);
            return this.execute(streamGraph);
        }
    }

    @Internal
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobClient jobClient = this.executeAsync(streamGraph);
        try {
            Object jobExecutionResult = this.configuration.getBoolean(DeploymentOptions.ATTACHED) ? (JobExecutionResult)jobClient.getJobExecutionResult().get() : new DetachedJobExecutionResult(jobClient.getJobID());
            this.jobListeners.forEach(arg_0 -> StreamExecutionEnvironment.lambda$execute$11((JobExecutionResult)jobExecutionResult, arg_0));
            return jobExecutionResult;
        }
        catch (Throwable t) {
            Throwable strippedException = ExceptionUtils.stripExecutionException((Throwable)t);
            this.jobListeners.forEach(jobListener -> jobListener.onJobExecuted(null, strippedException));
            ExceptionUtils.rethrowException((Throwable)strippedException);
            return null;
        }
    }

    private void invalidateCacheTransformations(List<Transformation<?>> transformations) throws Exception {
        for (Transformation<?> transformation : transformations) {
            if (transformation == null) continue;
            if (transformation instanceof CacheTransformation) {
                this.invalidateClusterDataset(((CacheTransformation)transformation).getDatasetId());
            }
            this.invalidateCacheTransformations(transformation.getInputs());
        }
    }

    @PublicEvolving
    public void registerJobListener(JobListener jobListener) {
        Preconditions.checkNotNull((Object)jobListener, (String)"JobListener cannot be null");
        this.jobListeners.add(jobListener);
    }

    @PublicEvolving
    public void clearJobListeners() {
        this.jobListeners.clear();
    }

    @PublicEvolving
    public final JobClient executeAsync() throws Exception {
        return this.executeAsync(this.getStreamGraph());
    }

    @PublicEvolving
    public JobClient executeAsync(String jobName) throws Exception {
        Preconditions.checkNotNull((Object)jobName, (String)"Streaming Job name should not be null.");
        StreamGraph streamGraph = this.getStreamGraph();
        streamGraph.setJobName(jobName);
        return this.executeAsync(streamGraph);
    }

    @Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        Preconditions.checkNotNull((Object)streamGraph, (String)"StreamGraph cannot be null.");
        PipelineExecutor executor = this.getPipelineExecutor();
        CompletableFuture jobClientFuture = executor.execute((Pipeline)streamGraph, this.configuration, this.userClassloader);
        try {
            JobClient jobClient = (JobClient)jobClientFuture.get();
            this.jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            this.collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
            this.collectIterators.clear();
            return jobClient;
        }
        catch (ExecutionException executionException) {
            Throwable strippedException = ExceptionUtils.stripExecutionException((Throwable)executionException);
            this.jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));
            throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()), strippedException);
        }
    }

    @Internal
    public StreamGraph getStreamGraph() {
        return this.getStreamGraph(true);
    }

    @Internal
    public StreamGraph getStreamGraph(boolean clearTransformations) {
        StreamGraph streamGraph = this.getStreamGraph(this.transformations);
        if (clearTransformations) {
            this.transformations.clear();
        }
        return streamGraph;
    }

    private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
        this.synchronizeClusterDatasetStatus();
        return this.getStreamGraphGenerator(transformations).generate();
    }

    private void synchronizeClusterDatasetStatus() {
        if (this.cachedTransformations.isEmpty()) {
            return;
        }
        Set completedClusterDatasets = this.listCompletedClusterDatasets().stream().map(AbstractID::new).collect(Collectors.toSet());
        this.cachedTransformations.forEach((id, transformation) -> transformation.setCached(completedClusterDatasets.contains(id)));
    }

    @Internal
    public StreamGraph generateStreamGraph(List<Transformation<?>> transformations) {
        return this.getStreamGraphGenerator(transformations).generate();
    }

    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return new StreamGraphGenerator(new ArrayList(transformations), this.config, this.checkpointCfg, (ReadableConfig)this.configuration).setStateBackend(this.defaultStateBackend).setChangelogStateBackendEnabled(this.changelogStateBackendEnabled).setSavepointDir(this.defaultSavepointDirectory).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout).setSlotSharingGroupResource(this.slotSharingGroupResources);
    }

    public String getExecutionPlan() {
        return this.getStreamGraph(false).getStreamingPlanAsJSON();
    }

    @Internal
    public <F> F clean(F f) {
        if (this.getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, (ExecutionConfig.ClosureCleanerLevel)this.getConfig().getClosureCleanerLevel(), (boolean)true);
        }
        ClosureCleaner.ensureSerializable(f);
        return f;
    }

    @Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, (String)"transformation must not be null.");
        this.transformations.add(transformation);
    }

    @Internal
    public ReadableConfig getConfiguration() {
        return this.configuration;
    }

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        return StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
    }

    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, (Object)contextEnvironmentFactory).map(factory -> factory.createExecutionEnvironment(configuration)).orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
    }

    public static LocalStreamEnvironment createLocalEnvironment() {
        return StreamExecutionEnvironment.createLocalEnvironment(defaultLocalParallelism);
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
        return StreamExecutionEnvironment.createLocalEnvironment(parallelism, new Configuration());
    }

    public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, (Object)parallelism);
        return StreamExecutionEnvironment.createLocalEnvironment(copyOfConfiguration);
    }

    public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
        if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
            return new LocalStreamEnvironment(configuration);
        }
        Configuration copyOfConfiguration = new Configuration();
        copyOfConfiguration.addAll(configuration);
        copyOfConfiguration.set(CoreOptions.DEFAULT_PARALLELISM, (Object)defaultLocalParallelism);
        return new LocalStreamEnvironment(copyOfConfiguration);
    }

    @PublicEvolving
    public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
        Preconditions.checkNotNull((Object)conf, (String)"conf");
        if (!conf.contains(RestOptions.PORT)) {
            conf.setInteger(RestOptions.PORT, ((Integer)RestOptions.PORT.defaultValue()).intValue());
        }
        return StreamExecutionEnvironment.createLocalEnvironment(conf);
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String ... jarFiles) {
        return new RemoteStreamEnvironment(host, port, jarFiles);
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String ... jarFiles) {
        RemoteStreamEnvironment env = new RemoteStreamEnvironment(host, port, jarFiles);
        env.setParallelism(parallelism);
        return env;
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, Configuration clientConfig, String ... jarFiles) {
        return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles);
    }

    @PublicEvolving
    public static int getDefaultLocalParallelism() {
        return defaultLocalParallelism;
    }

    @PublicEvolving
    public static void setDefaultLocalParallelism(int parallelism) {
        defaultLocalParallelism = parallelism;
    }

    protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = ctx;
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    }

    protected static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
        threadLocalContextEnvironmentFactory.remove();
    }

    public void registerCachedFile(String filePath, String name) {
        this.registerCachedFile(filePath, name, false);
    }

    public void registerCachedFile(String filePath, String name, boolean executable) {
        this.cacheFile.add((Tuple2<String, DistributedCache.DistributedCacheEntry>)new Tuple2((Object)name, (Object)new DistributedCache.DistributedCacheEntry(filePath, Boolean.valueOf(executable))));
    }

    @Internal
    public static boolean areExplicitEnvironmentsAllowed() {
        return contextEnvironmentFactory == null && threadLocalContextEnvironmentFactory.get() == null;
    }

    private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(Object source, String sourceName, Class<?> baseSourceClass, TypeInformation<OUT> typeInfo) {
        TypeInformation resolvedTypeInfo = typeInfo;
        if (resolvedTypeInfo == null && source instanceof ResultTypeQueryable) {
            resolvedTypeInfo = ((ResultTypeQueryable)source).getProducedType();
        }
        if (resolvedTypeInfo == null) {
            try {
                resolvedTypeInfo = TypeExtractor.createTypeInfo(baseSourceClass, source.getClass(), (int)0, null, null);
            }
            catch (InvalidTypesException e) {
                resolvedTypeInfo = new MissingTypeInfo(sourceName, e);
            }
        }
        return (T)resolvedTypeInfo;
    }

    @Internal
    public List<Transformation<?>> getTransformations() {
        return this.transformations;
    }

    @Internal
    public <T> void registerCacheTransformation(AbstractID intermediateDataSetID, CacheTransformation<T> t) {
        this.cachedTransformations.put(intermediateDataSetID, t);
    }

    @Internal
    public void invalidateClusterDataset(AbstractID datasetId) throws Exception {
        if (!this.cachedTransformations.containsKey(datasetId)) {
            throw new RuntimeException(String.format("IntermediateDataset %s is not found", datasetId));
        }
        PipelineExecutor executor = this.getPipelineExecutor();
        if (!(executor instanceof CacheSupportedPipelineExecutor)) {
            return;
        }
        ((CacheSupportedPipelineExecutor)executor).invalidateClusterDataset(datasetId, this.configuration, this.userClassloader).get();
        this.cachedTransformations.get(datasetId).setCached(false);
    }

    protected Set<AbstractID> listCompletedClusterDatasets() {
        try {
            PipelineExecutor executor = this.getPipelineExecutor();
            if (!(executor instanceof CacheSupportedPipelineExecutor)) {
                return Collections.emptySet();
            }
            return (Set)((CacheSupportedPipelineExecutor)executor).listCompletedClusterDatasetIds(this.configuration, this.userClassloader).get();
        }
        catch (Throwable e) {
            return Collections.emptySet();
        }
    }

    @Override
    public void close() throws Exception {
        for (AbstractID id : this.cachedTransformations.keySet()) {
            this.invalidateClusterDataset(id);
        }
    }

    private PipelineExecutor getPipelineExecutor() throws Exception {
        Preconditions.checkNotNull((Object)this.configuration.get(DeploymentOptions.TARGET), (String)"No execution.target specified in your configuration file.");
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        Preconditions.checkNotNull((Object)executorFactory, (String)"Cannot find compatible factory for specified execution.target (=%s)", (Object[])new Object[]{this.configuration.get(DeploymentOptions.TARGET)});
        return executorFactory.getExecutor(this.configuration);
    }

    private static /* synthetic */ void lambda$execute$11(JobExecutionResult jobExecutionResult, JobListener jobListener) {
        jobListener.onJobExecuted(jobExecutionResult, null);
    }
}

