Netty应用(七) ----MQTT编解码器

news2025/1/9 13:01:08

目录

  • 0.前言
  • 1. MqttEncoder--编码器
    • 1.1 构造方法
    • 1.2 encodeConnectMessage -- 连接消息
    • 1.3 encodeConnAckMessage - 确认连接
    • 1.4 encodePublishMessage -- 发布消息
    • 1.5 encodeSubscribeMessage - 订阅主题
    • 1.6 encodeUnsubscribeMessage - 取消订阅
    • 1.7 encodeSubAckMessage - 订阅应答
    • 1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId
    • 1.9 encodeMessageWithOnlySingleByteFixedHeader
  • 2. MqttDecoder--解码器
    • 2.1 构造方法
    • 2.2 READ_FIXED_HEADER - 固定报头解码
    • 2.3 READ_VARIABLE_HEADER - 可变报头解码
    • 2.4 READ_PAYLOAD- 有效载荷解码

0.前言

这里梳理下netty中对mqtt协议的编码和解码的处理。一方面对mqtt协议的结构再巩固些,另一方面就是学习下netty中对字节的处理。对于MQTT协议,可以参考前一篇文章MQTT协议详解。

1. MqttEncoder–编码器

1.1 构造方法

对于编码器,构造方法是私有的。我们可以通过其提供的静态常量INSTANCE访问。

    public static final MqttEncoder INSTANCE = new MqttEncoder();

    private MqttEncoder() { }

1.2 encodeConnectMessage – 连接消息

编码器,我们重点看一下doEncode方法。通过固定报头中的消息类型来对不同类型的消息进行特定编码。

    static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {

        switch (message.fixedHeader().messageType()) {
            case CONNECT:
                return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);

            case CONNACK:
                return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);

            case PUBLISH:
                return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);

            case SUBSCRIBE:
                return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);

            case UNSUBSCRIBE:
                return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);

            case SUBACK:
                return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);

            case UNSUBACK:
            case PUBACK:
            case PUBREC:
            case PUBREL:
            case PUBCOMP:
                return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);

            case PINGREQ:
            case PINGRESP:
            case DISCONNECT:
                return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);

            default:
                throw new IllegalArgumentException(
                        "Unknown message type: " + message.fixedHeader().messageType().value());
        }
    }

