跳到主要内容

MqttServer

提示

通过自定义脚本,可快速适配业务模型

概述

核心功能

MqttServer 是一个内置的 MQTT 服务器插件,支持 TCP 和 WebSocket 方式接入客户端设备,可以定时或基于数据变化发布数据,支持 RPC 远程控制功能。该插件为物联网设备提供了本地 MQTT 接入能力,适用于边缘计算场景。

主要特性

  • 多协议支持:同时支持 TCP 和 WebSocket 连接方式
  • 安全认证:支持客户端 ID 前缀验证、用户名密码认证和匿名登录
  • 灵活配置:支持详细的服务器参数配置,如端口、日志级别等
  • 数据发布:支持定时发布、变化发布和混合发布三种模式
  • 远程控制:支持通过 MQTT 消息实现 RPC 远程变量写入
  • 脚本扩展:通过自定义脚本适配各种业务模型和设备协议
  • 高性能:基于异步架构,支持高并发连接

技术架构

  • 服务器核心:基于 MQTTnet 库实现的 MQTT 服务器
  • 连接管理:支持 TCP 和 WebSocket 连接管理
  • 认证系统:实现客户端认证和授权
  • 消息路由:处理 MQTT 消息的发布和订阅
  • RPC 处理:实现远程过程调用功能
  • 数据发布:管理设备数据的定时和变化发布

适用场景

  • 边缘计算:在边缘设备上提供本地 MQTT 服务
  • 设备网关:作为设备接入网关,收集和转发设备数据
  • 本地监控:在局域网内实现设备数据的实时监控

插件属性配置项

服务器配置

属性说明默认值建议值
IP服务器绑定 IP 地址,为空时表示绑定所有网络接口生产环境建议绑定具体 IP
端口TCP 连接绑定端口1883标准 MQTT 端口,如需修改请确保防火墙开放
WebSocket 端口WebSocket 连接绑定端口8083固定使用 /mqtt 路由
详细日志日志输出级别:False 仅输出上传数量,True 输出详细内容False调试时设为 True,生产环境设为 False
允许连接的 ID(前缀)客户端 ID 必须以此前缀开头,为空时允许任意客户端 ID生产环境建议设置前缀,提高安全性
允许匿名登录是否允许匿名登录True生产环境建议设为 False,启用认证

安全配置

属性说明默认值建议值
允许匿名登录是否允许客户端匿名连接True生产环境建议设为 False
用户名/密码当禁止匿名登录时,客户端需使用的认证信息-Admin中设置的用户密码都能登录

RPC 配置

属性说明默认值建议值
允许 Rpc 写入是否允许通过 MQTT 写入变量False根据安全需求设置
Rpc 写入主题写入变量的主题前缀RpcWrite使用有意义的主题名称
Rpc 请求数据主题接收到消息后发布全部信息的主题-配置专用的请求主题

发布配置

属性说明默认值建议值
上传模式数据发布模式:间隔/变化/变化和间隔同时生效间隔根据业务需求选择
定时上传间隔定时发布的时间间隔(毫秒)100001000-60000 毫秒
严格入队模式启用后,每次定时上传时,保证一组数据在同一时间点可见-
分组上传是否按变量分组属性上传数据False建议启用,提高效率
选择全部变量是否选择全部变量False批量配置时启用
设备状态列表上传设备是否列表上传True性能允许时建议启用
变量列表上传变量是否列表上传True性能允许时建议启用
报警列表上传报警是否列表上传True性能允许时建议启用

主题配置

属性说明默认值建议值
设备 Topic设备实体的发布主题,支持 ${key} 变量替换-使用层次化结构,如 devices/${Name}/attributes
变量 Topic变量实体的发布主题,支持 ${key} 变量替换-使用层次化结构,如 devices/${DeviceName}/telemetry
报警 Topic报警实体的发布主题,支持 ${key} 变量替换-使用层次化结构,如 devices/${DeviceName}/alarms

脚本配置

属性说明默认值建议值
设备实体脚本设备数据处理脚本-实现 DynamicModelBase 接口
变量实体脚本变量数据处理脚本-实现 DynamicModelBase 接口
报警实体脚本报警数据处理脚本-实现 DynamicModelBase 接口

缓存配置

属性说明默认值建议值
启用缓存是否启用数据缓存False建议启用,确保数据可靠性
缓存文件最大长度(mb)缓存文件的最大大小10050-500 MB,根据存储空间调整
上传每页条数每批上传的最大数量10050-500,根据网络情况调整
内存队列最大数量内存队列的最大容量100001000-50000,根据设备性能调整

脚本与实体

脚本接口

MqttServer 使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档

变量业务属性

基本配置

属性说明默认值
启用 RPC单独配置变量是否允许远程写入true
Enable是否启用该变量true

配置建议

  • 启用 RPC:对于需要远程控制的变量,建议启用此选项;对于只读变量,建议禁用以提高安全性
  • Enable:根据业务需求启用或禁用变量,禁用不需要的变量可以提高系统性能

RPC 远程控制

概述

