跳到主要内容

TDengineDBProducer

提示

taos数据库插件支持原生连接和websocket连接,官方文档

概述

TDengineDBProducer 是 ThingsGateway 中用于适配 TDengine 时序数据库的插件,可以定时或基于变化存储变量数据。TDengine 是一款专为物联网、工业互联网等场景设计的高性能时序数据库,具有高写入性能、高压缩率和丰富的时间序列分析功能。

核心功能

  • 时序数据存储:将设备变量数据存储为时序数据,适合时间序列分析
  • 超级表与子表:利用 TDengine 的超级表和子表特性,按设备和变量自动分表
  • 灵活的上传模式:支持定时上传、变化上传或两者同时生效
  • 分组上传:支持按变量分组属性进行批量上传
  • 数据缓存:支持内存队列和文件缓存,确保数据可靠性
  • 动态脚本:支持通过脚本自定义数据处理和存储逻辑

应用场景

  • 工业监控系统:存储设备运行状态和参数,用于实时监控和历史趋势分析
  • 能源管理系统:存储能耗数据,用于能耗分析和优化
  • 环境监测系统:存储环境参数数据,用于环境质量评估和趋势分析
  • 物联网平台:存储海量设备数据,用于设备管理和数据分析

插件属性配置

配置界面

详细配置项

配置项说明默认值建议值
链接字符串TDengine 数据库连接字符串-参考下方连接字符串示例
表名称超级表名称,子表名称为 [超级表名称][设备名称][变量名称]-使用有意义的表名,如 iot_data
分组上传启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量False批量数据场景建议启用
是否选择全部变量是否选择全部变量,true时不需要单个变量添加业务属性False变量较多时建议启用
上传模式数据上传模式:间隔/变化/变化和间隔同时生效间隔根据数据采集需求选择
定时上传间隔间隔执行时间(秒)10根据数据更新频率设置
严格入队模式启用后,每次定时上传时,保证一组数据在同一时间点可见-
启用缓存是否启用缓存False网络不稳定或数据量大时建议启用
缓存文件最大长度(mb)缓存文件最大长度100根据磁盘空间和数据量设置
上传每页条数每一次上传的列表最大数量1000根据数据库性能和网络带宽调整
内存队列最大数量内存队列的最大数量,超出或失败时转入文件缓存10000根据系统内存和数据量调整
历史库动态脚本历史库数据处理脚本-参考下方脚本示例

数据库连接字符串

ConnectionStringBuilder 使用 key-value 对方式设置连接参数,key 为参数名,value 为参数值,不同参数之间使用分号 ; 分割。

连接字符串示例

protocol=WebSocket;host=127.0.0.1;port=6041;useSSL=false

支持的参数

参数名描述默认值
hostTDengine TSDB 运行实例的地址-
portTDengine TSDB 运行实例的端口-
username连接的用户名-
password连接的密码-
protocol连接的协议,可选值为 Native 或 WebSocketNative
db连接的数据库-
timezone时区本地时区
connTimeout连接超时时间1 分钟
bearerToken连接 TDengine TSDB 的 token 验证信息-

WebSocket 连接额外支持的参数

参数名描述默认值
readTimeout读取超时时间5 分钟
writeTimeout发送超时时间10 秒
token连接 TDengine TSDB cloud 的 token-
useSSL是否使用 SSL 连接false
enableCompression是否启用 WebSocket 压缩false
autoReconnect是否自动重连false
reconnectRetryCount重连次数3
reconnectIntervalMs重连间隔毫秒时间2000

脚本与实体

脚本接口

TDengineDBProducer 支持通过继承 DynamicSQLBase 类来自定义数据处理和存储逻辑。


using ThingsGateway.SqlOrm;

namespace ThingsGateway.Plugin.DB;

