package org.jetlinks.supports.cluster;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.cache.Caches;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/supports/cluster/AbstractDeviceOperationBroker.class */
public abstract class AbstractDeviceOperationBroker implements DeviceOperationBroker, MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(AbstractDeviceOperationBroker.class);
    private final Map<String, Sinks.Many<DeviceMessageReply>> replyProcessor = Caches.newCache();
    private final Map<String, AtomicInteger> fragmentCounter = new ConcurrentHashMap();
    private ReplyFailureHandler replyFailureHandler = (th, deviceMessageReply) -> {
        log.warn("unhandled reply message:{}", deviceMessageReply, th);
    };

    public abstract Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection);

    public abstract Disposable handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function);

    public Flux<DeviceMessageReply> handleReply(String str, String str2, Duration duration) {
        long currentTimeMillis = System.currentTimeMillis();
        String awaitReplyKey = getAwaitReplyKey(str, str2);
        return ((Flux) this.replyProcessor.computeIfAbsent(awaitReplyKey, str3 -> {
            return Sinks.many().multicast().onBackpressureBuffer();
        }).asFlux().as(flux -> {
            return (duration.isZero() || duration.isNegative()) ? flux : flux.timeout(duration, Mono.error(() -> {
                return new DeviceOperationException(ErrorCode.TIME_OUT);
            }));
        })).doFinally(signalType -> {
            log.trace("reply device message {} {} take {}ms", new Object[]{str, str2, Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            this.replyProcessor.remove(awaitReplyKey);
            this.fragmentCounter.remove(awaitReplyKey);
        });
    }

    public abstract Mono<Integer> send(String str, Publisher<? extends Message> publisher);

    public abstract Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher);

    public abstract Flux<Message> handleSendToDeviceMessage(String str);

    protected abstract Mono<Void> doReply(DeviceMessageReply deviceMessageReply);

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAwaitReplyKey(DeviceMessage deviceMessage) {
        return getAwaitReplyKey(deviceMessage.getDeviceId(), deviceMessage.getMessageId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getAwaitReplyKey(String str, String str2) {
        return str + ":" + str2;
    }

    public Mono<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        if (StringUtils.isEmpty(deviceMessageReply.getMessageId())) {
            log.warn("reply message messageId is empty: {}", deviceMessageReply);
            return Reactors.ALWAYS_FALSE;
        }
        Mono<Boolean> mono = Reactors.ALWAYS_TRUE;
        if (deviceMessageReply instanceof ChildDeviceMessageReply) {
            Message childDeviceMessage = ((ChildDeviceMessageReply) deviceMessageReply).getChildDeviceMessage();
            if (childDeviceMessage instanceof DeviceMessageReply) {
                mono = reply((DeviceMessageReply) childDeviceMessage);
            }
        }
        return Mono.defer(() -> {
            String str = (String) deviceMessageReply.getHeader(Headers.fragmentBodyMessageId).orElse(deviceMessageReply.getMessageId());
            if (!((Boolean) deviceMessageReply.getHeader(Headers.async).orElse(false)).booleanValue() && !this.replyProcessor.containsKey(getAwaitReplyKey(deviceMessageReply.getDeviceId(), str))) {
                return doReply(deviceMessageReply).thenReturn(true);
            }
            handleReply(deviceMessageReply);
            return Reactors.ALWAYS_TRUE;
        }).then(mono);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleReply(DeviceMessageReply deviceMessageReply) {
        try {
            String awaitReplyKey = getAwaitReplyKey(deviceMessageReply);
            String str = (String) deviceMessageReply.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (str == null) {
                Sinks.Many<DeviceMessageReply> many = this.replyProcessor.get(awaitReplyKey);
                if (many != null) {
                    many.emitNext(deviceMessageReply, Reactors.emitFailureHandler());
                    many.emitComplete(Reactors.emitFailureHandler());
                } else {
                    this.replyProcessor.remove(awaitReplyKey);
                }
                return;
            }
            log.trace("handle fragment device[{}] message {}", deviceMessageReply.getDeviceId(), deviceMessageReply);
            String awaitReplyKey2 = getAwaitReplyKey(deviceMessageReply.getDeviceId(), str);
            Sinks.Many<DeviceMessageReply> orDefault = this.replyProcessor.getOrDefault(awaitReplyKey2, this.replyProcessor.get(awaitReplyKey));
            if (orDefault == null || orDefault.currentSubscriberCount() == 0) {
                this.replyProcessor.remove(awaitReplyKey2);
                return;
            }
            int intValue = ((Integer) deviceMessageReply.getHeader(Headers.fragmentNumber).orElse(1)).intValue();
            AtomicInteger computeIfAbsent = this.fragmentCounter.computeIfAbsent(awaitReplyKey2, str2 -> {
                return new AtomicInteger(intValue);
            });
            try {
                orDefault.emitNext(deviceMessageReply, Reactors.emitFailureHandler());
                if (computeIfAbsent.decrementAndGet() <= 0 || ((Boolean) deviceMessageReply.getHeader(Headers.fragmentLast).orElse(false)).booleanValue()) {
                    try {
                        orDefault.tryEmitComplete();
                        this.replyProcessor.remove(awaitReplyKey2);
                        this.fragmentCounter.remove(awaitReplyKey2);
                    } finally {
                    }
                }
            } catch (Throwable th) {
                if (computeIfAbsent.decrementAndGet() <= 0 || ((Boolean) deviceMessageReply.getHeader(Headers.fragmentLast).orElse(false)).booleanValue()) {
                    try {
                        orDefault.tryEmitComplete();
                        this.replyProcessor.remove(awaitReplyKey2);
                        this.fragmentCounter.remove(awaitReplyKey2);
                    } finally {
                    }
                }
                throw th;
            }
        } catch (Throwable th2) {
            this.replyFailureHandler.handle(th2, deviceMessageReply);
        }
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}
