目录
1.演示效果
2.源码下载
3.服务端介绍
4.客户端介绍
1.演示效果
2.源码下载
下载地址:https://download.csdn.net/download/rotion135/89385802
3.服务端介绍
服务端用的控制台程序进行设计,实际使用可以套一层Windows服务的皮,进行服务部署。
调试用控制台显示收发的消息,便于直观
首先安装的MQTTNET 版本是4.3.6.1152 :
自定义了服务的客户端列表数据模型:
public class MqttClientInfo
{
/// <summary>
/// ID
/// </summary>
public string ClientId { get; set; }
/// <summary>
/// 客户端名称
/// </summary>
public string ClientName { get; set; }
/// <summary>
/// 订阅列表
/// </summary>
public List<MqttSubscription> Subscriptions { get; set; } = new List<MqttSubscription>();
}
public class MqttSubscription
{
/// <summary>
/// 所属客户端
/// </summary>
public MqttClientInfo Parent { get; set; }
/// <summary>
/// 订阅消息
/// </summary>
public string Topic { get; set; }
}
再对服务端的代码进行封装,添加响应的事件,做一些消息显示到控制台
服务端的代码就这么简单
public class LSMQTTServer
{
MqttServer mqttServer;
List<MqttClientInfo> MqttClients = new List<MqttClientInfo>();
/// <summary>
/// 初始化Mqtt服务并启动服务
/// </summary>
/// <param name="ip"></param>
/// <param name="port"></param>
public virtual void InitMqttServer(string ip, int port)
{
var mqttServerOptions =
new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(ip))//set the ip of the server
.WithDefaultEndpointPort(port)//set the port of the server
.Build();
mqttServer = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
mqttServer.ValidatingConnectionAsync += MqttServer_ValidatingConnectionAsync;
mqttServer.ClientConnectedAsync += MqttServer_ClientConnectedAsync;
mqttServer.ClientDisconnectedAsync += MqttServer_ClientDisconnectedAsync;
mqttServer.ClientSubscribedTopicAsync += MqttServer_ClientSubscribedTopicAsync;
mqttServer.ClientUnsubscribedTopicAsync += MqttServer_ClientUnsubscribedTopicAsync;
mqttServer.InterceptingPublishAsync += MqttServer_InterceptingPublishAsync;
mqttServer.ClientAcknowledgedPublishPacketAsync += MqttServer_ClientAcknowledgedPublishPacketAsync;
mqttServer.InterceptingClientEnqueueAsync += MqttServer_InterceptingClientEnqueueAsync;
mqttServer.ApplicationMessageNotConsumedAsync += MqttServer_ApplicationMessageNotConsumedAsync;
mqttServer.StartAsync();
}
private Task MqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
{
try
{
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ApplicationMessageNotConsumedAsync", ex);
}
return Task.CompletedTask;
}
private Task MqttServer_InterceptingClientEnqueueAsync(InterceptingClientApplicationMessageEnqueueEventArgs arg)
{
try
{
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_InterceptingClientEnqueueAsync", ex);
}
return Task.CompletedTask;
}
private Task MqttServer_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs arg)
{
try
{
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ClientAcknowledgedPublishPacketAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task MqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
try
{
var client = arg.ClientId;
var topic = arg.ApplicationMessage.Topic;
var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);
GlobalEvents.OnMessage($"接收到消息:Client[{client}] Topic[{topic}] Message[{content}]");
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_InterceptingPublishAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 关闭Mqtt服务
/// </summary>
public async virtual Task StopMqttServer()
{
if (mqttServer != null)
{
if (mqttServer.IsStarted)
{
await mqttServer.StopAsync();
mqttServer.Dispose();
}
}
}
/// <summary>
/// 对客户端的连接进行验证
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
try
{
//验证ClientId
if (string.IsNullOrWhiteSpace(arg.ClientId))
{
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
}
//验证用户名和密码
bool acceptflag = !(string.IsNullOrWhiteSpace(arg.UserName) || string.IsNullOrWhiteSpace(arg.Password));
if (!acceptflag)
{
//验证失败
arg.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
return Task.CompletedTask;
}
arg.ReasonCode = MqttConnectReasonCode.Success;
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ValidatingConnectionAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 客户端连接成功
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
try
{
MqttClients.Add(new MqttClientInfo() { ClientId = arg.ClientId, ClientName = arg.UserName });
GlobalEvents.OnMessage($"客户端上线- ID:【{arg.ClientId}】 Name:【{arg.UserName}】");
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ClientConnectedAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 客户端断开连接
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public virtual Task MqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
try
{
MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
MqttClients.Remove(mqttUser);
GlobalEvents.OnMessage($"客户端离线- ID:【{mqttUser.ClientId}】");
}
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ClientDisconnectedAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 客户端发布订阅
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
try
{
if (arg == null)
return Task.CompletedTask;
MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
mqttUser.Subscriptions.Add(new MqttSubscription() { Parent = mqttUser, Topic = arg.TopicFilter.Topic });
GlobalEvents.OnMessage($"客户端发布订阅- Topic:【{arg.TopicFilter.Topic}】");
}
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ClientSubscribedTopicAsync", ex);
}
return Task.CompletedTask;
}
/// <summary>
/// 客户端取消订阅
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
public virtual Task MqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
try
{
if (arg == null)
return Task.CompletedTask;
MqttClientInfo? mqttUser = MqttClients.FirstOrDefault(t => t.ClientId == arg.ClientId);
if (mqttUser != null)
{
MqttSubscription? mqttSubedTopic = mqttUser.Subscriptions.FirstOrDefault(t => t.Topic == arg.TopicFilter);
if (mqttSubedTopic != null)
{
mqttUser.Subscriptions.Remove(mqttSubedTopic);
GlobalEvents.OnMessage($"客户端取消订阅- Topic:【{mqttSubedTopic.Topic}】");
}
}
}
catch (Exception ex)
{
LogOperate.Error("MqttServer_ClientUnsubscribedTopicAsync", ex);
}
return Task.CompletedTask;
}
}
然后在控制台运行的时候,对服务进行实例化
var ip = IPHelper.GetLocalIP();
int port = 3303;
LSMQTTServer server = new LSMQTTServer();
server.InitMqttServer(ip,port);
GlobalEvents.OnMessage($"MQTT服务启动,IP:{ip},Port{port}");
4.客户端介绍
客户端设计,用的WPF,将每个客户端连接,用自定义控件进行封装,界面添加多个控件即表示多个客户端,添加订阅后,即可以显示收到的消息,便于多个客户端之间的消息调试
首先对连接的客户端也进行了类封装,包括连接,订阅,取消订阅和消息接收等
public class LSMQTTClient
{
MqttClient mqttClient;
public delegate void DelegateOutMessage(string message);
public event DelegateOutMessage OnOutMessage;
public void InitMqttClient(string serverIp, int serverPort, string clientId, string userName, string password)
{
try
{
var options = new MqttClientOptionsBuilder()
.WithCleanSession(true)
.WithCredentials(userName, password)
.WithClientId(clientId)
.WithTcpServer(serverIp,serverPort)
.Build();
ConnectMQTTServer(options);
}
catch (Exception ex)
{
LogOperate.Error("InitMqttClient 发生异常", ex);
}
}
/// <summary>
/// 判断是否已连接服务
/// </summary>
/// <returns></returns>
public bool IsConnect()
{
if (mqttClient == null)
return false;
if(!mqttClient.IsConnected)
return false;
return true;
}
public async void ConnectMQTTServer(MqttClientOptions options)
{
MqttFactory factory = new MqttFactory();
if (mqttClient == null)
{
mqttClient = (MqttClient)factory.CreateMqttClient();
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; ;
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; ;
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; ;
}
await mqttClient.ConnectAsync(options);
}
/// <summary>
/// 断开服务连接
/// </summary>
public void DisConnectMQTTServer()
{
if(mqttClient!=null && mqttClient.IsConnected)
{
mqttClient.DisconnectAsync();
}
}
/// <summary>
/// 添加订阅
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<BaseResult> AddSubscription(string topic)
{
if(!IsConnect())
{
return new BaseResult(false, "请先连接服务");
}
await mqttClient.SubscribeAsync(topic);
return BaseResult.Successed;
}
/// <summary>
/// 取消订阅
/// </summary>
/// <param name="topic"></param>
/// <returns></returns>
public async Task<BaseResult> UnSubscription(string topic)
{
if (!IsConnect())
{
return new BaseResult(false, "请先连接服务");
}
await mqttClient.UnsubscribeAsync(topic);
return BaseResult.Successed;
}
/// <summary>
/// 发送消息
/// </summary>
/// <param name="topic"></param>
/// <param name="content"></param>
/// <returns></returns>
public async Task<BaseResult> SendMessage(string topic,string content)
{
if (!IsConnect())
{
return new BaseResult(false, "请先连接服务");
}
await mqttClient.PublishStringAsync(topic, content);
return BaseResult.Successed;
}
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
OnOutMessage?.Invoke("已断开服务连接");
return Task.CompletedTask;
}
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
if (arg.ConnectResult.ResultCode == MqttClientConnectResultCode.Success)
OnOutMessage?.Invoke("已连接到服务");
else
OnOutMessage?.Invoke($"连接服务失败【{arg.ConnectResult.ReasonString}】");
return Task.CompletedTask;
}
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
if (arg.ApplicationMessage.PayloadSegment.Array != null)
{
var content = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);// BitConverter.ToString(arg.ApplicationMessage.PayloadSegment.Array,0, arg.ApplicationMessage.PayloadSegment.Count);
OnOutMessage?.Invoke($"[{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}]-收到消息>>{content}");
}
return Task.CompletedTask;
}
}
再设计自定义控件,将连接属性单独实例化
自定义控件包括XAML和ViewModel的设计,详细的可以下载源码进行查看,此处不展示太多了,代码量也确实有一些些,无非就是 连接的各个参数,如IP、端口,客户端ID,用户名、密码等等
然后再主界面设计两个按钮,添加自定义控件和清理自定义控件
界面设计的东西不介绍太多了,因为客户端可以有很多种设计的方式,但通讯那一块就已经在上边展示的代码里边了;
到此服务端+客户端就已经实现了,是不是没有想象中那么复杂。