对于消息的编码,我们可以对照着协议来看,这样会更清晰。
固定报头
在这里插入图片描述
可变报头
在这里插入图片描述
encodeConnectMessage方法

    private static ByteBuf encodeConnectMessage(
            ByteBufAllocator byteBufAllocator,
            MqttConnectMessage message) {
        // 1. 有效载荷初始大小设为0字节
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttConnectVariableHeader variableHeader = message.variableHeader();
        MqttConnectPayload payload = message.payload();
        // 2.public enum MqttVersion {
        //    MQTT_3_1("MQIsdp", (byte) 3),
        //    MQTT_3_1_1("MQTT", (byte) 4);
        // }  对mqtt版本做校验,枚举中只有两种,如果名称和版本不匹配,则报错
        MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
                (byte) variableHeader.version());

        // 3.如果可变报头中,没有用户名称但是有用户密码,则报错
        if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
            throw new DecoderException("Without a username, the password MUST be not set");
        }

        // 4. 校验有效载荷中的clientId,版本MQTT_3_1中clinetId编码字节长度必须为1到23,
        // 在v3.1.1中允许超过23字节或者长度为0的clientId,所以不为null即可。
        String clientIdentifier = payload.clientIdentifier();
        if (!isValidClientId(mqttVersion, clientIdentifier)) {
            throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
        }
        byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
        // 4.1 connect消息的有效载荷,如果都包含的话必须按这个顺序出现:客户端标识符,遗嘱主题,遗嘱消息,用户名,密码
        // 而这几部分的结构,都是由一个两字节的长度和对应的载荷消息的组成,所以这里都是 2字节 + 消息的字节长度
        payloadBufferSize += 2 + clientIdentifierBytes.length;

        // 5.校验有效载荷中的遗嘱主题和遗嘱消息,如果可变报头中的遗嘱标志为1,则表示这两项存在
        String willTopic = payload.willTopic();
        byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;
        byte[] willMessage = payload.willMessageInBytes();
        byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.isWillFlag()) {
            payloadBufferSize += 2 + willTopicBytes.length;
            payloadBufferSize += 2 + willMessageBytes.length;
        }

        // 6.校验有效载荷中的用户和密码,如果可变报头中的用户名/密码标识为1,则表示用户名/密码存在
        String userName = payload.userName();
        byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.hasUserName()) {
            payloadBufferSize += 2 + userNameBytes.length;
        }
        byte[] password = payload.passwordInBytes();
        byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
        if (variableHeader.hasPassword()) {
            payloadBufferSize += 2 + passwordBytes.length;
        }

        byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
        // 7. 可变报头的长度 = 2字节的协议名的长度(v3.1为3,v3.1.1值为4) + 4字节的协议名(MQTT) + 1字段协议级别 + 1字节连接标志 + 2字节保持时间
        int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        // 8.这里计算的是固定头的长度 = 1字节的报文类型(包括类型标志位) + 剩余长度字段,剩余长度=可变报头 + 有效载荷
        // 由于剩余长度字段中每字节的最高位为进制位,所以每个字节表示的最大值为128(0-127),所以(可变报头+有效载荷)/128即剩余长度所占的字字节
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        // 9.创建一个这Mqtt消息长度的byteBuf,开始写入消息
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);

        // 9.1 向byteBuf中写入固定报头中的第一个字节,详细介绍见下段代码
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        // 9.2 向byteBuf中写入固定报头中的剩余长度字段,详细介绍见下段代码
        writeVariableLengthInt(buf, variablePartSize);

        // 9.3 写入两字节的协议名的长度,再写入协议名
        buf.writeShort(protocolNameBytes.length);
        buf.writeBytes(protocolNameBytes);

        // 9.4 1字节协议版本,1字节连接标志,2字节连接时间
        buf.writeByte(variableHeader.version());
        buf.writeByte(getConnVariableHeaderFlag(variableHeader));
        buf.writeShort(variableHeader.keepAliveTimeSeconds());

        // 9.5 最后写入有效载荷
        buf.writeShort(clientIdentifierBytes.length);
        buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
        if (variableHeader.isWillFlag()) {
            buf.writeShort(willTopicBytes.length);
            buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
            buf.writeShort(willMessageBytes.length);
            buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
        }
        if (variableHeader.hasUserName()) {
            buf.writeShort(userNameBytes.length);
            buf.writeBytes(userNameBytes, 0, userNameBytes.length);
        }
        if (variableHeader.hasPassword()) {
            buf.writeShort(passwordBytes.length);
            buf.writeBytes(passwordBytes, 0, passwordBytes.length);
        }
        return buf;
    }
    /**
     * 计算固定报头第一字节
     *
     * @param header
     * @return
     */
    private static int getFixedHeaderByte1(MqttFixedHeader header) {
        int ret = 0;
        // 1. 将消息类型左移四位,因为第一字节的0-3位为类型对应的标志位
        ret |= header.messageType().value() << 4;

        // 2. 如果是重发报文,则重发标志为1,第一字节的第3位
        if (header.isDup()) {
            // 采用或运算,将第一字节的第三位这只为1
            ret |= 0x08;
        }
        // 3. Qos等级为第一字节的第1、2位
        ret |= header.qosLevel().value() << 1;

        // 4.如果要保留消息,则保留标志为1,第一字节的第0位
        if (header.isRetain()) {
            ret |= 0x01;
        }
        return ret;
    }

    /**
     * 写入固定报头中的剩余长度字段
     *
     * @param buf
     * @param num
     */
    private static void writeVariableLengthInt(ByteBuf buf, int num) {
        do {
            // 1. %求余,取前一个字节所表示的数值大小,
            // 比如num表示200字节,第一次这里digit表示72
            // 由于第二次,num=1,大于0成立,这里1 % 128 = 1
            int digit = num % 128;
            // 2. 第一除以128,即去除前一个字节,num表示1,第二次等于0了,表示没有字节了
            num /= 128;
            if (num > 0) {
                // 3.如果还有字节,则最高位进制位设置为1
                digit |= 0x80;
            }
            // 4. 第一字节写入72,第二字节写入1
            buf.writeByte(digit);
        } while (num > 0);
    }

	/**
	* 通过或运算计算连接标志
	*/
    private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
       int flagByte = 0;
       if (variableHeader.hasUserName()) {
           flagByte |= 0x80;
       }
       if (variableHeader.hasPassword()) {
           flagByte |= 0x40;
       }
       if (variableHeader.isWillRetain()) {
           flagByte |= 0x20;
       }
       flagByte |= (variableHeader.willQos() & 0x03) << 3;
       if (variableHeader.isWillFlag()) {
           flagByte |= 0x04;
       }
       if (variableHeader.isCleanSession()) {
           flagByte |= 0x02;
       }
       return flagByte;
   }