public abstract class DynamicSQLBase
{
public TouchSocket.Core.ILog Logger { get; set; }

/// <summary>
/// 建库建表
/// </summary>
/// <returns></returns>
public virtual Task DBInit(ISqlOrmClient db, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

/// <summary>
/// 完全自定义上传
/// </summary>
/// <returns></returns>
public virtual Task DBInsertable(ISqlOrmClient db, IEnumerable<object> datas, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

/// <summary>
/// 删除n天前数据
/// </summary>
/// <returns></returns>
public virtual Task<int> DBDeleteable(ISqlOrmClient db, int days, CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
}

实体类

详细格式说明请参考 VariableBasicData文档

脚本示例

基本存储脚本

using ThingsGateway.Plugin.DB;
using ThingsGateway.SqlSugar;
using System.Text;
using ThingsGateway.Gateway.Application;
using TouchSocket.Core;

public class TaosBasic : DynamicSQLBase
{
public override async Task DBInsertable(ISqlSugarClient db, IEnumerable<object> datas, CancellationToken cancellationToken)
{
StringBuilder stringBuilder = new();
stringBuilder.Append("INSERT INTO");
var dbInserts = datas.Cast<VariableBasicData>();

foreach (var deviceGroup in dbInserts.GroupBy(a => a.DeviceName))
{
foreach (var variableGroup in deviceGroup.GroupBy(a => a.Name))
{
stringBuilder.Append($"""

`iot_data_{deviceGroup.Key}_{variableGroup.Key}`
USING `iot_data` TAGS ("{deviceGroup.Key}", "{variableGroup.Key}", "{variableGroup.First().Unit ?? ""}", "{variableGroup.First().Description ?? ""}")
VALUES

""");

foreach (var item in variableGroup)
{
stringBuilder.Append($"""({item.ChangeTime.ToString("yyyy-MM-dd HH:mm:ss.fff")},"{item.CollectTime.ToString("yyyy-MM-dd HH:mm:ss.fff")}",{item.Id},{item.IsOnline},"{item.Value}"),""");
}
stringBuilder.Remove(stringBuilder.Length - 1, 1);
}
}
stringBuilder.Append(';');
stringBuilder.AppendLine();

await db.Ado.ExecuteCommandAsync(stringBuilder.ToString(), default, cancellationToken: cancellationToken).ConfigureAwait(false);
Logger?.Trace($"Inserted {datas.Count()} records");
}

public override async Task DBInit(ISqlSugarClient db, CancellationToken cancellationToken)
{
// 创建数据库
await db.Ado.ExecuteCommandAsync("CREATE DATABASE IF NOT EXISTS iot_db", default, cancellationToken: cancellationToken).ConfigureAwait(false);
await db.Ado.ExecuteCommandAsync("USE iot_db", default, cancellationToken: cancellationToken).ConfigureAwait(false);

// 创建超级表
var sql = $"""
CREATE STABLE IF NOT EXISTS `iot_data`(
`createtime` TIMESTAMP,
`collecttime` TIMESTAMP,
`id` BIGINT,
`isonline` BOOL,
`value` VARCHAR(255)
) TAGS(
`devicename` VARCHAR(100),
`variablename` VARCHAR(100),
`unit` VARCHAR(50),
`description` VARCHAR(255)
)
""";
await db.Ado.ExecuteCommandAsync(sql, default, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}

故障排查

常见问题及解决方案

问题可能原因解决方案
数据未写入数据库1. 数据库连接失败
2. SQL语句错误
3. 权限不足
1. 检查连接字符串和网络连接
2. 查看日志中的SQL错误信息
3. 确认数据库用户权限
数据写入速度慢1. 数据库性能不足
2. 上传频率过高
3. 数据量过大
1. 优化数据库性能
2. 调整上传间隔
3. 增加数据库资源或使用集群
缓存文件过大1. 数据库连接持续失败
2. 缓存大小设置不合理
1. 解决数据库连接问题
2. 调整缓存文件最大长度
脚本执行失败1. 脚本语法错误
2. 脚本逻辑错误
1. 检查脚本语法
2. 查看日志中的错误信息
子表创建失败1. 表名过长
2. 字符集问题
1. 缩短设备名和变量名

日志分析

当遇到问题时,建议查看 ThingsGateway 的日志文件,特别是与 TDengineDBProducer 相关的日志。日志中通常会包含详细的错误信息,帮助定位问题。

总结

TDengineDBProducer 是 ThingsGateway 中功能强大的 TDengine 时序数据库存储插件,通过合理配置和优化,可以为物联网系统提供高性能、可靠的数据存储方案。在实际应用中,应根据具体场景和需求,选择合适的配置参数和优化策略,以达到最佳的性能和可靠性。