MqttServer
提示通过自定义脚本,可快速适配业务模型
概述
核心功能
MqttServer 是一个内置的 MQTT 服务器插件,支持 TCP 和 WebSocket 方式接入客户端设备,可以定时或基于数据变化发布数据,支持 RPC 远程控制功能。该插件为物联网设备提供了本地 MQTT 接入能力,适用于边缘计算场景。
主要特性
- 多协议支持:同时支持 TCP 和 WebSocket 连接方式
- 安全认证:支持客户端 ID 前缀验证、用户名密码认证和匿名登录
- 灵活配置:支持详细的服务器参数配置,如端口、日志级别等
- 数据发布:支持定时发布、变化发布和混合发布三种模式
- 远程控制:支持通过 MQTT 消息实现 RPC 远程变量写入
- 脚本扩展:通过自定义脚本适配各种业务模型和设备协议
- 高性能:基于异步架构,支持高并发连接
技 术架构
- 服务器核心:基于 MQTTnet 库实现的 MQTT 服务器
- 连接管理:支持 TCP 和 WebSocket 连接管理
- 认证系统:实现客户端认证和授权
- 消息路由:处理 MQTT 消息的发布和订阅
- RPC 处理:实现远程过程调用功能
- 数据发布:管理设备数据的定时和变化发布
适用场景
- 边缘计算:在边缘设备上提供本地 MQTT 服务
- 设备网关:作为设备接入网关,收集和转发设备数据
- 本地监控:在局域网内实现设备数据的实时监控
插件属性配置项
服务器配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| IP | 服务器绑定 IP 地址,为空时表示绑定所有网络接口 | 空 | 生产环境建议绑定具体 IP |
| 端口 | TCP 连接绑定端口 | 1883 | 标准 MQTT 端口,如需修改请确保防火墙开放 |
| WebSocket 端口 | WebSocket 连接绑定端口 | 8083 | 固定使用 /mqtt 路由 |
| 详细日志 | 日志输出级别:False 仅输出上传数量,True 输出详细内容 | False | 调试时设为 True,生产环境设为 False |
| 允许连接的 ID(前缀) | 客户端 ID 必须以此前缀开头,为空时允许任意客户端 ID | 空 | 生产环境建议设置前缀,提高安全性 |
| 允许匿名登录 | 是否允许匿名登录 | True | 生产环境建议设为 False,启用认证 |
安全配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 允许匿名登录 | 是否允许客户端匿名连接 | True | 生产环境建议设为 False |
| 用户名/密码 | 当禁止匿名登录时,客户端需使用的认证信息 | - | Admin中设置的用户密码都能登录 |
RPC 配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 允许 Rpc 写入 | 是否允许通过 MQTT 写入变量 | False | 根据安全需求设置 |
| Rpc 写入主题 | 写入变量的主题前缀 | RpcWrite | 使用有意义的主题名称 |
| Rpc 请求数据主题 | 接收到消息后发布全部信息的主题 | - | 配置专用的请求主题 |
发布配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 上传模式 | 数据发布模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据业务需求选择 |
| 定时上传间隔 | 定时发布的时间间隔(毫秒) | 10000 | 1000-60000 毫秒 |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - | |
| 分组上传 | 是否按变量分组属性上传数据 | False | 建议启用,提高效率 |
| 选择全部变量 | 是否选择全部变量 | False | 批量配置时启用 |
| 设备状态列表上传 | 设备是否列表上传 | True | 性能允许时建议启用 |
| 变量列表上传 | 变量是否列表上传 | True | 性能允许时建议启用 |
| 报警列表上传 | 报警是否列表上传 | True | 性能允许时建议启用 |
主题配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 设备 Topic | 设备实体的发布主题,支持 ${key} 变量替换 | - | 使用层次化结构,如 devices/${Name}/attributes |
| 变量 Topic | 变量实体的发布主题,支持 ${key} 变量替换 | - | 使用层次化结构,如 devices/${DeviceName}/telemetry |
| 报警 Topic | 报警实体的发布主题,支持 ${key} 变量替换 | - | 使用层次化结构,如 devices/${DeviceName}/alarms |
脚本配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 设备实体脚本 | 设备数据处理脚本 | - | 实现 DynamicModelBase 接口 |
| 变量实体脚本 | 变量数据处理脚本 | - | 实现 DynamicModelBase 接口 |
| 报警实体脚本 | 报警数据处理脚本 | - | 实现 DynamicModelBase 接口 |
缓存配置
| 属性 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 启用缓存 | 是否启用数据缓存 | False | 建议启用,确保数据可靠性 |
| 缓存文件最大长度(mb) | 缓存文件的最大大小 | 100 | 50-500 MB,根据存储空间调整 |
| 上传每页条数 | 每批上传的最大数量 | 100 | 50-500,根据网络情况调整 |
| 内存队列最大数量 | 内存队列的最大容量 | 10000 | 1000-50000,根据设备性能调整 |
脚本与实体
脚本接口
MqttServer 使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档。
变量业务属性
基本配置
| 属性 | 说明 | 默认值 |
|---|---|---|
| 启用 RPC | 单独配置变量是否允许远程写入 | true |
| Enable | 是否启用该变量 | true |
配置建议
- 启用 RPC:对于需要远程控制的变量,建议启用此选项;对于只读变量,建议禁用以提高安全性
- Enable:根据业务需求启用或禁用变量,禁用不需要的变量可以提高系统性能
RPC 远程控制
概述
MqttServer 支持通过 MQTT 消息实现 RPC(远程过程调用)功能,允许客户端设备发送控制命令来写入变量值。该功能为物联网设备提供了远程控制能力,适用于需要远程操作的场景。
RPC 脚本
脚本类定义
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttServerRpcBase
{
/// <summary>
/// 触发 RPC 脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">消息拦截事件参数</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">MQTT 服务器实例</param>
/// <param name="getRpcResult">获取 RPC 结果的委托</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(
TouchSocket.Core.ILog logMessage,
InterceptingPublishEventArgs args,
MqttServerProperty driverPropertys,
MQTTnet.Server.MqttServer mqttServer,
Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult,
CancellationToken cancellationToken
)
{
// 默认实现为空,由子类重写
}
}
RPC 脚本示例
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
using ThingsGateway.Plugin.Mqtt;
public class DynamicMqttServerRpc : DynamicMqttServerRpcBase
{
/// <summary>
/// 处理 RPC 调用
/// </summary>
public override async Task RPCInvokeAsync(
TouchSocket.Core.ILog logMessage,
InterceptingPublishEventArgs args,
MqttServerProperty driverPropertys,
MQTTnet.Server.MqttServer mqttServer,
Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult,
CancellationToken cancellationToken
)
{
// 检查 RPC 写入主题配置
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
// 检查消息主题是否匹配
if (args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;
try
{
// 解析 RPC 请求数据
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload)
.FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null) return;
// 执行 RPC 操作
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
// 构建并发送响应消息
var responseMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented))
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await mqttServer.InjectApplicationMessage(
new InjectedMqttApplicationMessage(responseMessage),
cancellationToken
).ConfigureAwait(false);
}
catch (Exception ex)
{
// 记录错误日志
logMessage.Error($"RPC 处理错误: {ex.Message}");
}
}
}
RPC 消息格式
请求格式
RPC 请求消息使用 JSON 格式,结构如下:
{
"device1": {
"variable1": "value1",
"variable2": "value2"
},
"device2": {
"variable3": "value3"
}
}
- 外层键:设备名称
- 内层键:变量名称
- 内层值:要写入的变量值
响应格式
RPC 响应消息包含操作结果:
{
"device1": {
"variable1": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
},
"variable2": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
}
},
"device2": {
"variable3": {
"OperCode": 0,
"IsSuccess": true,
"ErrorMessage": null,
"ErrorType": null
}
}
}
RPC 主题配置
请求主题
请求主题在配置属性中设置Rpc写入Topic,配置后实际格式为:
{RpcWriteTopic}/{requestId}
其中:
RpcWriteTopic:在插件配置中设置的 RPC 写入主题前缀requestId:请求的唯一标识,通常为 GUID
响应主题
响应主题自动生成,格式为:
{RpcWriteTopic}/{requestId}/Response
RPC 安全配置
| 配置项 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 允许 Rpc 写入 | 全局启用/禁用 RPC 写入 | False | 生产环境根据安全需求设置 |
| Rpc 写入主题 | RPC 请求主题前缀 | RpcWrite | 使用有意义的主题名称 |
| 启用 RPC | 变量级别的 RPC 启用控制 | true | 根据变量重要性设置 |
与 MqttClient 的兼容性
MqttServer 的 RPC 格式与 MqttClient 相同。详细格式说明请参考 MqttClient RPC 文档
常见问题与解决方案
连接问题
问题:客户端无法连接到 MQTT 服务器
可能原因:
- 网络防火墙阻止了 MQTT 端口
- 服务器 IP 或端口配置错误
- 客户端 ID 格式不正确
- 用户名密码认证失败
解决方案:
- 检查防火墙设置,确保 MQTT 端口(默认 1883)和 WebSocket 端口(默认 8083)已开放
- 验证服务器 IP 和端口配置是否正确
- 确保客户端 ID 唯一且符合要求
- 检查用户名密码是否正确配置
问题:连接不稳定,频繁断开
可能原因:
- 网络质量差
- KeepAlive 时间设置不合理
- 服务器负载过高
- 客户端资源限制
解决方案:
- 优化网络环境,确保网络稳定
- 调整 KeepAlive 时间,建议设置为 30-60 秒
- 增加服务器资源或优化服务器配置
- 检查客户端设备资源使用情况
性能问题
问题:服务器响应缓慢
可能原因:
- 并发连接数超过服务器处理能力
- 内存队列已满
- 磁盘 I/O 瓶颈
- 脚本执行效率低
解决方案:
- 增加服务器硬件资源
- 优化内存队列大小设置
- 使用高性能存储设备
- 优化脚本执行逻辑,减少复杂计算
问题:数据传输延迟高
可能原因:
- 网络延迟
- 批处理设置不合理
- 服务器负载高
解决方案:
- 优化网络环境
- 调整上传每页条数,平衡实时性和效率
- 增加服务器资源或负载均衡