IOT云平台 simple(4)springboot netty实现简单的mqtt broker

news2024/11/17 13:33:10

常见的开源mqttBroker很多,如:
Mosquitto、emqx;
这里简单的介绍了mqtt,然后利用springboot netty实现了简单的mqtt Broker。

mqtt Broker:springboot netty实现;
mqtt client:MQTT.fx工具软件;

1 开发

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议。
在这里插入图片描述
mqtt是运行在TCP/IP之上的应用层,所以mqtt Broker基本就是个TCP Server。
创建主要的类:
1) MqttBrokerChannelHandler:
server channel处理的类;
2 )MqttBrokerChannelInitializer
server channel初始化的类
3)MqttBroker
server类。
4)MqttClientStartListener:监听到springboot启动后,启动MqttBroker。

mqtt数据协议:
数据结构包括:
1)固定报头(Fixed header);
2)可变报头(Variable header);
3)有效载荷(Payload)。
在这里插入图片描述

其中,MqttBrokerChannelHandler类中实现了mqtt数据的解析处理。

	    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        MqttMessage mqttMessage = (MqttMessage) msg;
        log.info("--------------------------begin---------------------------*");
        log.info("来自终端:" + channelHandlerContext.channel().remoteAddress());
        log.info("接收消息:" + mqttMessage.toString());
        try {
            MqttMessageType type = mqttMessage.fixedHeader().messageType();
            MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);
            if(messageStrategy!=null){
                messageStrategy.sendResponseMessage(channelHandlerContext.channel(),mqttMessage);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        log.info("--------------------------end---------------------------*");
    }

在这里我们定义一个策略接口,实现对mqtt 消息的解析返回。

public interface MessageStrategy {
    void sendResponseMessage(Channel channel, MqttMessage mqttMessage);
}

然后我们就可以实现不同类型消息的策略处理,如对Connect消息的处理:

@Slf4j
public class ConnectAckMessageStrategy implements MessageStrategy{

    @Override
    public void sendResponseMessage(Channel channel, MqttMessage mqttMessage) {
        MqttConnectMessage mqttConnectMessage = (MqttConnectMessage)mqttMessage;

        /*---------------------------解析接收的消息----------------------------*/
        MqttFixedHeader mqttFixedHeader = mqttConnectMessage.fixedHeader();
        MqttConnectVariableHeader mqttConnectVariableHeader = mqttConnectMessage.variableHeader();

        /*---------------------------构建返回的消息---------------------------*/
        //	构建返回报文, 固定报头
        MqttConnAckVariableHeader mqttConnAckVariableHeader =new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeader.isCleanSession());
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeader.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeader.isRetain(), 0x02);

        //	构建CONNACK消息体
        MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeader);
        log.info("返回消息:"+connAck.toString());
        channel.writeAndFlush(connAck);
    }
}

最后我们定义了MessageStrategyManager类实现对不同类型消息的策略处理:

@Slf4j
@Component
public class MessageStrategyManager {
    public Map<MqttMessageType, MessageStrategy> messageStrategyMap = new HashMap<>();

    //根据消息类型获取对应的策略类
    public  MessageStrategy getMessageStrategy(MqttMessageType messageType){
        switch (messageType){
            case CONNECT:
                return new ConnectAckMessageStrategy();
            case PUBLISH:
                return new PublishAckMessageStrategy();
            case PUBREL:
                return new PublishCompleteMessageStrategy();
            case SUBSCRIBE:
                return new SubscribeAckMessageStrategy();
            case UNSUBSCRIBE:
                return new UnSubscribeAckMessageStrategy();
            case PINGREQ:
                return new PingMessageStrategy();
            default:
                return null;
        }
    }

    //根据消息类型获取返回消息的类型
    private  static MqttMessageType getResMqttMessageType(MqttMessageType messageType){
        switch (messageType){
            case CONNECT:
                return MqttMessageType.CONNACK;
            case PUBLISH:
                return MqttMessageType.PUBACK;
            case PUBREL:
                return MqttMessageType.PUBLISH;
            case SUBSCRIBE:
                return MqttMessageType.SUBACK;
            case UNSUBSCRIBE:
                return MqttMessageType.UNSUBACK;
            case PINGREQ:
                return MqttMessageType.PINGRESP;
            default:
                return null;
        }
    }
}

2 验证

2.1 MQTT.fx连接broker

MQTT.fx发送消息的类型:CONNECT
broker返回消息的类型:CONNACK
在这里插入图片描述