1.3 encodeConnAckMessage - 确认连接

可变报头
在这里插入图片描述
encodeConnAckMessage

    private static ByteBuf encodeConnAckMessage(
            ByteBufAllocator byteBufAllocator,
            MqttConnAckMessage message) {
        // 一共4个字节,两字节的固定头,两字节的可变头,无载荷
        ByteBuf buf = byteBufAllocator.buffer(4);
        buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
        // 剩余长度字段固定为2,表示可变报头+载荷=2字节
        buf.writeByte(2);
        // 服务端如果保存了会话,则置为1,如果没有保存,则置为0,
        buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
        buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
        return buf;
    }

1.4 encodePublishMessage – 发布消息

固定报头
在这里插入图片描述

可变报头
只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中
在这里插入图片描述
encodePublishMessage

    private static ByteBuf encodePublishMessage(
            ByteBufAllocator byteBufAllocator,
            MqttPublishMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        ByteBuf payload = message.payload().duplicate();

        String topicName = variableHeader.topicName();
        byte[] topicNameBytes = encodeStringUtf8(topicName);

        // 1. 可变报头长度 = 2字节长度 + topic长度 + 2字节PacketIdentifier(qos=1或qos=2时存在)
        int variableHeaderBufferSize = 2 + topicNameBytes.length +
                (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
        int payloadBufferSize = payload.readableBytes();
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);
        buf.writeShort(topicNameBytes.length);
        buf.writeBytes(topicNameBytes);
        if (mqttFixedHeader.qosLevel().value() > 0) {
            buf.writeShort(variableHeader.packetId());
        }
        buf.writeBytes(payload);

        return buf;
    }

1.5 encodeSubscribeMessage - 订阅主题

有效载荷
在这里插入图片描述
encodeSubscribeMessage

    private static ByteBuf encodeSubscribeMessage(
            ByteBufAllocator byteBufAllocator,
            MqttSubscribeMessage message) {
        int variableHeaderBufferSize = 2;
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = message.variableHeader();
        MqttSubscribePayload payload = message.payload();

        // 1. 订阅消息的载荷中可以包含多个订阅主题
        for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
            String topicName = topic.topicName();
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            // 2. 每个订阅主题的载荷 = 2字节长度 + topic过滤器的长度 + 1字节的Qos
            payloadBufferSize += 2 + topicNameBytes.length;
            payloadBufferSize += 1;
        }

        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);

        // 3. 可变头中包含2字节的packageId
        int messageId = variableHeader.messageId();
        buf.writeShort(messageId);

        // Payload
        for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
            String topicName = topic.topicName();
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            buf.writeShort(topicNameBytes.length);
            buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
            buf.writeByte(topic.qualityOfService().value());
        }

        return buf;
    }

