跳到主要内容

规则引擎

一、规则引擎概述

1.1 什么是规则引擎

ThingsGateway 规则引擎是一个强大的业务逻辑处理模块,允许您通过可视化的方式创建和配置复杂的业务规则,实现设备数据的智能处理和自动化操作。

1.2 核心组件

触发器

触发器用于启动规则的执行,当满足特定条件时,规则会开始运行:

  • 报警状态触发器:当变量的报警状态发生变化时触发
  • 设备状态触发器:当设备的连接状态发生变化时触发
  • 变量状态触发器:当变量的值或状态发生变化时触发
  • 时间间隔触发器:按照设定的时间间隔周期性触发

执行器

执行器用于执行具体的操作,当规则触发后,执行器会按照配置执行相应的动作:

  • RPC写入:向设备或变量写入数据
  • 自定义脚本:执行用户自定义的C#脚本,实现复杂的业务逻辑

其他组件

  • 条件脚本:用于判断是否继续执行规则,返回true时继续执行
  • 数据脚本:用于处理和转换数据,结果会传递给下一个节点

1.3 应用场景

规则引擎可以应用于多种场景:

  • 设备联动:当一个设备状态变化时,自动控制另一个设备
  • 报警处理:当发生报警时,自动执行报警处理流程
  • 数据转换:对采集的数据进行实时转换和处理
  • 定时任务:按照设定的时间间隔执行特定任务
  • 远程控制:通过规则实现设备的远程控制和管理

二、规则设计

2.1 规则结构

开始/结束节点

  • 开始节点:规则只能由开始节点作为起点,每个规则必须有且仅有一个开始节点
  • 结束节点:结束节点不强制要求,规则执行到最后一个节点后会自动结束

2.2 触发器配置

触发器是规则的起点,用于检测特定事件的发生:

2.2.1 报警状态触发器

配置说明

  • 填写需要监控的变量名称
  • 当该变量的报警状态发生变化时(如从正常变为报警,或从报警变为正常),规则会触发并进入下一步

使用场景

  • 当温度超过阈值时,自动开启冷却系统
  • 当设备故障报警时,自动发送通知

2.2.2 设备状态触发器

配置说明

  • 填写需要监控的采集设备名称
  • 当该设备的连接状态发生变化时(如从在线变为离线,或从离线变为在线),规则会触发并进入下一步

使用场景

  • 当设备离线时,自动发送告警通知
  • 当设备恢复在线时,自动执行恢复操作

2.2.3 变量状态触发器

配置说明

  • 填写需要监控的变量名称
  • 当该变量的值或状态发生变化时,规则会触发并进入下一步

使用场景

  • 当液位低于阈值时,自动开启补液泵
  • 当压力超过安全值时,自动触发泄压操作

2.2.4 时间间隔触发器

配置说明

  • 填写间隔时间(ms)
  • 当达到设定的时间间隔时,规则会周期性触发并进入下一步

使用场景

  • 定期采集和上传数据
  • 定期执行设备维护任务
  • 定期检查系统状态

2.3 脚本配置

2.3.1 条件脚本

功能说明

  • 用于判断是否继续执行规则
  • 脚本需要返回bool类型的值
  • 当返回true时,规则会继续执行下一步;当返回false时,规则会停止执行

使用场景

  • 当温度超过阈值且持续时间超过10秒时,才执行下一步操作
  • 当多个条件同时满足时,才执行特定操作

示例

// 检查温度是否超过阈值且持续时间超过10秒
var temperature = (double)raw["Value"];
var duration = (int)raw["Duration"];
return temperature > 80 && duration > 10;

2.3.2 数据脚本

功能说明

  • 用于处理和转换数据
  • 脚本可以返回任意类型的值
  • 结果会传递给下一个节点使用

使用场景

  • 对采集的数据进行计算和转换
  • 提取和处理复杂数据结构中的特定字段
  • 构建自定义的数据格式

示例

// 从传入的数据中提取Value字段
// raw是上一个节点传入值
return (JToken.FromObject(raw))["Value"];

高级示例

// 计算平均值并格式化输出
var values = raw as List<double>;
if (values == null || values.Count == 0)
return 0;

var average = values.Average();
return new {
AverageValue = average,
Timestamp = DateTime.Now,
Status = average > 50 ? "High" : "Normal"
};

2.4 执行器配置

执行器用于执行具体的操作,是规则的最终输出部分:

2.4.1 RPC写入

功能说明

  • 用于向设备或变量写入数据
  • 把上一节点传递的数据写入到指定的变量中

配置说明

  • 填写需要写入的变量名称
  • 确保该变量具有写入权限

使用场景

  • 当温度超过阈值时,自动调整设备参数
  • 当检测到异常时,自动重置设备状态

2.4.2 自定义执行

功能说明

  • 执行用户自定义的C#脚本
  • 可以实现复杂的业务逻辑和外部系统集成

配置说明

  • 自定义脚本类需继承 IExexcuteExpressions 接口
  • 实现 ExecuteAsync 方法,该方法接收输入数据并返回输出结果

接口定义

namespace ThingsGateway.Gateway.Application;


public interface IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }

Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken);
}

注意下面是网关内部定义的输入输出类型,不需要在脚本中重新定义

public class NodeInput
{
private object input;
public JToken JToken
{
get
{
return JToken.FromObject(input); ;
}
}

public object Value
{
get
{
return input;
}
set
{
input = value;
}
}
}

public class NodeOutput
{
private object output;
public JToken JToken
{
get
{
return JToken.FromObject(output); ;
}
}

public object Value
{
get
{
return output;
}
set
{
output = value;
}
}
}

使用场景

  • 发送邮件通知
  • 调用Webhook接口
  • 上传数据到第三方系统
  • 执行复杂的业务逻辑

案例:通过MQTT发送数据

using System.Text;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;

public class TestEx : IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }

public async System.Threading.Tasks.Task<NodeOutput> ExecuteAsync(NodeInput input, System.Threading.CancellationToken cancellationToken)
{
//想上传mqtt,可以自己写mqtt上传代码,或者通过mqtt插件的公开方法上传

//直接获取mqttclient插件类型的第一个设备
var mqttClient = GlobalData.ReadOnlyChannels.FirstOrDefault(a => a.Value.PluginName == "ThingsGateway.Plugin.Mqtt.MqttClient").Value?.ReadDeviceRuntimes?.FirstOrDefault().Value?.Driver as ThingsGateway.Plugin.Mqtt.MqttClient;
if (mqttClient == null)
throw new("mqttClient NOT FOUND");

TopicArray topicArray = new()
{
Topic = "test",
Json = Encoding.UTF8.GetBytes("test")
};
var result = await mqttClient.MqttUpAsync(topicArray, default).ConfigureAwait(false);// 主题 和 负载
if (!result.IsSuccess)
throw new(result.ErrorMessage);
return new NodeOutput() { Value = result };

//通过设备名称找出mqttClient插件
//var mqttClient = GlobalData.ReadOnlyDevices.FirstOrDefault(a => a.Value.Name == "mqttDevice1").Value?.Driver as ThingsGateway.Plugin.Mqtt.MqttClient;

//if (mqttClient == null)
// throw new("mqttClient NOT FOUND");

//TopicArray topicArray = new()
//{
// Topic = "test",
// Json = Encoding.UTF8.GetBytes("test")
//};
//var result = await mqttClient.MqttUpAsync(topicArray, default).ConfigureAwait(false);// 主题 和 负载
//if (!result.IsSuccess)
// throw new(result.ErrorMessage);
//return new NodeOutput() { Value = result };
}
}

案例:发送邮件通知

using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using System.Net.Mail;
using System.Net;
using TouchSocket.Core;

public class EmailNotification : IExexcuteExpressions
{
public TouchSocket.Core.ILog Logger { get; set; }

public async Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken)
{
// 从输入中获取报警信息
var alarmInfo = input.Value as dynamic;
var message = $"报警:{alarmInfo.DeviceName} - {alarmInfo.VariableName} - {alarmInfo.Message}";

// 发送邮件
using var smtpClient = new SmtpClient("smtp.example.com")
{
Port = 587,
Credentials = new NetworkCredential("username", "password"),
EnableSsl = true,
};

var mailMessage = new MailMessage
{
From = new MailAddress("from@example.com"),
Subject = "ThingsGateway 报警通知",
Body = message,
};
mailMessage.To.Add("to@example.com");

await smtpClient.SendMailAsync(mailMessage, cancellationToken);

return new NodeOutput { Value = "邮件发送成功" };
}
}

三、日志查看

规则引擎执行过程中会生成详细的日志,您可以通过以下方式查看:

3.1 日志内容