2.2 MQTT.fx向broker发送心跳

MQTT.fx发送消息的类型:PINGREQ
broker返回消息的类型:PINGRESP
在这里插入图片描述

2.3 MQTT.fx向broker发布topic

首先MQTT.fx发布topic为:

weather/beijing

发布内容为:

The low temperature is 20C and the high temperature is 25 C

在这里插入图片描述
从log看到broker已经收到topic:
MQTT.fx发送消息的类型:PUBLISH
这里qosLevel=AT_MOST_ONCE。
在这里插入图片描述

2.4 MQTT.fx断开连接broker

MQTT.fx发送消息的类型:DISCONNECT
在这里插入图片描述

代码详见:
https://gitee.com/linghufeixia/iot-simple
code3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/11813.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java Tomcat内存马——filter内存马

目录 前言&#xff1a; &#xff08;一) 内存马简介 0X01 原理 0X02 内存马类型 2.1 servlet-api类 2.2 spring类 2.3 Java Instrumentation类 &#xff08;二&#xff09; filter 内存马 &#xff08;三&#xff09;Tomcat Filter 流程分析 0x01 项目搭建 0x02 在访…

【Spring】——5、@Lazy懒加载

&#x1f4eb;作者简介&#xff1a;zhz小白 公众号&#xff1a;小白的Java进阶之路 专业技能&#xff1a; 1、Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理 2、熟悉Java基础&#xff0c;并精通多线程的开发&#xff0c;熟悉JVM原理&#xff0c;具备⼀定的线…

【BOOST C++ 12 函数式编程】(5) Boost.Lambda

一、说明Boost.Lambda 在 C11 之前&#xff0c;您需要使用像 Boost.Lambda 这样的库来利用 lambda 函数。从 C11 开始&#xff0c;这个库可以被视为已弃用&#xff0c;因为 lambda 函数现在是编程语言的一部分。如果您在不支持 C11 的开发环境中工作&#xff0c;您应该在转向 B…

大空间享大智慧 奇瑞新能源奇瑞大蚂蚁

在亲子消费市场上家庭消费已经成为了主力军。亲子消费的重心已经从以饮食、服装为主向教育、游乐等方向多元化发展。而在出行方面汽车的品质与驾乘感受也是如今消费者选择的主要需求。所以实惠、安全、环保的新能源大空间SUV成为了越来越多二胎、三胎家庭的最终选择。奇瑞新能源…

线程池使用

转载&#xff1a;线程池详解&#xff08;通俗易懂超级好&#xff09;_拉格朗日(Lagrange)的博客-CSDN博客_线程池 目录 基本概念 什么是线程池 线程池优点 线程池源码 ThreadPoolExecutor 参数解释 具体使用 线程池的工作原理 线程池的参数 任务队列&#xff08;w…

元数据管理-解决方案调研一:元数据概述

一、元数据概述 1.1、定义 元数据定义&#xff1a;描述数据的数据&#xff0c;对数据及信息资源的描述性信息。小编认为元数据不仅仅是关于数据的数据&#xff0c;它还是一种上下文&#xff0c;赋予信息更加丰富的身份。 以图片为例&#xff0c;其图片本身是一种数据&#xf…

操作系统的内存究竟是怎么一回事?

摘要&#xff1a;操作系统的内存究竟是怎么一回事&#xff1f;带你完整复习一遍《操作系统》一书中有关内存的所有知识点本文分享自华为云社区《操作系统的内存究竟是怎么一回事&#xff1f;带你完整复习一遍《操作系统》一书中有关内存的所有知识点》&#xff0c;作者&#xf…

【图神经网络】使用DGL框架实现简单图分类任务

使用DGL框架实现简单图分类任务简单图分类任务实现过程打包一个图的小批量定义图分类器图卷积读出和分类准备和训练核心代码参考资料图分类&#xff08;预测图的标签&#xff09;是图结构数据里一类重要的问题。它的应用广泛&#xff0c;可见于生物信息学、化学信息学、社交网络…

aws xray ec2环境搭建和基础用法

参考资料 https://docs.amazonaws.cn/en_us/xray/latest/devguide/xray-daemon.html https://docs.aws.amazon.com/xray-sdk-for-nodejs/latest/reference/ https://github.com/aws/aws-xray-sdk-node https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/ba…

联想集团:长期前景稳定,业务转型正在提高盈利能力

