1 ,MQTT介绍:
MQTT详解以及实际操作_mqtt使用-CSDN博客
2,MQTT应用:
C#MQTT编程06--MQTT服务器和客户端(winform版)_c# mqtt服务器-CSDN博客
3,MQTT实例:
效果
代码:
服务端
public partial class Form1 : Form
{
Timer timer = new Timer();
List<ClientItem> clients = new List<ClientItem>();
public Form1()
{
InitializeComponent();
timer.Interval = 500;
timer.Tick += Timer_Tick;
timer.Start();
InitSetting();
}
private void Timer_Tick(object sender, EventArgs e)
{
toolStripStatusDateTime.Text = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss");
}
void InitSetting()
{
//获取本机所有的IP4地址
string[] ips = Dns.GetHostAddresses(Environment.MachineName).Where(item => !item.IsIPv6LinkLocal).Select(item => item.ToString()).ToArray();
comboIp_List.DataSource = ips;
txtPort.Text = "8000";
txtUserName.Text = "sa";
txtPwd.Text = "1";
}
IMqttServer mqttServer;
private void btnStart_Click(object sender, EventArgs e)
{
if (btnStart.Text.Equals("开始"))
{
IMqttServerOptions options = new MqttServerOptionsBuilder()
.WithConnectionBacklog(10)
.WithDefaultEndpointBoundIPAddress(IPAddress.Parse(comboIp_List.Text.Trim()))
.WithDefaultEndpointPort(Convert.ToInt32(txtPort.Text.Trim()))
.WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000))
.WithConnectionValidator(context =>
{
//验证
if (ck_UserPwd.Checked)
{
if (!context.Username.Equals(txtUserName.Text))
{
context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
}
if (!context.Password.Equals(txtPwd.Text))
{
context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
}
}
else
{
context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;
}
}).Build();
mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ApplicationMessageReceived);
//服务开始事件
mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(MqttServer_StartedAsync);
//服务停止事件
mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(MqttServer_StoppedAsync);
//客户端连接事件
mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(MqttServer_ClientConnectedAsync);
//客户端断开连接事件
mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnectedAsync);
//客户端订阅事件
mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(MqttServer_ClientSubscribedTopicAsync);
//客户端取消订阅事件
mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttServer_ClientUnsubscribedTopicAsync);
mqttServer.StartAsync(options);
if (mqttServer.IsStarted)
{
btnStart.Text = "断开";
btnStart.BackColor = Color.Green;
comboIp_List.Enabled = txtPort.Enabled = false;
}
}
else
{
mqttServer.StopAsync().Wait();
if (!mqttServer.IsStarted)
{
btnStart.Text = "开始";
btnStart.BackColor = Color.Red;
comboIp_List.Enabled = txtPort.Enabled = true;
}
mqttServer.Dispose();
}
}
private Task MqttServer_ClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs arg)
{
//客户端取消订阅事件
string msg = $"客户端ID:[{arg.ClientId}] 取消主题:[{arg.TopicFilter}] 订阅";
AddLog(1, msg);
//取消订阅
ClientItem client = clients.FirstOrDefault(item => item.ClientID.Equals(arg.ClientId));
client?.Topics.RemoveAll(item => item.Topic.Equals(arg.TopicFilter));
return Task.FromResult(arg);
}
private Task MqttServer_ClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs arg)
{
//客户端订阅事件
string msg = $"客户端ID:[{arg.ClientId}]订阅主题:[{arg.TopicFilter.Topic}],Qos:[{arg.TopicFilter.QualityOfServiceLevel}]";
AddLog(0, msg);
TopicItem topic = new TopicItem { QualityOfServiceLevel = arg.TopicFilter.QualityOfServiceLevel, Topic = arg.TopicFilter.Topic };
ClientItem client = clients.FirstOrDefault(item => item.ClientID.Equals(arg.ClientId));
client?.Topics.Add(topic);
return Task.FromResult(arg);
}
private Task MqttServer_ClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs arg)
{
//客户端断开事件
string msg = $"客户端ID:[{arg.ClientId}] 断开连接,EndPoint:{arg.Endpoint},断开连接类型:{arg.DisconnectType}";
AddLog(1, msg);
//移除该Client
clients.RemoveAll(item => item.ClientID.Equals(arg.ClientId));
//更新ListBox
UpdateClientList();
return Task.FromResult(arg);
}
private Task MqttServer_ClientConnectedAsync(MqttServerClientConnectedEventArgs arg)
{
//客户端连接事件
string msg = $"客户端ID:[{arg.ClientId}]连接 ,EndPoint:{arg.Endpoint},UserName: {arg.UserName}";
AddLog(0, msg);
ClientItem client = new ClientItem
{
ClientID = arg.ClientId,
EndPoint = arg.Endpoint
};
clients.Add(client);
//更新ListBox
UpdateClientList();
return Task.FromResult(arg);
}
private Task ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs arg)
{
//客户端MessageReceived事件
string msg = $"收到消息:发送者ID:[{arg.ClientId}] QoS:{arg.ApplicationMessage.QualityOfServiceLevel} topic:{arg.ApplicationMessage.Topic}, 内容:{arg.ApplicationMessage.ConvertPayloadToString()}";
AddLog(1, msg);
return Task.FromResult(arg);
}
private Task MqttServer_StoppedAsync(EventArgs arg)
{
//服务停止事件
string msg = $"服务停止";
AddLog(1, msg);
clients.Clear();
return Task.FromResult(arg);
}
private Task MqttServer_StartedAsync(EventArgs arg)
{
//服务开启事件
string msg = $"服务开启";
AddLog(0, msg);
return Task.FromResult(arg);
}
private void ck_UserPwd_CheckedChanged(object sender, EventArgs e)
{
txtUserName.Enabled = txtPwd.Enabled = ck_UserPwd.Checked;
}
void AddLog(int cate, string msg)
{
if (lstview_Note.InvokeRequired)
{
this.Invoke(new Action(() =>
{
ListViewItem item = new ListViewItem();
item.ImageIndex = cate;
item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));
item.SubItems.Add(msg);
lstview_Note.Items.Add(item);
lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);
toolStripStatusConnectCount.Text = clients.Count.ToString();
}));
}
else
{
ListViewItem item = new ListViewItem();
item.ImageIndex = cate;
item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));
item.SubItems.Add(msg);
lstview_Note.Items.Add(item);
lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);
toolStripStatusConnectCount.Text = clients.Count.ToString();
}
}
private void btnCreateClient_Click(object sender, EventArgs e)
{
ClientFrm frm = new ClientFrm();
frm.Show();
}
private void lst_Client_SelectedIndexChanged(object sender, EventArgs e)
{
if (lst_Client.SelectedItem != null)
{
ClientItem item = lst_Client.SelectedItem as ClientItem;
lst_Topic.DataSource = item.Topics.Select(a => $"Topic:{a.Topic},QoS:{a.QualityOfServiceLevel}").ToArray();
}
}
/// <summary>
/// 更新clientList 列表
/// </summary>
void UpdateClientList()
{
this.Invoke(new Action(() => {
lst_Client.Items.Clear();
foreach (var item in clients)
{
lst_Client.Items.Add(item);
}
toolStripStatusConnectCount.Text = clients.Count.ToString();
}));
}
}
class ClientItem
{
public List<TopicItem> Topics { get; set; } = new List<TopicItem>();
public string ClientID { get; set; }
public string EndPoint { get; set; }
public override string ToString()
{
return $"{ClientID};{EndPoint}";
}
}
class TopicItem
{
public string Topic { get; set; }
/// <summary>
/// 服务质量级别,QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送。QoS2(Exactly Once):只有一次,确保消息只到达一次。
/// </summary>
public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }
}
客户端
public partial class ClientFrm : Form
{
Timer timer = new Timer();
public ClientFrm()
{
InitializeComponent();
timer.Interval = 500;
timer.Tick += Timer_Tick;
timer.Start();
InitSetting();
}
private void Timer_Tick(object sender, EventArgs e)
{
toolStripStatusDateTime.Text = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss");
}
void InitSetting()
{
//获取本机所有的IP4地址
string[] ips = Dns.GetHostAddresses(Environment.MachineName).Where(item => !item.IsIPv6LinkLocal).Select(item => item.ToString()).ToArray();
comboIp_List.DataSource = ips;
txtPort.Text = "8000";
txtUserName.Text = "sa";
txtPwd.Text = "1";
comboQoS.DataSource = Enum.GetNames(typeof(MQTTnet.Protocol.MqttQualityOfServiceLevel));
}
IMqttClient client;
private void btnConnect_Click(object sender, EventArgs e)
{
// txtClientId.Text = Guid.NewGuid().ToString();
if (string.IsNullOrEmpty(txtClientId.Text.Trim()))
{
MessageBox.Show("ClientId不能为空!");
return;
}
if (btnConnect.Text.Equals("连接"))
{
client = new MqttFactory().CreateMqttClient();
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
.WithClientId(txtClientId.Text.Trim())
.WithCommunicationTimeout(TimeSpan.FromMilliseconds(50000))
.WithTcpServer(comboIp_List.Text.Trim(), Convert.ToInt32(txtPort.Text.Trim()));
if (ck_UserPwd.Checked)
{
builder.WithCredentials(txtUserName.Text.Trim(), txtPwd.Text.Trim());
}
IMqttClientOptions option = builder.Build();
//消息接收事件
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceivedAsync);
//客户端连接事件
client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(Client_ConnectedAsync);
// client.UseConnectedHandler
//客户端连接断开事件
client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(Client_DisconnectedAsync);
client.ConnectAsync(option);
}
else
{
client.DisconnectAsync().Wait();
UpdateStatus(client.IsConnected);
client.Dispose();
}
}
private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
//客户端断开事件
string msg = $" 断开连接, 返回结果:{arg.Reason}";
AddLog(1, msg);
UpdateStatus(client.IsConnected);
return Task.FromResult(arg);
}
private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
//客户端连接事件
string msg = $"客户端ID 连接,结果:{arg.ConnectResult.ResultCode}";
AddLog(0, msg);
UpdateStatus(client.IsConnected);
return Task.FromResult(arg);
}
private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
//客户端取消订阅事件
string msg = $"消息ClientId:[{arg.ClientId}] , topic:[{arg.ApplicationMessage.Topic}], 内容:[{arg.ApplicationMessage.ConvertPayloadToString()}],QoS:[{arg.ApplicationMessage.QualityOfServiceLevel}]";
AddLog(1, msg);
return Task.FromResult(arg);
}
private void ck_UserPwd_CheckedChanged(object sender, EventArgs e)
{
txtUserName.Enabled = txtPwd.Enabled = ck_UserPwd.Checked;
}
void AddLog(int cate, string msg)
{
this.Invoke(new Action(() =>
{
ListViewItem item = new ListViewItem();
item.ImageIndex = cate;
item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));
item.SubItems.Add(msg);
lstview_Note.Items.Add(item);
lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);
}));
}
private async void btnSubscribed_Click(object sender, EventArgs e)
{
if (string.IsNullOrWhiteSpace(txtTopic.Text))
{
MessageBox.Show("主题不能为空!");
return;
}
//主题订阅
if (client?.IsConnected == true)
{
//QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。
//QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送
//QoS2(Exactly Once):只有一次:只有一次,确保消息只到达一次。
var result = await client.SubscribeAsync(new MqttTopicFilter()
{
QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), comboQoS.Text.Trim()),
Topic = txtTopic.Text.Trim()
});
StringBuilder sb = new StringBuilder();
foreach (var item in result.Items)
{
sb.AppendLine($"订阅主题:[{item.TopicFilter.Topic}],QoS:[{item.TopicFilter.QualityOfServiceLevel}],ReturnCode:[{item.ResultCode}]");
}
AddLog(0, sb.ToString());
}
else
{
MessageBox.Show("请先连接服务!");
}
}
private async void btnPublish_Click(object sender, EventArgs e)
{
if (string.IsNullOrWhiteSpace(txtMsgTopic.Text.Trim()))
{
MessageBox.Show("主题不能为空!");
return;
}
//发布信息
if (client?.IsConnected == true)
{
var result = await client.PublishAsync(new MqttApplicationMessage
{
Topic = txtMsgTopic.Text.Trim(),
QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,
Payload =Encoding.UTF8.GetBytes( txtMsgContent.Text.Trim())
});
string msg = $"发布消息:{txtMsgContent.Text} Topic:{txtMsgTopic.Text.Trim()} 结果:{result.ReasonCode}";
AddLog(0, msg);
}
else
{
MessageBox.Show("请先连接服务!");
}
}
void UpdateStatus(bool isConnected)
{
this.Invoke(new Action(() =>
{
if (!isConnected)
{
btnConnect.Text = "连接";
btnConnect.BackColor = Color.Red;
comboIp_List.Enabled = txtPort.Enabled = true;
}
else
{
btnConnect.Text = "断开";
btnConnect.BackColor = Color.Green;
comboIp_List.Enabled = txtPort.Enabled = false;
}
}));
}
private async void btnUnsubscribed_Click(object sender, EventArgs e)
{
if (string.IsNullOrWhiteSpace(txtTopic.Text))
{
MessageBox.Show("主题不能为空!");
return;
}
//主题订阅
if (client?.IsConnected == true)
{
//QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。
//QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送
//QoS2(Exactly Once):只有一次:只有一次,确保消息只到达一次。
var result = await client.UnsubscribeAsync(txtTopic.Text.Trim());
StringBuilder sb = new StringBuilder();
foreach (var item in result.Items)
{
sb.AppendLine($"取消主题:[{item.TopicFilter}]订阅,ReturnCode:[{item.ReasonCode}]");
}
AddLog(0, sb.ToString());
}
else
{
MessageBox.Show("请先连接服务!");
}
}
}
demo链接
https://download.csdn.net/download/lingxiao16888/89616009