RabbitMQProducer
提示通过自定义脚本,可快速适配业务模型
概述
RabbitMQProducer 是 ThingsGateway 中用于适配 RabbitMQ 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 RabbitMQ 服务端。RabbitMQ 是一款功能强大的消息队列系统,支持多种消息协议,具有高可靠性、灵活的路由和扩展性,适合各种消息传递场景。
核心功能
- 多实体发布:支持发布设备、变量和报警三种实体类型的数据
- 灵活的上传模式:支持定时上传、变化上传或两者同时生效
- 分组上传:支持按变量分组属性进行批量上传
- 列表上传控制:可控制设备、变量和报警是否以列表形式上传
- 动态Topic配置:支持使用变量属性动态生成Topic名称
- 自定义脚本:支持通过脚本自定义数据处理和转换逻辑
- 数据缓存:支持内存队列和文件缓存,确保数据可靠性
- 交换机支持:支持通过交换机进行消息路由 ,实现更灵活的消息分发
应用场景
- 物联网数据采集:将设备数据实时发布到 RabbitMQ,供下游系统处理
- 实时监控系统:将设备状态和报警信息发布到 RabbitMQ,实现实时监控
- 数据集成:作为数据集成的中间件,将设备数据转发到其他系统
- 任务调度:基于设备数据触发下游系统的任务执行
- 事件驱动架构:构建基于事件的系统架构,实现系统解耦
插件属性配置
配置界面
详细配置项
| 配置项 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| IP | ServerIP,为空时指任意IP | - | 如 "127.0.0.1" |
| 端口 | 连接端口 | 5672 | RabbitMQ 默认端口 5672,SSL 连接使用 5671 |
| 账号 | 认证用户名 | guest | 生产环境建议创建专 用账号 |
| 密码 | 认证密码 | guest | 生产环境建议使用强密码 |
| 详细日志 | False=>日志输出上传数量,True=>日志输出上传内容 | False | 调试时可启用,生产环境建议关闭 |
| 是否声明队列 | 初始化时是否自动创建队列 | True | 建议启用,确保队列存在 |
| 虚拟Host | RabbitMQ 虚拟主机 | / | 可根据业务隔离需求设置不同的虚拟主机 |
| 交换机名称 | 交换机名称 | - | 如 "amq.topic" 或自定义交换机名称 |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 | False | 批量数据场景建议启用 |
| 选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | False | 变量较多时建议启用 |
| 设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 设备Topic | 设备实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 "devices.${DeviceName}" |
| 变量Topic | 变量实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 "variables.${DeviceName}.${Name}" |
| 报警Topic | 报警实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 "alarms.${DeviceName}" |
| 设备实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceBasicData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 变量实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 报警实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable | - | 编辑页面中,可通过检查按钮验证脚本 |
| 上传模式 | 数据上传模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据数据采集需求选择 |
| 定时上传间隔 | 间隔执行时间(秒) | 10 | 根据数据更新频率设置 |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - | |
| 启用缓存 | 是否启用缓存 | False | 网络不稳定或数据量大时建议启用 |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | 100 | 根据磁盘空间和数据量设置 |
| 上传每页条数 | 每一次上传的列表最大数量 | 1000 | 根据RabbitMQ性能和网络带宽调整 |
| 内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存 | 10000 | 根据系统内存和数据量调整 |
脚本与实体
使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档。
最佳实践
RabbitMQ 配置
-
服务配置
- 生产环境建议使用 RabbitMQ 集群,提高可用性
- 配置适当的内存和磁盘限制,避免资源耗尽
- 启用监控插件,监控 RabbitMQ 的运行状态
-
交换机和队列设计
- 根据业务需求选择合适的交换机类型:
- Direct:精确匹配路由键
- Topic:支持通配符的路由
- Fanout:广播消息到所有绑定的队列
- Headers:基于消息头进行路由
- 为不同类型的消息创建专用队列,便于管理和监控
- 为队列设置合理的 TTL(生存时间)和过期策略
- 根据业务需求选择合适的交换机类型:
-
连接管理
- 使用连接池管理 RabbitMQ 连接,减少连接建立开销
- 配置合理的连接超时和心跳间隔
- 实现重连机制,确保网络中断后能够自动恢复连接
性能优化
-
批量处理
- 启用列表上传,减少网络往返次数
- 调整上传每页条数,平衡内存使用和吞吐量
-
缓存配置
- 启用缓存功能,确保网络不稳定时数据不丢失
- 合理设置内存队列大小和文件缓存大小
-
消息大小
- 控制消息大小,避免发送过大的消息
- 对于大型数据,考虑分块发送或使用引用方式
-
消息压缩
- 对于较大的消息,考虑启用压缩,减少网络传输开销
可靠性保障
-
消息持久化
- 配置队列和消息的持久化,确保 RabbitMQ 重启后消息不丢失
- 设置适当的持久化级别,平衡性能和可靠性
-
确认机制
- 使用发布确认机制,确保消息成功发送到 RabbitMQ
- 实现消息重发机制,处理发送失败的情况
-
错误处理
- 在脚本中实现完善的错误处理逻辑
- 使用 try-catch 捕获异常,确保脚本稳定运行
- 记录详细的错误日志,便于故障排查
-
安全配置
- 创建专用的 RabbitMQ 用户,设置最小权限
- 启用 SSL/TLS 加密,保护消息传输安全
- 定期更新密码和访问控制列表
监控与维护
-
监控指标
- 监控 RabbitMQ 的队列长度、消息速率和消费者数量
- 监控连接数和通道数,避免资源耗尽
- 监控磁盘空间使用情况,避免磁盘满导致服务不可用
-
日志管理
- 配置合理的日志级别,平衡详细程度和性能
- 定期清理日志文件,避免磁盘空间耗尽
- 实现日志聚合,便于集中分析和监控
-
定期维护
- 定期检查和清理未使用的队列和交换机
- 监控并优化 RabbitMQ 的性能参数
- 定期备份 RabbitMQ 的配置和数据
故障排查
常见问题及解决方案
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 消息发布失败 | 1. RabbitMQ 服务不可用 2. 网络连接问题 3. 认证失败 4. 队列不存在 | 1. 检查 RabbitMQ 服务状态 2. 检查网络连接和防火墙设置 3. 验证认证信息是否正确 4. 启用"是否声明队列"选项 |
| 消息发布速度慢 | 1. RabbitMQ 集群负载高 2. 网络带宽不足 3. 批量大小设置不合理 4. 消息持久化开销大 | 1. 监控 RabbitMQ 集群性能 2. 检查网络带宽使用情况 3. 调整上传每页条数和批量大小 4. 评估持久化级别是否合理 |
| 缓存文件过大 | 1. RabbitMQ 服务持续不可用 2. 缓存大小设置不合理 | 1. 解决 RabbitMQ 服务问题 2. 调整缓存文件最大长度 |
| 脚本执行失败 | 1. 脚本语法错误 2. 脚本逻辑错误 3. 数据类型不匹配 | 1. 检查脚本语法 2. 查看日志中的错误信息 3. 确保脚本中对数据类型进行适当处理 |
| 消息路由失败 | 1. 交换机不存在 2. 路由键配置错误 3. 绑定关系不正确 | 1. 确保交换机已创建 2. 检查路由键格式是否正确 3. 验证交换机和队列的绑定关系 |
日志分析
当遇到问题时,建议查看以下日志:
- ThingsGateway 日志:查看与 RabbitMQProducer 相关的日志,了解插件的运行状态和错误信息
- RabbitMQ 日志:查看 RabbitMQ 服务器的日志,了解服务端的状态和错误信息