跳到主要内容

RabbitMQProducer

提示

通过自定义脚本,可快速适配业务模型

概述

RabbitMQProducer 是 ThingsGateway 中用于适配 RabbitMQ 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 RabbitMQ 服务端。RabbitMQ 是一款功能强大的消息队列系统,支持多种消息协议,具有高可靠性、灵活的路由和扩展性,适合各种消息传递场景。

核心功能

  • 多实体发布:支持发布设备、变量和报警三种实体类型的数据
  • 灵活的上传模式:支持定时上传、变化上传或两者同时生效
  • 分组上传:支持按变量分组属性进行批量上传
  • 列表上传控制:可控制设备、变量和报警是否以列表形式上传
  • 动态Topic配置:支持使用变量属性动态生成Topic名称
  • 自定义脚本:支持通过脚本自定义数据处理和转换逻辑
  • 数据缓存:支持内存队列和文件缓存,确保数据可靠性
  • 交换机支持:支持通过交换机进行消息路由,实现更灵活的消息分发

应用场景

  • 物联网数据采集:将设备数据实时发布到 RabbitMQ,供下游系统处理
  • 实时监控系统:将设备状态和报警信息发布到 RabbitMQ,实现实时监控
  • 数据集成:作为数据集成的中间件,将设备数据转发到其他系统
  • 任务调度:基于设备数据触发下游系统的任务执行
  • 事件驱动架构:构建基于事件的系统架构,实现系统解耦

插件属性配置

配置界面

详细配置项

配置项说明默认值建议值
IPServerIP,为空时指任意IP-如 "127.0.0.1"
端口连接端口5672RabbitMQ 默认端口 5672,SSL 连接使用 5671
账号认证用户名guest生产环境建议创建专用账号
密码认证密码guest生产环境建议使用强密码
详细日志False=>日志输出上传数量,True=>日志输出上传内容False调试时可启用,生产环境建议关闭
是否声明队列初始化时是否自动创建队列True建议启用,确保队列存在
虚拟HostRabbitMQ 虚拟主机/可根据业务隔离需求设置不同的虚拟主机
交换机名称交换机名称-如 "amq.topic" 或自定义交换机名称
分组上传启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量False批量数据场景建议启用
选择全部变量是否选择全部变量,true时不需要单个变量添加业务属性False变量较多时建议启用
设备状态列表上传设备是否列表上传,false时每个设备实体都会单独发布True数据量大时建议启用,减少网络开销
变量列表上传变量是否列表上传,false时每个变量实体都会单独发布True数据量大时建议启用,减少网络开销
报警列表上传报警是否列表上传,false时每个报警实体都会单独发布True数据量大时建议启用,减少网络开销
设备Topic设备实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-"devices.${DeviceName}"
变量Topic变量实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-"variables.${DeviceName}.${Name}"
报警Topic报警实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-"alarms.${DeviceName}"
设备实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceBasicData-编辑页面中,可通过检查按钮验证脚本
变量实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData-编辑页面中,可通过检查按钮验证脚本
报警实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable-编辑页面中,可通过检查按钮验证脚本
上传模式数据上传模式:间隔/变化/变化和间隔同时生效间隔根据数据采集需求选择
定时上传间隔间隔执行时间(秒)10根据数据更新频率设置
严格入队模式启用后,每次定时上传时,保证一组数据在同一时间点可见-
启用缓存是否启用缓存False网络不稳定或数据量大时建议启用
缓存文件最大长度(mb)缓存文件最大长度100根据磁盘空间和数据量设置
上传每页条数每一次上传的列表最大数量1000根据RabbitMQ性能和网络带宽调整
内存队列最大数量内存队列的最大数量,超出或失败时转入文件缓存10000根据系统内存和数据量调整

脚本与实体

使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档

最佳实践

RabbitMQ 配置

  1. 服务配置

    • 生产环境建议使用 RabbitMQ 集群,提高可用性
    • 配置适当的内存和磁盘限制,避免资源耗尽
    • 启用监控插件,监控 RabbitMQ 的运行状态
  2. 交换机和队列设计

    • 根据业务需求选择合适的交换机类型:
      • Direct:精确匹配路由键
      • Topic:支持通配符的路由
      • Fanout:广播消息到所有绑定的队列
      • Headers:基于消息头进行路由
    • 为不同类型的消息创建专用队列,便于管理和监控
    • 为队列设置合理的 TTL(生存时间)和过期策略
  3. 连接管理

    • 使用连接池管理 RabbitMQ 连接,减少连接建立开销
    • 配置合理的连接超时和心跳间隔
    • 实现重连机制,确保网络中断后能够自动恢复连接