日志中包含以下信息:

  • 规则执行的开始和结束时间
  • 每个节点的执行状态和结果
  • 脚本执行的详细信息
  • 错误信息(如果有)

3.2 日志用途

  • 调试规则:查看规则执行过程中的详细信息,帮助排查问题
  • 监控执行:了解规则的执行频率和状态
  • 审计跟踪:记录规则的执行历史,用于审计和分析

四、自定义规则节点

4.1 概述

自定义规则节点允许您通过编写C#代码来扩展规则引擎的功能,实现更复杂和个性化的业务逻辑处理。根据规则引擎的组件类型,自定义节点分为三种类型:自定义执行器、自定义触发器和自定义脚本。每种类型都有其特定的接口和用途。

所有自定义规则节点都必须继承自 BaseNode 类,该类提供了节点的基本功能和属性:

using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Blazor.Diagrams.Core.Models;

using TouchSocket.Core;

namespace ThingsGateway.Gateway.Application;

public abstract class BaseNode : NodeModel, INode
{
public BaseNode(string id, Point? position = null) : base(id, position)
{
}

public string RulesEngineName { get; set; }
public ILog Logger { get; set; }
}

说明

  • BaseNode 继承自 NodeModel,提供了节点的基本几何属性和模型功能
  • RulesEngineName:规则引擎名称
  • Logger:日志记录器,用于记录节点执行过程中的日志信息

4.1.2 节点特性

所有自定义规则节点都必须使用 CategoryNode 特性来指定节点的元数据信息:

public sealed class CategoryNode : Attribute 
{
public string WidgetType { get; set; }
public string ImgUrl { get; set; } = "ImgUrl";
public string Desc { get; set; } = "Desc";
public string Category { get; set; } = "Other";
public Type LocalizerType { get; set; }
public IStringLocalizer StringLocalizer;
}

特性属性说明

  • WidgetType:节点的类型标识,用于在UI界面中识别节点
  • ImgUrl:节点的图标URL
  • Desc:节点的描述信息
  • Category:节点的分类,默认为"Other"
  • LocalizerType:本地化类型
  • StringLocalizer:字符串本地化器

示例:

[CategoryNode(Category = "Trigger", ImgUrl = $"_content/ThingsGateway.Gateway.Razor.Common/img/ValueChanged.svg", Desc = nameof(AlarmChangedTriggerNode), LocalizerType = typeof(ThingsGateway.Gateway.Application.INode), WidgetType = $"ThingsGateway.Gateway.Razor.VariableWidget,ThingsGateway.Gateway.Razor.Common")]

  • ImgUrl:Blazor资源路径
  • WidgetType:类型标识,(类型全名称,Dll名称)

4.2 自定义执行器

执行器用于执行具体的操作,是规则的最终输出部分。

4.2.1 核心接口

自定义执行器必须实现 IActuatorNode 接口:

namespace ThingsGateway.Gateway.Application;


public interface IActuatorNode
{
public TouchSocket.Core.ILog Logger { get; set; }

Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken);
}

接口说明

  • Logger:日志记录器,用于记录节点执行过程中的日志信息
  • ExecuteAsync:异步执行方法,接收输入数据并返回输出结果

#### 4.2.3 实现示例

