跳到主要内容

Mqtt采集

概述

MqttCollect 驱动程序可用于Mqtt服务器中 获取通过 MQTT 协议发送的数据。变量地址中指定主题,当接收到这些主题的更新时,会对负载进行分析,并更新变量值。

主要特性

  • 协议支持:支持 MQTT v3.1.1 和 MQTT v5.0 协议
  • 通信方式:支持 TCP 和 WebSocket 连接
  • 主题订阅:支持通过变量地址订阅多个主题
  • 负载解析:支持 JSON 格式负载解析
  • 条件过滤:支持基于条件的消息过滤
  • 高性能:基于异步架构,提供高性能的数据访问

适用场景

  • IoT 设备数据采集
  • 传感器数据采集
  • 实时数据监控
  • 消息队列集成
  • 边缘计算数据采集

插件属性配置项

基本配置

属性说明备注
IPServer IP,为空时指任意 IP-
端口连接端口1883
是否 WebSocket 连接是否 WebSocket 连接False
WebSocket UrlWebSocket Urlws://127.0.0.1:8083/mqtt
账号账号-
密码密码-
连接 Id连接 Id-
连接超时时间连接超时时间(毫秒)-

变量地址配置项

基本格式

变量地址格式:

${mqtt_topic};${payload_item};${Condition}
  • mqtt_topic:MQTT 主题
  • payload_item:负载中的字段路径
  • Condition:条件表达式(可选)

主题配置

主题示例:

vendor/device

负载解析

基本负载示例

{
"ModuleUnoccupied": {
"EquipId":"E12",
"CarrierId": "C12",
"SubstrateLocId": "S12",
"LotId": 1,
"DesignId": "D12",
"EventTime": "12322131"
}
}

基本解析示例

vendor/device;ModuleUnoccupied.EquipId

结果:"E12"

条件过滤示例

vendor/device;ModuleUnoccupied.EquipId;((JToken)raw).SelectToken("ModuleUnoccupied.LotId").ToString().ToInt()==1

结果:当 LotId 等于 1 时,值为 "E12"

复杂负载解析

数组负载示例

{
"sensors": [
{"id": 1, "value": 100},
{"id": 2, "value": 200}
]
}

数组解析示例

vendor/device;sensors[0].value

结果:100

嵌套对象示例

{
"device": {
"status": {
"temperature": 25.5,
"humidity": 60
}
}
}

嵌套解析示例

vendor/device;device.status.temperature

结果:25.5

常见问题

Q1: 连接 MQTT 服务器失败怎么办?

A: 检查以下几点:

  1. 确认 MQTT 服务器 IP 和端口配置正确
  2. 检查网络连接是否正常
  3. 确认账号密码配置正确
  4. 检查 MQTT 服务器是否运行
  5. 确认防火墙设置

Q2: 无法解析消息负载怎么办?

A: 检查以下几点:

  1. 确认负载格式为有效的 JSON
  2. 确认变量地址中的字段路径正确
  3. 检查 JSON 字段名称是否正确
  4. 检查数据类型是否匹配

Q3: 条件表达式不生效怎么办?

A: 检查以下几点:

  1. 确认条件表达式语法正确
  2. 确认 JSON 路径正确
  3. 检查数据类型转换是否正确
  4. 测试条件表达式是否返回预期结果

Q4: 支持哪些 MQTT 版本?

A: 支持 MQTT v3.1.1 和 MQTT v5.0 协议。

Q5: 如何提高消息处理性能?

A: 采用以下方法:

  1. 使用 QoS 0 减少网络流量
  2. 合理设置消息批量处理
  3. 优化 JSON 结构,减少嵌套深度
  4. 使用适当的数据类型

Q6: 如何处理断线重连?

A: MqttCollect 会自动处理断线重连,无需额外配置。

Q7: 如何订阅多个主题?

A: 通过创建多个变量,每个变量使用不同的主题:

// 变量 1
vendor/device1;temperature

// 变量 2
vendor/device2;temperature

Q8: 如何处理复杂的 JSON 结构?

A: 使用点号和方括号访问嵌套字段和数组元素:

// 访问嵌套字段
device.status.temperature

// 访问数组元素
sensors[0].value