1.6 encodeUnsubscribeMessage - 取消订阅

有效载荷示例
在这里插入图片描述
encodeUnsubscribeMessage

    private static ByteBuf encodeUnsubscribeMessage(
            ByteBufAllocator byteBufAllocator,
            MqttUnsubscribeMessage message) {
        int variableHeaderBufferSize = 2;
        int payloadBufferSize = 0;

        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = message.variableHeader();
        MqttUnsubscribePayload payload = message.payload();
        
        // 1. 要取消的主题列表,每个主题过滤器 = 2字节长度 + 过滤器长度
        for (String topicName : payload.topics()) {
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            payloadBufferSize += 2 + topicNameBytes.length;
        }

        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);

        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variablePartSize);

        // Variable Header
        int messageId = variableHeader.messageId();
        buf.writeShort(messageId);

        // Payload
        for (String topicName : payload.topics()) {
            byte[] topicNameBytes = encodeStringUtf8(topicName);
            buf.writeShort(topicNameBytes.length);
            buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
        }

        return buf;
    }

1.7 encodeSubAckMessage - 订阅应答

可变报头
可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符。
有效载荷
有效载荷包含一个返回码清单。每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器。它们指定了 SUBSCRIBE 请求的每个订阅被授予的最大 QoS 等级。
在这里插入图片描述
encodeSubAckMessage

    private static ByteBuf encodeSubAckMessage(
            ByteBufAllocator byteBufAllocator,
            MqttSubAckMessage message) {
        // 1. 可变报头包含等待确认的 SUBSCRIBE 报文的报文标识符,所以固定长度为2
        int variableHeaderBufferSize = 2;
        // 2. SUBSCRIBE报文中的每个过滤器,subAck消息中都要给出对应主题的所能赋予的最大Qos等级
        int payloadBufferSize = message.payload().grantedQoSLevels().size();
        int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
        buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
        writeVariableLengthInt(buf, variablePartSize);
        buf.writeShort(message.variableHeader().messageId());
        for (int qos : message.payload().grantedQoSLevels()) {
            buf.writeByte(qos);
        }

        return buf;
    }

1.8 encodeMessageWithOnlySingleByteFixedHeaderAndMessageId

对于UNSUBACK、PUBACK、PUBREC、PUBREL、PUBCOMP消息,都是一些确认消息。这类消息中的可变报头需要携带对应消息的packetId,无载荷。

    private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
            ByteBufAllocator byteBufAllocator,
            MqttMessage message) {
        MqttFixedHeader mqttFixedHeader = message.fixedHeader();
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
        int msgId = variableHeader.messageId();
        
        // 对于此方法,适用于报文结构为: 可变头只有个2字节packetId,并且无有效载荷的
        int variableHeaderBufferSize = 2; // variable part only has a message id
        int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
        ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
        buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
        writeVariableLengthInt(buf, variableHeaderBufferSize);
        buf.writeShort(msgId);

        return buf;
    }

1.9 encodeMessageWithOnlySingleByteFixedHeader

对于PINGREQ、PINGRESP、DISCONNECT消息,既无可变报头,也无有效载荷。

2. MqttDecoder–解码器

2.1 构造方法

对于解码器,netty提供了对外公共的构造方法,无参构造和有参构造。

    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;

    private final int maxBytesInMessage;

    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

采用无参构造,默认消息的最大字节为8092,有参构造则可以指定最大字节数。同时,将initialState设置为固定报头,作用是后面解码时从固定头开始,这里作者注释也有说明。

    /**
     * Creates a new instance with the specified initial state.
     */
    protected ReplayingDecoder(S initialState) {
        state = initialState;
    }

    /**
     * States of the decoder.
     * We start at READ_FIXED_HEADER, followed by
     * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
     */
    enum DecoderState {
        READ_FIXED_HEADER,
        READ_VARIABLE_HEADER,
        READ_PAYLOAD,
        BAD_MESSAGE,
    }

