/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.tracing;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.transforms.SmtManager;
import io.debezium.transforms.tracing.DebeziumTextMap;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Map;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ActivateTracingSpan<R extends ConnectRecord<R>>
implements Transformation<R> {
    private static final String DB_FIELDS_PREFIX = "db.";
    private static final Logger LOGGER = LoggerFactory.getLogger(ActivateTracingSpan.class);
    private static final String DEFAULT_TRACING_SPAN_CONTEXT_FIELD = "tracingspancontext";
    private static final String DEFAULT_TRACING_OPERATION_NAME = "debezium-read";
    private static final String TRACING_COMPONENT = "debezium";
    private static final String TX_LOG_WRITE_OPERATION_NAME = "db-log-write";
    private static final boolean OPEN_TRACING_AVAILABLE = ActivateTracingSpan.resolveOpenTracingApiAvailable();
    public static final io.debezium.config.Field TRACING_SPAN_CONTEXT_FIELD = io.debezium.config.Field.create("tracing.span.context.field").withDisplayName("Serialized tracing span context field").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.HIGH).withDefault("tracingspancontext").withDescription("The name of the field containing java.util.Properties representation of serialized span context. Defaults to 'tracingspancontext'");
    public static final io.debezium.config.Field TRACING_OPERATION_NAME = io.debezium.config.Field.create("tracing.operation.name").withDisplayName("Tracing operation name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.HIGH).withDefault("debezium-read").withDescription("The operation name representing Debezium processing span. Default is 'debezium-read'");
    public static final io.debezium.config.Field TRACING_CONTEXT_FIELD_REQUIRED = io.debezium.config.Field.create("tracing.with.context.field.only").withDisplayName("Trace only events with context field present").withType(ConfigDef.Type.BOOLEAN).withWidth(ConfigDef.Width.MEDIUM).withImportance(ConfigDef.Importance.HIGH).withDefault(false).withDescription("Set to `true` when only events that have serialized context field should be traced.");
    private String spanContextField;
    private String operationName;
    private boolean requireContextField;
    private SmtManager<R> smtManager;

    @Override
    public void configure(Map<String, ?> props) {
        Configuration config = Configuration.from(props);
        Field.Set configFields = io.debezium.config.Field.setOf(TRACING_SPAN_CONTEXT_FIELD, TRACING_OPERATION_NAME);
        if (!config.validateAndRecord(configFields, LOGGER::error)) {
            throw new ConnectException("Unable to validate config.");
        }
        this.spanContextField = config.getString(TRACING_SPAN_CONTEXT_FIELD);
        this.operationName = config.getString(TRACING_OPERATION_NAME);
        this.requireContextField = config.getBoolean(TRACING_CONTEXT_FIELD_REQUIRED);
        this.smtManager = new SmtManager(config);
    }

    public void setRequireContextField(boolean requireContextField) {
        this.requireContextField = requireContextField;
    }

    @Override
    public R apply(R record) {
        if (((ConnectRecord)record).value() == null || !this.smtManager.isValidEnvelope(record)) {
            return record;
        }
        Struct envelope = (Struct)((ConnectRecord)record).value();
        Struct after = envelope.schema().field("after") != null ? envelope.getStruct("after") : null;
        Struct source = envelope.schema().field("source") != null ? envelope.getStruct("source") : null;
        String propagatedSpanContext = null;
        if (after != null && after.schema().field(this.spanContextField) != null) {
            propagatedSpanContext = after.getString(this.spanContextField);
        }
        if (propagatedSpanContext == null && this.requireContextField) {
            return record;
        }
        try {
            return this.traceRecord(record, envelope, source, after, propagatedSpanContext);
        }
        catch (NoClassDefFoundError e) {
            throw new DebeziumException("Failed to record tracing information, tracing libraries not available", e);
        }
    }

    private R traceRecord(R record, Struct envelope, Struct source, Struct after, String propagatedSpanContext) {
        Tracer tracer = GlobalTracer.get();
        if (tracer == null) {
            return record;
        }
        Tracer.SpanBuilder txLogSpanBuilder = tracer.buildSpan(TX_LOG_WRITE_OPERATION_NAME);
        Tracer.SpanBuilder debeziumSpanBuilder = tracer.buildSpan(this.operationName);
        this.addFieldToSpan(debeziumSpanBuilder, envelope, "op", "");
        this.addFieldToSpan(debeziumSpanBuilder, envelope, "ts_ms", "");
        Long processingTimestamp = envelope.getInt64("ts_ms");
        if (processingTimestamp != null) {
            debeziumSpanBuilder.withStartTimestamp(processingTimestamp * 1000L);
        }
        Long eventTimestamp = null;
        if (source != null) {
            for (Field field : source.schema().fields()) {
                this.addFieldToSpan(txLogSpanBuilder, source, field.name(), DB_FIELDS_PREFIX);
            }
            eventTimestamp = source.getInt64("ts_ms");
            if (eventTimestamp != null) {
                txLogSpanBuilder.withStartTimestamp(eventTimestamp * 1000L);
            }
        }
        if (propagatedSpanContext != null) {
            DebeziumTextMap parentSpanContextMap = new DebeziumTextMap(propagatedSpanContext);
            SpanContext parentSpanContext = tracer.extract(Format.Builtin.TEXT_MAP, (Object)parentSpanContextMap);
            txLogSpanBuilder.asChildOf(parentSpanContext);
        }
        Span txLogSpan = txLogSpanBuilder.start();
        debeziumSpanBuilder.asChildOf(txLogSpan);
        Span debeziumSpan = debeziumSpanBuilder.start();
        try (Scope debeziumScope = tracer.scopeManager().activate(debeziumSpan);){
            Tags.COMPONENT.set(txLogSpan, TRACING_COMPONENT);
            Tags.COMPONENT.set(debeziumSpan, TRACING_COMPONENT);
            if (eventTimestamp != null) {
                txLogSpan.finish(eventTimestamp * 1000L);
            } else {
                txLogSpan.finish();
            }
            debeziumSpan.finish();
            DebeziumTextMap activeTextMap = new DebeziumTextMap();
            tracer.inject(debeziumSpan.context(), Format.Builtin.TEXT_MAP, (Object)activeTextMap);
            activeTextMap.forEach(e -> record.headers().add((String)e.getKey(), e.getValue(), Schema.STRING_SCHEMA));
        }
        return record;
    }

    @Override
    public void close() {
    }

    @Override
    public ConfigDef config() {
        ConfigDef config = new ConfigDef();
        io.debezium.config.Field.group(config, null, TRACING_SPAN_CONTEXT_FIELD, TRACING_OPERATION_NAME, TRACING_CONTEXT_FIELD_REQUIRED);
        return config;
    }

    private void addFieldToSpan(Tracer.SpanBuilder span, Struct struct, String field, String prefix) {
        Object fieldValue = struct.get(field);
        if (fieldValue != null) {
            String targetFieldName = prefix + field;
            if (DB_FIELDS_PREFIX.equals(prefix)) {
                if ("db".equals(field)) {
                    targetFieldName = prefix + "instance";
                } else if ("connector".equals(field)) {
                    targetFieldName = prefix + "type";
                } else if ("name".equals(field)) {
                    targetFieldName = prefix + "cdc-name";
                }
            }
            span.withTag(targetFieldName, fieldValue.toString());
        }
    }

    public static boolean isOpenTracingAvailable() {
        return OPEN_TRACING_AVAILABLE;
    }

    private static boolean resolveOpenTracingApiAvailable() {
        try {
            GlobalTracer.get();
            return true;
        }
        catch (NoClassDefFoundError noClassDefFoundError) {
            return false;
        }
    }
}