性能优化

  1. 批量处理

    • 启用列表上传,减少网络往返次数
    • 调整上传每页条数,平衡内存使用和吞吐量
  2. 缓存配置

    • 启用缓存功能,确保网络不稳定时数据不丢失
    • 合理设置内存队列大小和文件缓存大小
  3. 消息大小

    • 控制消息大小,避免发送过大的消息
    • 对于大型数据,考虑分块发送或使用引用方式
  4. 消息压缩

    • 对于较大的消息,考虑启用压缩,减少网络传输开销

可靠性保障

  1. 消息持久化

    • 配置队列和消息的持久化,确保 RabbitMQ 重启后消息不丢失
    • 设置适当的持久化级别,平衡性能和可靠性
  2. 确认机制

    • 使用发布确认机制,确保消息成功发送到 RabbitMQ
    • 实现消息重发机制,处理发送失败的情况
  3. 错误处理

    • 在脚本中实现完善的错误处理逻辑
    • 使用 try-catch 捕获异常,确保脚本稳定运行
    • 记录详细的错误日志,便于故障排查
  4. 安全配置

    • 创建专用的 RabbitMQ 用户,设置最小权限
    • 启用 SSL/TLS 加密,保护消息传输安全
    • 定期更新密码和访问控制列表

监控与维护

  1. 监控指标

    • 监控 RabbitMQ 的队列长度、消息速率和消费者数量
    • 监控连接数和通道数,避免资源耗尽
    • 监控磁盘空间使用情况,避免磁盘满导致服务不可用
  2. 日志管理

    • 配置合理的日志级别,平衡详细程度和性能
    • 定期清理日志文件,避免磁盘空间耗尽
    • 实现日志聚合,便于集中分析和监控
  3. 定期维护

    • 定期检查和清理未使用的队列和交换机
    • 监控并优化 RabbitMQ 的性能参数
    • 定期备份 RabbitMQ 的配置和数据

故障排查

常见问题及解决方案

问题可能原因解决方案
消息发布失败1. RabbitMQ 服务不可用
2. 网络连接问题
3. 认证失败
4. 队列不存在
1. 检查 RabbitMQ 服务状态
2. 检查网络连接和防火墙设置
3. 验证认证信息是否正确
4. 启用"是否声明队列"选项
消息发布速度慢1. RabbitMQ 集群负载高
2. 网络带宽不足
3. 批量大小设置不合理
4. 消息持久化开销大
1. 监控 RabbitMQ 集群性能
2. 检查网络带宽使用情况
3. 调整上传每页条数和批量大小
4. 评估持久化级别是否合理
缓存文件过大1. RabbitMQ 服务持续不可用
2. 缓存大小设置不合理
1. 解决 RabbitMQ 服务问题
2. 调整缓存文件最大长度
脚本执行失败1. 脚本语法错误
2. 脚本逻辑错误
3. 数据类型不匹配
1. 检查脚本语法
2. 查看日志中的错误信息
3. 确保脚本中对数据类型进行适当处理
消息路由失败1. 交换机不存在
2. 路由键配置错误
3. 绑定关系不正确
1. 确保交换机已创建
2. 检查路由键格式是否正确
3. 验证交换机和队列的绑定关系

日志分析

当遇到问题时,建议查看以下日志:

  1. ThingsGateway 日志:查看与 RabbitMQProducer 相关的日志,了解插件的运行状态和错误信息
  2. RabbitMQ 日志:查看 RabbitMQ 服务器的日志,了解服务端的状态和错误信息

总结

RabbitMQProducer 是 ThingsGateway 中功能强大的 RabbitMQ 消息发布插件,通过合理配置和优化,可以为物联网系统提供高性能、可靠的消息发布方案。在实际应用中,应根据具体场景和需求,选择合适的配置参数、交换机类型和队列策略,以达到最佳的性能和可靠性。同时,结合自定义脚本,可以实现灵活的数据处理和转换,满足各种业务需求。