/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperatorFactory;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

@Internal
public class SchemaOperatorTranslator {
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final String schemaOperatorUid;
    private final Duration rpcTimeOut;
    private final String timezone;

    public SchemaOperatorTranslator(SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid, Duration rpcTimeOut, String timezone) {
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.schemaOperatorUid = schemaOperatorUid;
        this.rpcTimeOut = rpcTimeOut;
        this.timezone = timezone;
    }

    public DataStream<Event> translate(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes) {
        return this.addSchemaOperator(input, parallelism, metadataApplier, routes, this.schemaChangeBehavior, this.timezone);
    }

    public String getSchemaOperatorUid() {
        return this.schemaOperatorUid;
    }

    private DataStream<Event> addSchemaOperator(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes, SchemaChangeBehavior schemaChangeBehavior, String timezone) {
        ArrayList<RouteRule> routingRules = new ArrayList<RouteRule>();
        for (RouteDef route : routes) {
            routingRules.add(new RouteRule(route.getSourceTable(), route.getSinkTable(), (String)route.getReplaceSymbol().orElse(null)));
        }
        SingleOutputStreamOperator stream = input.transform("SchemaOperator", (TypeInformation)new EventTypeInfo(), (OneInputStreamOperatorFactory)new SchemaOperatorFactory(metadataApplier, routingRules, this.rpcTimeOut, schemaChangeBehavior, timezone));
        stream.uid(this.schemaOperatorUid).setParallelism(parallelism);
        return stream;
    }
}

