/*
 * Decompiled with CFR 0.152.
 */
package org.frankframework.management.gateway;

import com.hazelcast.cluster.Member;
import com.hazelcast.cluster.MembershipEvent;
import com.hazelcast.cluster.MembershipListener;
import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import com.hazelcast.topic.ITopic;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;
import org.frankframework.management.bus.BusException;
import org.frankframework.management.bus.OutboundGateway;
import org.frankframework.management.gateway.HazelcastConfig;
import org.frankframework.management.gateway.events.ClusterMemberEvent;
import org.frankframework.management.security.JwtKeyGenerator;
import org.frankframework.util.SpringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

public class HazelcastOutboundGateway
implements InitializingBean,
ApplicationContextAware,
OutboundGateway {
    @Generated
    private static final Logger log = LogManager.getLogger(HazelcastOutboundGateway.class);
    private HazelcastInstance hzInstance;
    private ApplicationContext applicationContext;
    private static final RandomStringUtils NUMBER_GENERATOR = RandomStringUtils.insecure();
    private final String requestTopicName = "frank_integration_request_topic";
    private ITopic<Message<?>> requestTopic;
    @Autowired
    private JwtKeyGenerator jwtGenerator;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void afterPropertiesSet() throws Exception {
        this.hzInstance = HazelcastConfig.newHazelcastInstance(HazelcastConfig.InstanceType.CONTROLLER, Collections.emptyMap());
        SpringUtils.registerSingleton((ApplicationContext)this.applicationContext, (String)"hazelcastOutboundInstance", (Object)this.hzInstance);
        IMap config = this.hzInstance.getMap("frank-configuration");
        config.set((Object)"jwks", (Object)this.jwtGenerator.getPublicJwkSet());
        this.requestTopic = this.hzInstance.getTopic("frank_integration_request_topic");
        this.hzInstance.getCluster().addMembershipListener(new MembershipListener(){

            public void memberAdded(MembershipEvent e) {
                HazelcastOutboundGateway.this.applicationContext.publishEvent((ApplicationEvent)new ClusterMemberEvent(HazelcastOutboundGateway.this.applicationContext, ClusterMemberEvent.EventType.ADD_MEMBER, HazelcastOutboundGateway.this.mapMember(e.getMember())));
            }

            public void memberRemoved(MembershipEvent e) {
                HazelcastOutboundGateway.this.applicationContext.publishEvent((ApplicationEvent)new ClusterMemberEvent(HazelcastOutboundGateway.this.applicationContext, ClusterMemberEvent.EventType.REMOVE_MEMBER, HazelcastOutboundGateway.this.mapMember(e.getMember())));
            }
        });
    }

    @Override
    @Nonnull
    public <I, O> Message<O> sendSyncMessage(Message<I> in) {
        String tempReplyChannelName = "__tmp." + NUMBER_GENERATOR.nextAlphanumeric(32);
        long receiveTimeout = this.receiveTimeout(in);
        log.debug("sending synchronous request to topic [{}] message [{}] reply-queue [{}] receiveTimeout [{}]", (Object)"frank_integration_request_topic", in, (Object)tempReplyChannelName, (Object)receiveTimeout);
        IQueue responseQueue = this.hzInstance.getQueue(tempReplyChannelName);
        Message requestMessage = ((MessageBuilder)((MessageBuilder)MessageBuilder.fromMessage(in).setReplyChannelName(tempReplyChannelName)).setHeader("meta-user", (Object)this.getAuthentication())).build();
        this.requestTopic.publish((Object)requestMessage);
        Message<O> replyMessage = this.doReceive(responseQueue, receiveTimeout);
        this.silentlyRemoveQueue(responseQueue);
        if (replyMessage != null) {
            return replyMessage;
        }
        throw new BusException("no reponse found on temporary reply-queue [" + tempReplyChannelName + "] within receiveTimeout [" + receiveTimeout + "]");
    }

    private void silentlyRemoveQueue(IQueue<?> responseQueue) {
        try {
            responseQueue.destroy();
        }
        catch (Exception e) {
            log.info("error closing response queue", (Throwable)e);
        }
    }

    @Nullable
    private <O> Message<O> doReceive(IQueue<Message<O>> responseQueue, long receiveTimeout) {
        try {
            Message response = (Message)responseQueue.poll(receiveTimeout, TimeUnit.MILLISECONDS);
            if (response != null) {
                log.trace("received message with id [{}]", new Supplier[]{() -> response.getHeaders().getId()});
                return response;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        log.trace("did not receive response within timeout of [{}] ms", (Object)receiveTimeout);
        return null;
    }

    @Override
    public List<OutboundGateway.ClusterMember> getMembers() {
        Set members = this.hzInstance.getCluster().getMembers();
        return members.stream().map(this::mapMember).toList();
    }

    private OutboundGateway.ClusterMember mapMember(Member member) {
        OutboundGateway.ClusterMember cm = new OutboundGateway.ClusterMember();
        cm.setAddress(member.getSocketAddress().getHostName() + ":" + member.getSocketAddress().getPort());
        cm.setId(member.getUuid());
        HashMap<String, String> attrs = new HashMap<String, String>(member.getAttributes());
        String type = (String)attrs.remove("type");
        if (StringUtils.isNotBlank((CharSequence)type)) {
            if (HazelcastConfig.InstanceType.WORKER.name().equals(type)) {
                cm.setType("worker");
            } else {
                cm.setType("console");
            }
        }
        cm.setAttributes(attrs);
        cm.setLocalMember(member.localMember());
        return cm;
    }

    @Nonnull
    private String getAuthentication() {
        return this.jwtGenerator.create();
    }

    private long receiveTimeout(Message<?> requestMessage) {
        Long receiveTimeout = this.headerToLong(requestMessage.getHeaders().get((Object)"receiveTimeout"));
        return receiveTimeout != null ? receiveTimeout : 5000L;
    }

    @Nullable
    private Long headerToLong(@Nullable Object headerValue) {
        if (headerValue instanceof Number) {
            Number number = (Number)headerValue;
            return number.longValue();
        }
        if (headerValue instanceof String) {
            String text = (String)headerValue;
            return Long.parseLong(text);
        }
        return null;
    }

    @Override
    public <I> void sendAsyncMessage(Message<I> in) {
        log.debug("sending asynchronous request to topic [{}] message [{}]", (Object)"frank_integration_request_topic", in);
        Message requestMessage = ((MessageBuilder)((MessageBuilder)MessageBuilder.fromMessage(in).setReplyChannelName(null)).setHeader("meta-user", (Object)this.getAuthentication())).build();
        this.requestTopic.publishAsync((Object)requestMessage);
    }
}