2.2 READ_FIXED_HEADER - 固定报头解码

解码器,我们重点看一下decode方法。这里通过state()方法判断是解析固定头、可变头、载荷、错误消息。new对象的时候,初始状态为固定头,从固定头开始,注意固定头和可变头case中没有break,所以解码顺序固定头 -> 可变头 -> 载荷。在这三个过程中有异常,则 -> BAD_MESSAGE
decodeFixedHeader

    private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
        // 1. 取无符号第一字节,固定头第一字节,7-4位为消息类型,所以右移4位
        short b1 = buffer.readUnsignedByte();
        MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
        // 1.1 重发标志第3位,值为1,标志重发
        boolean dupFlag = (b1 & 0x08) == 0x08;
        // 1.2 Qos占1、2位,采用& 0x06获取到这两位值,然后右移一位即Qos的值
        int qosLevel = (b1 & 0x06) >> 1;
        // 1.3 会话保留标志站0位
        boolean retain = (b1 & 0x01) != 0;

        int remainingLength = 0;
        int multiplier = 1;
        short digit;
        int loops = 0;
        do {
            digit = buffer.readUnsignedByte();
            // 2.由于每个字节的最高位为进制位,所以&127获取除进制位之外的7位
            remainingLength += (digit & 127) * multiplier;
            multiplier *= 128;
            loops++;
        } while ((digit & 128) != 0 && loops < 4);
        // 协议规定,剩余长度字段最大为4个字节
        if (loops == 4 && (digit & 128) != 0) {
            throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
        }
        MqttFixedHeader decodedFixedHeader =
                new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
        return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
    }

validateFixedHeader
协议中规定,PUBREL、SUBSCRIBE、UNSUBSCRIBE中固定头中的Qos等级必须是1。
在这里插入图片描述

    static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) {
        switch (mqttFixedHeader.messageType()) {
            case PUBREL:
            case SUBSCRIBE:
            case UNSUBSCRIBE:
                if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
                    throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
                }
            default:
                return mqttFixedHeader;
        }
    }

resetUnusedFields
对于协议中规定,固定报头中标志位保留的,一律置为0。

    static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
        switch (mqttFixedHeader.messageType()) {
            case CONNECT:
            case CONNACK:
            case PUBACK:
            case PUBREC:
            case PUBCOMP:
            case SUBACK:
            case UNSUBACK:
            case PINGREQ:
            case PINGRESP:
            case DISCONNECT:
                if (mqttFixedHeader.isDup() ||
                        mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
                        mqttFixedHeader.isRetain()) {
                    return new MqttFixedHeader(
                            mqttFixedHeader.messageType(),
                            false,
                            MqttQoS.AT_MOST_ONCE,
                            false,
                            mqttFixedHeader.remainingLength());
                }
                return mqttFixedHeader;
            case PUBREL:
            case SUBSCRIBE:
            case UNSUBSCRIBE:
                if (mqttFixedHeader.isRetain()) {
                    return new MqttFixedHeader(
                            mqttFixedHeader.messageType(),
                            mqttFixedHeader.isDup(),
                            mqttFixedHeader.qosLevel(),
                            false,
                            mqttFixedHeader.remainingLength());
                }
                return mqttFixedHeader;
            default:
                return mqttFixedHeader;
        }
    }

2.3 READ_VARIABLE_HEADER - 可变报头解码