```csharp
using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;

[CategoryNode(WidgetType = "MyCustomExecutor", Category = "执行器", Desc = "自定义执行器示例")]
public class MyCustomExecutor : BaseNode, IActuatorNode
{
public MyCustomExecutor(string id, Point? position = null) : base(id, position)
{
}

public async Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken)
{
// 记录日志
Logger.Info("自定义执行器开始执行");

try
{
// 获取输入数据
var inputData = input.Value;

// 处理业务逻辑
var result = ProcessData(inputData);

// 返回输出结果
return new NodeOutput { Value = result };
}
catch (Exception ex)
{
Logger.Error($"执行器执行失败: {ex.Message}");
throw;
}
}

private object ProcessData(object data)
{
// 自定义数据处理逻辑
return data;
}
}

4.3 自定义触发器

触发器用于启动规则的执行,当满足特定条件时,规则会开始运行。

4.3.1 核心接口

自定义触发器必须实现 ITriggerExpressions 接口:

namespace ThingsGateway.Gateway.Application;


public interface ITriggerNode : INode
{
public Task StartAsync(Func<NodeOutput, CancellationToken, Task> func, CancellationToken cancellationToken);
}

接口说明

  • Logger:日志记录器,用于记录触发器执行过程中的日志信息
  • TriggerAsync:异步触发方法,接收触发输入数据并返回是否触发成功

4.3.2 实现示例

using ThingsGateway.Blazor.Diagrams.Core.Geometry;
using ThingsGateway.Gateway.Application;
using ThingsGateway.RulesEngine;
using TouchSocket.Core;

[CategoryNode(Category = "Trigger", ImgUrl = $"_content/ThingsGateway.Gateway.Razor.Common/img/ValueChanged.svg", Desc = nameof(AlarmChangedTriggerNode), LocalizerType = typeof(ThingsGateway.Gateway.Application.INode), WidgetType = $"ThingsGateway.Gateway.Razor.VariableWidget,ThingsGateway.Gateway.Razor.Common")]
public class MyCustomTrigger : BaseNode, ITriggerExpressions
{
public MyCustomTrigger(string id, Point? position = null) : base(id, position)
{
}
private Func<NodeOutput, CancellationToken, Task> Func { get; set; }

public async Task StartAsync(Func<NodeOutput, CancellationToken, Task> func, CancellationToken cancellationToken)
{
// 记录日志
Func = func;

try
{
//按需执行Func

}
catch (Exception ex)
{
Logger.Warn($"触发器执行失败: {ex.Message}");
}
}


}

4.6 最佳实践

4.6.1 错误处理

  • 使用 try-catch 块捕获异常
  • 使用 Logger 记录详细的错误信息
  • 提供有意义的错误消息
try
{
// 业务逻辑
}
catch (Exception ex)
{
Logger.Error($"执行失败: {ex.Message}, ex");
throw new Exception("自定义节点执行失败", ex);
}

4.7.2 日志记录

  • 在关键节点记录日志
  • 使用适当的日志级别(Info, Warning, Error)
  • 记录输入输出数据以便调试
Logger.Info($"节点开始执行,输入数据: {input.Value}");
Logger.Warning($"数据量超过预期: {dataList.Count}");
Logger.Error($"处理失败: {ex.Message}");

4.7.3 性能优化

  • 避免在执行方法中执行耗时操作
  • 使用异步方法处理IO操作
  • 合理使用 CancellationToken
public async Task<NodeOutput> ExecuteAsync(NodeInput input, CancellationToken cancellationToken)
{
// 检查取消请求
cancellationToken.ThrowIfCancellationRequested();

// 异步处理
var result = await ProcessDataAsync(input.Value, cancellationToken);

return new NodeOutput { Value = result };
}

4.8 使用

4.8.2 在规则中使用

  1. 打开规则设计器
  2. 从节点面板中选择自定义规则节点
  3. 将节点拖拽到规则画布中
  4. 配置节点参数(如果有)
  5. 连接节点到规则流程中

4.9 常见问题

Q: 自定义节点执行失败如何调试?

A: 查看规则引擎日志,检查 Logger 记录的错误信息和堆栈跟踪。确保输入数据格式正确,业务逻辑没有异常。

Q: 如何在自定义节点中访问其他设备的数据?

A: 可以通过 GlobalData 访问设备运行时数据,参考MQTT发送示例中的代码。

Q: 自定义节点可以调用外部API吗?

A: 可以,使用 HttpClient 或其他网络库调用外部API,注意处理网络异常和超时。

Q: 如何确保自定义节点的线程安全?

A: 避免在节点中使用静态可变状态,对于共享资源使用适当的同步机制。

五、总结

ThingsGateway 规则引擎是一个功能强大、灵活多样的业务逻辑处理模块,通过可视化的方式配置规则,实现设备数据的智能处理和自动化操作。

核心优势

  • 可视化配置:通过拖拽方式创建规则,无需编程经验
  • 灵活多样:支持多种触发器和执行器,适应不同场景需求
  • 强大的脚本支持:通过C#脚本实现复杂的业务逻辑
  • 实时监控:详细的执行日志,方便调试和监控

应用前景

规则引擎可以广泛应用于各种工业物联网场景,如:

  • 智能工厂:实现设备的自动控制和故障处理
  • 能源管理:根据能耗数据自动调整设备运行状态
  • 智慧城市:实现城市设施的智能监控和管理
  • 环境监测:根据环境数据自动触发相应的处理措施

通过合理配置和使用规则引擎,可以大大提高系统的自动化水平和运行效率,减少人工干预,实现真正的智能化管理。