跳到主要内容

MqttServer

提示

通过自定义脚本,可快速适配业务模型

一、说明

MqttServer支持Tcp/webSocket方式接入,可以定时/变化发布数据,支持Rpc写入

通道只支持 Other

二、插件属性配置项

属性说明备注
IPServerIP,为空时指任意IP
端口Tcp绑定端口1883
详细日志Flase=>日志输出上传数量,True=>日志输出上传内容False
WebSocket端口WebSocket绑定端口8083,固定/mqtt路由
允许连接的ID(前缀)允许连接的ID(前缀)ClientId必须以这个属性开头,为空时可以是任意Id
允许匿名登录允许匿名登录设为Flase时,客户端需要填入网关的用户名和密码
允许Rpc写入是否允许写入变量
Rpc写入主题写入变量的主题如果检测适配固定的topic标识,会按默认规则返回,比如thingsboard平台为v1/gateway/rpc 。默认规则为:固定通配 RpcWrite/+ ,其中RpcWrite为该属性填入内容,+通配符是请求GUID值;返回结果主题会在主题后添加Response , 也就是RpcWrite/+/Response
Rpc请求数据主题该主题接受到任何消息都会发布全部信息到对应的变量/设备/报警主题中
分组上传启用后,无论是定时还是变化模式,始终会上传变量分组属性为key分组的全部变量 。在变化模式时,每次变量变化都会触发一次组上传False
选择全部变量选择全部变量False
设备状态列表上传设备是否列表上传,false时每个设备实体都会单独发布,注意性能需求,默认为true
变量列表上传变量是否列表上传,false时每个变量实体都会单独发布,注意性能需求,默认为true
报警列表上传报警是否列表上传,false时每个报警实体都会单独发布,注意性能需求,默认为true
设备Topic设备实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
变量Topic变量实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
报警Topic报警实体的发布主题 ,使用${key}作为匹配项,key必须是上传实体中的属性
设备实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为DeviceData,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
变量实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为VariableBasicData,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
报警实体脚本脚本返回新的实体列表,动态类中需继承IDynamicModel,传入列表为AlarmVariable,查看以下具体属性编辑页面中,可通过检查按钮验证脚本
选择全部变量是否选择全部变量,true时不需要单个变量添加业务属性
上传模式间隔/变化/变化和间隔同时生效
定时上传间隔间隔执行时间
启用缓存是否启用缓存
缓存文件最大长度(mb)缓存文件最大长度
上传每页条数每一次上传的列表最大数量
内存队列最大数量内存队列的最大数量,超出或失败时转入文件缓存,根据数据量设定适当值

三、脚本与实体

查看MqttClient页面脚本接口

四、变量业务属性

属性说明备注
启用RPC单独配置变量是否允许写入true
Enable是否启用true

五、Rpc

1、Rpc脚本

脚本类定义

using MQTTnet;
using MQTTnet.Server;

using Newtonsoft.Json.Linq;

using System.Text;

using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;

namespace ThingsGateway.Plugin.Mqtt;

public abstract class DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public virtual async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{


}
}


RPC脚本demo







using MQTTnet;
using MQTTnet.Server;

using Newtonsoft.Json.Linq;

using System.Text;

using ThingsGateway.Foundation;
using ThingsGateway.NewLife.Extension;
using ThingsGateway.NewLife.Json.Extension;

using ThingsGateway.Plugin.Mqtt;

public class DynamicMqttServerRpc:DynamicMqttServerRpcBase
{
/// <summary>
///触发rpc脚本调用
/// </summary>
/// <param name="logMessage">日志对象</param>
/// <param name="args">InterceptingPublishEventArgs</param>
/// <param name="driverPropertys">插件属性</param>
/// <param name="mqttServer">mqttServer</param>
/// <param name="getRpcResult">传入clientId和rpc数据(设备,变量名称+值字典),返回rpc结果</param>
/// <param name="cancellationToken">cancellationToken</param>
/// <returns></returns>
public override async Task RPCInvokeAsync(TouchSocket.Core.ILog logMessage, InterceptingPublishEventArgs args, MqttServerProperty driverPropertys, MQTTnet.Server.MqttServer mqttServer, Func<string, Dictionary<string, Dictionary<string, JToken>>, ValueTask<Dictionary<string, Dictionary<string, IOperResult>>>> getRpcResult, CancellationToken cancellationToken)
{

if (driverPropertys.RpcWriteTopic.IsNullOrWhiteSpace()) return;

if(args.ApplicationMessage.Topic != driverPropertys.RpcWriteTopic) return;
var rpcDatas = Encoding.UTF8.GetString(args.ApplicationMessage.Payload).FromJsonNetString<Dictionary<string, Dictionary<string, JToken>>>();
if (rpcDatas == null)
return;
var mqttRpcResult = await getRpcResult(args.ClientId, rpcDatas).ConfigureAwait(false);

try
{
var variableMessage = new MqttApplicationMessageBuilder()
.WithTopic($"{args.ApplicationMessage.Topic}/Response")
.WithPayload(mqttRpcResult.ToSystemTextJsonString(driverPropertys.JsonFormattingIndented)).Build();
await mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(variableMessage), cancellationToken).ConfigureAwait(false);
}
catch
{
}
}
}




2、默认格式与MqttClient相同

查看MqttClient页面Rpc