如何在网页端实现MQTT消息的发布和订阅?
- 实现MQTT功能,可以发布和订阅主题
- 通过WebSocket协议将MQTT消息转发给对应的网页端
带着这个实现思路,采用C#控制台程序实现MQTT服务端功能,web端可以直接使用websocket插件与服务端双向通讯。
- 新建C#控制台程序,.net framework框架版本4.5.2
- 引用Fleck框架,版本1.2.0
- 引用MQTTnet框架,版本4.3.1.873
1、WebSocket功能实现
socketDic 字典key值记录每个客户端,value记录每个客户端对应的SN,代表设备的序列号;
每当有网页端客户端连接的时候,自动将客户端套接字加入字典中;
每当网页关闭,服务端自动断开客户端套接字连接,并从缓存字典中移除此客户端套接字;
当网页端客户端套接字发送包含SN:开头的字符,表示网页端发送了设备的SN,服务端接收此消息,并解析设备SN,并与对应的客户端套接字绑定对应关系,在缓存字典中维护对应关系。
class Server
{
static Dictionary<IWebSocketConnection, string> socketDic = new Dictionary<IWebSocketConnection, string>();
static void Main()
{
FleckLog.Level = LogLevel.Debug;
var server = new WebSocketServer("ws://0.0.0.0:8181");
//var socketDic = new Dictionary<IWebSocketConnection, string>();
server.Start(socket =>
{
socket.OnOpen = () =>
{
Console.WriteLine("Open!");
socketDic.Add(socket, null);
};
socket.OnClose = () =>
{
Console.WriteLine("Close!");
socketDic.Remove(socket);
};
socket.OnMessage = message =>
{
if (message.Contains("SN:") && socketDic.ContainsKey(socket))
{
socketDic[socket] = message.Replace("SN:", "");
}
};
});
Client();
var input = Console.ReadLine();
while (input != "exit")
{
foreach (var socket in socketDic.ToList())
{
socket.Key.Send(input);
}
input = Console.ReadLine();
}
}
}
2、MQTT功能实现
MQTT服务器采用EMQX公共服务器测试,服务器域名:broker.emqx.io,端口:1883,用户名:emqx_test,密码:emqx_test
订阅主题名:123、12345
MQTT接收消息事件ApplicationMessageReceivedAsync,判断消息主题与缓存字典中的套接字是否对应,如消息主题与缓存字典SN存在包含关系,则认为此消息与网页端套接字存在关联关系,则将消息推送给此套接字。
具体实现代码:
static IMqttClient client;
public static async void Client()
{
try
{
client = new MqttFactory().CreateMqttClient() as MqttClient;
var build = new MqttClientOptionsBuilder()
.WithClientId(Guid.NewGuid().ToString().Replace("-", "").ToUpper())
.WithCredentials("emqx_test", "emqx_test")
.WithTcpServer("broker.emqx.io", 1883)
.WithCleanSession(true)
.WithKeepAlivePeriod(TimeSpan.FromSeconds(100.5));
client.ConnectedAsync += _mqttClient_ConnectedAsync;
await client.ConnectAsync(build.Build());
client.DisconnectedAsync += _mqttClient_DisconnectedAsync;
client.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync;
await client.SubscribeAsync(
new MqttClientSubscribeOptions
{
TopicFilters = new List<MqttTopicFilter> {new MqttTopicFilter() //订阅消息对象
{
Topic = "123", //订阅消息主题
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce //消息类型
},new MqttTopicFilter() //订阅消息对象
{
Topic = "12345", //订阅消息主题
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce //消息类型
}
}
});
}
catch (Exception e)
{
Console.WriteLine($"连接失败");
}
}
private static Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
var msg = arg.ApplicationMessage.ConvertPayloadToString();
foreach (var item in socketDic)
{
if (item.Value!=null && arg.ApplicationMessage.Topic.Contains(item.Value))
{
item.Key.Send($"hello {item.Value},{msg}");
}
}
Console.WriteLine(arg.ApplicationMessage.ConvertPayloadToString());
return Task.FromResult("");
}
private static Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
Console.WriteLine($"客户端“{client.Options.ClientId}”已断开MQTT服务器!");
return Task.FromResult("");
}
private static Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
Console.WriteLine($"客户端“{client.Options.ClientId}”已连接MQTT服务器!");
return Task.FromResult("");
}
3、网页端实现
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN">
<html>
<head>
<title>websocket client</title>
<script type="text/javascript">
var start = function () {
var inc = document.getElementById('incomming');
var wsImpl = window.WebSocket || window.MozWebSocket;
var form = document.getElementById('sendForm');
var input = document.getElementById('sendText');
inc.innerHTML += "connecting to server ..<br/>";
// create a new websocket and connect
window.ws = new wsImpl('ws://localhost:8181/');
// when data is comming from the server, this metod is called
ws.onmessage = function (evt) {
inc.innerHTML += evt.data + '<br/>';
};
// when the connection is established, this method is called
ws.onopen = function () {
inc.innerHTML += '.. connection open<br/>';
ws.send("SN:12345");
};
// when the connection is closed, this method is called
ws.onclose = function () {
inc.innerHTML += '.. connection closed<br/>';
}
//form.addEventListener('submit', function(e){
// e.preventDefault();
// var val = input.value;
// ws.send(val);
// input.value = "";
//});
}
window.onload = start;
</script>
</head>
<body>
<form id="sendForm">
<input id="sendText" placeholder="Text to send" />
</form>
<pre id="incomming"></pre>
</body>
</html>
4、测试
启动WebSocket服务器控制台应用程序;
运行MQTT.fx工具,新建连接
给12345主题发布消息,
观察网页端是否收到消息,