/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.composer.flink.translator;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.composer.definition.RouteDef;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperatorFactory;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;

@Internal
public class SchemaOperatorTranslator {
    private final SchemaChangeBehavior schemaChangeBehavior;
    private final String schemaOperatorUid;
    private final Duration rpcTimeOut;

    public SchemaOperatorTranslator(SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid, Duration rpcTimeOut) {
        this.schemaChangeBehavior = schemaChangeBehavior;
        this.schemaOperatorUid = schemaOperatorUid;
        this.rpcTimeOut = rpcTimeOut;
    }

    public DataStream<Event> translate(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes) {
        switch (this.schemaChangeBehavior) {
            case EVOLVE: {
                return this.addSchemaOperator(input, parallelism, metadataApplier, routes);
            }
            case IGNORE: {
                return this.dropSchemaChangeEvent(input, parallelism);
            }
            case EXCEPTION: {
                return this.exceptionOnSchemaChange(input, parallelism);
            }
        }
        throw new IllegalArgumentException(String.format("Unrecognized schema change behavior: %s", this.schemaChangeBehavior));
    }

    public String getSchemaOperatorUid() {
        return this.schemaOperatorUid;
    }

    private DataStream<Event> addSchemaOperator(DataStream<Event> input, int parallelism, MetadataApplier metadataApplier, List<RouteDef> routes) {
        ArrayList<Tuple2> routingRules = new ArrayList<Tuple2>();
        for (RouteDef route : routes) {
            routingRules.add(Tuple2.of((Object)route.getSourceTable(), (Object)TableId.parse((String)route.getSinkTable())));
        }
        SingleOutputStreamOperator stream = input.transform("SchemaOperator", (TypeInformation)new EventTypeInfo(), (OneInputStreamOperatorFactory)new SchemaOperatorFactory(metadataApplier, routingRules, this.rpcTimeOut));
        stream.uid(this.schemaOperatorUid).setParallelism(parallelism);
        return stream;
    }

    private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
        return input.filter((FilterFunction & Serializable)event -> !(event instanceof SchemaChangeEvent)).setParallelism(parallelism);
    }

    private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
        return input.map((MapFunction & Serializable)event -> {
            if (event instanceof SchemaChangeEvent) {
                throw new RuntimeException(String.format("Aborting execution as the pipeline encountered a schema change event: %s", event));
            }
            return event;
        }).setParallelism(parallelism);
    }
}

