一 前言
MQTT的相关理论内容这里不做过多介绍,请看下面两篇文章:
Introduction · MQTT协议中文版
MQTT协议-CSDN博客
这篇文章只做代码实现,文章中使用MQTTnet作为MQTT开发的组件。
MQTT分为服务端和客户端,一个服务端对应多个客户端。其中服务端相当于是一台服务器,它是MQTT消息传输的枢纽,负责将MQTT客户端发送来的消息传递给另一个客户端;MQTT服务端还负责管理客户端,确保客户端之间的通讯顺畅,保证MQTT消息得以正确接收和准确投递。
MQTT客户端可以向服务端发布信息,也可以从服务端接受信息,我们把客户端向服务端发送消息的行为称为“发布”消息,客户端也可以“订阅”消息。
二 服务端
服务端可以不用自己开发,有几个常用的第三方服务端,比如EMQ,EMQ怎么使用的,可以查看官网:物联网实时消息引擎 | EMQ
这里不介绍第三方服务,这里具体介绍如何自己动手开发服务端。
1、添加MQTTnet引用
新建一个控制台应用程序,打开NuGet程序包,添加MQTTnet,版本选择3.0.13,选择版本这里要注意一下,不同的版本实现方式不同,下面的实现代码中,如果选择高版本,可能会有异常。
2、代码实现
不啰嗦,直接上代码
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System.Text;
namespace ConsoleApp2
{
internal class Program
{
static void Main(string[] args)
{
MqttServerClass serverClass = new MqttServerClass();
serverClass.StartMqttServer().Wait();
Console.ReadLine();
}
}
public static class Config
{
public static int Port { get; set; } = 1883;
public static string UserName { get; set; } = "Username";
public static string Password { get; set; } = "Password";
}
public class UserInstance
{
public string ClientId { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
public class MqttServerClass
{
private IMqttServer mqttServer;
private List<MqttApplicationMessage> messages = new List<MqttApplicationMessage>();
public async Task StartMqttServer()
{
try
{
if (mqttServer == null)
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointPort(Config.Port)
//连接拦截器
.WithConnectionValidator(
c =>
{
//var flag = c.Username == Config.UserName && c.Password == Config.Password;
//if (!flag)
//{
// c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
// return;
//}
//设置代码为 Success
c.ReasonCode = MqttConnectReasonCode.Success;
//instances.Add(new UserInstance() //缓存到内存的List集合当中
//{
// ClientId = c.ClientId,
// UserName = c.Username,
// Password = c.Password
//});
})
//订阅拦截器
.WithSubscriptionInterceptor(
c =>
{
if (c == null) return;
c.AcceptSubscription = true;
})
//应用程序消息拦截器
.WithApplicationMessageInterceptor(
c =>
{
if (c == null) return;
c.AcceptPublish = true;
})
//clean session是否生效
.WithPersistentSessions();
mqttServer = new MqttFactory().CreateMqttServer();
//客户端断开连接拦截器
//mqttServer.UseClientDisconnectedHandler(c =>
//{
// //var user = instances.FirstOrDefault(t => t.ClientId == c.ClientId);
// //if (user != null)
// //{
// // instances.Remove(user);
// //}
//});
//服务开始
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(OnMqttServerStarted);
//服务停止
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(OnMqttServerStopped);
//客户端连接
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(OnMqttServerClientConnected);
//客户端断开连接(此事件会覆盖拦截器)
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(OnMqttServerClientDisconnected);
//客户端订阅
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(OnMqttServerClientSubscribedTopic);
//客户端取消订阅
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(OnMqttServerClientUnsubscribedTopic);
//服务端收到消息
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnMqttServerApplicationMessageReceived);
await mqttServer.StartAsync(optionsBuilder.Build());
//主动发送消息到客户端
//await mqttServer.PublishAsync(new
// MqttApplicationMessage
//{
// Topic = "testtopic",
// Payload = Encoding.UTF8.GetBytes("dsdsd")
//});
//mqttServer.GetClientStatusAsync();
//mqttServer.GetRetainedApplicationMessagesAsync();
//mqttServer.GetSessionStatusAsync();
}
}
catch (Exception ex)
{
Console.WriteLine($"MQTT Server start fail.>{ex.Message}");
}
}
private void OnMqttServerStarted(EventArgs e)
{
if (mqttServer.IsStarted)
{
Console.WriteLine("MQTT服务启动完成!");
}
}
private void OnMqttServerStopped(EventArgs e)
{
if (!mqttServer.IsStarted)
{
Console.WriteLine("MQTT服务停止完成!");
}
}
private void OnMqttServerClientConnected(MqttServerClientConnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.ClientId}]已连接");
}
private void OnMqttServerClientDisconnected(MqttServerClientDisconnectedEventArgs e)
{
Console.WriteLine($"客户端[{e.ClientId}]已断开连接!");
}
private void OnMqttServerClientSubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
{
Console.WriteLine($"客户端[{e.ClientId}]已成功订阅主题[{e.TopicFilter}]!");
}
private void OnMqttServerClientUnsubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
{
Console.WriteLine($"客户端[{e.ClientId}]已成功取消订阅主题[{e.TopicFilter}]!");
}
private void OnMqttServerApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
messages.Add(e.ApplicationMessage);
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
Console.WriteLine($"客户端[{e.ClientId}]>> Topic[{e.ApplicationMessage.Topic}] Payload[{Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? new byte[] { })}] Qos[{e.ApplicationMessage.QualityOfServiceLevel}] Retain[{e.ApplicationMessage.Retain}]");
}
}
}
三 客户端
客户端可以做成一个独立的项目,如果有什么地方需要调用比如发送消息的方法,可以直接引用MQTT服务直接进行调用,当然这是其中一个思路,我目前把MQTT服务放在了Application项目中,具体的实现思路就不细聊了,看官方文档吧,我这里直接上代码,主打一个拿来就用。
注意:
1、先启动mqtt服务端,在启动客户端,同时客户端配置文件appsetting.json中MqttHost配置要加上,节点MqttHost中无值,MQTT客户端不启用,目前主题是固定的testTopic
2、项目即可发送消息,又可接收消息
先看项目结构
1、appsettings.json增加配置节点
HttpApi.Host项目中的appsettings.json增加MQTT相关配置节点
"MqttSettingsProvider": {
"BrokerHostSettings": {
"MqttHost": "", //localhost //服务端ip
"MqttPort": 1883 //服务端端口
},
"ClientSettings": {
"ClientId": "5eb020f043ba8930506acbdd2",
"UserName": "",
"Password": ""
},
"TopicName": "testTopic"
}
2、添加MQTTnet引用
在Application项目中通过NuGet包添加MQTTnet引用,版本与服务端保持一致3.0.13
3、代码示例
下面的代码示例不是按代码书写顺序来的,为了方便写文档,直接按文件顺序粘贴代码
Application→MqttServer→AspCoreMqttClientOptionBuilder.cs
using MQTTnet.Client.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class AspCoreMqttClientOptionBuilder : MqttClientOptionsBuilder
{
public IServiceProvider ServiceProvider { get; }
public AspCoreMqttClientOptionBuilder(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
}
}
Application→MqttServer→BrokerHostSettings.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class BrokerHostSettings
{
public string MqttHost { get; set; }
public int MqttPort { get; set; }
}
}
Application→MqttServer→ClientSettings.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class ClientSettings
{
public string ClientId { get; set; }
public string UserName { get; set; }
public string Password { get; set; }
}
}
Application→MqttServer→IMessageSendService.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public interface IMessageSendService
{
int Order { get; }
Task SendMessage(MessageContext context);
}
}
Application→MqttServer→IMqttClientService.cs
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;
namespace WMSInterface.MqttServer
{
public interface IMqttClientService : IHostedService, IMqttClientConnectedHandler, IMqttClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler
{
Task Publish(string topicName, string message);
}
}
Application→MqttServer→MessageContext.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class MessageContext
{
public string TopicName { get; set; }
public string Title { get; set; }
public string Content { get; set; }
//public IList<UserModel> UserList { get; set; } = new List<UserModel>();
public string[] Users { get; set; }
public string ObjId { get; set; }
}
}
Application→MqttServer→MqttClientService.cs
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Microsoft.Extensions.Options;
using MQTTnet.Server;
using System.Threading;
using Microsoft.Extensions.Hosting;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Receiving;
using Microsoft.Extensions.DependencyInjection;
using MQTTnet.Client.Options;
using WMSInterface.Server;
namespace WMSInterface.MqttServer
{
public class MqttClientService : IMqttClientService, IHostedService, IMqttClientConnectedHandler, IMqttClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler
{
private IMqttClient mqttClient;
private IMqttClientOptions options;
private readonly IServiceProvider _serviceProvider;
public MqttClientService(IMqttClientOptions options, IServiceProvider serviceProvider)
{
this.options = options;
_serviceProvider = serviceProvider;
mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}
private void ConfigureMqttClient()
{
mqttClient.ConnectedHandler = this;
mqttClient.DisconnectedHandler = this;
mqttClient.ApplicationMessageReceivedHandler = this;
}
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
{
IEnumerable<IMqttMessageHandler> handlers = _serviceProvider.GetServices<IMqttMessageHandler>();
foreach (IMqttMessageHandler handler in handlers)
{
await handler.HandleMessage(e.ApplicationMessage.Topic, e.ApplicationMessage.Payload);
}
}
public async Task Publish(string topicName, string message)
{
string topic = topicName.Trim();
string msg = message.Trim();
if (string.IsNullOrEmpty(topic))
{
Console.Write("主题不能为空!");
}
else if (!mqttClient.IsConnected)
{
Console.Write("MQTT客户端尚未连接!");
}
else
{
await MqttClientExtensions.PublishAsync(applicationMessage: new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(msg)
.WithAtMostOnceQoS()
.WithRetainFlag(value: false)
.Build(), client: mqttClient);
}
}
/// <summary>
/// 订阅连接成功事件
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
{
await mqttClient.SubscribeAsync("testTopic");
//...可订阅多个主题
}
/// <summary>
/// 订阅断开连接事件
/// </summary>
/// <param name="eventArgs"></param>
/// <returns></returns>
public async Task HandleDisconnectedAsync(MqttClientDisconnectedEventArgs eventArgs)
{
await mqttClient.UnsubscribeAsync("testTopic");
//尝试重新连接
//await mqttClient.ConnectAsync(options);
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await mqttClient.ConnectAsync(options);
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
MqttClientDisconnectOptions disconnectOption = new MqttClientDisconnectOptions
{
ReasonCode = MqttClientDisconnectReason.NormalDisconnection,
ReasonString = "NormalDiconnection"
};
await mqttClient.DisconnectAsync(disconnectOption, cancellationToken);
}
await mqttClient.DisconnectAsync();
}
}
}
Application→MqttServer→MqttClientServiceProvider.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class MqttClientServiceProvider
{
public readonly IMqttClientService MqttClientService;
public MqttClientServiceProvider(IMqttClientService mqttClientService)
{
MqttClientService = mqttClientService;
}
}
}
Application→MqttServer→MqttMessageService.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class MqttMessageService : IMessageSendService
{
private string _topicName = string.Empty;
private readonly IMqttClientService _mqttClientService;
private readonly ILogger<MqttMessageService> _logger;
private readonly IConfiguration _configuration;
public int Order => 0;
public MqttMessageService(MqttClientServiceProvider mqttClientServiceProvider,
ILogger<MqttMessageService> logger,
IConfiguration configuration)
{
_mqttClientService = mqttClientServiceProvider.MqttClientService;
_logger = logger;
_configuration = configuration;
_topicName = configuration["MqttSettingsProvider:TopicName"];
}
public Task SendMessage(MessageContext context)
{
try
{
if (!string.IsNullOrEmpty(context.Content))
{
var content = new
{
content = context.Content,
//users = context.UserList.Select((UserModel m) => m.Id.ToString()).ToList()
};
_mqttClientService.Publish(_topicName, JsonConvert.SerializeObject(content));
}
}
catch (Exception e)
{
_logger.LogError(e, "MQTT发送消息错误。");
}
return Task.CompletedTask;
}
}
}
Application→MqttServer→MqttServerModule.cs
注意:这里跟配置文件appsettings.json中的节点MqttHost有关联,MqttHost为空,不启动MQTT客户端服务,MqttHost不为空,会客户端会连接服务端。
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.Modularity;
namespace WMSInterface.MqttServer
{
public class MqttServerModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
IConfiguration configuration = context.Services.GetConfiguration();
MqttSettingsProvider mqttSettingsProvider = configuration.GetSection("MqttSettingsProvider").Get<MqttSettingsProvider>();
if (!string.IsNullOrEmpty(mqttSettingsProvider.BrokerHostSettings?.MqttHost))
{
context.Services.AddMqttClientHostedService(mqttSettingsProvider);
context.Services.TryAddEnumerable(ServiceDescriptor.Transient<IMessageSendService, MqttMessageService>());
}
}
}
}
Application→MqttServer→MqttServiceCollectionExtension.cs
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public static class MqttServiceCollectionExtension
{
public static IServiceCollection AddMqttClientHostedService(this IServiceCollection services, MqttSettingsProvider mqttSettingsProvider)
{
AddMqttClientServiceWithConfig(services, delegate (AspCoreMqttClientOptionBuilder aspOptionBuilder)
{
IConfiguration configuration = services.GetConfiguration();
aspOptionBuilder
.WithCredentials(mqttSettingsProvider.ClientSettings.UserName, mqttSettingsProvider.ClientSettings.Password)
.WithClientId(mqttSettingsProvider.ClientSettings.ClientId)
.WithTcpServer(mqttSettingsProvider.BrokerHostSettings.MqttHost);
});
return services;
}
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptionBuilder> configure)
{
services.AddSingleton(delegate (IServiceProvider serviceProvider)
{
AspCoreMqttClientOptionBuilder aspCoreMqttClientOptionBuilder = new AspCoreMqttClientOptionBuilder(serviceProvider);
configure(aspCoreMqttClientOptionBuilder);
return aspCoreMqttClientOptionBuilder.Build();
});
services.AddSingleton<MqttClientService>();
services.AddSingleton((Func<IServiceProvider, IHostedService>)((IServiceProvider serviceProvider) => serviceProvider.GetService<MqttClientService>()));
services.AddSingleton(delegate (IServiceProvider serviceProvider)
{
MqttClientService service = serviceProvider.GetService<MqttClientService>();
return new MqttClientServiceProvider(service);
});
return services;
}
}
}
Application→MqttServer→MqttSettingsProvider.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.MqttServer
{
public class MqttSettingsProvider
{
public BrokerHostSettings BrokerHostSettings { get; set; }
public ClientSettings ClientSettings { get; set; }
}
}
Application→Server→IMqttMessageHandler.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace WMSInterface.Server
{
public interface IMqttMessageHandler
{
Task HandleMessage(string topic, byte[] data);
}
}
Application→Server→MqttMessageHandler.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
namespace WMSInterface.Server
{
public class MqttMessageHandler: IMqttMessageHandler,ITransientDependency
{
public async Task HandleMessage(string topic, byte[] data)
{
if (!string.IsNullOrEmpty(topic) || data != null)
{
Console.WriteLine($"接收到的主题:{topic},消息:{Encoding.UTF8.GetString(data)}");
}
}
}
}
xx.HttpApi.Host→xxHttpApiHostModule.cs
typeof(MqttServerModule)
MQTT客户端发送消息代码示例
xx.HttpApi→Controllers→MqttController.cs
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Volo.Abp.AspNetCore.Mvc;
using WMSInterface.MqttServer;
namespace WMSInterface.Controllers
{
/// <summary>
/// mqtt
/// </summary>
[ApiController]
[Route("api/[controller]/[action]")]
public class MqttController : AbpController
{
private readonly IConfiguration _configuration;
private readonly IMessageSendService _messageSendService;
public MqttController(IConfiguration configuration,
IMessageSendService messageSendService)
{
_configuration = configuration;
_messageSendService = messageSendService;
}
#region 发送消息
/// <summary>
/// 发送消息
/// </summary>
/// <param name="body"></param>
/// <returns></returns>
[HttpPost]
public async Task<IActionResult> SendAsync(MessageContext body)
{
await _messageSendService.SendMessage(body);
return Ok("ok");
}
#endregion
}
}
四 MQTT收发消息测试
下载MQTTX工具进行MQTT消息的测试,使用方法就不具体介绍了,基本上是拿来就用
MQTTX:全功能 MQTT 客户端工具
发送消息入口
接收消息示例
五 结尾
本文章不做MQTT科普,默认具有一定的MQTT认知,主要目的是让大家可以直接在abp框架中快速集成MQTT