KafkaProducer
提示通过自定义脚本,可快速适配业务模型
须知插件使用librdkafka,注意按需安装c库
On Mac OSX, install librdkafka with homebrew:
$ brew install librdkafka
On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:
$ apt install librdkafka-dev
On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, instructions here and then install librdkafka:
$ yum install librdkafka-devel
For other platforms, follow the source building instructions below.
概述
KafkaProducer 是 ThingsGateway 中用于适配 Kafka 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 Kafka 服务端。Kafka 是一款分布式流处理平台,具有高吞吐量、高可靠性和可扩展性,适合处理海量数据流。
核心功能
- 多实体发布:支持发布设备、变量和报警三种实体类型的数据
- 灵活的上传模式:支持定时上传、变化上传或两者同时生效
- 分组上传:支持按变量分组属性进行批量上传
- 列表上传控制:可控制设备、变量和报警是否以列表形式上传
- 动态Topic配置:支持使用变量属性动态 生成Topic名称
- 自定义脚本:支持通过脚本自定义数据处理和转换逻辑
- 数据缓存:支持内存队列和文件缓存,确保数据可靠性
应用场景
- 物联网数据采集:将设备数据实时发布到 Kafka,供下游系统处理
- 实时监控系统:将设备状态和报警信息发布到 Kafka,实现实时监控
- 数据集成:作为数据集成的中间件,将设备数据转发到其他系统
- 大数据分析:将设备数据发送到 Kafka,供大数据平台进行分析
插件属性配置
配置界面
详细配置项
| 配置项 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 服务地址 | Kafka 服务地址 | 127.0.0.1:9092 | 多个地址用逗号分隔,如 "127.0.0.1:9092,127.0.0.1:9093" |
| 发布超时时间 | 发布消息的超时时间(毫秒) | 5000 | 根据网络状况调整,建议 3000-10000 |
| 用户名 | Kafka 认证用户名 | - | 启用认证时设置 |
| 密码 | Kafka 认证密码 | - | 启用认证时设置 |
| SecurityProtocol | 安全协议配置 | - | 如 "SASL_SSL"、"PLAINTEXT" 等 |
| SaslMechanism | SASL 认证机制 | - | 如 "PLAIN"、"SCRAM-SHA-256" 等 |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 | False | 批量数据场景建议启用 |
| 选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | False | 变量较多时建议启用 |
| 设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 设备Topic | 设备实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 devices/${DeviceName} |
| 变量Topic | 变量实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 variables/${DeviceName}/${Name} |
| 报警Topic | 报警实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 alarms/${DeviceName} |
| 设备实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 变量实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 报警实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable | - | 编辑页面中,可通过检查按钮验证脚本 |
| 上传模式 | 数据上传模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据数据采集需求选择 |
| 定时上传间隔 | 间隔执行时间(秒) | 10 | 根据数据更新频率设置 |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - | |
| 启用缓存 | 是否启用缓存 | False | 网络不稳定或数据量大时建议启用 |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | 100 | 根据磁盘空间和数据量设置 |
| 上传每页条数 | 每一次上传的列表最大数量 | 1000 | 根据Kafka性能和网络带宽调整 |
| 内存队列最大数量 | 内存队列的 最大数量,超出或失败时转入文件缓存 | 10000 | 根据系统内存和数据量调整 |
脚本与实体
使用与 MqttClient 相同的脚本接口,通过实现 DynamicModelBase 接口来自定义数据处理逻辑。详细格式说明请参考 MqttClient文档。
最佳实践
Kafka 集群配置
-
服务地址配置
- 配置多个 Kafka broker 地址,提高可靠性
- 格式:
host1:port1,host2:port2,host3:port3
-
Topic 设计
- 按实体类型和业务领域划分 Topic
- 使用动态 Topic 配置,如
devices/${DeviceName} - 为不同重要性的数据设置不同的 Topic
-
分区策略
- 根据数据量和并发需求设置适当的分区数
- 分区数建议为 broker 数量的 2-4 倍
性能优化
-
批量 处理
- 启用列表上传,减少网络往返次数
- 调整上传每页条数,平衡内存使用和吞吐量
-
缓存配置
- 启用缓存功能,确保网络不稳定时数据不丢失
- 合理设置内存队列大小和文件缓存大小
-
数据压缩
- 启用 Kafka 的数据压缩功能,减少网络传输开销
- 建议使用 lz4 或 snappy 压缩算法
-
连接管理
- 复用 Kafka 连接,减少连接建立开销
- 配置合理的连接超时和重试机制
可靠性保障
-
数据一致性
- 配置适当的 acks 参数,确保消息可靠送达
- 建议生产环境使用
acks=all
-
错误处理
- 在脚本中实现完善的错误处理逻辑
- 使用 try-catch 捕获异常,确保脚本稳定运行
-
监控与告警
- 监控 Kafka 集群的运行状态
- 监控消息生产和消费的延迟
- 设置告警机制,及时发现和解决问题
-
安全配置
- 启用 SASL 认证,保护 Kafka 集群
- 使用 SSL/TLS 加密,确保数据传输安全
故障排查
常见问题及解决方案
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 消息发布失败 | 1. Kafka 服务不可用 2. 网络连接问题 3. 认证失败 | 1. 检查 Kafka 服务状态 2. 检查网络连接和防火墙设置 3. 验证认证信息是否正确 |
| 消息发布速度慢 | 1. Kafka 集群负载高 2. 网络带宽不足 3. 批量大小设置不合理 | 1. 监控 Kafka 集群性能 2. 检查网络带宽使用情况 3. 调整上传每页条数和批量大小 |
| 缓存文件过大 | 1. Kafka 服务持续不可用 2. 缓存大小设置不合理 | 1. 解决 Kafka 服务问题 2. 调整缓存文件最大长度 |
| 脚本执行失败 | 1. 脚本语法错误 2. 脚本逻辑错误 | 1. 检查脚本语法 2. 查看日志中的错误信息 |
| Topic 不存在 | 1. Topic 未创建 2. 权限不足 | 1. 确保 Topic 已创建 2. 验证用户权限 |
日志分析
当遇到问题时,建议查看 ThingsGateway 的日志文件,特别是与 KafkaProducer 相关的日志。日志中通常会包含详细的错误信息,帮助定位问题。
总结
KafkaProducer 是 ThingsGateway 中功能强大的 Kafka 消息发布插件,通过合理配置和优化,可以为物联网系统提供高性能、可靠的数据发布方案。在实际应用中,应根据具体场景和需求,选择合适的配置参数和优化策略,以达到最佳的性能和可靠性。