package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import io.netty.util.ReferenceCountUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.supports.cluster.redis.DeviceCheckRequest;
import org.jetlinks.supports.cluster.redis.DeviceCheckResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Deprecated
/* loaded from: input_file:org/jetlinks/supports/cluster/EventBusDeviceOperationBroker.class */
public class EventBusDeviceOperationBroker extends AbstractDeviceOperationBroker implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(EventBusDeviceOperationBroker.class);
    private static final Codec<Message> messageCodec = Codecs.lookup(Message.class);
    private final String serverId;
    private final EventBus eventBus;
    private Function<Publisher<String>, Flux<DeviceStateInfo>> localStateChecker;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<String, Sinks.One<DeviceCheckResponse>> checkRequests = new ConcurrentHashMap();
    private final Map<String, RepayableDeviceMessage<?>> awaits = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5)).removalListener(removalNotification -> {
        if (removalNotification.getCause() == RemovalCause.EXPIRED) {
            try {
                log.debug("discard await reply message[{}] message,{}", removalNotification.getKey(), removalNotification.getValue());
            } catch (Throwable th) {
            }
        }
    }).build().asMap();

    public EventBusDeviceOperationBroker(String str, EventBus eventBus) {
        this.serverId = str;
        this.eventBus = eventBus;
    }

    public void dispose() {
        this.disposable.dispose();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    private void doSubscribeReply() {
        Subscription of = Subscription.of("device-message-broker", new String[]{"/_sys/msg-broker-reply/" + this.serverId}, new Subscription.Feature[]{Subscription.Feature.broker});
        Disposable.Composite composite = this.disposable;
        Flux subscribe = this.eventBus.subscribe(of, messageCodec);
        Class<DeviceMessageReply> cls = DeviceMessageReply.class;
        DeviceMessageReply.class.getClass();
        composite.add(subscribe.filter((v1) -> {
            return r2.isInstance(v1);
        }).cast(DeviceMessageReply.class).subscribe(this::handleReply));
    }

    public void start() {
        doSubscribeReply();
        this.disposable.add(this.eventBus.subscribe(Subscription.of("device-state-checker", new String[]{"/_sys/device-state-check-res/" + this.serverId}, new Subscription.Feature[]{Subscription.Feature.broker}), DeviceCheckResponse.class).subscribe(deviceCheckResponse -> {
            Optional.ofNullable(this.checkRequests.remove(deviceCheckResponse.getRequestId())).ifPresent(one -> {
                one.tryEmitValue(deviceCheckResponse);
            });
        }));
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection) {
        return Flux.defer(() -> {
            if (this.serverId.equals(str) && this.localStateChecker != null) {
                return this.localStateChecker.apply(Flux.fromIterable(collection));
            }
            long currentTimeMillis = System.currentTimeMillis();
            String uuid = UUID.randomUUID().toString();
            DeviceCheckRequest deviceCheckRequest = new DeviceCheckRequest(this.serverId, uuid, new ArrayList(collection));
            Sinks.One<DeviceCheckResponse> one = Sinks.one();
            this.checkRequests.put(uuid, one);
            return this.eventBus.publish("/_sys/device-state-check/".concat(str), deviceCheckRequest).flatMapMany(l -> {
                if (l.longValue() != 0) {
                    return one.asMono().flatMapIterable((v0) -> {
                        return v0.getStateInfoList();
                    });
                }
                log.warn("JetLinks server [{}] not found", str);
                return Mono.empty();
            }).timeout(Duration.ofSeconds(5L), Flux.empty()).doFinally(signalType -> {
                log.trace("check device state complete take {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                this.checkRequests.remove(uuid);
            });
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Disposable handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function) {
        Subscription of = Subscription.of("device-state-checker", new String[]{"/_sys/device-state-check/" + str}, new Subscription.Feature[]{Subscription.Feature.broker});
        this.localStateChecker = function;
        return this.eventBus.subscribe(of, DeviceCheckRequest.class).subscribe(deviceCheckRequest -> {
            ((Flux) function.apply(Flux.fromIterable(deviceCheckRequest.getDeviceId()))).collectList().map(list -> {
                return new DeviceCheckResponse(list, deviceCheckRequest.getRequestId());
            }).flatMap(deviceCheckResponse -> {
                return this.eventBus.publish("/_sys/device-state-check-res/" + deviceCheckRequest.getFrom(), deviceCheckResponse);
            }).subscribe();
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    protected Mono<Void> doReply(DeviceMessageReply deviceMessageReply) {
        String str = (String) Optional.ofNullable(this.awaits.remove(getAwaitReplyKey(deviceMessageReply))).flatMap(repayableDeviceMessage -> {
            return repayableDeviceMessage.getHeader(Headers.sendFrom);
        }).orElse("*");
        return this.eventBus.publish("/_sys/msg-broker-reply/" + str, messageCodec, deviceMessageReply).doOnNext(l -> {
            if (l.longValue() > 0 || "*".equals(str)) {
                return;
            }
            log.warn("no handler [{}] for reply message : {}", str, deviceMessageReply);
        }).then();
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Mono<Integer> send(String str, Publisher<? extends Message> publisher) {
        return this.eventBus.publish("/_sys/msg-broker/" + str, messageCodec, Flux.from(publisher).doOnNext(message -> {
            message.addHeader(Headers.sendFrom, this.serverId);
        })).map((v0) -> {
            return v0.intValue();
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher) {
        return this.eventBus.publish("/_sys/msg-broker-broadcast", messageCodec, publisher).map((v0) -> {
            return v0.intValue();
        });
    }

    @Override // org.jetlinks.supports.cluster.AbstractDeviceOperationBroker
    public Flux<Message> handleSendToDeviceMessage(String str) {
        return this.eventBus.subscribe(Subscription.of("device-message-broker", new String[]{"/_sys/msg-broker/" + str}, new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).map(topicPayload -> {
            try {
                return (Message) TraceHolder.copyContext(topicPayload.getHeaders(), (Message) topicPayload.decode(messageCodec, false), (v0, v1, v2) -> {
                    v0.addHeader(v1, v2);
                });
            } finally {
                ReferenceCountUtil.safeRelease(topicPayload);
            }
        }).doOnNext(message -> {
            if (!(message instanceof RepayableDeviceMessage) || ((Boolean) message.getHeader(Headers.sendAndForget).orElse(false)).booleanValue() || ((Boolean) message.getHeader(Headers.sendFrom).map(str2 -> {
                return Boolean.valueOf(str2.equals(str));
            }).orElse(false)).booleanValue()) {
                return;
            }
            this.awaits.put(getAwaitReplyKey((RepayableDeviceMessage) message), (RepayableDeviceMessage) message);
        });
    }
}
