KafkaProducer
提示通过自定义脚本,可快速适配业务模型
须知插件使用librdkafka,注意按需安装c库
On Mac OSX, install librdkafka with homebrew:
$ brew install librdkafka
On Debian and Ubuntu, install librdkafka from the Confluent APT repositories, see instructions here and then install librdkafka:
$ apt install librdkafka-dev
On RedHat, CentOS, Fedora, install librdkafka from the Confluent YUM repositories, instructions here and then install librdkafka:
$ yum install librdkafka-devel
For other platforms, follow the source building instructions below.
概述
KafkaProducer 是 ThingsGateway 中用于适配 Kafka 消息队列的插件,可以定时或基于变化发布设备、变量和报警数据到 Kafka 服务端。Kafka 是一款分布式流处理平台,具有高吞吐量、高可靠性和可扩展性,适合处理海量数据流。
核心功能
- 多实体发布:支持发布设备、变量和报警三种实体类型的数据
- 灵活的上传模式:支持定时上传、变化上传或两者同时生效
- 分组上传:支持按变量分组属性进行批量上传
- 列表上传控制:可控制设备、变量和报警是否以列表形式上传
- 动态Topic配置:支持使用变量属性动态生成Topic名称
- 自定义脚本:支持通过脚本自定义数据处理和转换逻辑
- 数据缓存:支持内存队列和文件缓存,确保数据可靠性
应用场景
- 物联网数据采集:将设备数据实时发布到 Kafka,供下游系统处理
- 实时监控系统:将设备状态和报警信息发布到 Kafka,实现实时监控
- 数据集成:作为数据集成的中间件,将设备数据转发到其他系统
- 大数据分析:将设备数据发送到 Kafka,供大数据平台进行分析
插件属性配置
配置界面
详细配置项
| 配置项 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 服务地址 | Kafka 服务地址 | 127.0.0.1:9092 | 多个地址用逗号分隔,如 "127.0.0.1:9092,127.0.0.1:9093" |
| 发布超时时间 | 发布消息的超时时间(毫秒) | 5000 | 根据网络状况调整,建议 3000-10000 |
| 用户名 | Kafka 认证用户名 | - | 启用认证时设置 |
| 密码 | Kafka 认证密码 | - | 启用认证时设置 |
| SecurityProtocol | 安全协议配置 | - | 如 "SASL_SSL"、"PLAINTEXT" 等 |
| SaslMechanism | SASL 认证机制 | - | 如 "PLAIN"、"SCRAM-SHA-256" 等 |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 | False | 批量数据场景建议启用 |
| 选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | False | 变量较多时建议启用 |
| 设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布 | True | 数据量大时建议启用,减少网络开销 |
| 设备Topic | 设备实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 devices/${DeviceName} |
| 变量Topic | 变量实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 variables/${DeviceName}/${Name} |
| 报警Topic | 报警实体的发布主题,使用${key}作为匹配项,key必须是上传实体中的属性 | - | 如 alarms/${DeviceName} |
| 设备实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为DeviceData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 变量实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为VariableBasicData | - | 编辑页面中,可通过检查按钮验证脚本 |
| 报警实体脚本 | 脚本返回新的实体列表,动态类中需继承DynamicModelBase,传入列表为AlarmVariable | - | 编辑页面中,可通过检查按钮验证脚本 |
| 上传模式 | 数据上传模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据数据采集需求选择 |
| 定时上传间隔 | 间隔执行时间(秒) | 10 | 根据数据更新频率设置 |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - | |
| 启用缓存 | 是否启用缓存 | False | 网络不稳定或数据量大时建议启用 |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | 100 | 根据磁盘空间和数据量设置 |
| 上传每页条数 | 每一次上传的列表最大数量 | 1000 | 根据Kafka性能和网络带宽调整 |
| 内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存 | 10000 | 根据系统内存和数据量调整 |