MqttServer
提示
通过自定义脚本,可快速适配业务模型
一、说明
MqttServer支持Tcp/webSocket方式接入,可以定时/变化发布数据,支持Rpc写入
通道只支持 Other
二、插件属性配置项

属性 | 说明 | 备注 |
---|---|---|
IP | ServerIP,为空时指任意IP | |
端口 | Tcp绑定端口 | 1883 |
详细日志 | Flase=>日志输出上传数量,True=>日志输出上传内容 | False |
WebSocket端口 | WebSocket绑定端口 | 8083,固定/mqtt路由 |
允许连接的ID(前缀) | 允许连接的ID(前缀) | ClientId必须以这个属性开头,为空时可以是任意Id |
允许匿名登录 | 允许匿名登录 | 设为Flase时,客户端需要填入网关的用户名和密码 |
允许Rpc写入 | 是否允许写入变量 | |
Rpc写入主题 | 写入变量的主 题 | 如果检测适配固定的topic标识,会按默认规则返回,比如thingsboard 平台为v1/gateway/rpc 。默认规则为:固定通配 RpcWrite/+ ,其中RpcWrite为该属性填入内容,+通配符是请求GUID值;返回结果主题会在主题后添加Response , 也就是RpcWrite/+/Response |
Rpc请求数据主题 | 该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中 | |
分组上传 | 启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 。在变化模式时,每次变量变化都会触发一次组上传 | False |
选择全部变量 | 选择全部变量 | False |
设备状态列表上传 | 设备是否列表上传,false时每个设备实体都会单独发布,注意性能需求,默认为true | |
变量列表上传 | 变量是否列表上传,false时每个变量实体都会单独发布,注意性能需求,默认为true | |
报警列表上传 | 报警是否列表上传,false时每个报警实体都会单独发布,注意性能需求,默认为true | |
设备Topic | 设备实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
变量Topic | 变量实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
报警Topic | 报警实体的发布主题 ,使用${key} 作为匹配项,key必须是上传实体中的属性 | |
设备实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为DeviceData,查看以下具体属性 | 编辑页面中,可通过检 查按钮验证脚本 |
变量实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为VariableBasicData,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
报警实体脚本 | 脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为AlarmVariable,查看以下具体属性 | 编辑页面中,可通过检查按钮验证脚本 |
选择全部变量 | 是否选择全部变量,true时不需要单个变量添加业务属性 | |
上传模式 | 间隔/变化/变化和间隔同时生效 | |
定时上传间隔 | 间隔执行时间 | |
启用缓存 | 是否启用缓存 | |
缓存文件最大长度(mb) | 缓存文件最大长度 | |
上传每页条数 | 每一次上传的列表最大数量 | |
内存队列最大数量 | 内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值 |
三、脚本与实体
查看MqttClient页面脚本接口
四、变量业务属性

属性 | 说明 | 备注 |
---|---|---|
启用RPC | 单独配置变量是否允许写入 | true |
Enable | 是否启用 | true |
五、Rpc
1、Rpc脚本
脚本类定义
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
namespace ThingsGateway.Plugin.Mqtt;
public abstract class DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{
}
}
RPC脚本demo
using MQTTnet;
using MQTTnet.Server;
using Newtonsoft.Json.Linq;
using System.Text;
using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;
using ThingsGateway.Plugin.Mqtt;
public class DynamicMqttServerRpc:DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public override async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{
if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;
if(args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);
try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(variableMessage), cancellationToken).ConfigureAwait(false);
}
catch
{
}
}
}
2、默认格式与MqttClient相同
查看MqttClient页面Rpc