MQTT(Message Queuing Telemetry Transport)
即时通讯协议,
开发商 IBM
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
新建Winform应用程序,将默认的Form1重命名为FormMqttServer
管理Nuget包,输入关键字Mqtt
点击安装
安装完成后示例如图
我们使用MQTTnet.dll,程序集 MQTTnet, Version=4.3.6.1152。应用程序.net framework版本需要4.6.1或者以上
窗体 FormMqttServer 设计如下:
窗体 FormMqttServer设计器代码如下:
文件 FormMqttServer.Designer.cs
namespace CultureDemo
{
partial class FormMqttServer
{
/// <summary>
/// Required designer variable.
/// </summary>
private System.ComponentModel.IContainer components = null;
/// <summary>
/// Clean up any resources being used.
/// </summary>
/// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param>
protected override void Dispose(bool disposing)
{
if (disposing && (components != null))
{
components.Dispose();
}
base.Dispose(disposing);
}
#region Windows Form Designer generated code
/// <summary>
/// Required method for Designer support - do not modify
/// the contents of this method with the code editor.
/// </summary>
private void InitializeComponent()
{
this.lvTopic = new System.Windows.Forms.ListBox();
this.lbClients = new System.Windows.Forms.ListBox();
this.rtxtMessage = new System.Windows.Forms.RichTextBox();
this.label5 = new System.Windows.Forms.Label();
this.label4 = new System.Windows.Forms.Label();
this.btnStop = new System.Windows.Forms.Button();
this.btnStart = new System.Windows.Forms.Button();
this.txtPassword = new System.Windows.Forms.TextBox();
this.label3 = new System.Windows.Forms.Label();
this.txtUsername = new System.Windows.Forms.TextBox();
this.label2 = new System.Windows.Forms.Label();
this.txtPort = new System.Windows.Forms.TextBox();
this.label1 = new System.Windows.Forms.Label();
this.txtIP = new System.Windows.Forms.TextBox();
this.lb = new System.Windows.Forms.Label();
this.dgvTopic = new System.Windows.Forms.DataGridView();
this.Column1 = new System.Windows.Forms.DataGridViewTextBoxColumn();
this.Column2 = new System.Windows.Forms.DataGridViewTextBoxColumn();
this.Column3 = new System.Windows.Forms.DataGridViewTextBoxColumn();
this.Column4 = new System.Windows.Forms.DataGridViewTextBoxColumn();
((System.ComponentModel.ISupportInitialize)(this.dgvTopic)).BeginInit();
this.SuspendLayout();
//
// lvTopic
//
this.lvTopic.FormattingEnabled = true;
this.lvTopic.ItemHeight = 12;
this.lvTopic.Location = new System.Drawing.Point(686, 35);
this.lvTopic.Name = "lvTopic";
this.lvTopic.Size = new System.Drawing.Size(300, 136);
this.lvTopic.TabIndex = 31;
//
// lbClients
//
this.lbClients.FormattingEnabled = true;
this.lbClients.ItemHeight = 12;
this.lbClients.Location = new System.Drawing.Point(343, 35);
this.lbClients.Name = "lbClients";
this.lbClients.Size = new System.Drawing.Size(300, 136);
this.lbClients.TabIndex = 30;
//
// rtxtMessage
//
this.rtxtMessage.Location = new System.Drawing.Point(24, 188);
this.rtxtMessage.Name = "rtxtMessage";
this.rtxtMessage.Size = new System.Drawing.Size(467, 338);
this.rtxtMessage.TabIndex = 29;
this.rtxtMessage.Text = "";
//
// label5
//
this.label5.AutoSize = true;
this.label5.Location = new System.Drawing.Point(684, 17);
this.label5.Name = "label5";
this.label5.Size = new System.Drawing.Size(53, 12);
this.label5.TabIndex = 28;
this.label5.Text = "主题列表";
//
// label4
//
this.label4.AutoSize = true;
this.label4.Location = new System.Drawing.Point(341, 17);
this.label4.Name = "label4";
this.label4.Size = new System.Drawing.Size(65, 12);
this.label4.TabIndex = 27;
this.label4.Text = "连接客户端";
//
// btnStop
//
this.btnStop.Location = new System.Drawing.Point(129, 148);
this.btnStop.Name = "btnStop";
this.btnStop.Size = new System.Drawing.Size(75, 23);
this.btnStop.TabIndex = 26;
this.btnStop.Text = "停止";
this.btnStop.UseVisualStyleBackColor = true;
this.btnStop.Click += new System.EventHandler(this.btnStop_Click);
//
// btnStart
//
this.btnStart.Location = new System.Drawing.Point(32, 148);
this.btnStart.Name = "btnStart";
this.btnStart.Size = new System.Drawing.Size(75, 23);
this.btnStart.TabIndex = 25;
this.btnStart.Text = "启动";
this.btnStart.UseVisualStyleBackColor = true;
this.btnStart.Click += new System.EventHandler(this.btnStart_Click);
//
// txtPassword
//
this.txtPassword.Location = new System.Drawing.Point(91, 121);
this.txtPassword.Name = "txtPassword";
this.txtPassword.Size = new System.Drawing.Size(211, 21);
this.txtPassword.TabIndex = 24;
this.txtPassword.Text = "1";
//
// label3
//
this.label3.AutoSize = true;
this.label3.Location = new System.Drawing.Point(44, 124);
this.label3.Name = "label3";
this.label3.Size = new System.Drawing.Size(29, 12);
this.label3.TabIndex = 23;
this.label3.Text = "密码";
//
// txtUsername
//
this.txtUsername.Location = new System.Drawing.Point(91, 94);
this.txtUsername.Name = "txtUsername";
this.txtUsername.Size = new System.Drawing.Size(211, 21);
this.txtUsername.TabIndex = 22;
this.txtUsername.Text = "a";
//
// label2
//
this.label2.AutoSize = true;
this.label2.Location = new System.Drawing.Point(44, 97);
this.label2.Name = "label2";
this.label2.Size = new System.Drawing.Size(29, 12);
this.label2.TabIndex = 21;
this.label2.Text = "账号";
//
// txtPort
//
this.txtPort.Location = new System.Drawing.Point(91, 58);
this.txtPort.Name = "txtPort";
this.txtPort.Size = new System.Drawing.Size(211, 21);
this.txtPort.TabIndex = 20;
this.txtPort.Text = "12345";
//
// label1
//
this.label1.AutoSize = true;
this.label1.Location = new System.Drawing.Point(44, 61);
this.label1.Name = "label1";
this.label1.Size = new System.Drawing.Size(29, 12);
this.label1.TabIndex = 19;
this.label1.Text = "端口";
//
// txtIP
//
this.txtIP.Location = new System.Drawing.Point(91, 16);
this.txtIP.Name = "txtIP";
this.txtIP.Size = new System.Drawing.Size(211, 21);
this.txtIP.TabIndex = 18;
this.txtIP.Text = "127.0.0.1";
//
// lb
//
this.lb.AutoSize = true;
this.lb.Location = new System.Drawing.Point(32, 19);
this.lb.Name = "lb";
this.lb.Size = new System.Drawing.Size(41, 12);
this.lb.TabIndex = 17;
this.lb.Text = "IP地址";
//
// dgvTopic
//
this.dgvTopic.AllowUserToAddRows = false;
this.dgvTopic.AllowUserToDeleteRows = false;
this.dgvTopic.ColumnHeadersHeightSizeMode = System.Windows.Forms.DataGridViewColumnHeadersHeightSizeMode.AutoSize;
this.dgvTopic.Columns.AddRange(new System.Windows.Forms.DataGridViewColumn[] {
this.Column1,
this.Column2,
this.Column3,
this.Column4});
this.dgvTopic.Location = new System.Drawing.Point(522, 188);
this.dgvTopic.Name = "dgvTopic";
this.dgvTopic.ReadOnly = true;
this.dgvTopic.RowTemplate.Height = 23;
this.dgvTopic.Size = new System.Drawing.Size(464, 338);
this.dgvTopic.TabIndex = 32;
//
// Column1
//
this.Column1.HeaderText = "Topic";
this.Column1.Name = "Column1";
this.Column1.ReadOnly = true;
//
// Column2
//
this.Column2.HeaderText = "ClientId";
this.Column2.Name = "Column2";
this.Column2.ReadOnly = true;
//
// Column3
//
this.Column3.HeaderText = "Qos";
this.Column3.Name = "Column3";
this.Column3.ReadOnly = true;
//
// Column4
//
this.Column4.HeaderText = "SessionItems";
this.Column4.Name = "Column4";
this.Column4.ReadOnly = true;
//
// FormMqttServer
//
this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 12F);
this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font;
this.ClientSize = new System.Drawing.Size(1013, 538);
this.Controls.Add(this.dgvTopic);
this.Controls.Add(this.lvTopic);
this.Controls.Add(this.lbClients);
this.Controls.Add(this.rtxtMessage);
this.Controls.Add(this.label5);
this.Controls.Add(this.label4);
this.Controls.Add(this.btnStop);
this.Controls.Add(this.btnStart);
this.Controls.Add(this.txtPassword);
this.Controls.Add(this.label3);
this.Controls.Add(this.txtUsername);
this.Controls.Add(this.label2);
this.Controls.Add(this.txtPort);
this.Controls.Add(this.label1);
this.Controls.Add(this.txtIP);
this.Controls.Add(this.lb);
this.Name = "FormMqttServer";
this.Text = "FormMqttServer";
this.Load += new System.EventHandler(this.FormMqttServer_Load);
((System.ComponentModel.ISupportInitialize)(this.dgvTopic)).EndInit();
this.ResumeLayout(false);
this.PerformLayout();
}
#endregion
private System.Windows.Forms.ListBox lvTopic;
private System.Windows.Forms.ListBox lbClients;
private System.Windows.Forms.RichTextBox rtxtMessage;
private System.Windows.Forms.Label label5;
private System.Windows.Forms.Label label4;
private System.Windows.Forms.Button btnStop;
private System.Windows.Forms.Button btnStart;
private System.Windows.Forms.TextBox txtPassword;
private System.Windows.Forms.Label label3;
private System.Windows.Forms.TextBox txtUsername;
private System.Windows.Forms.Label label2;
private System.Windows.Forms.TextBox txtPort;
private System.Windows.Forms.Label label1;
private System.Windows.Forms.TextBox txtIP;
private System.Windows.Forms.Label lb;
private System.Windows.Forms.DataGridView dgvTopic;
private System.Windows.Forms.DataGridViewTextBoxColumn Column1;
private System.Windows.Forms.DataGridViewTextBoxColumn Column2;
private System.Windows.Forms.DataGridViewTextBoxColumn Column3;
private System.Windows.Forms.DataGridViewTextBoxColumn Column4;
}
}
窗体FormMqttServer代码如下【MQTT服务端相关代码】
文件FormMqttServer.cs
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using MQTTnet.Server;
using MQTTnet;
using MQTTnet.Protocol;
using System.Collections;
namespace CultureDemo
{
public partial class FormMqttServer : Form
{
private MqttServer server;//mqtt服务器对象
public FormMqttServer()
{
InitializeComponent();
rtxtMessage.ReadOnly = true;
/*
* 在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。
固定头(Fixed header):存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
可变头(Variable header):存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
消息体(Payload):存在于部分MQTT数据包中,表示客户端收到的具体内容。
*/
}
/// <summary>
/// 打印相关消息
/// </summary>
/// <param name="contents"></param>
private void DisplayMessage(string contents)
{
if (!this.IsHandleCreated)
{
return;
}
this.BeginInvoke(new Action(() =>
{
if (rtxtMessage.TextLength >= 40960)
{
rtxtMessage.Clear();
}
rtxtMessage.AppendText($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}->{contents}\n");
rtxtMessage.ScrollToCaret();
}));
}
/// <summary>
/// 初始化Mqtt服务并启动服务
/// </summary>
/// <param name="ip">IPV4地址</param>
/// <param name="port">端口:0~65535之间</param>
private Task StartMqttServer(string ip, int port)
{
MqttServerOptions mqttServerOptions =
new MqttServerOptionsBuilder()
.WithDefaultEndpoint()
.WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(ip))
.WithDefaultEndpointPort(port)
.WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000))
.Build();
server = new MqttFactory().CreateMqttServer(mqttServerOptions); // create MQTT service object
server.ValidatingConnectionAsync += Server_ValidatingConnectionAsync;//验证用户名和密码
server.ClientConnectedAsync += Server_ClientConnectedAsync;//绑定客户端连接事件
server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;//绑定客户端断开事件
server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync;//绑定客户端订阅主题事件
server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync;//绑定客户端退订主题事件
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;//消息接收事件
server.ClientAcknowledgedPublishPacketAsync += Server_ClientAcknowledgedPublishPacketAsync;
server.InterceptingClientEnqueueAsync += Server_InterceptingClientEnqueueAsync;
server.ApplicationMessageNotConsumedAsync += Server_ApplicationMessageNotConsumedAsync;
server.StartedAsync += Server_StartedAsync;//绑定服务端启动事件
server.StoppedAsync += Server_StoppedAsync;//绑定服务端停止事件
return server.StartAsync();
}
private Task Server_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs e)
{
try
{
DisplayMessage($"【MessageNotConsumed】-SenderId:{e.SenderId}-Message:{e.ApplicationMessage.ConvertPayloadToString()}");
}
catch (Exception ex)
{
DisplayMessage($"Server_ApplicationMessageNotConsumedAsync出现异常:{ex.Message}");
}
return Task.CompletedTask;
}
private Task Server_InterceptingClientEnqueueAsync(InterceptingClientApplicationMessageEnqueueEventArgs e)
{
try
{
DisplayMessage($"【InterceptingClientEnqueue】-SenderId:{e.SenderClientId}-Message:{e.ApplicationMessage.ConvertPayloadToString()}");
}
catch (Exception ex)
{
DisplayMessage($"Server_InterceptingClientEnqueueAsync出现异常:{ex.Message}");
}
return Task.CompletedTask;
}
private Task Server_ClientAcknowledgedPublishPacketAsync(ClientAcknowledgedPublishPacketEventArgs e)
{
try
{
DisplayMessage($"【ClientAcknowledgedPublishPacket】-SenderId:{e.ClientId}-Message:{Encoding.UTF8.GetString(e.PublishPacket.PayloadSegment.ToArray())}");
}
catch (Exception ex)
{
DisplayMessage($"Server_ClientAcknowledgedPublishPacketAsync出现异常:{ex.Message}");
}
return Task.CompletedTask;
}
/// <summary>
/// 消息接收
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs e)
{
try
{
string client = e.ClientId;
string topic = e.ApplicationMessage.Topic;
string contents = e.ApplicationMessage.ConvertPayloadToString();//Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.ToArray());
DisplayMessage($"接收到消息:Client:【{client}】 Topic:【{topic}】 Message:【{contents}】");
}
catch (Exception ex)
{
DisplayMessage($"Server_InterceptingPublishAsync出现异常:{ex.Message}");
}
return Task.CompletedTask;
}
private void FormMqttServer_Load(object sender, EventArgs e)
{
rtxtMessage.Text = @"MQTT服务端关键事件说明:
ClientConnectedAsync-->绑定客户端连接事件
ValidatingConnectionAsync-->验证用户名和密码
ClientSubscribedTopicAsync-->绑定客户端订阅主题事件
InterceptingPublishAsync-->消息接收事件
";
}
private Task Server_StoppedAsync(EventArgs arg)
{
return Task.Run(new Action(() =>
{
DisplayMessage($"服务端【{txtIP.Text}:{txtPort.Text}】已停止MQTT");
}));
}
private Task Server_StartedAsync(EventArgs e)
{
return Task.Run(new Action(() =>
{
DisplayMessage($"服务端【{txtIP.Text}:{txtPort.Text}】已启用MQTT");
}));
}
private Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs e)
{
return Task.Run(new Action(() =>
{
lvTopic.Invoke(new Action(() =>
{
string itemContents = $"【{e.ClientId}】订阅主题【{e.TopicFilter}】";
if (lvTopic.Items.Contains(itemContents))
{
//如果存在该项,就移除
lvTopic.Items.Remove(itemContents);
}
IDictionary collection = e.SessionItems;
string[] sessionItemContents = new string[collection.Count];
int idx = 0;
foreach (object key in collection.Keys)
{
sessionItemContents[idx++] = $"[{key}:{collection[key]}]";
}
}));
DisplayMessage($"客户端【{e.ClientId}】退订主题【{e.TopicFilter}】");
}));
}
private Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs e)
{
return Task.Run(new Action(() =>
{
lvTopic.Invoke(new Action(() =>
{
string itemContents = $"【{e.ClientId}】订阅主题【{e.TopicFilter.Topic}】";
if (!lvTopic.Items.Contains(itemContents))
{
//如果不存在该项,就添加
lvTopic.Items.Add(itemContents);
}
IDictionary collection = e.SessionItems;
string[] sessionItemContents = new string[collection.Count];
int idx = 0;
foreach (object key in collection.Keys)
{
sessionItemContents[idx++] = $"[{key}:{collection[key]}]";
}
dgvTopic.Rows.Add(e.TopicFilter.Topic, e.ClientId, e.TopicFilter.QualityOfServiceLevel, string.Join(",", sessionItemContents));
}));
DisplayMessage($"客户端【{e.ClientId}】订阅主题【{e.TopicFilter.Topic}】");
}));
}
private Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs e)
{
return Task.Run(new Action(() =>
{
lbClients.BeginInvoke(new Action(() =>
{
lbClients.Items.Remove(e.ClientId);
}));
DisplayMessage($"客户端已断开.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】.ReasonCode:【{e.ReasonCode}】,DisconnectType:【{e.DisconnectType}】");
}));
}
/// <summary>
/// 绑定客户端连接事件
/// </summary>
/// <param name="arg"></param>
/// <returns></returns>
private Task Server_ClientConnectedAsync(ClientConnectedEventArgs e)
{
return Task.Run(new Action(() =>
{
lbClients.BeginInvoke(new Action(() =>
{
lbClients.Items.Add(e.ClientId);
}));
DisplayMessage($"客户端已连接.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】");
}));
}
private Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs e)
{
return Task.Run(new Action(() =>
{
string UserName = txtUsername.Text;
string Password = txtPassword.Text;
if (e.UserName == UserName && e.Password == Password)
{
e.ReasonCode = MqttConnectReasonCode.Success;
DisplayMessage($"客户端已验证成功.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】");
}
else
{
e.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword;
DisplayMessage($"客户端验证失败.ClientId:【{e.ClientId}】,Endpoint:【{e.Endpoint}】");
}
}));
}
private async void btnStart_Click(object sender, EventArgs e)
{
await StartMqttServer(txtIP.Text, int.Parse(txtPort.Text));
btnStart.Enabled = false;
}
private async void btnStop_Click(object sender, EventArgs e)
{
await server.StopAsync();
btnStart.Enabled = true;
}
}
}