package org.jetlinks.supports.server;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Deprecated
/* loaded from: input_file:org/jetlinks/supports/server/DefaultSendToDeviceMessageHandler.class */
public class DefaultSendToDeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultSendToDeviceMessageHandler.class);
    private final String serverId;
    private final DeviceSessionManager sessionManager;
    private final MessageHandler handler;
    private final DeviceRegistry registry;
    private final DecodedClientMessageHandler decodedClientMessageHandler;

    public void startup() {
        this.handler.handleSendToDeviceMessage(this.serverId).subscribe(message -> {
            try {
                if (message instanceof DeviceMessage) {
                    handleDeviceMessage((DeviceMessage) message);
                }
            } catch (Throwable th) {
                log.error("handle send to device message error {}", message, th);
            }
        });
        this.handler.handleGetDeviceState(this.serverId, publisher -> {
            return Flux.from(publisher).map(str -> {
                return new DeviceStateInfo(str, this.sessionManager.sessionIsAlive(str) ? (byte) 1 : (byte) -1);
            });
        });
    }

    protected void handleDeviceMessage(DeviceMessage deviceMessage) {
        String deviceId = deviceMessage.getDeviceId();
        DeviceSession session = this.sessionManager.getSession(deviceId);
        if (session != null) {
            doSend(deviceMessage, session);
        } else {
            ((Mono) this.registry.getDevice(deviceId).flatMap(deviceOperator -> {
                Mono selfConfig = deviceOperator.getSelfConfig(DeviceConfigKey.parentGatewayId);
                DeviceRegistry deviceRegistry = this.registry;
                deviceRegistry.getClass();
                return selfConfig.flatMap(deviceRegistry::getDevice);
            }).flatMap(deviceOperator2 -> {
                ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
                childDeviceMessage.setDeviceId(deviceOperator2.getDeviceId());
                childDeviceMessage.setMessageId(deviceMessage.getMessageId());
                childDeviceMessage.setTimestamp(deviceMessage.getTimestamp());
                childDeviceMessage.setChildDeviceId(deviceId);
                childDeviceMessage.setChildDeviceMessage(deviceMessage);
                if (null != deviceMessage.getHeaders()) {
                    ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(deviceMessage.getHeaders());
                    concurrentHashMap.remove("productId");
                    concurrentHashMap.remove("deviceName");
                    childDeviceMessage.setHeaders(concurrentHashMap);
                }
                deviceMessage.addHeader(Headers.dispatchToParent, true);
                ChildrenDeviceSession session2 = this.sessionManager.getSession(deviceOperator2.getDeviceId(), deviceId);
                if (null != session2) {
                    doSend(childDeviceMessage, session2);
                    return Mono.just(true);
                }
                DeviceSession session3 = this.sessionManager.getSession(deviceOperator2.getDeviceId());
                if (null == session3) {
                    return doReply(createReply(deviceId, deviceMessage).error(ErrorCode.CLIENT_OFFLINE));
                }
                doSend(childDeviceMessage, session3);
                return Mono.just(true);
            }).switchIfEmpty(Mono.defer(() -> {
                log.warn("device[{}] not connected,send message fail", deviceMessage.getDeviceId());
                return doReply(createReply(deviceId, deviceMessage).error(ErrorCode.CLIENT_OFFLINE));
            })).as(MonoTracer.createWith(deviceMessage.getHeaders()))).subscribe();
        }
    }

    protected DeviceMessageReply createReply(String str, DeviceMessage deviceMessage) {
        DeviceMessageReply newReply = deviceMessage instanceof RepayableDeviceMessage ? ((RepayableDeviceMessage) deviceMessage).newReply() : new AcknowledgeDeviceMessage();
        newReply.messageId(deviceMessage.getMessageId()).deviceId(str);
        return newReply;
    }

    protected void doSend(DeviceMessage deviceMessage, DeviceSession deviceSession) {
        DeviceSession trace = DeviceSession.trace(deviceSession.unwrap(DeviceSession.class));
        if (trace.getOperator() == null) {
            log.warn("unsupported send message to {}", trace);
            return;
        }
        String deviceId = deviceMessage.getDeviceId();
        DeviceMessageReply createReply = createReply(deviceId, deviceMessage);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        boolean booleanValue = ((Boolean) deviceMessage.getHeader(Headers.sendAndForget).orElse(false)).booleanValue();
        Flux flux = (Flux) trace.getOperator().getProtocol().flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(trace.getTransport());
        }).flatMapMany(deviceMessageCodec -> {
            return deviceMessageCodec.encode(new ToDeviceMessageContext() { // from class: org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler.1
                public Mono<Boolean> sendToDevice(@Nonnull EncodedMessage encodedMessage) {
                    return trace.send(encodedMessage);
                }

                public Mono<Void> disconnect() {
                    DeviceSession deviceSession2 = trace;
                    return Mono.fromRunnable(() -> {
                        deviceSession2.close();
                        DefaultSendToDeviceMessageHandler.this.sessionManager.unregister(deviceSession2.getId());
                    });
                }

                @Nonnull
                public DeviceSession getSession() {
                    return trace;
                }

                public Mono<DeviceSession> getSession(String str) {
                    return Mono.justOrEmpty(DefaultSendToDeviceMessageHandler.this.sessionManager.getSession(str)).map(DeviceSession::trace);
                }

                @Nonnull
                public Message getMessage() {
                    return deviceMessage;
                }

                public DeviceOperator getDevice() {
                    return trace.getOperator();
                }

                public Mono<DeviceOperator> getDevice(String str) {
                    return DefaultSendToDeviceMessageHandler.this.registry.getDevice(str);
                }

                @Nonnull
                public Mono<Void> reply(@Nonnull Publisher<? extends DeviceMessage> publisher) {
                    atomicBoolean.set(true);
                    Flux from = Flux.from(publisher);
                    DeviceSession deviceSession2 = trace;
                    return from.flatMap(deviceMessage2 -> {
                        return DefaultSendToDeviceMessageHandler.this.decodedClientMessageHandler.handleMessage(deviceSession2.getOperator(), deviceMessage2);
                    }).then();
                }
            });
        }).as(FluxTracer.create(DeviceTracer.SpanName.encode(deviceId), (reactiveSpan, encodedMessage) -> {
            reactiveSpan.setAttribute(DeviceTracer.SpanKey.message, encodedMessage.toString());
        }));
        trace.getClass();
        Mono onErrorResume = flux.flatMap(trace::send).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).flatMap(bool3 -> {
            return (atomicBoolean.get() || booleanValue) ? Mono.empty() : ((Boolean) deviceMessage.getHeader(Headers.async).orElse(false)).booleanValue() ? doReply(createReply.message(ErrorCode.REQUEST_HANDLING.getText()).code(ErrorCode.REQUEST_HANDLING.name()).success()) : Mono.just(true);
        }).switchIfEmpty(Mono.defer(() -> {
            if (!(deviceMessage instanceof DisconnectDeviceMessage)) {
                return (atomicBoolean.get() || booleanValue) ? Mono.empty() : doReply(createReply(deviceId, deviceMessage).error(ErrorCode.UNSUPPORTED_MESSAGE));
            }
            deviceSession.close();
            this.sessionManager.unregister(deviceSession.getId());
            return atomicBoolean.get() ? Mono.empty() : doReply(createReply(deviceId, deviceMessage).success());
        })).onErrorResume(th -> {
            atomicBoolean.set(true);
            if (!(th instanceof DeviceOperationException) || booleanValue) {
                log.error(th.getMessage(), th);
            }
            return booleanValue ? Mono.empty() : doReply(createReply.error(th));
        });
        if ((deviceMessage instanceof ChildDeviceMessage) && (((ChildDeviceMessage) deviceMessage).getChildDeviceMessage() instanceof DisconnectDeviceMessage)) {
            onErrorResume = this.registry.getDevice(((ChildDeviceMessage) deviceMessage).getChildDeviceMessage().getDeviceId()).flatMap(deviceOperator -> {
                Mono selfConfig = deviceOperator.getSelfConfig(DeviceConfigKey.selfManageState);
                Boolean bool4 = Boolean.FALSE;
                bool4.getClass();
                return selfConfig.filter((v1) -> {
                    return r1.equals(v1);
                }).map(bool5 -> {
                    return this.sessionManager.unRegisterChildren(deviceId, deviceOperator.getDeviceId()).then(doReply(createReply.success()));
                });
            }).defaultIfEmpty(onErrorResume).flatMap(Function.identity());
        }
        ((Mono) onErrorResume.as(MonoTracer.createWith(deviceMessage.getHeaders()))).subscribe();
    }

    private Mono<Boolean> doReply(DeviceMessageReply deviceMessageReply) {
        Mono<Boolean> just = Mono.just(true);
        if (deviceMessageReply instanceof ChildDeviceMessageReply) {
            Message childDeviceMessage = ((ChildDeviceMessageReply) deviceMessageReply).getChildDeviceMessage();
            if (childDeviceMessage instanceof DeviceMessageReply) {
                just = doReply((DeviceMessageReply) childDeviceMessage);
            }
        }
        Mono writeToMessage = DeviceTracer.writeToMessage(deviceMessageReply);
        MessageHandler messageHandler = this.handler;
        messageHandler.getClass();
        return ((Mono) writeToMessage.flatMap(messageHandler::reply).as(mono -> {
            return log.isDebugEnabled() ? mono.doFinally(signalType -> {
                log.debug("reply message {} ,[{}]", signalType, deviceMessageReply);
            }) : mono;
        })).doOnError(th -> {
            log.error("reply message error", th);
        }).then(just);
    }

    public DefaultSendToDeviceMessageHandler(String str, DeviceSessionManager deviceSessionManager, MessageHandler messageHandler, DeviceRegistry deviceRegistry, DecodedClientMessageHandler decodedClientMessageHandler) {
        this.serverId = str;
        this.sessionManager = deviceSessionManager;
        this.handler = messageHandler;
        this.registry = deviceRegistry;
        this.decodedClientMessageHandler = decodedClientMessageHandler;
    }
}
