规则引擎
一、规则引擎概述
1.1 什么是规则引擎
ThingsGateway 规则引擎是一个强大的业务逻辑处理模块,允许您通过可视化的方式创建和配置复杂的业务规则,实现设备数据的智能处理和自动化操作。
1.2 核心组件
触发器
触发器用于启动规则的执行,当满足特定条件时,规则会开始运行:
- 报警状态触发器:当变量的报警状态发生变化时触发
- 设备状态触发器:当设备的连接状态发生变化时触发
- 变量状态触发器:当变量的值或状态发生变化时触发
- 时间间隔触发器:按照设定的时间间隔周期性触发
执行器
执行器用于执行具体的操作,当规则触发后,执行器会按照配置执行相应的动作:
- RPC写入:向设备或变量写入数据
- 自定义脚本:执行用户自定义的C#脚本,实现复杂的业务逻辑
其他组件
- 条件脚本:用于判断是否继续执行规则,返回true时继续执行
- 数据脚本:用于处理和转换数据,结果会传递给下一个节点
1.3 应用场景
规则引擎可以应用于多种场景:
- 设备联动:当一个设备状态变化时,自动控制另一个设备
- 报警处理:当发生报警时,自动执行报警处理流程
- 数据转换:对采集的 数据进行实时转换和处理
- 定时任务:按照设定的时间间隔执行特定任务
- 远程控制:通过规则实现设备的远程控制和管理
二、规则设计
2.1 规则结构
开始/结束节点
- 开始节点:规则只能由开始节点作为起点,每个规则必须有且仅有一个开始节点
- 结束节点:结束节点不强制要求,规则执行到最后一个节点后会自动结束
2.2 触发器配置
触发器是规则的起点,用于检测特定事件的发生:
2.2.1 报警状态触发器
配置说明:
- 填写需要监控的变量名称
- 当该变量的报警状态发生变化时 (如从正常变为报警,或从报警变为正常),规则会触发并进入下一步
使用场景:
- 当温度超过阈值时,自动开启冷却系统
- 当设备故障报警时,自动发送通知
2.2.2 设备状态触发器
配置说明:
- 填写需要监控的采集设备名称
- 当该设备的连接状态发生变化时(如从在线变为离线,或从离线变为在线),规则会触发并进入下一步
使用场景:
- 当设备离线时,自动发送告警通知
- 当设备恢复在线时,自动执行恢复操作
2.2.3 变量状态触发器
配置说明:
- 填写需要监控的变量名称
- 当该变量的值或状态发生变化时,规则会触发并进入下一步
使用场景:
- 当液位低于阈值时,自动开启补液泵
- 当压力超过安全值时,自动触发泄压操作
2.2.4 时间间隔触发器
配置说明:
- 填写间隔时间(ms)
- 当达到设定的 时间间隔时,规则会周期性触发并进入下一步
使用场景:
- 定期采集和上传数据
- 定期执行设备维护任务
- 定期检查系统状态
2.3 脚本配置
2.3.1 条件脚本
功能说明:
- 用于判断是否继续执行规则
- 脚本需要返回bool类型的值
- 当返回true时,规则会继续执行下一步;当返回false时,规则会停止执行
使用场景:
- 当温度超过阈值且持续时间超过10秒时,才执行下一步操作
- 当多个条件同时满足时,才执行特定操作
示例:
// 检查温度是否超过阈值且持续时间超过10秒
var temperature = (double)raw["Value"];
var duration = (int)raw["Duration"];
return temperature > 80 && duration > 10;
2.3.2 数据脚本
功能说明:
- 用于处理和转换数据
- 脚本可以返回任意类型的值
- 结果会传递给下一个节点使用
使用场景:
- 对采集的数据进行计算和转换
- 提取和处理复杂数据结构中的特定字段
- 构建自定义的数据格式
示例:
// 从传入的数据中提取Value字段
// raw是上一个节点传入值
return (JToken.FromObject(raw))["Value"];
高级示例:
// 计算平均值并格式化输出
var values = raw as List<double>;
if (values == null || values.Count == 0)
return 0;
var average = values.Average();
return new {
AverageValue = average,
Timestamp = DateTime.Now,
Status = average > 50 ? "High" : "Normal"
};
2.4 执行器配置
执行器用于执行具体的操作,是规则的最终输出部分:
2.4.1 RPC写入
功能说明:
- 用于向设备或变量写入数据
- 把上一节点传递的数据写入到指定的变量中
配置说明:
- 填写需要写入的变量名称
- 确保该变量具有写入权限
使用场景:
- 当温度超过阈值时,自动调整设备参数
- 当检测到异常时,自动重置设备状态
2.4.2 自定义执行
功能说明:
- 执行用户自定义的C#脚本
- 可以实现复杂的业务逻辑和外部系统集成
配置说明:
- 自定义脚本类需继承
IExexcuteExpressions接口 - 实现
ExecuteAsync方法,该方法接收输入数据并返回输出结果
接口定义:
namespace ThingsGateway.Gateway.Application;
public interface IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }
Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken);
}
注意下面是网关内部定义的输入输出类型,不需要在脚本中重新定义
public class NodeInput
{
private object input;
public JToken JToken
{
get
{
return JToken.FromObject(input); ;
}
}
public object Value
{
get
{
return input;
}
set
{
input = value;
}
}
}
public class NodeOutput
{
private object output;
public JToken JToken
{
get
{
return JToken.FromObject(output); ;
}
}
public object Value
{
get
{
return output;
}
set
{
output = value;
}
}
}
使用场景:
- 发送邮件通知
- 调用Webhook接口
- 上传数据到第三方系统
- 执行复杂的业务逻辑
案例:通过MQTT发送数据
using System.Text;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;
public class TestEx : IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }
public async System.Threading.Tasks.Task<NodeOutput> ExecuteAsync(NodeInput input, System.Threading.CancellationToken cancellationToken)
{
//想上传mqtt,可以自己写mqtt上传代码,或者通过mqtt插件的公开方法上传
//直接获取mqttclient插件类型的第一个设备
var mqttClient = GlobalData.ReadOnlyChannels.FirstOrDefault(a => a.Value.PluginName == "ThingsGateway.Plugin.Mqtt.MqttClient").Value?.ReadDeviceRuntimes?.FirstOrDefault().Value?.Driver as ThingsGateway.Plugin.Mqtt.MqttClient;
if (mqttClient == null)
throw new("mqttClient NOT FOUND");
TopicArray topicArray = new()
{
Topic = "test",
Json = Encoding.UTF8.GetBytes("test")
};
var result = await mqttClient.MqttUpAsync(topicArray, default).ConfigureAwait(false);// 主题 和 负载
if (!result.IsSuccess)
throw new(result.ErrorMessage);
return new NodeOutput() { Value = result };
//通过设备名称找出mqttClient插件
//var mqttClient = GlobalData.ReadOnlyDevices.FirstOrDefault(a => a.Value.Name == "mqttDevice1").Value?.Driver as ThingsGateway.Plugin.Mqtt.MqttClient;
//if (mqttClient == null)
// throw new("mqttClient NOT FOUND");
//TopicArray topicArray = new()
//{
// Topic = "test",
// Json = Encoding.UTF8.GetBytes("test")
//};
//var result = await mqttClient.MqttUpAsync(topicArray, default).ConfigureAwait(false);// 主题 和 负载
//if (!result.IsSuccess)
// throw new(result.ErrorMessage);
//return new NodeOutput() { Value = result };
}
}
案例:发送邮件通知
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using System.Net.Mail;
using System.Net;
using TouchSocket.Core;
public class EmailNotification : IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }
public async Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken)
{
// 从输入中获取报警信息
var alarmInfo = input.Value as dynamic;
var message = $"报警:{alarmInfo.DeviceName} - {alarmInfo.VariableName} - {alarmInfo.Message}";
// 发送邮件
using var smtpClient = new SmtpClient("smtp.example.com")
{
Port = 587,
Credentials = new NetworkCredential("username", "password"),
EnableSsl = true,
};
var mailMessage = new MailMessage
{
From = new MailAddress("from@example.com"),
Subject = "ThingsGateway 报警通知",
Body = message,
};
mailMessage.To.Add("to@example.com");
await smtpClient.SendMailAsync(mailMessage, cancellationToken);
return new NodeOutput { Value = "邮件发送成功" };
}
}
三、日志查看
规则引擎执行过程中会生成详细的日志,您可以通过以下方式查看:
3.1 日志内容
日志中包含以下信息:
- 规则执行的开始和结束时间
- 每个节点的执行状态和结果
- 脚本执行的详细信息
- 错误信息(如果有)
3.2 日志用途
- 调试规则:查看规则执行过程中的详细信息,帮助排查问题
- 监控执行:了解规则的执行频率和状态
- 审计跟踪:记录规则的执行历史,用于审计和分析
四、自定义规则节点
4.1 概述
自定义规则节点允许您通过编写C#代码来扩展规则引擎的功能,实现更复杂和个性化的业务逻辑处理。根据规则引擎的组件类型,自定义节点分为三种类型:自定义执行器、自定义触发器和自定义脚本。每种类型都有其特定的接口和用途。
所有自定义规则节点都必须继承自 BaseNode 类,该类提供了节点的基本功能和属性:
using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Blazor.Diagrams.Core.Models;
using TouchSocket.Core;
namespace ThingsGateway.Gateway.Application;
public abstract class BaseNode : NodeModel, INode
{
public BaseNode(string id, Point? position = null) : base(id, position)
{
}
public string RulesEngineName { get; set; }
public ILog Logger { get; set; }
}
说明:
BaseNode继承自NodeModel,提供了节点的基本几何属性和模型功能RulesEngineName:规则引擎名称Logger:日志记录器,用于记录节点执行过程中的日志信息
4.1.2 节点特性
所有自定义规则节点都必须使用 CategoryNode 特性来指定节点的元数据信息:
public sealed class CategoryNode : Attribute
{
public string WidgetType { get; set; }
public string ImgUrl { get; set; } = "ImgUrl";
public string Desc { get; set; } = "Desc";
public string Category { get; set; } = "Other";
public Type LocalizerType { get; set; }
public IStringLocalizer StringLocalizer;
}
特性属性说明:
WidgetType:节点的类型标识,用于在UI界面中识别节点ImgUrl:节点的图标URLDesc:节点的描述信息Category:节点的分类,默认为"Other"LocalizerType:本地化类型StringLocalizer:字符串本地化器
示例:
[CategoryNode(Category = "Trigger", ImgUrl = $"_content/ThingsGateway.Gateway.Razor.Common/img/ValueChanged.svg", Desc = nameof(AlarmChangedTriggerNode), LocalizerType = typeof(ThingsGateway.Gateway.Application.INode), WidgetType = $"ThingsGateway.Gateway.Razor.VariableWidget,ThingsGateway.Gateway.Razor.Common")]
ImgUrl:Blazor资源路径WidgetType:类型标识,(类型全名称,Dll名称)
4.2 自定义执行器
执行器用于执行具体的操作,是规则的最终输出部分。
4.2.1 核心接口
自定义执行器必须实现 IActuatorNode 接口:
namespace ThingsGateway.Gateway.Application;
public interface IActuatorNode
{
public TouchSocket.Core.ILog Logger { get; set; }
Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken);
}
接口说明:
Logger:日志记录器,用于记录节点执行过程中的日志信息ExecuteAsync:异步执行方法,接收输入数据并返回输出结果
#### 4.2.3 实现示例
```csharp
using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;
[CategoryNode(WidgetType = "MyCustomExecutor", Category = "执行器", Desc = "自定义执行器示例")]
public class MyCustomExecutor : BaseNode, IActuatorNode
{
public MyCustomExecutor(string id, Point? position = null) : base(id, position)
{
}
public async Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken)
{
// 记录日志
Logger.Info("自定义执 行器开始执行");
try
{
// 获取输入数据
var inputData = input.Value;
// 处理业务逻辑
var result = ProcessData(inputData);
// 返回输出结果
return new NodeOutput { Value = result };
}
catch (Exception ex)
{
Logger.Error($"执行器执行失败: {ex.Message}");
throw;
}
}
private object ProcessData(object data)
{
// 自定义数据处理逻辑
return data;
}
}
4.3 自定义触发器
触发器用于启动规则的执行,当满足特定条件时,规则会开始运行。
4.3.1 核心接口
自定义触发器必须实现 ITriggerExpressions 接口:
namespace ThingsGateway.Gateway.Application;
public interface ITriggerNode : INode
{
public Task StartAsync(Func<NodeOutput, CancellationToken, Task> func, CancellationToken cancellationToken);
}
接口说明:
Logger:日志记录器,用于记录触发器执行过程中的日志信息TriggerAsync:异步触发方法,接收触发输入数据并返回是否触发成功
4.3.2 实现示例
using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;
[CategoryNode(Category = "Trigger", ImgUrl = $"_content/ThingsGateway.Gateway.Razor.Common/img/ValueChanged.svg", Desc = nameof(AlarmChangedTriggerNode), LocalizerType = typeof(ThingsGateway.Gateway.Application.INode), WidgetType = $"ThingsGateway.Gateway.Razor.VariableWidget,ThingsGateway.Gateway.Razor.Common")]
public class MyCustomTrigger : BaseNode, ITriggerExpressions
{
public MyCustomTrigger(string id, Point? position = null) : base(id, position)
{
}
private Func<NodeOutput, CancellationToken, Task> Func { get; set; }
public async Task StartAsync(Func<NodeOutput, CancellationToken, Task> func, CancellationToken cancellationToken)
{
// 记录日志
Func = func;
try
{
//按需执行Func
}
catch (Exception ex)
{
Logger.Warn($"触发器执行失败: {ex.Message}");
}
}
}
4.6 最佳实践
4.6.1 错误处理
- 使用 try-catch 块捕获异常
- 使用 Logger 记录详细的错误信息
- 提供有意义的错误消息
try
{
// 业务逻辑
}
catch (Exception ex)
{
Logger.Error($"执行失败: {ex.Message}, ex");
throw new Exception("自定义节点执行失败", ex);
}
4.7.2 日志记录
- 在关键节点记录日志
- 使用适当的日志级别(Info, Warning, Error)
- 记录输入输出数据以便调试
Logger.Info($"节点开始执行,输入数据: {input.Value}");
Logger.Warning($"数据量超过预期: {dataList.Count}");
Logger.Error($"处理失败: {ex.Message}");
4.7.3 性能优化
- 避免在执行方法中执行耗时操作
- 使用异步方法处理IO操作