跳到主要内容

MqttClient

提示

通过自定义脚本,可快速适配业务模型,比如各大云平台的Iot物模型

一、说明

MqttClient通过Tcp/WebSocket的方式,发布内容到Broker(Server),可以定时/变化发布数据

通道只支持 Other

二、插件属性配置项

属性说明备注
IPServerIP
端口连接端口1883
详细日志Flase=>日志输出上传数量,True=>日志输出上传内容False
TLS启用SSL/TLSFalse
CA文件CA File
客户端证书Client Certificate File
客户端key文件Client Key File
Websocket是否WebSocket连接False
WebSocketUrlWebSocketUrlws://127.0.0.1:8083/mqtt
用户名账号
密码密码
连接Id连接Id
连接超时时间连接超时时间
允许Rpc写入是否允许写入变量
Rpc写入主题写入变量的主题如果检测适配固定的topic标识,会按默认规则返回,比如thingsboard平台为v1/gateway/rpc 。默认规则为:固定通配 RpcWrite/+ ,其中RpcWrite为该属性填入内容,+通配符是请求GUID值;返回结果主题会在主题后添加Response , 也就是RpcWrite/+/Response
Rpc请求数据主题该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中
选择全部变量选择全部变量False
设备状态列表上传设备是否列表上传,false时每个设备实体都会单独发布,注意性能需求,默认为true
变量列表上传变量是否列表上传,false时每个变量实体都会单独发布,注意性能需求,默认为true
报警列表上传报警是否列表上传,false时每个报警实体都会单独发布,注意性能需求,默认为true
设备Topic设备实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
变量Topic变量实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
报警Topic报警实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
设备实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为DeviceData,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
变量实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为VariableBasicData,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
报警实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为AlarmVariable,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
选择全部变量是否选择全部变量,true时不需要单个变量添加业务属性
上传模式间隔/变化/变化和间隔同时生效
定时上传间隔间隔执行时间
启用缓存是否启用缓存
缓存文件最大长度(mb)缓存文件最大长度
上传每页条数每一次上传的列表最大数量
内存队列最大数量内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值

三、脚本说明

3.1、脚本接口


public interface IDynamicModel
{
IEnumerable<dynamic> GetList(IEnumerable<object> datas);
}


3.2、设备脚本传入内容


/// <summary>
/// 设备业务变化数据
/// </summary>
public class DeviceBasicData : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }

/// <inheritdoc cref="Device.Name"/>
public string Name { get; set; }

/// <inheritdoc cref="DeviceRuntime.ActiveTime"/>
public DateTime ActiveTime { get; set; }

/// <inheritdoc cref="DeviceRuntime.DeviceStatus"/>
public DeviceStatusEnum DeviceStatus { get; set; }

/// <inheritdoc cref="DeviceRuntime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string LastErrorMessage { get; set; }

/// <inheritdoc cref="DeviceRuntime.PluginName"/>
public string PluginName { get; set; }

/// <inheritdoc cref="Device.Description"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }

/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }

/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }

/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }

/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }

/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }


}



3.3、变量脚本传入内容


/// <summary>
/// 变量业务变化数据
/// </summary>
public class VariableBasicData : IPrimaryIdEntity
{
/// <inheritdoc cref="PrimaryIdEntity.Id"/>
public long Id { get; set; }

/// <inheritdoc cref="Variable.Name"/>
public string Name { get; set; }

/// <inheritdoc cref="VariableRuntime.DeviceName"/>
public string DeviceName { get; set; }

/// <inheritdoc cref="VariableRuntime.Value"/>
public object Value { get; set; }
/// <inheritdoc cref="VariableRuntime.RawValue"/>
public object RawValue { get; set; }
/// <inheritdoc cref="VariableRuntime.LastSetValue"/>
public object LastSetValue { get; set; }

/// <inheritdoc cref="VariableRuntime.ChangeTime"/>
public DateTime ChangeTime { get; set; }

/// <inheritdoc cref="VariableRuntime.CollectTime"/>
public DateTime CollectTime { get; set; }

/// <inheritdoc cref="VariableRuntime.IsOnline"/>
public bool IsOnline { get; set; }

/// <inheritdoc cref="VariableRuntime.DeviceRuntime"/>
[System.Text.Json.Serialization.JsonIgnore]
[Newtonsoft.Json.JsonIgnore]
[AutoGenerateColumn(Ignore = true)]
public DeviceBasicData DeviceRuntime { get; set; }

/// <inheritdoc cref="VariableRuntime.LastErrorMessage"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? LastErrorMessage { get; set; }
/// <inheritdoc cref="Variable.RegisterAddress"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? RegisterAddress { get; set; }

/// <inheritdoc cref="Variable.Unit"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Unit { get; set; }

/// <inheritdoc cref="Variable.Description"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }

/// <inheritdoc cref="Variable.ProtectType"/>
public ProtectTypeEnum ProtectType { get; set; }

/// <inheritdoc cref="Variable.DataType"/>
public DataTypeEnum DataType { get; set; }

/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }

/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }

/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }

/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }

/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }
}


3.4、报警脚本传入内容


/// <summary>
/// 报警变量
/// </summary>
public class AlarmVariable : PrimaryIdEntity, IDBHistoryAlarm
{
/// <inheritdoc cref="Variable.Name"/>
[SugarColumn(ColumnDescription = "变量名称", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string Name { get; set; }

/// <inheritdoc cref="Variable.Description"/>
[SugarColumn(ColumnDescription = "描述", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string? Description { get; set; }

/// <inheritdoc cref="IBaseDataEntity.CreateOrgId"/>
[SugarColumn(ColumnDescription = "组织Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long CreateOrgId { get; set; }

/// <inheritdoc cref="IBaseEntity.CreateUserId"/>
[SugarColumn(ColumnDescription = "创建用户Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long CreateUserId { get; set; }

/// <inheritdoc cref="Variable.DeviceId"/>
[SugarColumn(ColumnDescription = "设备Id", IsNullable = true)]
[AutoGenerateColumn(Visible = false, Filterable = true, Sortable = true)]
public long DeviceId { get; set; }

/// <inheritdoc cref="VariableRuntime.DeviceName"/>
[SugarColumn(ColumnDescription = "设备名称", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string DeviceName { get; set; }

/// <inheritdoc cref="Variable.RegisterAddress"/>
[SugarColumn(ColumnDescription = "变量地址")]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string RegisterAddress { get; set; }

/// <inheritdoc cref="Variable.DataType"/>
[SugarColumn(ColumnDescription = "数据类型")]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public DataTypeEnum DataType { get; set; }

/// <inheritdoc cref="VariableRuntime.AlarmCode"/>
[SugarColumn(ColumnDescription = "报警值", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string AlarmCode { get; set; }

/// <inheritdoc cref="VariableRuntime.AlarmLimit"/>
[SugarColumn(ColumnDescription = "报警限值", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string AlarmLimit { get; set; }

/// <inheritdoc cref="VariableRuntime.AlarmText"/>
[SugarColumn(ColumnDescription = "报警文本", IsNullable = true)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public string? AlarmText { get; set; }

/// <inheritdoc cref="VariableRuntime.AlarmTime"/>
[SugarColumn(ColumnDescription = "报警时间", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public DateTime AlarmTime { get; set; }

/// <inheritdoc cref="VariableRuntime.EventTime"/>
[SugarColumn(ColumnDescription = "事件时间", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
[TimeDbSplitField(DateType.Month)]
public DateTime EventTime { get; set; }

/// <summary>
/// 报警类型
/// </summary>
[SugarColumn(ColumnDescription = "报警类型", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public AlarmTypeEnum? AlarmType { get; set; }

/// <summary>
/// 事件类型
/// </summary>
[SugarColumn(ColumnDescription = "事件类型", IsNullable = false)]
[AutoGenerateColumn(Visible = true, Filterable = true, Sortable = true)]
public EventTypeEnum EventType { get; set; }

/// <inheritdoc cref="Device.Remark1"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark1 { get; set; }

/// <inheritdoc cref="Device.Remark2"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark2 { get; set; }

/// <inheritdoc cref="Device.Remark3"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark3 { get; set; }

/// <inheritdoc cref="Device.Remark4"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark4 { get; set; }

/// <inheritdoc cref="Device.Remark5"/>
[JsonProperty(NullValueHandling = NullValueHandling.Ignore)]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingNull)]
public string Remark5 { get; set; }
}


三、变量业务属性

属性说明备注
启用RPC单独配置变量是否允许写入true
Enable是否启用true

四、Rpc

提示

如果检测适配固定的topic标识,会按默认规则返回,比如thingsboard平台的rpc主题为v1/gateway/rpc

下面说明为ThingsGateway默认规则

1、请求内容

MqttRpc的请求内容与WebApi一致,请求参数为Dictionary<string,Dictionary<string, string>>,比如

{
"modbusDevice650922399363167":
{
"modbus41":"1",
"modbus42":"2"

}

}

键为设备名称,值为变量字典(包含变量名称和写入值)

2、请求主题

请求主题在配置属性中设置,查看Rpc写入主题

注意,实际的请求主题为 {[Rpc写入Topic]/[自定义GUID或者雪花ID]}

3、Rpc返回主题

{Rpc写入Topic/[自定义GUID或者雪花ID]/Response}

4、Rpc返回内容


public int? OperCode { get; set; }

public bool IsSuccess => OperCode == null || OperCode == 0;

public string? ErrorMessage { get; set; }

public ErrorTypeEnum? ErrorType { get; set; }

五、脚本收集

5.1、适配 ThingsBoard 脚本

对应文档:ThingsBoard

设备属性脚本

using TouchSocket.Core;
public class ThingsBoardDeviceScript : IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
Dictionary<string, Dictionary<string, object>>
dict = new Dictionary<string, Dictionary<string, object>>
();
foreach (var v in datas)
{
var data = (DeviceBasicData)v;
Dictionary<string, object> demoData = new Dictionary<string, object>
{
{ nameof(Device.Description), data.Description },
{ nameof(DeviceBasicData.ActiveTime), data.ActiveTime },
{ nameof(DeviceBasicData.DeviceStatus), data.DeviceStatus.ToString() },
{ nameof(DeviceBasicData.LastErrorMessage), data.LastErrorMessage },
{ nameof(DeviceBasicData.PluginName), data.PluginName },
{ nameof(DeviceBasicData.Remark1), data.Remark1 },
{ nameof(DeviceBasicData.Remark2), data.Remark2 },
{ nameof(DeviceBasicData.Remark3), data.Remark3 },
{ nameof(DeviceBasicData.Remark4), data.Remark4 },
{ nameof(DeviceBasicData.Remark5), data.Remark5 }
};

dict.AddOrUpdate(data.Name, demoData);
}
return new List<dynamic>() { dict };
}
}


变量脚本


using ThingsGateway.Foundation;

using TouchSocket.Core;

public class ThingsBoardVariableScript : IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
var dict = new Dictionary<string, List<ThingsBoardValue>>();
// 对输入列表进行分组,根据 DeviceName 属性分组
var groups = datas.Where(a => !string.IsNullOrEmpty(((VariableBasicData)a).DeviceName)).GroupBy(a => ((VariableBasicData)a).DeviceName, a => ((VariableBasicData)a));
// 遍历每一个分组
foreach (var group in groups)
{
var data = group.GroupBy(a => a.CollectTime);
List<ThingsBoardValue> thingsBoardValues = new();
foreach (var item in data)
{
ThingsBoardValue thingsBoardValue = new();
thingsBoardValue.ts = item.Key.DateTimeToUnixTimestamp();
foreach (var tag in item)
{
thingsBoardValue.values.AddOrUpdate(tag.Name, tag.Value);
}
thingsBoardValues.Add(thingsBoardValue);
}

dict.AddOrUpdate(group.Key, thingsBoardValues);

}
return new List<dynamic>() { dict };
}
}

public class ThingsBoardValue
{
public long ts { get; set; }
public Dictionary<string, object> values { get; set; } = new();
}


传输主题

变量Topic 填入 v1/gateway/telemetry

设备Topic 填入 v1/gateway/attributes

RpcTopic 填入 v1/gateway/rpc

变量列表上传 填入 false

设备状态列表上传 填入 false

5.2、适配阿里云物模型脚本

对应文档:阿里云设备属性上报

变量脚本


//一个适配阿里云IOT的变量脚本demo


public class AliYunIotScript:IDynamicModel
{
public IEnumerable<dynamic> GetList(IEnumerable<object> datas)
{
List<AliYunIot> aliYunIots = new();
// 对输入列表进行分组,根据 Remark1属性分组
var groups = datas.Where(a => !string.IsNullOrEmpty(((VariableBasicData)a).Remark1)).GroupBy(a => ((VariableBasicData)a).Remark1,a=> ((VariableBasicData)a));
// 遍历每一个分组
foreach (var item in groups)
{
var requestId = Yitter.IdGenerator.YitIdHelper.NextId();
var iotId = item.Key; //Remark1自定义为设备Id
var productKey = item.FirstOrDefault(a => !string.IsNullOrEmpty(a.Remark2))?.Remark2;//Remark2自定义为产品Id
var deviceName = item.FirstOrDefault(a => !string.IsNullOrEmpty(a.Remark3))?.Remark3;//Remark3自定义为设备名称
AliYunIot aliYunIot = new();
aliYunIot.iotId = iotId;
aliYunIot.requestId = requestId.ToString();
aliYunIot.deviceName = deviceName;
aliYunIot.productKey = productKey;
// 遍历分组内的每一个元素
foreach (var varItem in item)
{

// 设置采集时间,并转换为 Unix 时间戳
// 设置值
var data = new Property() { value = varItem.Value, time = new DateTimeOffset(varItem.CollectTime).ToUnixTimeSeconds() };
// 以 变量名称 作为键,将新对象添加到分组的 属性 中
aliYunIot.items.Add(varItem.Name, data);

}
aliYunIots.Add(aliYunIot);
}
return aliYunIots;
}


}
public class AliYunIot
{
public string iotId { get; set; }
public string requestId { get; set; }
public string productKey { get; set; }
public string deviceName { get; set; }
public Dictionary<string, Property> items { get; set; } = new();
}

public class Property
{
public object value { get; set; }
public long time { get; set; }
}



传输主题

变量Topic 填入 /${productKey}/${deviceName}/thing/event/property/post