解码和编码是相对应的,这里不针对每种都进行说明。讲一下连接消息,其他的都比较简单。
decodeConnectionVariableHeader

    private static MqttDecoder.Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
        // 1. decodeString方法的作用,通过前两个长度字节(MLB,SLB)指定的字节长度来获取数据并装成string
        // Connect可变头最开始的两个长度字节用来表示协议名称的长度,所以这里通过此方法获取的就是协议名MQTT
        final MqttDecoder.Result<String> protoString = decodeString(buffer);
        // 2.注意这里的numberOfBytesConsumed字段,用于表示已经读取的字节个数,等解码完有效载荷后,
        // 会比较固定头里的剩余长度字段和此字段是否完全相等
        int numberOfBytesConsumed = protoString.numberOfBytesConsumed;

        final byte protocolLevel = buffer.readByte();
        numberOfBytesConsumed += 1;

        final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);

        // 3.取出连接标志字节,采用&进行解码
        final int b1 = buffer.readUnsignedByte();
        numberOfBytesConsumed += 1;

        final MqttDecoder.Result<Integer> keepAlive = decodeMsbLsb(buffer);
        numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
        final boolean hasUserName = (b1 & 0x80) == 0x80;
        final boolean hasPassword = (b1 & 0x40) == 0x40;
        final boolean willRetain = (b1 & 0x20) == 0x20;
        final int willQos = (b1 & 0x18) >> 3;
        final boolean willFlag = (b1 & 0x04) == 0x04;
        final boolean cleanSession = (b1 & 0x02) == 0x02;
        if (mqttVersion == MqttVersion.MQTT_3_1_1) {
            final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
            if (!zeroReservedFlag) {
                throw new DecoderException("non-zero reserved flag");
            }
        }

        final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
                mqttVersion.protocolName(),
                mqttVersion.protocolLevel(),
                hasUserName,
                hasPassword,
                willRetain,
                willQos,
                willFlag,
                cleanSession,
                keepAlive.value);
        return new MqttDecoder.Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
    }

decodeString
在这里插入图片描述
在这里插入图片描述

2.4 READ_PAYLOAD- 有效载荷解码

有效载荷也不针对每种都进行说明。讲一下连接消息的有效载荷解码。
decodeConnectionPayload

    private static MqttDecoder.Result<MqttConnectPayload> decodeConnectionPayload(
            ByteBuf buffer,
            MqttConnectVariableHeader mqttConnectVariableHeader) {
        // 1.解码clientId,对于connect消息,载荷里的clientId是必须有的。
        final MqttDecoder.Result<String> decodedClientId = decodeString(buffer);
        final String decodedClientIdValue = decodedClientId.value;
        final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
                (byte) mqttConnectVariableHeader.version());
        // 1.1 这里前面说过,如果是v3.1.1版本的mqtt,是允许clientId是空字符串或者大于23字节的,但是v3.1的不支持
        if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
            throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
        }
        int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;

        // 2.对于遗嘱topic和遗嘱消息、用户名、密码这几部分需要根据可变头中的标志位是否为1来判断是否存在
        MqttDecoder.Result<String> decodedWillTopic = null;
        MqttDecoder.Result<byte[]> decodedWillMessage = null;
        if (mqttConnectVariableHeader.isWillFlag()) {
            decodedWillTopic = decodeString(buffer, 0, 32767);
            numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
            decodedWillMessage = decodeByteArray(buffer);
            numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
        }
        MqttDecoder.Result<String> decodedUserName = null;
        MqttDecoder.Result<byte[]> decodedPassword = null;
        if (mqttConnectVariableHeader.hasUserName()) {
            decodedUserName = decodeString(buffer);
            numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
        }
        if (mqttConnectVariableHeader.hasPassword()) {
            decodedPassword = decodeByteArray(buffer);
            numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
        }

        final MqttConnectPayload mqttConnectPayload =
                new MqttConnectPayload(
                        decodedClientId.value,
                        decodedWillTopic != null ? decodedWillTopic.value : null,
                        decodedWillMessage != null ? decodedWillMessage.value : null,
                        decodedUserName != null ? decodedUserName.value : null,
                        decodedPassword != null ? decodedPassword.value : null);
        return new MqttDecoder.Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1322263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

姿态识别、目标检测和跟踪的综合应用

