TDengineDBProducer
提示taos数据库插件支持原生连接和websocket连接,官方文档
概述
TDengineDBProducer 是 ThingsGateway 中用于适配 TDengine 时序数据库的插件,可以定时或基于变化存储变量数据。TDengine 是一款专为物联网、工业互联网等场景设计的高性能时序数据库,具有高写入性能、高压缩率和丰富的时间序列分析功能。
核心功能
- 时序数据存储:将设备变量数据存储为时序数据,适合时间序列分析
- 超级表与子表:利用 TDengine 的超级表和子表特性,按设备和变量自动分表
- 灵活的上传模式:支持定时上传、变化上传或两者同时生效
- 分组上传:支持按变量分组属性进行批量上传
- 数据缓存:支持内存队列和文件缓存,确保数据可靠性
- 动态脚本:支持通过脚本自定义数据处理和存储逻辑
应用场景
- 工业监控系统:存储设备运行状态和参数,用于实时监控和历史 趋势分析
- 能源管理系统:存储能耗数据,用于能耗分析和优化
- 环境监测系统:存储环境参数数据,用于环境质量评估和趋势分析
- 物联网平台:存储海量设备数据,用于设备管理和数据分析
插件属性配置
配置界面
详细配置项
| 配置项 | 说明 | 默认值 | 建议值 |
|---|---|---|---|
| 链接字符串 | TDengine 数据库连接字符串 | - | 参考下方连接字符串示例 |
| 表名称 | 超级表名称,子表名称为 [超级表名称][设备名称][变量名称] | - | 使用有意义的表名,如 iot_data |
| 分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 | False | 批量数据场景建议启用 |
| 是否选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | False | 变量较多时建议启用 |
| 上传模式 | 数据上传模式:间隔/变化/变化和间隔同时生效 | 间隔 | 根据数据采集需求选择 |
| 定时上传间隔 | 间隔执行时间(秒) | 10 | 根据数据更新频率设置 |
| 严格入队模式 | 启用后,每次定时上传时,保证一组数据在同一时间点可见 | - | |
| 启用缓存 | 是否启用缓存 | False | 网络不稳定或数据量大时建议启用 |
| 缓存文件最大长度(mb) | 缓存文件最大长度 | 100 | 根据磁盘空间和数据量设置 |
| 上传每页条数 | 每一次上传的列表最大数量 | 1000 | 根据数据库性能和网络带宽调整 |
| 内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存 | 10000 | 根据系统内存和数据量调整 |
| 历史库动态脚本 | 历史库数据处理脚本 | - | 参考下方脚本示例 |
数据库连接字符串
ConnectionStringBuilder 使用 key-value 对方式设置连接参数,key 为参数名,value 为参数值,不同参数之间使用分号 ; 分割。
连接字符串示例
protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false
支持的参数
| 参数名 | 描述 | 默认值 |
|---|---|---|
| host | TDengine TSDB 运行实例的地址 | - |
| port | TDengine TSDB 运行实例的端口 | - |
| username | 连接的用户名 | - |
| password | 连接的密码 | - |
| protocol | 连接的协议,可选值为 Native 或 WebSocket | Native |
| db | 连接的数据库 | - |
| timezone | 时区 | 本地时区 |
| connTimeout | 连接超时时间 | 1 分钟 |
| bearerToken | 连接 TDengine TSDB 的 token 验证信息 | - |
WebSocket 连接额外支持的参数
| 参数名 | 描述 | 默认值 |
|---|---|---|
| readTimeout | 读取超时时间 | 5 分钟 |
| writeTimeout | 发送超时时间 | 10 秒 |
| token | 连接 TDengine TSDB cloud 的 token | - |
| useSSL | 是否使用 SSL 连接 | false |
| enableCompression | 是否启用 WebSocket 压缩 | false |
| autoReconnect | 是否自动重连 | false |
| reconnectRetryCount | 重连次数 | 3 |
| reconnectIntervalMs | 重连间隔毫秒时间 | 2000 |
脚本与实体
脚本接口
TDengineDBProducer 支持通过继承 DynamicSQLBase 类来自定义数据处理和存储逻辑。
using ThingsGateway.SqlOrm;
namespace ThingsGateway.Plugin.DB;
public abstract class DynamicSQLBase
{
public TouchSocket.Core.ILog Logger { get; set; }
/// <summary>
/// 建库建表
/// </summary>
/// <returns></returns>
public virtual Task DBInit(ISqlOrmClient db, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
/// <summary>
/// 完全自定义上传
/// </summary>
/// <returns></returns>
public virtual Task DBInsertable(ISqlOrmClient db, IEnumerable<object> datas, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
/// <summary>
/// 删除n天前数据
/// </summary>
/// <returns></returns>
public virtual Task<int> DBDeleteable(ISqlOrmClient db, int days, CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
}
实体类
详细格式说明请参考 VariableBasicData文档。
脚本示例
基本存储脚本
using ThingsGateway.Plugin.DB;
using ThingsGateway.SqlSugar;
using System.Text;
using ThingsGateway.Gateway.Application;
using TouchSocket.Core;
public class TaosBasic : DynamicSQLBase
{
public override async Task DBInsertable(ISqlSugarClient db, IEnumerable<object> datas, CancellationToken cancellationToken)
{
StringBuilder stringBuilder = new();
stringBuilder.Append("INSERT INTO");
var dbInserts = datas.Cast<VariableBasicData>();
foreach (var deviceGroup in dbInserts.GroupBy(a => a.DeviceName))
{
foreach (var variableGroup in deviceGroup.GroupBy(a => a.Name))
{
stringBuilder.Append($"""
`iot_data_{deviceGroup.Key}_{variableGroup.Key}`
USING `iot_data` TAGS ("{deviceGroup.Key}", "{variableGroup.Key}", "{variableGroup.First().Unit ?? ""}", "{variableGroup.First().Description ?? ""}")
VALUES
""");
foreach (var item in variableGroup)
{
stringBuilder.Append($"""({item.ChangeTime.ToString("yyyy-MM-dd HH:mm:ss.fff")},"{item.CollectTime.ToString("yyyy-MM-dd HH:mm:ss.fff")}",{item.Id},{item.IsOnline},"{item.Value}"),""");
}
stringBuilder.Remove(stringBuilder.Length - 1, 1);
}
}
stringBuilder.Append(';');
stringBuilder.AppendLine();
await db.Ado.ExecuteCommandAsync(stringBuilder.ToString(), default, cancellationToken: cancellationToken).ConfigureAwait(false);
Logger?.Trace($"Inserted {datas.Count()} records");
}
public override async Task DBInit(ISqlSugarClient db, CancellationToken cancellationToken)
{
// 创建数据库
await db.Ado.ExecuteCommandAsync("CREATE DATABASE IF NOT EXISTS iot_db", default, cancellationToken: cancellationToken).ConfigureAwait(false);
await db.Ado.ExecuteCommandAsync("USE iot_db", default, cancellationToken: cancellationToken).ConfigureAwait(false);
// 创建超级表
var sql = $"""
CREATE STABLE IF NOT EXISTS `iot_data`(
`createtime` TIMESTAMP,
`collecttime` TIMESTAMP,
`id` BIGINT,
`isonline` BOOL,
`value` VARCHAR(255)
) TAGS(
`devicename` VARCHAR(100),
`variablename` VARCHAR(100),
`unit` VARCHAR(50),
`description` VARCHAR(255)
)
""";
await db.Ado.ExecuteCommandAsync(sql, default, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
故障排查
常见问题及解决方案
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| 数据未写入数据库 | 1. 数据库连接失败 2. SQL语句错误 3. 权限不足 | 1. 检查连接字符串和网络连接 2. 查看日志中的SQL错误信息 3. 确认数据库用户权限 |
| 数据写入速度慢 | 1. 数据库性能不足 2. 上传频率过高 3. 数据量过大 | 1. 优化数据库性能 2. 调整上传间隔 3. 增加数据库资源或使用集群 |
| 缓存文件过大 | 1. 数据库连接持续失败 2. 缓存大小设置不合理 | 1. 解决数据库连接问题 2. 调整缓存文件最大长度 |
| 脚本执行失败 | 1. 脚本语法错误 2. 脚本逻辑错误 | 1. 检查脚本语法 2. 查看日志中的错误信息 |
| 子表创建失败 | 1. 表名过长 2. 字符集问题 | 1. 缩短设备名和变量名 |