MqttServer 支持通过 MQTT 消息实现 RPC(远程过程调用)功能,允许客户端设备发送控制命令来写入变量值。该功能为物联网设备提供了远程控制能力,适用于需要远程操作的场景。

RPC 脚本

脚本类定义

using MQTTnet;
using MQTTnet.Server;

using Newtonsoft.Json.Linq;

using System.Text;

using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;

namespace ThingsGateway.Plugin.Mqtt;

public abstract class DynamicMqttServerRpcBase
{
/// <summary>
/// 触发 RPC 脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">消息拦截事件参数</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">MQTT 服务器实例</param>
/// <param name="getRpcResult">获取 RPC 结果的委托</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(
TouchSocket.Core.ILog logMessage,
InterceptingPublishEventArgs args,
MqttServerProperty driverPropertys,
MQTTnet.Server.MqttServer mqttServer,
Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult,
CancellationToken cancellationToken
)
{
// 默认实现为空,由子类重写
}
}

RPC 脚本示例

using MQTTnet;
using MQTTnet.Server;

using Newtonsoft.Json.Linq;

using System.Text;

using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;

using ThingsGateway.Plugin.Mqtt;

public class DynamicMqttServerRpc : DynamicMqttServerRpcBase
{
/// <summary>
/// 处理 RPC 调用
/// </summary>
public override async Task RPCInvokeAsync(
TouchSocket.Core.ILog logMessage,
InterceptingPublishEventArgs args,
MqttServerProperty driverPropertys,
MQTTnet.Server.MqttServer mqttServer,
Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult,
CancellationToken cancellationToken
)
{
// 检查 RPC 写入主题配置
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;

// 检查消息主题是否匹配
if (args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;

try
{
// 解析 RPC 请求数据
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
.FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();

if (rpcDatas == null) return;

// 执行 RPC 操作
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);

// 构建并发送响应消息
var responseMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented))
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();

await mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(responseMessage),
cancellationToken
).ConfigureAwait(false);
}
catch (Exception ex)
{
// 记录错误日志
logMessage.Error($"RPC 处理错误: {ex.Message}");
}
}
}

RPC 消息格式

请求格式

RPC 请求消息使用 JSON 格式,结构如下:

{
"device1": {
"variable1": "value1",
"variable2": "value2"
},
"device2": {
"variable3": "value3"
}
}
  • 外层键:设备名称
  • 内层键:变量名称
  • 内层值:要写入的变量值

响应格式

RPC 响应消息包含操作结果:

{
"device1": {
"variable1": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
},
"variable2": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
}
},
"device2": {
"variable3": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
}
}
}

RPC 主题配置

请求主题

请求主题在配置属性中设置Rpc写入Topic,配置后实际格式为:

{RpcWriteTopic}/{requestId}

其中:

  • RpcWriteTopic:在插件配置中设置的 RPC 写入主题前缀
  • requestId:请求的唯一标识,通常为 GUID

响应主题

响应主题自动生成,格式为:

{RpcWriteTopic}/{requestId}/Response

RPC 安全配置

配置项说明默认值建议值
允许 Rpc 写入全局启用/禁用 RPC 写入False生产环境根据安全需求设置
Rpc 写入主题RPC 请求主题前缀RpcWrite使用有意义的主题名称
启用 RPC变量级别的 RPC 启用控制true根据变量重要性设置

与 MqttClient 的兼容性

MqttServer 的 RPC 格式与 MqttClient 相同。详细格式说明请参考 MqttClient RPC 文档

常见问题与解决方案

连接问题

问题:客户端无法连接到 MQTT 服务器

可能原因

  • 网络防火墙阻止了 MQTT 端口
  • 服务器 IP 或端口配置错误
  • 客户端 ID 格式不正确
  • 用户名密码认证失败

解决方案

  • 检查防火墙设置,确保 MQTT 端口(默认 1883)和 WebSocket 端口(默认 8083)已开放
  • 验证服务器 IP 和端口配置是否正确
  • 确保客户端 ID 唯一且符合要求
  • 检查用户名密码是否正确配置

问题:连接不稳定,频繁断开

可能原因

  • 网络质量差
  • KeepAlive 时间设置不合理
  • 服务器负载过高
  • 客户端资源限制

解决方案

  • 优化网络环境,确保网络稳定
  • 调整 KeepAlive 时间,建议设置为 30-60 秒
  • 增加服务器资源或优化服务器配置
  • 检查客户端设备资源使用情况

性能问题

问题:服务器响应缓慢

可能原因

  • 并发连接数超过服务器处理能力
  • 内存队列已满
  • 磁盘 I/O 瓶颈
  • 脚本执行效率低

解决方案

  • 增加服务器硬件资源
  • 优化内存队列大小设置
  • 使用高性能存储设备
  • 优化脚本执行逻辑,减少复杂计算

问题:数据传输延迟高

可能原因

  • 网络延迟
  • 批处理设置不合理
  • 服务器负载高

解决方案

  • 优化网络环境
  • 调整上传每页条数,平衡实时性和效率
  • 增加服务器资源或负载均衡