/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.async.apikit.internal.protocols.kafka;

import org.mule.extension.async.apikit.internal.AsyncConfig;
import org.mule.extension.async.apikit.internal.exception.AsyncApiModuleException;
import org.mule.extension.async.apikit.internal.protocols.ProtocolHandler;
import org.mule.extension.async.apikit.internal.protocols.kafka.KafkaConfig;
import org.mule.extension.async.apikit.internal.protocols.kafka.KafkaMessageListenerHandler;
import org.mule.extension.async.apikit.internal.protocols.kafka.KafkaPublishHandler;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.extension.api.client.source.SourceHandler;

public class KafkaProtocolHandler
extends ProtocolHandler<KafkaMessageListenerHandler, KafkaPublishHandler> {
    private static final String MOCK_CHANNEL_NAME = "_";

    @Override
    public String getExtensionName() {
        return "Apache Kafka";
    }

    @Override
    public void buildMessageListenerHandlersFor(AsyncConfig config, String server, String channelName, boolean disableValidation, String defaultEncoding) {
        KafkaConfig kafkaConfig = config.getKafkaConfigs().stream().filter(c -> c.getServerKey().equals(server)).findFirst().orElseThrow(() -> new AsyncApiModuleException("No server found with key '{}'", server));
        Pair handlerKey = new Pair((Object)server, (Object)MOCK_CHANNEL_NAME);
        this.applyChannelBindings(config, channelName, ProtocolHandler.Protocol.KAFKA);
        KafkaMessageListenerHandler messageListenerHandler = this.messageListenerHandlers.computeIfAbsent(handlerKey, key -> new KafkaMessageListenerHandler(config.getApi(), config.getSourceCallbackRegistry(), config, config.getAsyncApiAmfConfiguration().elementClient(), disableValidation, server, kafkaConfig.getConsumerConfigRef(), config.getBindingsHandler().getSourceListenerAsyncBindings(channelName), defaultEncoding));
        this.sourceHandlers.putIfAbsent(handlerKey, config.getExtensionsClient().createSource(this.getExtensionName(), messageListenerHandler.getSourceListenerName(), messageListenerHandler::consumeSourceListenerResult, messageListenerHandler::configureSourceListener));
    }

    @Override
    public void buildPublishHandlersFor(AsyncConfig config, String server, String channelName) {
        KafkaConfig kafkaConfig = (KafkaConfig)this.getServerConfig(config.getKafkaConfigs(), server);
        this.publishHandlers.putIfAbsent(new Pair((Object)server, (Object)MOCK_CHANNEL_NAME), new KafkaPublishHandler(kafkaConfig.getProducerConfigRef()));
    }

    @Override
    public SourceHandler getSourceHandler(String server, String channelName) {
        return super.getSourceHandler(server, MOCK_CHANNEL_NAME);
    }

    @Override
    public KafkaMessageListenerHandler getMessageListenerHandler(String server, String channelName) {
        return (KafkaMessageListenerHandler)super.getMessageListenerHandler(server, MOCK_CHANNEL_NAME);
    }

    @Override
    public KafkaPublishHandler getPublishHandler(String server, String channelName) {
        return (KafkaPublishHandler)super.getPublishHandler(server, MOCK_CHANNEL_NAME);
    }
}

