MqttClient
提示
通过自定义脚本,可快速适配业务模型,比如各大云平台的Iot物模型
一、说明
MqttClient通过Tcp/WebSocket的方式,发布内容到Broker(Server),可以定时/变化发布数据
通道只支持 Other
二、插件属性配置项

属性 | 说明 | 备注 |
---|---|---|
IP | ServerIP | |
端口 | 连接端口 | 1883 |
详细日志 | Flase=>日志输出上传数量,True=>日志输出上传内容 | False |
TLS | 启用SSL/TLS | False |
CA文件 | CA File | |
客户端证书 | Client Certificate File | |
客户端key文件 | Client Key File | |
Websocket | 是否WebSocket连接 | False |
WebSocketUrl | WebSocketUrl | ws://127.0.0.1:8083/mqtt |
用户名 | 账号 | |
密码 | 密码 | |
连接Id | 连接Id | |
连接超时时间 | 连接超时时间 | |
允许Rpc写入 | 是否允许写入变量 | |
Rpc写入主题 | 写入变量的主题 | 如果检测 适配固定的topic标识,会按默认规则返回,比如thingsboard 平台为v1/gateway/rpc 。默认规则为:固定通配 RpcWrite/+ ,其中RpcWrite为该属性填入内容,+通配符是请求GUID值;返回结果主题会在主题后添加Response , 也就是RpcWrite/+/Response |
Rpc请求数据主题 | 该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中 | |
选择全部变量 | 选择全部变量 | False |
设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布,注意性能需求,默认为true | |
变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布,注意性能需求,默认为true | |
报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布,注意性能需求,默认为true | |
设备Topic | 设备实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
变量Topic | 变量实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
报警Topic | 报警实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
设备实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为DeviceData,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
变量实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为VariableBasicData,查看以下具体属性 | 编辑页面中,可通过检查按 钮验证脚本 |
报警实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为AlarmVariable,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | |
上传模式 | 间隔/变化/变化和间隔同时生效 | |
定时上传间隔 | 间隔执行时间 | |
启用缓存 | 是否启用缓存 | |
缓存文件最大长度(mb) | 缓存文件最大长度 | |
上传每页条数 | 每一次上传的列表最大数量 | |
内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值 |
三、脚本说明
3.1、脚本接口
public interface IDynamicModel
{
IEnumerable<dynamic> GetList(IEnumerable<object> datas);
}
3.2、设备脚本传入内容
/// <summary>
/// 设备业务变化数据
/// </summary>
public class DeviceBasicData : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }
/// <inheritdoc cref="Device.Name"/>
public string Name { get; set; }
/// <inheritdoc cref="DeviceRuntime.ActiveTime"/>
public DateTime ActiveTime { get; set; }
/// <inheritdoc cref="DeviceRuntime.DeviceStatus"/>
public DeviceStatusEnum DeviceStatus { get; set; }
/// <inheritdoc cref="DeviceRuntime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string LastErrorMessage { get; set; }
/// <inheritdoc cref="DeviceRuntime.PluginName"/>
public string PluginName { get; set; }
/// <inheritdoc cref="Device.Description"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }
/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }
/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }
/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }
/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }
/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }
}
3.3、变量脚本传入内容
/// <summary>
/// 变量业务变化数据
/// </summary>
public class VariableBasicData : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }
/// <inheritdoc cref="Variable.Name"/>
public string Name { get; set; }
/// <inheritdoc cref="VariableRuntime.DeviceName"/>
public string DeviceName { get; set; }
/// <inheritdoc cref="VariableRuntime.Value"/>
public object Value { get; set; }
/// <inheritdoc cref="VariableRuntime.RawValue"/>
public object RawValue { get; set; }
/// <inheritdoc cref="VariableRuntime.LastSetValue"/>
public object LastSetValue { get; set; }
/// <inheritdoc cref="VariableRuntime.ChangeTime"/>
public DateTime ChangeTime { get; set; }
/// <inheritdoc cref="VariableRuntime.CollectTime"/>
public DateTime CollectTime { get; set; }
/// <inheritdoc cref="VariableRuntime.IsOnline"/>
public bool IsOnline { get; set; }
/// <inheritdoc cref="VariableRuntime.DeviceRuntime"/>
[System.Text.Json.Serialization.JsonIgnore]
[Newtonsoft.Json.JsonIgnore]
[AutoGenerateColumn(Ignore = true)]
public DeviceBasicData DeviceRuntime { get; set; }
/// <inheritdoc cref="VariableRuntime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? LastErrorMessage { get; set; }
/// <inheritdoc cref="Variable.RegisterAddress"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? RegisterAddress { get; set; }
/// <inheritdoc cref="Variable.Unit"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Unit { get; set; }
/// <inheritdoc cref="Variable.Description"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }
/// <inheritdoc cref="Variable.ProtectType"/>
public ProtectTypeEnum ProtectType { get; set; }
/// <inheritdoc cref="Variable.DataType"/>
public DataTypeEnum DataType { get; set; }
/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }
/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }
/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }
/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }
/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }
}
3.4、报警脚本传入内容
/// <summary>
/// 报警变量
/// </summary>
public class AlarmVariable : PrimaryIdEntity, IDBHistoryAlarm
{
/// <inheritdoc cref="Variable.Name"/>
[SugarColumn(ColumnDescription = "变量名称", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string Name { get; set; }
/// <inheritdoc cref="Variable.Description"/>
[SugarColumn(ColumnDescription = "描述", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }
/// <inheritdoc cref="IBaseDataEntity.CreateOrgId"/>
[SugarColumn(ColumnDescription = "组织Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long CreateOrgId { get; set; }
/// <inheritdoc cref="IBaseEntity.CreateUserId"/>
[SugarColumn(ColumnDescription = "创建用户Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long CreateUserId { get; set; }
/// <inheritdoc cref="Variable.DeviceId"/>
[SugarColumn(ColumnDescription = "设备Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long DeviceId { get; set; }
/// <inheritdoc cref="VariableRuntime.DeviceName"/>
[SugarColumn(ColumnDescription = "设备名称", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string DeviceName { get; set; }
/// <inheritdoc cref="Variable.RegisterAddress"/>
[SugarColumn(ColumnDescription = "变量地址")]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string RegisterAddress { get; set; }
/// <inheritdoc cref="Variable.DataType"/>
[SugarColumn(ColumnDescription = "数据类型")]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public DataTypeEnum DataType { get; set; }
/// <inheritdoc cref="VariableRuntime.AlarmCode"/>
[SugarColumn(ColumnDescription = "报警值", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string AlarmCode { get; set; }
/// <inheritdoc cref="VariableRuntime.AlarmLimit"/>
[SugarColumn(ColumnDescription = "报警限值", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string AlarmLimit { get; set; }
/// <inheritdoc cref="VariableRuntime.AlarmText"/>
[SugarColumn(ColumnDescription = "报警文本", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string? AlarmText { get; set; }
/// <inheritdoc cref="VariableRuntime.AlarmTime"/>
[SugarColumn(ColumnDescription = "报警时间", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public DateTime AlarmTime { get; set; }
/// <inheritdoc cref="VariableRuntime.EventTime"/>
[SugarColumn(ColumnDescription = "事件时间", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[TimeDbSplitField(DateType.Month)]
public DateTime EventTime { get; set; }
/// <summary>
/// 报警类型
/// </summary>
[SugarColumn(ColumnDescription = "报警类型", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public AlarmTypeEnum? AlarmType { get; set; }
/// <summary>
/// 事件类型
/// </summary>
[SugarColumn(ColumnDescription = "事件类型", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public EventTypeEnum EventType { get; set; }
/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }
/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }
/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }
/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }
/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }
}
三、变量业务属性

属性 | 说明 | 备注 |
---|---|---|
启用RPC | 单独配置变量是否允许写入 | true |
Enable | 是否启用 | true |
四、Rpc
提示
如果检测适配固定的topic标识,会按默认规则返回,比如thingsboard
平台的rpc主题为v1/gateway/rpc
下面说明为ThingsGateway默认规则

1、请求内容
MqttRpc的请求内容与WebApi一致,请求参数为Dictionary<string,Dictionary<string, string>>
,比如
{
"modbusDevice650922399363167":
{
"modbus41":"1",
"modbus42":"2"
}
}
键为设备名称,值为变量字典(包含变量名称和写入值)
2、请求主题
请求主题在配置属性中设置,查看Rpc写入主题
注意,实际的请求主题为 {[Rpc写入Topic]/[自定义GUID或者雪花ID]}
3、Rpc返回主题
{Rpc写入Topic/[自定义GUID或者雪花ID]/Response}
4、Rpc返回内容
public int? OperCode { get; set; }
public bool IsSuccess => OperCode == null || OperCode == 0;
public string? ErrorMessage { get; set; }
public ErrorTypeEnum? ErrorType { get; set; }
五、脚本收集
5.1、适配 ThingsBoard 脚本
对应文档:ThingsBoard
设备属性脚本
using TouchSocket.Core;
public class ThingsBoardDeviceScript : IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
Dictionary<string, Dictionary<string, object>>
dict = new Dictionary<string, Dictionary<string, object>>
();
foreach (var v in datas)
{
var data = (DeviceBasicData)v;
Dictionary<string, object> demoData = new Dictionary<string, object>
{
{ nameof(Device.Description), data.Description },
{ nameof(DeviceBasicData.ActiveTime), data.ActiveTime },
{ nameof(DeviceBasicData.DeviceStatus), data.DeviceStatus.ToString() },
{ nameof(DeviceBasicData.LastErrorMessage), data.LastErrorMessage },
{ nameof(DeviceBasicData.PluginName), data.PluginName },
{ nameof(DeviceBasicData.Remark1), data.Remark1 },
{ nameof(DeviceBasicData.Remark2), data.Remark2 },
{ nameof(DeviceBasicData.Remark3), data.Remark3 },
{ nameof(DeviceBasicData.Remark4), data.Remark4 },
{ nameof(DeviceBasicData.Remark5), data.Remark5 }
};
dict.AddOrUpdate(data.Name, demoData);
}
return new List<dynamic>() { dict };
}
}
变量脚本
using ThingsGateway.Foundation;
using TouchSocket.Core;
public class ThingsBoardVariableScript : IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
var dict = new Dictionary<string, List<ThingsBoardValue>>();
// 对输入列表进行分组,根据 DeviceName 属性分组
var groups = datas.Where(a => !string.IsNullOrEmpty(((VariableBasicData)a).DeviceName)).GroupBy(a => ((VariableBasicData)a).DeviceName, a => ((VariableBasicData)a));
// 遍历每一个分组
foreach (var group in groups)
{
var data = group.GroupBy(a => a.CollectTime);
List<ThingsBoardValue> thingsBoardValues = new();
foreach (var item in data)
{
ThingsBoardValue thingsBoardValue = new();
thingsBoardValue.ts = item.Key.DateTimeToUnixTimestamp();
foreach (var tag in item)
{
thingsBoardValue.values.AddOrUpdate(tag.Name, tag.Value);
}
thingsBoardValues.Add(thingsBoardValue);
}
dict.AddOrUpdate(group.Key, thingsBoardValues);
}
return new List<dynamic>() { dict };
}
}
public class ThingsBoardValue
{
public long ts { get; set; }
public Dictionary<string, object> values { get; set; } = new();
}
传输主题
变量Topic 填入 v1/gateway/telemetry
设备Topic 填入 v1/gateway/attributes
RpcTopic 填入 v1/gateway/rpc
变量列表上传 填入 false
设备状态列表上传 填入 false
5.2、适配阿里云物模型脚本
对应文档:阿里云设备属性上报
变量脚本
//一个适配阿里云IOT的变量脚本demo
public class AliYunIotScript:IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
List<AliYunIot> aliYunIots = new();
// 对输入列表进行分组,根据 Remark1属性分组
var groups = datas.Where(a => !string.IsNullOrEmpty(((VariableBasicData)a).Remark1)).GroupBy(a => ((VariableBasicData)a).Remark1,a=> ((VariableBasicData)a));
// 遍历每一个分组
foreach (var item in groups)
{
var requestId = Yitter.IdGenerator.YitIdHelper.NextId();
var iotId = item.Key; //Remark1自定义为设备Id
var productKey = item.FirstOrDefault(a => !string.IsNullOrEmpty(a.Remark2))?.Remark2;//Remark2自定义为产品Id
var deviceName = item.FirstOrDefault(a => !string.IsNullOrEmpty(a.Remark3))?.Remark3;//Remark3自定义为设备名称
AliYunIot aliYunIot = new();
aliYunIot.iotId = iotId;
aliYunIot.requestId = requestId.ToString();
aliYunIot.deviceName = deviceName;
aliYunIot.productKey = productKey;
// 遍历分组内的每一个元素
foreach (var varItem in item)
{
// 设置采集时间,并转换为 Unix 时间戳
// 设置值
var data = new Property() { value = varItem.Value, time = new DateTimeOffset(varItem.CollectTime).ToUnixTimeSeconds() };
// 以 变量名称 作为键,将新对象添加到分组的 属性 中
aliYunIot.items.Add(varItem.Name, data);
}
aliYunIots.Add(aliYunIot);
}
return aliYunIots;
}
}
public class AliYunIot
{
public string iotId { get; set; }
public string requestId { get; set; }
public string productKey { get; set; }
public string deviceName { get; set; }
public Dictionary<string, Property> items { get; set; } = new();
}
public class Property
{
public object value { get; set; }
public long time { get; set; }
}
传输主题
变量Topic 填入 /${productKey}/${deviceName}/thing/event/property/post