/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.async.apikit.internal.protocols.kafka;

import amf.apicontract.client.platform.AMFElementClient;
import amf.apicontract.client.platform.model.domain.api.AsyncApi;
import com.mulesoft.connectors.kafka.api.KafkaRecordAttributes;
import java.io.InputStream;
import java.util.List;
import org.mule.extension.async.apikit.api.attributes.AsyncMessageAttributes;
import org.mule.extension.async.apikit.internal.bindings.AsyncBinding;
import org.mule.extension.async.apikit.internal.exception.AsyncApiRoutingException;
import org.mule.extension.async.apikit.internal.execution.ChannelBasedRegistry;
import org.mule.extension.async.apikit.internal.execution.SourceCallbackRegistry;
import org.mule.extension.async.apikit.internal.protocols.MessageListenerHandler;
import org.mule.extension.async.apikit.internal.protocols.ProtocolHandler;
import org.mule.extension.async.apikit.internal.protocols.kafka.KafkaMessageAttributesBuilder;
import org.mule.runtime.extension.api.client.params.Parameterizer;
import org.mule.runtime.extension.api.client.source.SourceParameterizer;
import org.mule.runtime.extension.api.runtime.operation.Result;

public class KafkaMessageListenerHandler
extends MessageListenerHandler<InputStream, KafkaRecordAttributes> {
    protected String consumerConfigRef;

    public KafkaMessageListenerHandler(AsyncApi api, SourceCallbackRegistry registry, ChannelBasedRegistry channelBasedRegistry, AMFElementClient amfElementClient, String serverName, String consumerConfigRef, List<AsyncBinding> asyncBindings) {
        super(api, registry, channelBasedRegistry, amfElementClient, serverName, null, asyncBindings);
        this.consumerConfigRef = consumerConfigRef;
    }

    @Override
    public String getSourceListenerName() {
        return "message-listener";
    }

    @Override
    public void configureSourceListener(SourceParameterizer sourceParameterizer) {
        sourceParameterizer.withConfigRef(this.consumerConfigRef);
        for (AsyncBinding asyncBinding : this.asyncBindings) {
            asyncBinding.applyBindings((Parameterizer)sourceParameterizer);
        }
    }

    @Override
    protected AsyncMessageAttributes buildResultAttributes(Result<InputStream, KafkaRecordAttributes> result, String specChannel) {
        KafkaRecordAttributes attributes = (KafkaRecordAttributes)result.getAttributes().orElseThrow(() -> new AsyncApiRoutingException("Unable to get attributes from Kafka message"));
        return ((KafkaMessageAttributesBuilder)((KafkaMessageAttributesBuilder)((KafkaMessageAttributesBuilder)KafkaMessageAttributesBuilder.builder().channelName(attributes.getTopic())).serverName(this.serverName)).protocol(ProtocolHandler.Protocol.KAFKA.name())).partition(attributes.getPartition()).headers(attributes.getHeaders()).key(attributes.getKey()).offset(attributes.getOffset()).creationTimestamp(attributes.getCreationTimestamp()).logAppendTimestamp(attributes.getLogAppendTimestamp()).leaderEpoch(attributes.getLeaderEpoch()).build();
    }

    @Override
    protected InputStream buildResultOutput(Result<InputStream, KafkaRecordAttributes> result) {
        return (InputStream)result.getOutput();
    }

    @Override
    protected String getRuntimeChannelName(Result<InputStream, KafkaRecordAttributes> result) {
        return result.getAttributes().map(KafkaRecordAttributes::getTopic).orElseThrow(() -> new AsyncApiRoutingException("Unable to find topic name from Kafka message"));
    }
}

