跳到主要内容

KafkaProducer

提示

通过自定义脚本,可快速适配业务模型

须知

插件使用librdkafka,注意按需安装c库

On Mac OSX, install librdkafka with homebrew:

$ brew install librdkafka

On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:

$ apt install librdkafka-dev

On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, instructions here and then install librdkafka:

$ yum install librdkafka-devel

For other platforms, follow the source building instructions below.

概述

KafkaProducer 是 ThingsGateway 中用于适配 Kafka 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 Kafka 服务端。Kafka 是一款分布式流处理平台,具有高吞吐量、高可靠性和可扩展性,适合处理海量数据流。

核心功能

  • 多实体发布:支持发布设备、变量和报警三种实体类型的数据
  • 灵活的上传模式:支持定时上传、变化上传或两者同时生效
  • 分组上传:支持按变量分组属性进行批量上传
  • 列表上传控制:可控制设备、变量和报警是否以列表形式上传
  • 动态Topic配置:支持使用变量属性动态生成Topic名称
  • 自定义脚本:支持通过脚本自定义数据处理和转换逻辑
  • 数据缓存:支持内存队列和文件缓存,确保数据可靠性

应用场景

  • 物联网数据采集:将设备数据实时发布到 Kafka,供下游系统处理
  • 实时监控系统:将设备状态和报警信息发布到 Kafka,实现实时监控
  • 数据集成:作为数据集成的中间件,将设备数据转发到其他系统
  • 大数据分析:将设备数据发送到 Kafka,供大数据平台进行分析

插件属性配置

配置界面

详细配置项

配置项说明默认值建议值
服务地址Kafka 服务地址127.0.0.1:9092多个地址用逗号分隔,如 "127.0.0.1:9092,127.0.0.1:9093"
发布超时时间发布消息的超时时间(毫秒)5000根据网络状况调整,建议 3000-10000
用户名Kafka 认证用户名-启用认证时设置
密码Kafka 认证密码-启用认证时设置
SecurityProtocol安全协议配置-如 "SASL_SSL"、"PLAINTEXT" 等
SaslMechanismSASL 认证机制-如 "PLAIN"、"SCRAM-SHA-256" 等
分组上传启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量False批量数据场景建议启用
选择全部变量是否选择全部变量,true时不需要单个变量添加业务属性False变量较多时建议启用
设备状态列表上传设备是否列表上传,false时每个设备实体都会单独发布True数据量大时建议启用,减少网络开销
变量列表上传变量是否列表上传,false时每个变量实体都会单独发布True数据量大时建议启用,减少网络开销
报警列表上传报警是否列表上传,false时每个报警实体都会单独发布True数据量大时建议启用,减少网络开销
设备Topic设备实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-devices/${DeviceName}
变量Topic变量实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-variables/${DeviceName}/${Name}
报警Topic报警实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性-alarms/${DeviceName}
设备实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceData-编辑页面中,可通过检查按钮验证脚本
变量实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData-编辑页面中,可通过检查按钮验证脚本
报警实体脚本脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable-编辑页面中,可通过检查按钮验证脚本
上传模式数据上传模式:间隔/变化/变化和间隔同时生效间隔根据数据采集需求选择
定时上传间隔间隔执行时间(秒)10根据数据更新频率设置
严格入队模式启用后,每次定时上传时,保证一组数据在同一时间点可见-
启用缓存是否启用缓存False网络不稳定或数据量大时建议启用
缓存文件最大长度(mb)缓存文件最大长度100根据磁盘空间和数据量设置
上传每页条数每一次上传的列表最大数量1000根据Kafka性能和网络带宽调整
内存队列最大数量内存队列的最大数量,超出或失败时转入文件缓存10000根据系统内存和数据量调整

脚本与实体

使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档

最佳实践

Kafka 集群配置

  1. 服务地址配置

    • 配置多个 Kafka broker 地址,提高可靠性
    • 格式:host1:port1,host2:port2,host3:port3
  2. Topic 设计

    • 按实体类型和业务领域划分 Topic
    • 使用动态 Topic 配置,如 devices/${DeviceName}
    • 为不同重要性的数据设置不同的 Topic
  3. 分区策略

    • 根据数据量和并发需求设置适当的分区数
    • 分区数建议为 broker 数量的 2-4 倍

性能优化

  1. 批量处理

    • 启用列表上传,减少网络往返次数
    • 调整上传每页条数,平衡内存使用和吞吐量
  2. 缓存配置

    • 启用缓存功能,确保网络不稳定时数据不丢失
    • 合理设置内存队列大小和文件缓存大小
  3. 数据压缩

    • 启用 Kafka 的数据压缩功能,减少网络传输开销
    • 建议使用 lz4 或 snappy 压缩算法
  4. 连接管理

    • 复用 Kafka 连接,减少连接建立开销
    • 配置合理的连接超时和重试机制

可靠性保障

  1. 数据一致性

    • 配置适当的 acks 参数,确保消息可靠送达
    • 建议生产环境使用 acks=all
  2. 错误处理

    • 在脚本中实现完善的错误处理逻辑
    • 使用 try-catch 捕获异常,确保脚本稳定运行
  3. 监控与告警

    • 监控 Kafka 集群的运行状态
    • 监控消息生产和消费的延迟
    • 设置告警机制,及时发现和解决问题
  4. 安全配置

    • 启用 SASL 认证,保护 Kafka 集群
    • 使用 SSL/TLS 加密,确保数据传输安全

故障排查

常见问题及解决方案

问题可能原因解决方案
消息发布失败1. Kafka 服务不可用
2. 网络连接问题
3. 认证失败
1. 检查 Kafka 服务状态
2. 检查网络连接和防火墙设置
3. 验证认证信息是否正确
消息发布速度慢1. Kafka 集群负载高
2. 网络带宽不足
3. 批量大小设置不合理
1. 监控 Kafka 集群性能
2. 检查网络带宽使用情况
3. 调整上传每页条数和批量大小
缓存文件过大1. Kafka 服务持续不可用
2. 缓存大小设置不合理
1. 解决 Kafka 服务问题
2. 调整缓存文件最大长度
脚本执行失败1. 脚本语法错误
2. 脚本逻辑错误
1. 检查脚本语法
2. 查看日志中的错误信息
Topic 不存在1. Topic 未创建
2. 权限不足
1. 确保 Topic 已创建
2. 验证用户权限

日志分析

当遇到问题时,建议查看 ThingsGateway 的日志文件,特别是与 KafkaProducer 相关的日志。日志中通常会包含详细的错误信息,帮助定位问题。

总结

KafkaProducer 是 ThingsGateway 中功能强大的 Kafka 消息发布插件,通过合理配置和优化,可以为物联网系统提供高性能、可靠的数据发布方案。在实际应用中,应根据具体场景和需求,选择合适的配置参数和优化策略,以达到最佳的性能和可靠性。