来源;猛兽财经 作者&#xff1a;猛兽财经 由疫情驱动的个人电脑需求正在减弱 在经历了两年的个人电脑销售强劲增长之后&#xff0c;随着全球对疫情封锁限制的放松&#xff0c;由疫情引发的远程工作和在线学习趋势带来的全球个人电脑需求正在减弱。根据IDC的数据&#xff0c;20…

文件之间的拷贝(拷贝图片实例)java.io.FileNotFoundException: G:\dad (拒绝访问。)通过绝对路径获取各种文件名

1.报错解决 :java.io.FileNotFoundException: G:\dad (拒绝访问。) 参考文献:(364条消息) java.io.FileNotFoundException:(拒接访问&#xff09;_corelone2的博客-CSDN博客_java.io.filenotfoundexception 2.code 代码参考地址:(364条消息) java中文件拷贝的几种方式_babar…

深入理解New操作符

前言 当我们对函数进行实例化时&#xff0c;需要用new操作符来实现。那么&#xff0c;对于它的底层实现原理你是否清楚呢&#xff1f;本文就跟大家分享下它的原理并用一个函数来模拟实现它&#xff0c;欢迎各位感兴趣的开发者阅读本文。 原理分析 我们通过一个具体的例子来看…

MySQL——数据库基础

文章目录什么叫做数据库&#xff1f;主流数据库基本使用服务器、数据库、表之间的关系MySQL逻辑结构MySQL架构MySQL分类存储引擎什么叫做数据库&#xff1f; 软件角度&#xff1a; 为用户或者用户程序提供更加方便的数据管理的软件&#xff0c;通过SQL语句进行&#xff01; 数…

【PostgreSQL-14版本snapshot的几点优化】

最近在分析PostgreSQL-14版本性能提升的时候&#xff0c;关注到了Snapshots的这一部分。发现在PostgreSQL-14版本&#xff0c;连续合入了好几个和Snapshots相关的patch。 并且&#xff0c;Andres Freund也通过这些改进显著减少了已确定的快照可扩展性瓶颈&#xff0c;从而改进了…

【C++】C/C++内存管理

众所周知&#xff0c;C/C没有内存&#xff08;垃圾&#xff09;回收机制&#xff0c;所以写C/C程序常常会面临内存泄漏等问题。这一节我们一起来学习C/C的内存管理机制&#xff0c;深入了解这套机制有利于我们之后写出更好的C/C程序。 在那些看不到太阳的日子里&#xff0c;别忘…

Spring(九)- Spring自定义命名空间整合第三方框架原理解析

文章目录一、Spring通过命名空间整合第三方框架1. Dubbo 命名空间2. Context 命名空间二、Spring自定义命名空间原理解析三、手写自定义命名空间标签与Spring整合一、Spring通过命名空间整合第三方框架 1. Dubbo 命名空间 Spring 整合其他组件时就不像MyBatis这么简单了&#…

电影影院购票管理系统

1、项目介绍 电影影院购票管理系统拥有两种角色&#xff1a;管理员和用户 管理员&#xff1a;用户管理、影片管理、影厅管理、订单管理、影评管理、排片管理等 用户&#xff1a;登录注册、个人中心、查看电影票、电影选座、下单支付、发布影评、查看票房统计等 2、项目技术 …

14、Horizontal Pod Autoscal

一、为何进行缩扩容&#xff1f; 在实际生产中&#xff0c;经常会遇到某个服务需要扩容的场景&#xff0c;可能会遇到由于资源紧张或者工作负载降低而需要减少服务实例数量的场景。可以利用Deployment/RC的Scale机制来完成这些工作。二、缩扩容模式 Kubernetes 对 Pod 扩容与缩…

mysql-Innodb解析

一.计算机不同介质操作速度 相对于CPU和内存操作&#xff0c; 我们可以看到磁盘的操作延时明显要大得多&#xff0c; 一次磁盘搜索的延时需要10ms。 假入我们某一个业务操作进行了大量磁盘读写&#xff0c; 那可以预料到这个服务的性能肯定是非常差的&#xff0c; 那么到底是什…

3.2文法与语言

1、文法生成语言 推导 定义&#xff1a;当αAβ直接推导出αγβ&#xff0c;即αAβ⇒αγβ&#xff0c;仅当A→γ是一个产生式&#xff0c;且α,β∈(VT∪VN)*。 注&#xff1a;按照我的理解是两个字符串的推导。如果α1⇒α2⇒…⇒αn,则我们称这个序列是从α1到αn的一个…