Mqtt采集
概述
MqttCollect 驱动程序可用于Mqtt服务器中 获取通过 MQTT 协议发送的数据。变量地址中指定主题,当接收到这些主题的更新时,会对负载进行分析,并更新变量值。
主要特性
- 协议支持:支持 MQTT v3.1.1 和 MQTT v5.0 协议
- 通信方式:支持 TCP 和 WebSocket 连接
- 主题订阅:支持通过变量地址订阅多个主题
- 负载解析:支持 JSON 格式负载解析
- 条件过滤:支持基于条件的消息过滤
- 高性能:基于异步架构,提供高性能的数据访问
适用场景
- IoT 设备数据采集
- 传感器数据采集
- 实时数据监控
- 消息队列集成
- 边缘计算数据采集
插件属性配置项
基本配置
| 属性 | 说明 | 备注 |
|---|---|---|
| IP | Server IP,为空时指任意 IP | - |
| 端口 | 连接端口 | 1883 |
| 是否 WebSocket 连接 | 是否 WebSocket 连接 | False |
| WebSocket Url | WebSocket Url | ws://127.0.0.1:8083/mqtt |
| 账号 | 账号 | - |
| 密码 | 密码 | - |
| 连接 Id | 连接 Id | - |
| 连接超时时间 | 连接超时时间(毫秒) | - |
变量地址配置项
基本格式
变量地址格式:
${mqtt_topic};${payload_item};${Condition}
- mqtt_topic:MQTT 主题
- payload_item:负载中的字段路径
- Condition:条件表达式(可选)
主题配置
主题示例:
vendor/device
负载解析
基本负载示例
{
"ModuleUnoccupied": {
"EquipId":"E12",
"CarrierId": "C12",
"SubstrateLocId": "S12",
"LotId": 1,
"DesignId": "D12",
"EventTime": "12322131"
}
}
基本解析示例
vendor/device;ModuleUnoccupied.EquipId
结果:"E12"
条件过滤示例
vendor/device;ModuleUnoccupied.EquipId;((JToken)raw).SelectToken("ModuleUnoccupied.LotId").ToString().ToInt()==1
结果:当 LotId 等于 1 时,值为 "E12"
复杂负载解析
数组负载示例
{
"sensors": [
{"id": 1, "value": 100},
{"id": 2, "value": 200}
]
}
数组 解析示例
vendor/device;sensors[0].value
结果:100
嵌套对象示例
{
"device": {
"status": {
"temperature": 25.5,
"humidity": 60
}
}
}
嵌套解析示例
vendor/device;device.status.temperature
结果:25.5
常见问题
Q1: 连接 MQTT 服务器失败怎么办?
A: 检查以下几点:
- 确认 MQTT 服务器 IP 和端口配置正确
- 检查网络连接是否正常
- 确认账号密码配置正确
- 检查 MQTT 服务器是否运行
- 确认防火墙设置
Q2: 无法解析消息负载怎么办?
A: 检查以下几点:
- 确认负载格式为有效的 JSON
- 确认变量地址中的字段路径正确
- 检查 JSON 字段名称是否正确
- 检查数据类型是否匹配
Q3: 条件表达 式不生效怎么办?
A: 检查以下几点:
- 确认条件表达式语法正确
- 确认 JSON 路径正确
- 检查数据类型转换是否正确
- 测试条件表达式是否返回预期结果
Q4: 支持哪些 MQTT 版本?
A: 支持 MQTT v3.1.1 和 MQTT v5.0 协议。
Q5: 如何提高消息处理性能?
A: 采用以下方法:
- 使用 QoS 0 减少网络流量
- 合理设置消息批量处理
- 优化 JSON 结构,减少嵌套深度
- 使用适当的数据类型
Q6: 如何处理断线重连?
A: MqttCollect 会自动处理断线重连,无需额外配置。