1、前言
前面完成了winform版,wpf版,为什么要搞个cmd版,因为前面介绍了mqtt的报文结构,重点分析了【连接报文】,【订阅报文】,【发布报文】,这节就要就看看实际报文是怎么组装的,这也是之前详细每个报文的结构,含义的目的,使用mqttnet这个组件实现mqtt通信是直接应用,不涉及到底层报文的结构内容,用户是看不到报文内容的,这节的目的就是为加深理解而干的。这节不安装任何mqtt的组件,而是直接使用socket的原始方式通信。
2、报文回顾
一共有14个报文,如下图
可以去看看3个报文的详细介绍,什么固定报头,可变报头,有效载荷这些东东:
C#MQTT编程03--连接报文
C#MQTT编程04--订阅报文
C#MQTT编程05--发布报文
总结出来就是这样的:
连接报文是客户端发1,服务器回2,
订阅报文是客户端发8,服务器回9,
发布报文是客户端发3,服务器回4,
心跳报文是客户端发12,服务器回13。
3、开始卷
1、创建项目方案
2、编写连接报文
完整的连接代码:
/// <summary>
/// 连接
/// </summary>
static void Connection()
{
// MQTT不支持UDP
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect("127.0.0.1", 1869);
//连接报文
List<byte> connBytes = new List<byte>();
#region 第一部分,固定报头
// 第0个字节:固定报头
List<byte> headerBytes = new List<byte>
{
1<<4 //表示连接请求 消息类型
};
//第1个字节:剩余字节长度
//需要后面计算得到
#endregion
#region 第二部分,可变报头
// 第2,3个字节:协议名称MQTT的字节长度
List<byte> bodyBytes = new List<byte>();
string protocolName = "MQTT";
byte[] pnameBytes = Encoding.ASCII.GetBytes(protocolName);//得到“MQTT”的字节数组
bodyBytes.Add((byte)(pnameBytes.Length / 256 % 256));//高4位
bodyBytes.Add((byte)(pnameBytes.Length % 256));//低4位
// 第4,5,6,7个字节:协议名称
bodyBytes.AddRange(pnameBytes);
// 第8个字节: 协议版本
bodyBytes.Add(0x04);
// 第9个字节: 负载是否需要用户名,密码等设置
byte flagByte = 0;
flagByte |= 128; // 1 0 0 0 0 0 0 0 128 // 需要用户
flagByte |= 64; // 0 1 0 0 0 0 0 0 64 // 需要密码
flagByte |= 2; // CleanSession
bodyBytes.Add(flagByte);
// 第10,11个字节: Keep Alive保持连接的时间,高位在前,低位在后
int seconds = 100; // 秒为单位
bodyBytes.Add((byte)(seconds / 256 % 256));
bodyBytes.Add((byte)(seconds % 256));
#endregion
#region 第三部分,载荷
List<byte> loadBytes = new List<byte>();
// 第12,13个字节:ClientID字符长度
string clientID = "x2";
byte[] ciBytes = Encoding.ASCII.GetBytes(clientID);
loadBytes.Add((byte)(ciBytes.Length / 256 % 256));
loadBytes.Add((byte)(ciBytes.Length % 256));
// 第14,115,16,17个字节:ClientID
loadBytes.AddRange(ciBytes);
// 第18,19个字节:用户名长度
string username = "boss";
byte[] unBytes = Encoding.ASCII.GetBytes(username);
loadBytes.Add((byte)(unBytes.Length / 256 % 256));
loadBytes.Add((byte)(unBytes.Length % 256));
// 第20,21,22,23,24个字节:用户名
loadBytes.AddRange(unBytes);
// 第25,26个字节:密码长度
string pwd = "1234";
byte[] pwdBytes = Encoding.ASCII.GetBytes(pwd);
loadBytes.Add((byte)(pwdBytes.Length / 256 % 256));
loadBytes.Add((byte)(pwdBytes.Length % 256));
// 第27,28,29,30,31个字节:密码
loadBytes.AddRange(pwdBytes);
#endregion
//第1个字节:剩余字节长度,从第 2 个字节开始。
headerBytes.Add((byte)(bodyBytes.Count + loadBytes.Count));
//组装成报文
connBytes.AddRange(headerBytes);
connBytes.AddRange(bodyBytes);
connBytes.AddRange(loadBytes);
//发送报文
socket.Send(connBytes.ToArray());
// 异步处理:开始心跳
Task.Run(async () =>
{
byte[] pingBytes = new byte[2] { 12 << 4, 0 };//心跳的字节报文是固定的
while (true)
{
Console.WriteLine("心跳时间:" + DateTime.Now.ToString());
await Task.Delay(1000);//等待1秒
socket.Send(pingBytes);
}
});
//异步处理:服务器返回的报文
Task.Run(() =>
{
//1:请求连接 (C->S)
//2:连接确认 (S->C)
//3:发布消息 (Both)
//4:发布收到确认 (QoS > 0)
//5:发布确认收到
//6:发布释放
//7:发布完成 (QoS 2)
//8:订阅请求 (C->S)
//9:订阅请求确认 (S->C)
//10:取消订阅请求 (C->S)
//11:取消订阅请求确认 (S->C)
//12:心跳请求 (C->S)
//13:心跳确认 (S->C)
//14:客户端断开连接 (C->S)
byte[] respBytes = new byte[1]; //接收MQTT报文类型,报文类型占1个字节
//连接成功,MQTT报文类型(CONNACK),服务器返回2,即0000 0010,高低位交换位置就是返回:0 0 1 0 0 0 0 0 ,转成10进制就是32
//发布成功,MQTT报文类型(PUBACK), 服务器返回4,即0000 0100,高低位交换位置就是返回:0 1 0 0 0 0 0 0 ,转成10进制就是64
//订阅成功,MQTT报文类型(SUBACK), 服务器返回9,即0000 1001,高低位交换位置就是返回:1 0 0 1 0 0 0 0 ,转成10进制就是144
//心跳成功,MQTT报文类型(PINGRESP),服务器返回13,即0000 1101,高低位交换位置就是返回:1 1 0 1 0 0 0 0 ,转成10进制就是208
while (true)//循环接收
{
try
{
socket.Receive(respBytes, 0, 1, SocketFlags.None);
int firstValue = Convert.ToInt32(respBytes[0]);
//Console.WriteLine("第一个字节:" + firstValue);
//根据报文类型进行处理
switch (firstValue)
{
case 32:
Console.WriteLine("连接成功!");
break;
case 64:
Console.WriteLine("发布成功!");
break;
case 144:
Console.WriteLine("订阅成功!");
break;
case 208:
Console.WriteLine("心跳成功!");
break;
}
}
catch (Exception ex)
{
Console.WriteLine("出错了," + ex.Message);
}
}
});
}
特别注意这里的处理
//连接成功,MQTT报文类型(CONNACK),服务器返回2,即0000 0010,高低位交换位置就是返回:0 0 1 0 0 0 0 0 ,转成10进制就是32
//发布成功,MQTT报文类型(PUBACK), 服务器返回4,即0000 0100,高低位交换位置就是返回:0 1 0 0 0 0 0 0 ,转成10进制就是64
//订阅成功,MQTT报文类型(SUBACK), 服务器返回9,即0000 1001,高低位交换位置就是返回:1 0 0 1 0 0 0 0 ,转成10进制就是144
//心跳成功,MQTT报文类型(PINGRESP),服务器返回13,即0000 1101,高低位交换位置就是返回:1 1 0 1 0 0 0 0 ,转成10进制就是208
心跳的处理,它的作用是不断地发送命令,以证明客户端存在
测试连接
先把前面的wpf版程序运行,启动服务器,启动客户端连接服务器
再启动本项目程序,可以看到连接成功,心跳也成功。
3、编写订阅报文
这里设置的qos级别是1,Qos级别-》 0:最多一次的传输,1:至少一次的传输、至多无限次,2:有且仅有一次的传输
完整代码
/// <summary>
/// 订阅
/// </summary>
/// <param name="topics">主题列表</param>
static void Subscription(List<string> topics)
{
List<byte> headerBytes = new List<byte>();
List<byte> bodyBytes = new List<byte>();
//第0个字节:报文类型(10000010)
byte msgType = 8 << 4; // 1000 0000
headerBytes.Add((byte)(msgType | 2));
//第1个字节:剩余字节长度,等后面计算获取后再添加
//第2,3个字节:Package Identifier的长度,表示报文的标识
int pi = random.Next(0, 1000); // Package Identifier的具体值
bodyBytes.Add((byte)(pi / 256 % 256));//高位
bodyBytes.Add((byte)(pi % 256));//低位
//遍历所有主题
foreach (var item in topics)
{
//第8,9个字节:topic字符长度
byte[] itemBytes = Encoding.UTF8.GetBytes(item);
bodyBytes.Add((byte)(itemBytes.Length / 256 % 256));
bodyBytes.Add((byte)(itemBytes.Length % 256));
//第10,11,12,13,14,16个字节:topic字符内容
bodyBytes.AddRange(itemBytes);
//第17个字节:Qos级别-》 0:最多一次的传输,1:至少一次的传输、至多无限次,2:有且仅有一次的传输
bodyBytes.Add(0x01);
}
//第1个字节:剩余字节长度,从第 2 个字节开始。
headerBytes.Add((byte)bodyBytes.Count);
//组成报文
headerBytes.AddRange(bodyBytes);
//发送报文
socket.Send(headerBytes.ToArray());
//接收服务器回应的报文
//byte[] respBytes = new byte[5];
//socket.Receive(respBytes, 0, 5, SocketFlags.None);
//var objSub = respBytes;
}
测试订阅
先让wpf客户端订阅一个主题“shanghai",订阅成功
再看本项目程序订阅主题”shanghai",可以看到订阅成功
4、编写发布报文
完整代码,注释详情
/// <summary>
/// 发布消息:服务级别(Qos1)
/// </summary>
static void Publish_Qos1()
{
#region 方法1
List<byte> headerBytes = new List<byte>();
//报文类型
byte msgType = 3 << 4; // 1000 0000
headerBytes.Add((byte)(msgType | 2)); // QoS-0低4位全为1
List<byte> bodyBytes = new List<byte>();
string topic = "shanghai";
string msg = "hello9098";
// 添加主题长度
byte[] topicBytes = Encoding.UTF8.GetBytes(topic);
bodyBytes.Add((byte)(topicBytes.Length / 256 % 256));
bodyBytes.Add((byte)(topicBytes.Length % 256));
// 添加主题内容
bodyBytes.AddRange(topicBytes);
// 必须添加Package Identifier:只包括它的字节长度
int pi = random.Next(0, 1000); // Package Identifier
//Console.WriteLine(pi);
bodyBytes.Add((byte)(pi / 256 % 256));
bodyBytes.Add((byte)(pi % 256));
// 添加消息长度
byte[] msgBytes = Encoding.UTF8.GetBytes(msg);
bodyBytes.Add((byte)(msgBytes.Length / 256 % 256));
bodyBytes.Add((byte)(msgBytes.Length % 256));
// 添加消息内容
bodyBytes.AddRange(msgBytes);
//添加第1个字节:剩余字节长度
headerBytes.Add((byte)bodyBytes.Count);
// 组装头
headerBytes.AddRange(bodyBytes);
//发送消息
socket.Send(headerBytes.ToArray());
#endregion
//#region 方法2
//string topic = "shanghai";
//string msg = "hello9098";
//int pi = random.Next(0, 1000); // Package Identifier
//List<byte> topicbytes = new List<byte>();
//byte[] topicArray = Encoding.UTF8.GetBytes(topic);
//byte[] payloadArray = Encoding.UTF8.GetBytes(msg);
//topicbytes.Add((byte)((int)topicArray.Length / 256));
//topicbytes.Add((byte)((int)topicArray.Length % 256));
//topicbytes.AddRange(topicArray);
//byte[] id = new byte[] { (byte)(pi / 256 % 256), (byte)(pi % 256) };
//byte[] bufferLen = new byte[] { (byte)(topicbytes.Count + payloadArray.Length + id.Length) };
//using (MemoryStream memoryStream = new MemoryStream())
//{
// memoryStream.WriteByte((3 << 4) | 2 | 1);// 写入消息类型(QoS-1)
// memoryStream.Write(bufferLen, 0, (int)bufferLen.Length);// 写入后续报文长度
// memoryStream.Write(topicbytes.ToArray(), 0, (int)topicbytes.Count);// 写入Topic字节
// memoryStream.Write(id.ToArray(), 0, (int)id.Length);// 写入Package Identifier字节
// memoryStream.Write(payloadArray.ToArray(), 0, (int)payloadArray.Length);// 写入消息
// byte[] sendArray = memoryStream.ToArray();
// socket.Send(sendArray);
//}
接收服务器回应的报文
//byte[] respBytes = new byte[4];
//socket.Receive(respBytes, 0, 4, SocketFlags.None);
//var objSub = respBytes;
//#endregion
}
测试发布
前面的c1订阅了主题“shanghai",现在的x2客户端向shanghai主题发布一个消息,看看c1能不能收到
最后全部完整代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace MQTTNETClientCMD
{
internal class Program
{
static Socket socket;//socket对象
static Random random = new Random();//随机数,用于产生package identifier
static List<string> topic = new List<string> { "shanghai" };//主题
static void Main(string[] args)
{
Console.WriteLine("Hello MQTT!");
Connection();//连接
Subscription(topic);//订阅
Publish_Qos1();// 发布Qos=1
Console.ReadKey();
}
/// <summary>
/// 连接
/// </summary>
static void Connection()
{
// MQTT不支持UDP
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect("127.0.0.1", 1869);
//连接报文
List<byte> connBytes = new List<byte>();
#region 第一部分,固定报头
// 第0个字节:固定报头
List<byte> headerBytes = new List<byte>
{
1<<4 //表示连接请求 消息类型
};
//第1个字节:剩余字节长度
//需要后面计算得到
#endregion
#region 第二部分,可变报头
// 第2,3个字节:协议名称MQTT的字节长度
List<byte> bodyBytes = new List<byte>();
string protocolName = "MQTT";
byte[] pnameBytes = Encoding.ASCII.GetBytes(protocolName);//得到“MQTT”的字节数组
bodyBytes.Add((byte)(pnameBytes.Length / 256 % 256));//高4位
bodyBytes.Add((byte)(pnameBytes.Length % 256));//低4位
// 第4,5,6,7个字节:协议名称
bodyBytes.AddRange(pnameBytes);
// 第8个字节: 协议版本
bodyBytes.Add(0x04);
// 第9个字节: 负载是否需要用户名,密码等设置
byte flagByte = 0;
flagByte |= 128; // 1 0 0 0 0 0 0 0 128 // 需要用户
flagByte |= 64; // 0 1 0 0 0 0 0 0 64 // 需要密码
flagByte |= 2; // CleanSession
bodyBytes.Add(flagByte);
// 第10,11个字节: Keep Alive保持连接的时间,高位在前,低位在后
int seconds = 100; // 秒为单位
bodyBytes.Add((byte)(seconds / 256 % 256));
bodyBytes.Add((byte)(seconds % 256));
#endregion
#region 第三部分,载荷
List<byte> loadBytes = new List<byte>();
// 第12,13个字节:ClientID字符长度
string clientID = "x2";
byte[] ciBytes = Encoding.ASCII.GetBytes(clientID);
loadBytes.Add((byte)(ciBytes.Length / 256 % 256));
loadBytes.Add((byte)(ciBytes.Length % 256));
// 第14,115,16,17个字节:ClientID
loadBytes.AddRange(ciBytes);
// 第18,19个字节:用户名长度
string username = "boss";
byte[] unBytes = Encoding.ASCII.GetBytes(username);
loadBytes.Add((byte)(unBytes.Length / 256 % 256));
loadBytes.Add((byte)(unBytes.Length % 256));
// 第20,21,22,23,24个字节:用户名
loadBytes.AddRange(unBytes);
// 第25,26个字节:密码长度
string pwd = "1234";
byte[] pwdBytes = Encoding.ASCII.GetBytes(pwd);
loadBytes.Add((byte)(pwdBytes.Length / 256 % 256));
loadBytes.Add((byte)(pwdBytes.Length % 256));
// 第27,28,29,30,31个字节:密码
loadBytes.AddRange(pwdBytes);
#endregion
//第1个字节:剩余字节长度,从第 2 个字节开始。
headerBytes.Add((byte)(bodyBytes.Count + loadBytes.Count));
//组装成报文
connBytes.AddRange(headerBytes);
connBytes.AddRange(bodyBytes);
connBytes.AddRange(loadBytes);
//发送报文
socket.Send(connBytes.ToArray());
// 异步处理:开始心跳
Task.Run(async () =>
{
byte[] pingBytes = new byte[2] { 12 << 4, 0 };//心跳的字节报文是固定的
while (true)
{
Console.WriteLine("心跳时间:" + DateTime.Now.ToString());
await Task.Delay(1000);//等待1秒
socket.Send(pingBytes);
}
});
//异步处理:服务器返回的报文
Task.Run(() =>
{
//1:请求连接 (C->S)
//2:连接确认 (S->C)
//3:发布消息 (Both)
//4:发布收到确认 (QoS > 0)
//5:发布确认收到
//6:发布释放
//7:发布完成 (QoS 2)
//8:订阅请求 (C->S)
//9:订阅请求确认 (S->C)
//10:取消订阅请求 (C->S)
//11:取消订阅请求确认 (S->C)
//12:心跳请求 (C->S)
//13:心跳确认 (S->C)
//14:客户端断开连接 (C->S)
byte[] respBytes = new byte[1]; //接收MQTT报文类型,报文类型占1个字节
//连接成功,MQTT报文类型(CONNACK),服务器返回2,即0000 0010,高低位交换位置就是返回:0 0 1 0 0 0 0 0 ,转成10进制就是32
//发布成功,MQTT报文类型(PUBACK), 服务器返回4,即0000 0100,高低位交换位置就是返回:0 1 0 0 0 0 0 0 ,转成10进制就是64
//订阅成功,MQTT报文类型(SUBACK), 服务器返回9,即0000 1001,高低位交换位置就是返回:1 0 0 1 0 0 0 0 ,转成10进制就是144
//心跳成功,MQTT报文类型(PINGRESP),服务器返回13,即0000 1101,高低位交换位置就是返回:1 1 0 1 0 0 0 0 ,转成10进制就是208
while (true)//循环接收
{
try
{
socket.Receive(respBytes, 0, 1, SocketFlags.None);
int firstValue = Convert.ToInt32(respBytes[0]);
//Console.WriteLine("第一个字节:" + firstValue);
//根据报文类型进行处理
switch (firstValue)
{
case 32:
Console.WriteLine("连接成功!");
break;
case 64:
Console.WriteLine("发布成功!");
break;
case 144:
Console.WriteLine("订阅成功!");
break;
case 208:
Console.WriteLine("心跳成功!");
break;
}
}
catch (Exception ex)
{
Console.WriteLine("出错了," + ex.Message);
}
}
});
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="topics">主题列表</param>
static void Subscription(List<string> topics)
{
List<byte> headerBytes = new List<byte>();
List<byte> bodyBytes = new List<byte>();
//第0个字节:报文类型(10000010)
byte msgType = 8 << 4; // 1000 0000
headerBytes.Add((byte)(msgType | 2));
//第1个字节:剩余字节长度,等后面计算获取后再添加
//第2,3个字节:Package Identifier的长度,表示报文的标识
int pi = random.Next(0, 1000); // Package Identifier的具体值
bodyBytes.Add((byte)(pi / 256 % 256));//高位
bodyBytes.Add((byte)(pi % 256));//低位
//遍历所有主题
foreach (var item in topics)
{
//第8,9个字节:topic字符长度
byte[] itemBytes = Encoding.UTF8.GetBytes(item);
bodyBytes.Add((byte)(itemBytes.Length / 256 % 256));
bodyBytes.Add((byte)(itemBytes.Length % 256));
//第10,11,12,13,14,16个字节:topic字符内容
bodyBytes.AddRange(itemBytes);
//第17个字节:Qos级别-》 0:最多一次的传输,1:至少一次的传输、至多无限次,2:有且仅有一次的传输
bodyBytes.Add(0x01);
}
//第1个字节:剩余字节长度,从第 2 个字节开始。
headerBytes.Add((byte)bodyBytes.Count);
//组成报文
headerBytes.AddRange(bodyBytes);
//发送报文
socket.Send(headerBytes.ToArray());
//接收服务器回应的报文
//byte[] respBytes = new byte[5];
//socket.Receive(respBytes, 0, 5, SocketFlags.None);
//var objSub = respBytes;
}
/// <summary>
/// 发布消息:服务级别(Qos1)
/// </summary>
static void Publish_Qos1()
{
#region 方法1
List<byte> headerBytes = new List<byte>();
//报文类型
byte msgType = 3 << 4; // 1000 0000
headerBytes.Add((byte)(msgType | 2)); // QoS-0低4位全为1
List<byte> bodyBytes = new List<byte>();
string topic = "shanghai";
string msg = "hello9098";
// 添加主题长度
byte[] topicBytes = Encoding.UTF8.GetBytes(topic);
bodyBytes.Add((byte)(topicBytes.Length / 256 % 256));
bodyBytes.Add((byte)(topicBytes.Length % 256));
// 添加主题内容
bodyBytes.AddRange(topicBytes);
// 必须添加Package Identifier:只包括它的字节长度
int pi = random.Next(0, 1000); // Package Identifier
//Console.WriteLine(pi);
bodyBytes.Add((byte)(pi / 256 % 256));
bodyBytes.Add((byte)(pi % 256));
// 添加消息长度
byte[] msgBytes = Encoding.UTF8.GetBytes(msg);
bodyBytes.Add((byte)(msgBytes.Length / 256 % 256));
bodyBytes.Add((byte)(msgBytes.Length % 256));
// 添加消息内容
bodyBytes.AddRange(msgBytes);
//添加第1个字节:剩余字节长度
headerBytes.Add((byte)bodyBytes.Count);
// 组装头
headerBytes.AddRange(bodyBytes);
//发送消息
socket.Send(headerBytes.ToArray());
#endregion
//#region 方法2
//string topic = "shanghai";
//string msg = "hello9098";
//int pi = random.Next(0, 1000); // Package Identifier
//List<byte> topicbytes = new List<byte>();
//byte[] topicArray = Encoding.UTF8.GetBytes(topic);
//byte[] payloadArray = Encoding.UTF8.GetBytes(msg);
//topicbytes.Add((byte)((int)topicArray.Length / 256));
//topicbytes.Add((byte)((int)topicArray.Length % 256));
//topicbytes.AddRange(topicArray);
//byte[] id = new byte[] { (byte)(pi / 256 % 256), (byte)(pi % 256) };
//byte[] bufferLen = new byte[] { (byte)(topicbytes.Count + payloadArray.Length + id.Length) };
//using (MemoryStream memoryStream = new MemoryStream())
//{
// memoryStream.WriteByte((3 << 4) | 2 | 1);// 写入消息类型(QoS-1)
// memoryStream.Write(bufferLen, 0, (int)bufferLen.Length);// 写入后续报文长度
// memoryStream.Write(topicbytes.ToArray(), 0, (int)topicbytes.Count);// 写入Topic字节
// memoryStream.Write(id.ToArray(), 0, (int)id.Length);// 写入Package Identifier字节
// memoryStream.Write(payloadArray.ToArray(), 0, (int)payloadArray.Length);// 写入消息
// byte[] sendArray = memoryStream.ToArray();
// socket.Send(sendArray);
//}
接收服务器回应的报文
//byte[] respBytes = new byte[4];
//socket.Receive(respBytes, 0, 4, SocketFlags.None);
//var objSub = respBytes;
//#endregion
}
}
}
讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。
讲解不易,分析不易,原创不易,整理不易,伙伴们动动你的金手指,你的支持是我最大的动力。