目录
- 0.前言
- 1. MqttEncoder--编码器
- 1.1 构造方法
- 1.2 encodeConnectMessage -- 连接消息
- 1.3 encodeConnAckMessage - 确认连接
- 1.4 encodePublishMessage -- 发布消息
- 1.5 encodeSubscribeMessage - 订阅主题
- 1.6 encodeUnsubscribeMessage - 取消订阅
- 1.7 encodeSubAckMessage - 订阅应答
- 1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
- 1.9 encodeMessageWithOnlySingleByteFixedHeader
- 2. MqttDecoder--解码器
- 2.1 构造方法
- 2.2 READ_FIXED_HEADER - 固定报头解码
- 2.3 READ_VARIABLE_HEADER - 可变报头解码
- 2.4 READ_PAYLOAD- 有效载荷解码
0.前言
这里梳理下netty中对mqtt协议的编码和解码的处理。一方面对mqtt协议的结构再巩固些,另一方面就是学习下netty中对字节的处理。对于MQTT协议,可以参考前一篇文章MQTT协议详解。
1. MqttEncoder–编码器
1.1 构造方法
对于编码器,构造方法是私有的。我们可以通过其提供的静态常量INSTANCE访问。
public static final MqttEncoder INSTANCE = new MqttEncoder();
private MqttEncoder() { }
1.2 encodeConnectMessage – 连接消息
编码器,我们重点看一下doEncode方法。通过固定报头中的消息类型来对不同类型的消息进行特定编码。
static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
switch (message.fixedHeader().messageType()) {
case CONNECT:
return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
case CONNACK:
return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
case PUBLISH:
return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
case SUBSCRIBE:
return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
case UNSUBSCRIBE:
return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
case SUBACK:
return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
case UNSUBACK:
case PUBACK:
case PUBREC:
case PUBREL:
case PUBCOMP:
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
case PINGREQ:
case PINGRESP:
case DISCONNECT:
return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
default:
throw new IllegalArgumentException(
"Unknown message type: " + message.fixedHeader().messageType().value());
}
}
对于消息的编码,我们可以对照着协议来看,这样会更清晰。
固定报头
可变报头
encodeConnectMessage方法
private static ByteBuf encodeConnectMessage(
ByteBufAllocator byteBufAllocator,
MqttConnectMessage message) {
// 1. 有效载荷初始大小设为0字节
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttConnectVariableHeader variableHeader = message.variableHeader();
MqttConnectPayload payload = message.payload();
// 2.public enum MqttVersion {
// MQTT_3_1("MQIsdp", (byte) 3),
// MQTT_3_1_1("MQTT", (byte) 4);
// } 对mqtt版本做校验,枚举中只有两种,如果名称和版本不匹配,则报错
MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
(byte) variableHeader.version());
// 3.如果可变报头中,没有用户名称但是有用户密码,则报错
if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
throw new DecoderException("Without a username, the password MUST be not set");
}
// 4. 校验有效载荷中的clientId,版本MQTT_3_1中clinetId编码字节长度必须为1到23,
// 在v3.1.1中允许超过23字节或者长度为0的clientId,所以不为null即可。
String clientIdentifier = payload.clientIdentifier();
if (!isValidClientId(mqttVersion, clientIdentifier)) {
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
}
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
// 4.1 connect消息的有效载荷,如果都包含的话必须按这个顺序出现:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码
// 而这几部分的结构,都是由一个两字节的长度和对应的载荷消息的组成,所以这里都是 2字节 + 消息的字节长度
payloadBufferSize += 2 + clientIdentifierBytes.length;
// 5.校验有效载荷中的遗嘱主题和遗嘱消息,如果可变报头中的遗嘱标志为1,则表示这两项存在
String willTopic = payload.willTopic();
byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;
byte[] willMessage = payload.willMessageInBytes();
byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
if (variableHeader.isWillFlag()) {
payloadBufferSize += 2 + willTopicBytes.length;
payloadBufferSize += 2 + willMessageBytes.length;
}
// 6.校验有效载荷中的用户和密码,如果可变报头中的用户名/密码标识为1,则表示用户名/密码存在
String userName = payload.userName();
byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;
if (variableHeader.hasUserName()) {
payloadBufferSize += 2 + userNameBytes.length;
}
byte[] password = payload.passwordInBytes();
byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
if (variableHeader.hasPassword()) {
payloadBufferSize += 2 + passwordBytes.length;
}
byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
// 7. 可变报头的长度 = 2字节的协议名的长度(v3.1为3,v3.1.1值为4) + 4字节的协议名(MQTT) + 1字段协议级别 + 1字节连接标志 + 2字节保持时间
int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
// 8.这里计算的是固定头的长度 = 1字节的报文类型(包括类型标志位) + 剩余长度字段,剩余长度=可变报头 + 有效载荷
// 由于剩余长度字段中每字节的最高位为进制位,所以每个字节表示的最大值为128(0-127),所以(可变报头+有效载荷)/128即剩余长度所占的字字节
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
// 9.创建一个这Mqtt消息长度的byteBuf,开始写入消息
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
// 9.1 向byteBuf中写入固定报头中的第一个字节,详细介绍见下段代码
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
// 9.2 向byteBuf中写入固定报头中的剩余长度字段,详细介绍见下段代码
writeVariableLengthInt(buf, variablePartSize);
// 9.3 写入两字节的协议名的长度,再写入协议名
buf.writeShort(protocolNameBytes.length);
buf.writeBytes(protocolNameBytes);
// 9.4 1字节协议版本,1字节连接标志,2字节连接时间
buf.writeByte(variableHeader.version());
buf.writeByte(getConnVariableHeaderFlag(variableHeader));
buf.writeShort(variableHeader.keepAliveTimeSeconds());
// 9.5 最后写入有效载荷
buf.writeShort(clientIdentifierBytes.length);
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
if (variableHeader.isWillFlag()) {
buf.writeShort(willTopicBytes.length);
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
buf.writeShort(willMessageBytes.length);
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
}
if (variableHeader.hasUserName()) {
buf.writeShort(userNameBytes.length);
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
}
if (variableHeader.hasPassword()) {
buf.writeShort(passwordBytes.length);
buf.writeBytes(passwordBytes, 0, passwordBytes.length);
}
return buf;
}
/**
* 计算固定报头第一字节
*
* @param header
* @return
*/
private static int getFixedHeaderByte1(MqttFixedHeader header) {
int ret = 0;
// 1. 将消息类型左移四位,因为第一字节的0-3位为类型对应的标志位
ret |= header.messageType().value() << 4;
// 2. 如果是重发报文,则重发标志为1,第一字节的第3位
if (header.isDup()) {
// 采用或运算,将第一字节的第三位这只为1
ret |= 0x08;
}
// 3. Qos等级为第一字节的第1、2位
ret |= header.qosLevel().value() << 1;
// 4.如果要保留消息,则保留标志为1,第一字节的第0位
if (header.isRetain()) {
ret |= 0x01;
}
return ret;
}
/**
* 写入固定报头中的剩余长度字段
*
* @param buf
* @param num
*/
private static void writeVariableLengthInt(ByteBuf buf, int num) {
do {
// 1. %求余,取前一个字节所表示的数值大小,
// 比如num表示200字节,第一次这里digit表示72
// 由于第二次,num=1,大于0成立,这里1 % 128 = 1
int digit = num % 128;
// 2. 第一除以128,即去除前一个字节,num表示1,第二次等于0了,表示没有字节了
num /= 128;
if (num > 0) {
// 3.如果还有字节,则最高位进制位设置为1
digit |= 0x80;
}
// 4. 第一字节写入72,第二字节写入1
buf.writeByte(digit);
} while (num > 0);
}
/**
* 通过或运算计算连接标志
*/
private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
int flagByte = 0;
if (variableHeader.hasUserName()) {
flagByte |= 0x80;
}
if (variableHeader.hasPassword()) {
flagByte |= 0x40;
}
if (variableHeader.isWillRetain()) {
flagByte |= 0x20;
}
flagByte |= (variableHeader.willQos() & 0x03) << 3;
if (variableHeader.isWillFlag()) {
flagByte |= 0x04;
}
if (variableHeader.isCleanSession()) {
flagByte |= 0x02;
}
return flagByte;
}
1.3 encodeConnAckMessage - 确认连接
可变报头
encodeConnAckMessage
private static ByteBuf encodeConnAckMessage(
ByteBufAllocator byteBufAllocator,
MqttConnAckMessage message) {
// 一共4个字节,两字节的固定头,两字节的可变头,无载荷
ByteBuf buf = byteBufAllocator.buffer(4);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
// 剩余长度字段固定为2,表示可变报头+载荷=2字节
buf.writeByte(2);
// 服务端如果保存了会话,则置为1,如果没有保存,则置为0,
buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
return buf;
}
1.4 encodePublishMessage – 发布消息
固定报头
可变报头
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中。
encodePublishMessage
private static ByteBuf encodePublishMessage(
ByteBufAllocator byteBufAllocator,
MqttPublishMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttPublishVariableHeader variableHeader = message.variableHeader();
ByteBuf payload = message.payload().duplicate();
String topicName = variableHeader.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
// 1. 可变报头长度 = 2字节长度 + topic长度 + 2字节PacketIdentifier(qos=1或qos=2时存在)
int variableHeaderBufferSize = 2 + topicNameBytes.length +
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
int payloadBufferSize = payload.readableBytes();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes);
if (mqttFixedHeader.qosLevel().value() > 0) {
buf.writeShort(variableHeader.packetId());
}
buf.writeBytes(payload);
return buf;
}
1.5 encodeSubscribeMessage - 订阅主题
有效载荷
encodeSubscribeMessage
private static ByteBuf encodeSubscribeMessage(
ByteBufAllocator byteBufAllocator,
MqttSubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttSubscribePayload payload = message.payload();
// 1. 订阅消息的载荷中可以包含多个订阅主题
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
// 2. 每个订阅主题的载荷 = 2字节长度 + topic过滤器的长度 + 1字节的Qos
payloadBufferSize += 2 + topicNameBytes.length;
payloadBufferSize += 1;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// 3. 可变头中包含2字节的packageId
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
String topicName = topic.topicName();
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
buf.writeByte(topic.qualityOfService().value());
}
return buf;
}
1.6 encodeUnsubscribeMessage - 取消订阅
有效载荷示例
encodeUnsubscribeMessage
private static ByteBuf encodeUnsubscribeMessage(
ByteBufAllocator byteBufAllocator,
MqttUnsubscribeMessage message) {
int variableHeaderBufferSize = 2;
int payloadBufferSize = 0;
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
MqttUnsubscribePayload payload = message.payload();
// 1. 要取消的主题列表,每个主题过滤器 = 2字节长度 + 过滤器长度
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
payloadBufferSize += 2 + topicNameBytes.length;
}
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variablePartSize);
// Variable Header
int messageId = variableHeader.messageId();
buf.writeShort(messageId);
// Payload
for (String topicName : payload.topics()) {
byte[] topicNameBytes = encodeStringUtf8(topicName);
buf.writeShort(topicNameBytes.length);
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
}
return buf;
}
1.7 encodeSubAckMessage - 订阅应答
可变报头
可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符。
有效载荷
有效载荷包含一个返回码清单。每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。
encodeSubAckMessage
private static ByteBuf encodeSubAckMessage(
ByteBufAllocator byteBufAllocator,
MqttSubAckMessage message) {
// 1. 可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符,所以固定长度为2
int variableHeaderBufferSize = 2;
// 2. SUBSCRIBE报文中的每个过滤器,subAck消息中都要给出对应主题的所能赋予的最大Qos等级
int payloadBufferSize = message.payload().grantedQoSLevels().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId());
for (int qos : message.payload().grantedQoSLevels()) {
buf.writeByte(qos);
}
return buf;
}
1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
对于UNSUBACK、PUBACK、PUBREC、PUBREL、PUBCOMP消息,都是一些确认消息。这类消息中的可变报头需要携带对应消息的packetId,无载荷。
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
ByteBufAllocator byteBufAllocator,
MqttMessage message) {
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
int msgId = variableHeader.messageId();
// 对于此方法,适用于报文结构为: 可变头只有个2字节packetId,并且无有效载荷的
int variableHeaderBufferSize = 2; // variable part only has a message id
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
writeVariableLengthInt(buf, variableHeaderBufferSize);
buf.writeShort(msgId);
return buf;
}
1.9 encodeMessageWithOnlySingleByteFixedHeader
对于PINGREQ、PINGRESP、DISCONNECT消息,既无可变报头,也无有效载荷。
2. MqttDecoder–解码器
2.1 构造方法
对于解码器,netty提供了对外公共的构造方法,无参构造和有参构造。
private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
private final int maxBytesInMessage;
public MqttDecoder() {
this(DEFAULT_MAX_BYTES_IN_MESSAGE);
}
public MqttDecoder(int maxBytesInMessage) {
super(DecoderState.READ_FIXED_HEADER);
this.maxBytesInMessage = maxBytesInMessage;
}
采用无参构造,默认消息的最大字节为8092,有参构造则可以指定最大字节数。同时,将initialState设置为固定报头,作用是后面解码时从固定头开始,这里作者注释也有说明。
/**
* Creates a new instance with the specified initial state.
*/
protected ReplayingDecoder(S initialState) {
state = initialState;
}
/**
* States of the decoder.
* We start at READ_FIXED_HEADER, followed by
* READ_VARIABLE_HEADER and finally READ_PAYLOAD.
*/
enum DecoderState {
READ_FIXED_HEADER,
READ_VARIABLE_HEADER,
READ_PAYLOAD,
BAD_MESSAGE,
}
2.2 READ_FIXED_HEADER - 固定报头解码
解码器,我们重点看一下decode方法。这里通过state()方法判断是解析固定头、可变头、载荷、错误消息。new对象的时候,初始状态为固定头,从固定头开始,注意固定头和可变头case
中没有break
,所以解码顺序固定头 -> 可变头 -> 载荷。在这三个过程中有异常,则 -> BAD_MESSAGE
。
decodeFixedHeader
private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
// 1. 取无符号第一字节,固定头第一字节,7-4位为消息类型,所以右移4位
short b1 = buffer.readUnsignedByte();
MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
// 1.1 重发标志第3位,值为1,标志重发
boolean dupFlag = (b1 & 0x08) == 0x08;
// 1.2 Qos占1、2位,采用& 0x06获取到这两位值,然后右移一位即Qos的值
int qosLevel = (b1 & 0x06) >> 1;
// 1.3 会话保留标志站0位
boolean retain = (b1 & 0x01) != 0;
int remainingLength = 0;
int multiplier = 1;
short digit;
int loops = 0;
do {
digit = buffer.readUnsignedByte();
// 2.由于每个字节的最高位为进制位,所以&127获取除进制位之外的7位
remainingLength += (digit & 127) * multiplier;
multiplier *= 128;
loops++;
} while ((digit & 128) != 0 && loops < 4);
// 协议规定,剩余长度字段最大为4个字节
if (loops == 4 && (digit & 128) != 0) {
throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
}
MqttFixedHeader decodedFixedHeader =
new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
}
validateFixedHeader
协议中规定,PUBREL、SUBSCRIBE、UNSUBSCRIBE中固定头中的Qos等级必须是1。
static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
}
default:
return mqttFixedHeader;
}
}
resetUnusedFields
对于协议中规定,固定报头中标志位保留的,一律置为0。
static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
switch (mqttFixedHeader.messageType()) {
case CONNECT:
case CONNACK:
case PUBACK:
case PUBREC:
case PUBCOMP:
case SUBACK:
case UNSUBACK:
case PINGREQ:
case PINGRESP:
case DISCONNECT:
if (mqttFixedHeader.isDup() ||
mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
mqttFixedHeader.messageType(),
false,
MqttQoS.AT_MOST_ONCE,
false,
mqttFixedHeader.remainingLength());
}
return mqttFixedHeader;
case PUBREL:
case SUBSCRIBE:
case UNSUBSCRIBE:
if (mqttFixedHeader.isRetain()) {
return new MqttFixedHeader(
mqttFixedHeader.messageType(),
mqttFixedHeader.isDup(),
mqttFixedHeader.qosLevel(),
false,
mqttFixedHeader.remainingLength());
}
return mqttFixedHeader;
default:
return mqttFixedHeader;
}
}
2.3 READ_VARIABLE_HEADER - 可变报头解码
解码和编码是相对应的,这里不针对每种都进行说明。讲一下连接消息,其他的都比较简单。
decodeConnectionVariableHeader
private static MqttDecoder.Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
// 1. decodeString方法的作用,通过前两个长度字节(MLB,SLB)指定的字节长度来获取数据并装成string
// Connect可变头最开始的两个长度字节用来表示协议名称的长度,所以这里通过此方法获取的就是协议名MQTT
final MqttDecoder.Result<String> protoString = decodeString(buffer);
// 2.注意这里的numberOfBytesConsumed字段,用于表示已经读取的字节个数,等解码完有效载荷后,
// 会比较固定头里的剩余长度字段和此字段是否完全相等
int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
final byte protocolLevel = buffer.readByte();
numberOfBytesConsumed += 1;
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
// 3.取出连接标志字节,采用&进行解码
final int b1 = buffer.readUnsignedByte();
numberOfBytesConsumed += 1;
final MqttDecoder.Result<Integer> keepAlive = decodeMsbLsb(buffer);
numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
final boolean hasUserName = (b1 & 0x80) == 0x80;
final boolean hasPassword = (b1 & 0x40) == 0x40;
final boolean willRetain = (b1 & 0x20) == 0x20;
final int willQos = (b1 & 0x18) >> 3;
final boolean willFlag = (b1 & 0x04) == 0x04;
final boolean cleanSession = (b1 & 0x02) == 0x02;
if (mqttVersion == MqttVersion.MQTT_3_1_1) {
final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
if (!zeroReservedFlag) {
throw new DecoderException("non-zero reserved flag");
}
}
final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
mqttVersion.protocolName(),
mqttVersion.protocolLevel(),
hasUserName,
hasPassword,
willRetain,
willQos,
willFlag,
cleanSession,
keepAlive.value);
return new MqttDecoder.Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
}
decodeString
2.4 READ_PAYLOAD- 有效载荷解码
有效载荷也不针对每种都进行说明。讲一下连接消息的有效载荷解码。
decodeConnectionPayload
private static MqttDecoder.Result<MqttConnectPayload> decodeConnectionPayload(
ByteBuf buffer,
MqttConnectVariableHeader mqttConnectVariableHeader) {
// 1.解码clientId,对于connect消息,载荷里的clientId是必须有的。
final MqttDecoder.Result<String> decodedClientId = decodeString(buffer);
final String decodedClientIdValue = decodedClientId.value;
final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
(byte) mqttConnectVariableHeader.version());
// 1.1 这里前面说过,如果是v3.1.1版本的mqtt,是允许clientId是空字符串或者大于23字节的,但是v3.1的不支持
if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
}
int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
// 2.对于遗嘱topic和遗嘱消息、用户名、密码这几部分需要根据可变头中的标志位是否为1来判断是否存在
MqttDecoder.Result<String> decodedWillTopic = null;
MqttDecoder.Result<byte[]> decodedWillMessage = null;
if (mqttConnectVariableHeader.isWillFlag()) {
decodedWillTopic = decodeString(buffer, 0, 32767);
numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
decodedWillMessage = decodeByteArray(buffer);
numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
}
MqttDecoder.Result<String> decodedUserName = null;
MqttDecoder.Result<byte[]> decodedPassword = null;
if (mqttConnectVariableHeader.hasUserName()) {
decodedUserName = decodeString(buffer);
numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
}
if (mqttConnectVariableHeader.hasPassword()) {
decodedPassword = decodeByteArray(buffer);
numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
}
final MqttConnectPayload mqttConnectPayload =
new MqttConnectPayload(
decodedClientId.value,
decodedWillTopic != null ? decodedWillTopic.value : null,
decodedWillMessage != null ? decodedWillMessage.value : null,
decodedUserName != null ? decodedUserName.value : null,
decodedPassword != null ? decodedPassword.value : null);
return new MqttDecoder.Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
}