引言&#xff1a; 近年来&#xff0c;随着人工智能技术的不断发展&#xff0c;姿态识别、目标检测和跟踪成为了计算机视觉领域的热门研究方向。这三个技术的综合应用为各个行业带来了巨大的变革和机遇。本文将分别介绍姿态识别、目标检测和跟踪的基本概念和算法&#xff0c;并探…

打开VScode时不打开上次使用的文件夹

是不是很烦VScode 打开新的文件夹&#xff0c;每次都打开上次使用过的文件夹&#xff0c;只需在设置里面改一个设置就可以避免了。 Ctrl &#xff0c;打开设置&#xff0c;搜索 window.restoreWindows 通过这种设置就可以让VScode 每次打开新的文件夹而不打开上次的文件夹。

保护您的Android应用程序:Android应用程序安全一览

保护您的Android应用程序&#xff1a;Android应用程序安全一览 我们都知道Android是为所有人设计的——开放、面向开发者、面向用户&#xff0c;这种开放性为今天和明天的移动技术提供了很多便利。然而&#xff0c;开放性也带来了需要妥善处理的安全风险。 安全是我们所有人都…

Linux的SSH(远程登录)

SSH定义&#xff1a; SSH&#xff08;Secure Shell 的缩写&#xff09;是一种网络协议&#xff0c;用于加密两台计算机之间的通信&#xff0c;并且支持各种身份验证机制。 实务中&#xff0c;它主要用于保证远程登录和远程通信的安全&#xff0c;任何网络服务都可以用这个协议…

什么是企业年报?

企业年报是指企业按照规定向相关部门报送的一种年度财务报告&#xff0c;它反映了企业在一年内的经营状况、财务状况、经营成果和现金流量等信息。对于投资者、债权人、政府部门等利益相关者来说&#xff0c;企业年报是非常重要的信息来源。下面就展开讲讲。 一、什么是年报&am…

什么牌子猫粮比较好?质量口碑较好的主食冻干猫粮分享

由于猫咪是肉食动物&#xff0c;对蛋白质的需求很高&#xff0c;如果摄入的蛋白质不足&#xff0c;就会影响猫咪的成长。而冻干猫粮本身因为制作工艺的原因&#xff0c;能保留原有的营养成分和营养元素&#xff0c;所以冻干猫粮蛋白含量比较高&#xff0c;营养又高&#xff0c;…

智能优化算法应用:基于适应度相关算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于适应度相关算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于适应度相关算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.适应度相关算法4.实验参数设定5.算法…

Pycharm中如何使用Markdown?只需装这个插件!

一、前言 由于Markdown的轻量化、易读易写特性&#xff0c;并且对于图片&#xff0c;图表、数学式都有支持&#xff0c;许多网站都广泛使用Markdown来撰写帮助文档或是用于论坛上发表消息。 如GitHub、Reddit、Diaspora、Stack Exchange、OpenStreetMap 、SourceForge、简书等…

SpringMVC---详细介绍+使用

文章目录 什么是SpringMVC&#xff1f;使用SpringMVCSpringMVC创建和连接创建连接RequestMapping的基础使用 获取参数返回数据返回静态页面返回非页面的普通数据&#xff08;text/html&#xff09;返回JSON对象请求转发或者请求重定向 什么是SpringMVC&#xff1f; SpringMVC它…

【MTK平台】BLE链接参数和功耗的关系

一 描述 BLE即低功耗蓝牙,是专为智能设备设计的一种低功耗、低延迟,小数据传输的蓝牙技术。目前广泛应用到手机,平板,及智能穿戴式设备中。 二 解决方案 BLE的数据传输都是发生在Connection Event之间,客户可以根据具体需求来调节链接参数(Connection Parameters)…

OpenAI 偷偷在训练 GPT-4.5!?

