/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Internal
public class DataSourceTranslator {
    public DataStreamSource<Event> translate(SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig, int sourceParallelism) {
        DataSource dataSource = this.createDataSource(sourceDef, env, pipelineConfig);
        EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
        if (eventSourceProvider instanceof FlinkSourceProvider) {
            FlinkSourceProvider sourceProvider = (FlinkSourceProvider)eventSourceProvider;
            return env.fromSource(sourceProvider.getSource(), WatermarkStrategy.noWatermarks(), sourceDef.getName().orElse(this.generateDefaultSourceName(sourceDef)), (TypeInformation)new EventTypeInfo()).setParallelism(sourceParallelism);
        }
        if (eventSourceProvider instanceof FlinkSourceFunctionProvider) {
            FlinkSourceFunctionProvider sourceFunctionProvider = (FlinkSourceFunctionProvider)eventSourceProvider;
            DataStreamSource stream = env.addSource(sourceFunctionProvider.getSourceFunction(), (TypeInformation)new EventTypeInfo()).setParallelism(sourceParallelism);
            if (sourceDef.getName().isPresent()) {
                stream.name(sourceDef.getName().get());
            }
            return stream;
        }
        throw new IllegalStateException(String.format("Unsupported EventSourceProvider type \"%s\"", eventSourceProvider.getClass().getCanonicalName()));
    }

    private DataSource createDataSource(SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
        DataSourceFactory sourceFactory = FactoryDiscoveryUtils.getFactoryByIdentifier(sourceDef.getType(), DataSourceFactory.class);
        FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory).ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
        DataSource dataSource = sourceFactory.createDataSource((Factory.Context)new FactoryHelper.DefaultContext(sourceDef.getConfig(), pipelineConfig, Thread.currentThread().getContextClassLoader()));
        return dataSource;
    }

    private String generateDefaultSourceName(SourceDef sourceDef) {
        return String.format("Flink CDC Event Source: %s", sourceDef.getType());
    }
}

