MqttClient
提示通过自定义脚本,可快速适配业务模型,比如各大云平台的Iot物模型
概述
MqttClient 是一个基于 MQTT 协议的客户端插件,通过 TCP 或 WebSocket 方式与 MQTT Broker 建立连接,实现数据的定时发布或变化触发发布。该插件采用异步架构设计,支持多种安全配置和发布模式,适用于各种物联网场景的数据传输需求。
核心功能
- 协议支持:完整支持 MQTT v3.1.1 和 MQTT v5.0 协议规范
- 连接方式:支持标准 TCP 连接和 WebSocket 连接,满足不同网络环境需求
- 安全机制:支持 TLS/SSL 加密传输,支持 CA 证书验证和客户端证书认证
- 发布策略:支持三种发布模式:定时发布、变化触发发布、变化和定时结合发布
- 数据组织:支持按变量分组属性上传数据,提高数据传输效率
- 远程控制:内置 RPC 支持,可通过 MQTT 消息实现远程变量写入和设备控制
- 平台适配:通过自定义脚本支持各种云平台物模型(ThingsBoard、阿里云、华为云等)
- 可靠性:内置断线重连机制,支持消息缓存和离线恢复
- 可扩展性:提供丰富的脚本接口,支持自定义数据处理逻辑
技术架构
- 异步通信:基于 .NET 异步编程模型,提供高性能的数据传输
- 消息队列:内置内存队列和文件缓存,确保数据可靠性
- 模块化设计:插件化架构,易于集成和扩展
- 配置灵活:支持详细的连接参数和发布参数配置
适用场景
- 物联网设备数据上报:传感器数据、设备状态等信息的实时上传
- 工业自动化数据采集:工业设备运行数据、生产指标的采集和传输
- 云平台集成:与 ThingsBoard、阿里云 IoT、华为云 IoT 等平台的无缝集成
- 实时监控系统:需要实时数据更新的监控场景
- 边缘计算:边缘设备数据处理和云端同步
- 传感器网络:分布式传感器网络的数据汇总和传输
- 远程控制:通过 MQTT 实现设备的远程参数配置和控制
插件属性配置项
基本配置
| 属性 | 说明 | 备注 |
|---|---|---|
| IP | Server IP | - |
| 端口 | 连接端口 | 1883 |
| 详细日志 | False=>日志输出上传数量,True=>日志输出上传内容 | False |
| TLS | 启用 SSL/TLS | False |
| CA 文件 | CA File | - |
| 客户端证书 | Client Certificate File | - |
| 客户端 key 文件 | Client Key File | - |
| Websocket | 是否 WebSocket 连接 | False |
| WebSocketUrl | WebSocket Url | ws://127.0.0.1:8083/mqtt |
| 用户名 | 账号 | - |
| 密码 | 密码 | - |
| 连接 Id | 连接 Id | - |
| 连接超时时间 | 连接超时时间(毫秒) | - |
| 允许 Rpc 写入 | 是否允许写入变量 | - |
| Rpc 写入主题 | 写入变量的主题 | 如果检测适配固定的 topic 标识,会按默认规则返回,比如thingsboard平台为v1/gateway/rpc 。默认规则为:固定通配 RpcWrite/+ ,其中 RpcWrite 为该属性填入内容,+ 通配符是请求 GUID 值;返回结果主题会在主题后添加 Response , 也就是RpcWrite/+/Response |
| Rpc 请求数据主题 | 该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中 | - |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为 key 分组的全部变量 。在变化模式时,每次变量变化都会触发一次组上传 | False |
| 选择全部变量 | 选择全部变量 | False |
| 设备状态列表上传 | 设备是否列表上传,false 时每个设备实体都会单独发布,注意性能需求,默认为 true | - |
| 变量列表上传 | 变量是否列表上传,false 时每个变量实体都会单独发布,注意性能需求,默认为 true | - |
| 报警列表上传 | 报警是否列表上传,false 时每个报警实体都会单独发布,注意性能需求,默认为 true | - |
| 设备 Topic | 设备实体的发布主题 ,使用${key}作为匹配项,key 必须是上传实体中的属性 | - |
| 变量 Topic | 变量实体的发布主题 ,使用${key}作为匹配项,key 必须是上传实体中的属性 | - |
| 报警 Topic | 报警实体的发布主题 ,使用${key}作为匹配项,key 必须是上传实体中的属性 | - |
| 设备实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceData,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 变量实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 报警实 体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
| 选择全部变量 | 是否选择全部变量,true 时不需要单个变量添加业务属性 | - |
| 上传模式 | 间隔/变化/变化和间隔同时生效 | - |
| 定时上传间隔 | 间隔执行时间 | - |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - |
| 启用缓存 | 是否启用缓存 | - |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | - |
| 上传每页条数 | 每一次上传的列表最大数量 | - |
| 内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值 | - |
变量业务属性
基本配置
| 属性 | 说明 | 备注 |
|---|---|---|
| 启用 RPC | 单独配置变量是否允许写入 | true |
| Enable | 是否启用 | true |
RPC 远程控制
概述
MqttClient 支持通过 MQTT 协议实现远程过程调用(RPC),允许从 MQTT Broker 向设备发送控制命令,实现变量写入和设备操作。该功能支持标准 MQTT RPC 模式和各大云平台的 RPC 协议格式。
提示如果检测到适配固定的 topic 标识,会按默认规则返回,比如thingsboard平台的 rpc 主题为v1/gateway/rpc
下面说明为 ThingsGateway 默认规则
1. RPC 请求格式
MqttRpc 的请求内容与 WebApi 一致,请求参数为Dictionary<string,Dictionary<string, string>>,示例:
{
"modbusDevice650922399363167":
{
"modbus41":"1",
"modbus42":"2"
}
}
- 外层键:设备名称
- 内层键:变量名称
- 内层值:要写入的变量值
2. RPC 主题 配置
请求主题
请求主题在配置属性中设置Rpc写入Topic,配置后实际格式为:
{Rpc写入Topic}/{请求ID}
其中:
Rpc写入Topic:在插件配置中设置的基础主题请求ID:自定义 GUID 或雪花 ID,用于标识请求
响应主题
响应主题自动生成,格式为:
{Rpc写入Topic}/{请求ID}/Response
3. RPC 响应格式
响应内容包含操作结果信息:
public int OperCode { get; set; } // 操作代码,0 表示成功
public bool IsSuccess => OperCode == null || OperCode == 0; // 是否成功
public string? ErrorMessage { get; set; } // 错误信息
4. RPC 安全配置
| 参数 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 允许 Rpc 写入 | 全局启用/禁用 RPC 写入 | true | 生产环境建议启用 |
| 启用 RPC | 变量级别的 RPC 启用控制 | true | 根据变量重要性设置 |
| Rpc 写入主题 | RPC 请求主题前缀 | RpcWrite | 使用有意义的主题名称 |
5. Rpc 脚本
脚本类定义
using MQTTnet;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttClientRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttClient">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="tryMqttClientAsync">尝试连接</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, MqttApplicationMessageReceivedEventArgs args, MqttClientProperty driverPropertys, MQTTnet.IMqttClient mqttClient, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, Func<CancellationToken, ValueTask<OperResult>> tryMqttClientAsync, CancellationToken cancellationToken)
{
}
}
RPC 脚本 demo
using MQTTnet;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
using ThingsGateway.Plugin.Mqtt;
public class DynamicMqttClientRpc:DynamicMqttClientRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttClient">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="tryMqttClientAsync">尝试连接</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public override async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, MqttApplicationMessageReceivedEventArgs args, MqttClientProperty driverPropertys, MQTTnet.IMqttClient mqttClient, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, Func<CancellationToken, ValueTask<OperResult>> tryMqttClientAsync, CancellationToken cancellationToken)
{
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
if(args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var isConnect = await tryMqttClientAsync(CancellationToken.None).ConfigureAwait(false);
if (isConnect.IsSuccess)
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttClient.PublishAsync(variableMessage, cancellationToken).ConfigureAwait(false);
}
}
catch
{
}
}
}