最近看到有人已经套路出 ChatGPT 当前的版本&#xff0c;回答居然是 gpt-4.5-turbo&#xff1a; 实际试验下&#xff0c;用 starflow.tech&#xff0c;切换到小星 4 全能版&#xff08;同等官网最新 GPT-4&#xff09;&#xff0c;复制下面这段话问它&#xff1a; What is the…

STM32F407-14.3.12-01使用断路功能

使用断路功能 使用断路功能时&#xff0c;根据其它控制位&#xff08;TIMx_BDTR 寄存器中的 MOE⑨、OSSI⑪ 和 OSSR⑩ 位以及 TIMx_CR2 寄存器中的 OISx⑰ 和 OISxN⑱ 位&#xff09;修改输出使能信号和无效电平。任何情况下&#xff0c;OCx③ 和 OCxN④ 输出都不能同时置为有效…

C#经典面试题:冒泡算法的使用

Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介绍C#经典面试题&#xff1a;冒泡算法的使用以及部分理论知识 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f349;博主收将持续更新学习记录获&#xff0c;友友们有任何问题可…

PIC单片机项目(6)——基于PIC16F877A的心率血氧检测装置

1.功能设计 使用PIC16F877A单片机&#xff0c;检测心率和血氧浓度&#xff0c;通过了protues仿真。仿真中&#xff0c;使用NE555芯片&#xff0c;构成一个振荡装置&#xff0c;振荡频率可调&#xff0c;用于模拟人体心率的变化。血氧传感器&#xff0c;则使用一个滑动变阻来模拟…

Log打印自动打印编译时间版本号打印方法

Log打印自动打印编译时间版本号打印方法 是否需要申请加入数字音频系统研究开发交流答疑群(课题组)&#xff1f;可加我微信hezkz17, 本群提供音频技术答疑服务&#xff0c;群赠送蓝牙音频&#xff0c;DSP音频项目核心开发资料,

scipy库的label函数|标记图像连通域

邻域 label函数标记连通区域 默认以 4 邻域划分区域 from scipy.ndimage import label import numpy as npa np.array([[0,0,1,1,0,0],[0,0,0,1,0,0],[1,1,0,0,1,0],[0,0,0,1,0,0]]) labels, N label(a) print(labels)[[0 0 1 1 0 0][0 0 0 1 0 0][2 2 0 0 3 0][0 0 0 4 0 …

Postman解决批量执行接口中包含文件上传接口断言错误

文章目录 前言一、问题描述二、解决方法一1.点击设置图标 → 选择 "Settings"2.打开允许读取工作目录外的文件开关3.重新批量执行接口&#xff08;问题完美解决&#xff09; 三、解决方法二1.点击设置图标 → 选择 "Settings"2.查看文件存储默认位置3.将要…

23级新生C语言周赛(6)(郑州轻工业大学)

题目链接:ZZULIOJ 3110: 数(shu)数(shu)问题 分析: 看到这个题第一步想的是 先把每个平方数给求出来 然后枚举 但是时间复杂度大于1e8 交了一下TLE 但后来打表发现,好数太多了要是枚举的话 注定TLE 能不能间接的去做呢? 把不是的减去,那不就是好数了吗? 这个时候又是打表,会…

Python轻松匹配文件:详解文件匹配和搜索技巧

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 文件匹配和搜索是日常编程中不可避免的任务&#xff0c;Python 提供了多种强大的工具来轻松应对这些需求。本文将深入探讨 Python 中文件匹配的不同方法&#xff0c;并通过丰富的示例代码演示如何灵活应用这些技…

Apache Doris 在奇富科技的统一 OLAP 场景探索实践

导读&#xff1a;随着消费信贷规模快速增长&#xff0c;个人信贷市场呈现场景化、体验感强的特征&#xff0c;精准营销、精细化风险管理以及用户使用体验的优化愈发重要。作为中国卓越的由人工智能驱动的信贷科技服务平台&#xff0c;奇富科技选择将 Apache Doris 作为整体 OLA…