package org.jetlinks.supports.server;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.LostDeviceSession;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/server/ClusterSendToDeviceMessageHandler.class */
public class ClusterSendToDeviceMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ClusterSendToDeviceMessageHandler.class);
    private static final HeaderKey<Boolean> resumeSession = HeaderKey.of("resume-session", true);
    private final DeviceSessionManager sessionManager;
    private final MessageHandler handler;
    private final DeviceRegistry registry;
    private final DecodedClientMessageHandler decodedClientMessageHandler;
    private final int concurrency = Integer.getInteger("jetlinks.device.message.send.concurrency", 10240).intValue();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/supports/server/ClusterSendToDeviceMessageHandler$CodecContext.class */
    public class CodecContext implements ToDeviceMessageContext {
        private final DeviceOperator device;
        private final DeviceMessage message;
        private final DeviceSession session;
        private volatile boolean alreadyReply = false;

        CodecContext(DeviceOperator deviceOperator, DeviceMessage deviceMessage, DeviceSession deviceSession) {
            this.device = deviceOperator;
            this.message = deviceMessage;
            this.session = deviceSession;
        }

        @Nullable
        public DeviceOperator getDevice() {
            return this.device;
        }

        public Mono<DeviceOperator> getDevice(String str) {
            return ClusterSendToDeviceMessageHandler.this.registry.getDevice(str);
        }

        public Map<String, Object> getConfiguration() {
            return super.getConfiguration();
        }

        public Optional<Object> getConfig(String str) {
            return super.getConfig(str);
        }

        @Nonnull
        public Message getMessage() {
            return this.message;
        }

        @Nonnull
        public Mono<Void> reply(@Nonnull Publisher<? extends DeviceMessage> publisher) {
            this.alreadyReply = true;
            return Flux.from(publisher).flatMap(deviceMessage -> {
                return ClusterSendToDeviceMessageHandler.this.decodedClientMessageHandler.handleMessage(this.device, deviceMessage);
            }).then();
        }

        public Mono<Boolean> sendToDevice(@Nonnull EncodedMessage encodedMessage) {
            return this.session.send(encodedMessage);
        }

        public Mono<Void> disconnect() {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.remove(this.device.getDeviceId(), true).then();
        }

        @Nonnull
        public DeviceSession getSession() {
            return this.session;
        }

        public Mono<DeviceSession> getSession(String str) {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.getSession(str);
        }

        public Mono<Boolean> sessionIsAlive(String str) {
            return ClusterSendToDeviceMessageHandler.this.sessionManager.isAlive(str);
        }
    }

    public ClusterSendToDeviceMessageHandler(DeviceSessionManager deviceSessionManager, MessageHandler messageHandler, DeviceRegistry deviceRegistry, DecodedClientMessageHandler decodedClientMessageHandler) {
        this.sessionManager = deviceSessionManager;
        this.handler = messageHandler;
        this.registry = deviceRegistry;
        this.decodedClientMessageHandler = decodedClientMessageHandler;
        init();
    }

    private void init() {
        this.handler.handleSendToDeviceMessage(this.sessionManager.getCurrentServerId()).onBackpressureDrop(message -> {
            doReply((DeviceOperator) null, (DeviceMessage) createReply(message).error(ErrorCode.SYSTEM_BUSY)).subscribe();
        }).flatMap(message2 -> {
            return handleMessage(message2).onErrorResume(th -> {
                log.error("handle send to device message error {}", message2, th);
                return Mono.empty();
            });
        }, this.concurrency).subscribe();
    }

    private DeviceMessageReply createReply(Message message) {
        return (DeviceMessageReply) TraceHolder.copyContext(message.getHeaders(), message instanceof RepayableDeviceMessage ? ((RepayableDeviceMessage) message).newReply() : new CommonDeviceMessageReply().deviceId(((DeviceMessage) message).getDeviceId()).messageId(message.getMessageId()), (v0, v1, v2) -> {
            v0.addHeaderIfAbsent(v1, v2);
        });
    }

    private Mono<Void> handleMessage(Message message) {
        if (!(message instanceof DeviceMessage)) {
            return Mono.empty();
        }
        DeviceMessage deviceMessage = (DeviceMessage) message;
        if (deviceMessage.getDeviceId() != null) {
            return this.sessionManager.getSession(deviceMessage.getDeviceId()).map(deviceSession -> {
                return sendTo(deviceSession, deviceMessage);
            }).defaultIfEmpty(Mono.defer(() -> {
                return sendToUnknownSession(deviceMessage);
            })).flatMap(Function.identity()).contextWrite(context -> {
                return TraceHolder.readToContext(context, deviceMessage.getHeaders()).put(DeviceMessage.class, deviceMessage);
            });
        }
        log.warn("deviceId is null :{}", deviceMessage);
        return Mono.empty();
    }

    private Mono<Void> sendTo(DeviceSession deviceSession, DeviceMessage deviceMessage) {
        if (deviceSession.isWrapFrom(ChildrenDeviceSession.class)) {
            return sendToParentSession(deviceSession.getOperator(), deviceSession.unwrap(ChildrenDeviceSession.class).getParentDevice(), deviceMessage);
        }
        DeviceOperator operator = deviceSession.getOperator();
        if (deviceSession.isWrapFrom(LostDeviceSession.class)) {
            return deviceMessage instanceof DisconnectDeviceMessage ? this.sessionManager.remove(deviceSession.getDeviceId(), false).then(doReply(operator, (DeviceMessage) ((DisconnectDeviceMessage) deviceMessage).newReply().success())) : retryResume(operator, deviceMessage);
        }
        CodecContext codecContext = new CodecContext(operator, deviceMessage, DeviceSession.trace(deviceSession));
        return ((Flux) operator.getProtocol().flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(codecContext.session.getTransport());
        }).flatMapMany(deviceMessageCodec -> {
            return deviceMessageCodec.encode(codecContext);
        }).as(FluxTracer.create(DeviceTracer.SpanName.encode(operator.getDeviceId()), (reactiveSpan, encodedMessage) -> {
            reactiveSpan.setAttribute(DeviceTracer.SpanKey.message, encodedMessage.toString());
        }))).map(encodedMessage2 -> {
            return codecContext.session.send(encodedMessage2).then();
        }).defaultIfEmpty(Mono.defer(() -> {
            return handleUnsupportedMessage(codecContext);
        })).flatMap(Function.identity()).onErrorResume(th -> {
            if (!(th instanceof DeviceOperationException)) {
                log.error("handle send to device message error {}", codecContext.message, th);
            }
            return !codecContext.alreadyReply ? doReply(codecContext, (DeviceMessage) createReply(codecContext.message).error(th)) : Mono.empty();
        }).then(Mono.defer(() -> {
            return handleMessageSent(codecContext);
        }));
    }

    private Mono<Void> handleMessageSent(CodecContext codecContext) {
        if (!codecContext.alreadyReply && ((Boolean) codecContext.message.getHeader(Headers.async).orElse(false)).booleanValue()) {
            return doReply(codecContext, (DeviceMessage) createReply(codecContext.message).message(ErrorCode.REQUEST_HANDLING.getText()).code(ErrorCode.REQUEST_HANDLING.name()).success()).then();
        }
        return Mono.empty();
    }

    private Mono<Void> handleUnsupportedMessage(CodecContext codecContext) {
        if (codecContext.alreadyReply) {
            return Mono.empty();
        }
        if (codecContext.message instanceof DisconnectDeviceMessage) {
            return this.sessionManager.remove(codecContext.device.getDeviceId(), false).then(doReply(codecContext, (DeviceMessage) createReply(codecContext.message).success()));
        }
        if (codecContext.message instanceof ChildDeviceMessage) {
            DisconnectDeviceMessage childDeviceMessage = codecContext.message.getChildDeviceMessage();
            if (childDeviceMessage instanceof DisconnectDeviceMessage) {
                return this.sessionManager.remove(childDeviceMessage.getDeviceId(), false).then(doReply(codecContext, (DeviceMessage) createReply(codecContext.message).success()));
            }
            if (childDeviceMessage instanceof DeviceStateCheckMessage) {
                return doReply(codecContext, (DeviceMessage) createReply(codecContext.message).success());
            }
        }
        return doReply(codecContext, (DeviceMessage) createReply(codecContext.message).error(ErrorCode.UNSUPPORTED_MESSAGE));
    }

    private Mono<Void> sendToUnknownSession(DeviceMessage deviceMessage) {
        return this.registry.getDevice(deviceMessage.getDeviceId()).flatMap(deviceOperator -> {
            Mono selfConfig = deviceOperator.getSelfConfig(DeviceConfigKey.parentGatewayId);
            DeviceRegistry deviceRegistry = this.registry;
            deviceRegistry.getClass();
            return selfConfig.flatMap(deviceRegistry::getDevice).map(deviceOperator -> {
                return sendToParentSession(deviceOperator, deviceOperator, deviceMessage);
            }).defaultIfEmpty(Mono.defer(() -> {
                return sendToNoSession(deviceOperator, deviceMessage);
            }));
        }).flatMap(Function.identity());
    }

    private Mono<Void> sendToNoSession(DeviceOperator deviceOperator, DeviceMessage deviceMessage) {
        log.warn("device session state failed,try resume. {}", deviceMessage);
        return this.sessionManager.checkAlive(deviceMessage.getDeviceId(), false).flatMap(bool -> {
            if (bool.booleanValue()) {
                return retryResume(deviceOperator, deviceMessage);
            }
            return doReply(deviceOperator, (DeviceMessage) createReply(deviceMessage).addHeader("reason", "session_not_exists").error(((Boolean) deviceMessage.getHeader(resumeSession).orElse(false)).booleanValue() ? ErrorCode.CONNECTION_LOST : ErrorCode.CLIENT_OFFLINE));
        });
    }

    private Mono<Void> retryResume(DeviceOperator deviceOperator, DeviceMessage deviceMessage) {
        if (deviceMessage.getHeader(resumeSession).isPresent()) {
            return doReply(deviceOperator, (DeviceMessage) createReply(deviceMessage).error(ErrorCode.CONNECTION_LOST));
        }
        deviceMessage.addHeader(resumeSession, true);
        return this.handler instanceof DeviceOperationBroker ? deviceOperator.getSelfConfig(DeviceConfigKey.connectionServerId).flatMap(str -> {
            return this.handler.send(str, Mono.just(deviceMessage));
        }).flatMap(num -> {
            return num.intValue() > 0 ? Mono.empty() : doReply(deviceOperator, (DeviceMessage) createReply(deviceMessage).error(ErrorCode.CONNECTION_LOST));
        }) : doReply(deviceOperator, (DeviceMessage) createReply(deviceMessage).error(ErrorCode.CONNECTION_LOST));
    }

    private Mono<Void> sendToParentSession(DeviceOperator deviceOperator, DeviceOperator deviceOperator2, DeviceMessage deviceMessage) {
        ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
        childDeviceMessage.setDeviceId(deviceOperator2.getDeviceId());
        childDeviceMessage.setChildDeviceId(deviceOperator.getDeviceId());
        childDeviceMessage.setChildDeviceMessage(deviceMessage);
        childDeviceMessage.setMessageId(deviceMessage.getMessageId());
        Headers.copyFunctionalHeader(deviceMessage, childDeviceMessage);
        return handleMessage(childDeviceMessage);
    }

    Mono<Void> doReply(DeviceOperator deviceOperator, DeviceMessage deviceMessage) {
        return this.decodedClientMessageHandler.handleMessage(deviceOperator, deviceMessage).then();
    }

    Mono<Void> doReply(CodecContext codecContext, DeviceMessage deviceMessage) {
        if (codecContext == null) {
            return doReply((DeviceOperator) null, deviceMessage).contextWrite(context -> {
                return TraceHolder.readToContext(context, deviceMessage.getHeaders());
            });
        }
        if (((Boolean) codecContext.message.getHeader(Headers.sendAndForget).orElse(false)).booleanValue() || codecContext.alreadyReply) {
            return Mono.empty();
        }
        codecContext.alreadyReply = true;
        return doReply(codecContext.device, deviceMessage).contextWrite(context2 -> {
            return TraceHolder.readToContext(context2, codecContext.message.getHeaders());
        });
    }
}
