IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

news2025/1/22 19:42:28

本系列教程包括:
IOT云平台 simple(0)IOT云平台简介
IOT云平台 simple(1)netty入门
IOT云平台 simple(2)springboot入门
IOT云平台 simple(3)springboot netty实现TCP Server
IOT云平台 simple(4)springboot netty实现简单的mqtt broker
IOT云平台 simple(5)springboot netty实现modbus TCP Master
IOT云平台 simple(6)springboot netty实现IOT云平台基本的架构(mqtt、Rabbitmq)

本章首先简单介绍了IOT云平台最基本的架构,然后基于springboot netty实现IOT Server;最后进行了测试验证。

测试环境:

  1. mqtt终端,这里用MQTT.fx 工具软件模拟;
  2. IOT server:基于springboot netty进行开发;
  3. Rabbitmq broker;本地安装Windows 64位环境;
  4. Rabbitmq consumer,订阅mq message的server,这里用MQTT Assistant工具软件模拟;

1 IOT云平台最基本的架构

本章涉及的IOT云平台最基本架构图:
在这里插入图片描述
说明
1)为了简单,这里只包括mqtt上行链路,即mqtt终端上传数据;
2)这里通过Rabbitmq 进行消息的分发,也可以用其他mq中间件,如kafka。
3)Mqtt terminal:mqtt终端,指的是具体的设备传感器或者mqtt协议网关。

具体流程
第1步,mqtt终端
实现mqtt协议或者其他协议(modbus、wifi、蓝牙)转换为mqtt协议。

第2步,mqtt终端->IOT server
mqtt终端publish message到IOT server;

第3步,IOT server->Rabbitmq broker
IOT Server中mqtt broker的模块收到mqtt message,进行解析,然后通过Rabbitmq producer模块,publish message到Rabbitmq broker;

第4步,Rabbitmq broker->不同server
Rabbitmq broker收到消息存入指定的queue,然后分发到订阅消息的不同server;

第5步,不同server
不同server监听收到消息进行相应的处理;如:存入到时序数据库、进行大数据流的计算、具体业务的处理。

2 集成开发

第1步:POM文件引入netty、Rabbitmq的依赖:

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>3.0.1</version>
        </dependency>

2.1实现mqtt broker

创建主要的类:
1)TCPServer
server类,实现mqtt broker。
2 )TCPServerStartListener
监听到springboot启动后,启动server。
3)TCPServerChannelInitializer
server channel初始化的类

包括两个server channel处理的类:
1) MqttMessageChannelHandler:
server channel处理的类;实现mqtt消息的解析;

@Component
@Slf4j
@ChannelHandler.Sharable
public class MqttMessageChannelHandler extends ChannelInboundHandlerAdapter  {

    @Autowired
    MessageStrategyManager messageStrategyManager;

    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        MqttMessage mqttMessage = (MqttMessage) msg;
        log.info("--------------------------channelRead begin---------------------------*");
        log.info("from client:" + channelHandlerContext.channel().remoteAddress());
        log.info("receive message:" + mqttMessage.toString());
        try {
            MqttMessageType type = mqttMessage.fixedHeader().messageType();
            MessageStrategy messageStrategy =  messageStrategyManager.getMessageStrategy(type);
            if(messageStrategy!=null){
                messageStrategy.sendResponseMessage(channelHandlerContext,mqttMessage);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        log.info("--------------------------channelRead end---------------------------*");

    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // 当出现异常就关闭连接
        cause.printStackTrace();
        ctx.close();
    }
}

2) MqMessageChannelHandler:
server channel处理的类;实现mq消息发布到Rabbitmq broker;

@Component
@Slf4j
public class MqMessageChannelHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    ProducerService producerService;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof MqMessage)) {
            return;
        }
        MqMessage mqMessage = (MqMessage) msg;
        log.info("转发到Rabbitmq Server:" + mqMessage.data);
        producerService.sendData(mqMessage.data);
    }

}

2.2实现Rabbitmq producer

1 定义配置类ProducerConfig:

exchange(交换机):topic_exchange
queue(队列):topic_queue
bindKey(绑定key):project1.station1.*

@Slf4j
@Configuration
public class ProducerConfig {
    String bindKey = "project1.station1.*";

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //开启Mandatory,触发回调函数
        rabbitTemplate.setMandatory(true);
        //ack
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("-----------confirm begin---------------");
                log.info("data:" + correlationData);
                if(ack){
                    log.info("Ack:true");
                }else{
                    log.info("Ack:false");
                }
                log.info("cause:" + cause);
                log.info("-----------confirm end---------------");
            }
        });
        //return
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.info("-----------return begin---------------");
                log.info("message:"+returnedMessage.getMessage());
                log.info("reply code:"+returnedMessage.getReplyCode());
                log.info("reply text:"+returnedMessage.getReplyText());
                log.info("exchange:"+returnedMessage.getExchange());
                log.info("routeKey:"+returnedMessage.getRoutingKey());
                log.info("-----------return end---------------");
            }
        });

        return rabbitTemplate;
    }

    @Bean("topic_exchange")
    public TopicExchange topicExchange() {
        return ExchangeBuilder.topicExchange("topic_exchange").durable(true).build();
    }

    @Bean("topic_queue")
    public Queue topicQueue(){
        return QueueBuilder.durable("topic_queue").build();
    }

    @Bean
    public Binding topicBind(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(bindKey);
    }
}

2 定义ProducerService类,实现发送消息的功能:
这里发送:
exchange(交换机):topic_exchange
routeKey (路由key):project1.station1.device1

@Slf4j
@Component
public class ProducerService {
    String exchange = "topic_exchange";
    String routeKey = "project1.station1.device1";
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void sendData(String data) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("------------topic producer begin------------");
        rabbitTemplate.convertAndSend(exchange, routeKey, data);
        log.info("exchange:"+exchange);
        log.info("routeKey:"+routeKey);
        log.info("send data:"+data);
        log.info("------------topic producer end------------");
    }
}

3 测试验证

第1步:mqtt终端发送消息:temperature is 26, 2023.1.14 17:00。
在这里插入图片描述
第2步:IOT Server接收到mqtt终端发送的消息;然后转发到Rabbitmq Server;
在这里插入图片描述
第3步:Rabbitmq Server的topic_queue中有1条消息;
在这里插入图片描述
第4步:MQTT Assistant中从topic_queue中消费消息:
在这里插入图片描述
可见,对于物联网的数据,实现了从端到平台(解析、分发、存储)的整个流程。

代码详见:
https://gitee.com/linghufeixia/iot-simple
code5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/141030.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

告别Whitelabel Error Page!

相信在JavaWeb开发中不少小伙伴会遇到这个页面吧&#xff0c;特别是初学者基础不扎实不牢固然后网上说的一大堆莫名其妙的解法&#xff0c;千万不要盲目跟着改&#xff0c;建议多读几篇博客&#xff0c;再根据自己的知识分析一下开发流程。首先status404&#xff0c;肯定是我访…

Unity联网多人游戏技术方案调研

关于联网方案 Listen Server (Host) 和 Relay转发服务器游戏包同时包含客户端和服务端逻辑&#xff0c;联网时一个客户端开主&#xff0c;称为Host&#xff0c;其他客户端连入。局域网和互联网都支持。互联网需要有一个匹配服务器帮助找到不同人建立的主机。如果不使用Relay服…

校招前端二面常考react面试题(边面边更)

高阶组件 高阶函数&#xff1a;如果一个函数接受一个或多个函数作为参数或者返回一个函数就可称之为高阶函数。 高阶组件&#xff1a;如果一个函数 接受一个或多个组件作为参数并且返回一个组件 就可称之为 高阶组件。 react 中的高阶组件 React 中的高阶组件主要有两种形式…

verilog学习笔记- 6)verilog基础知识

目录 Verilog 的逻辑值: Verilog 的标识符&#xff08;类似C中的变量名&#xff09;: 1) 定义: 2) 规范建议: Verilog 的数字进制格式: Verilog 的数据类型: 1) 寄存器类型&#xff1a; 2) 线网类型&#xff1a; 3) 参数类型&#xff1a; Verilog 的运算符&#xff1a…

Logistic Regression 逻辑斯蒂回归

文章目录5、Logistic Regression 逻辑斯蒂回归5.1 回归任务5.1.1 MNIST Dataset5.1.2 CIFAR-10 Dataset5.2 Regression vs Classification 回归 vs 分类5.3 Sigmoid functions5.3.1 Logistic Function [0, 1]5.3.2 Other Functions [-1, 1]5.4 Model 模型5.5.1 torch.sigmoid()…

Mybatis基本使用

Mybatis1、Mybatis简介1.1、什么是MyBatis1.2、持久化1.3、持久层1.4、为什么需要Mybatis2、MyBatis第一个程序2.1、代码演示3、CRUD操作3.1、namespace3.2、select3.3、insert3.4、update3.5、delete3.6、思考题4、配置解析4.1、核心配置文件4.2、environments元素4.3、mapper…

点进详情巩固 react-router-dom v6

使用几段代码,再次巩固一下 v6 的使用 0. 安装 npm i react-router-dom1. 配置路由 import {StrictMode } from "react" import ReactDOM from "react-dom/client" import App from "./App" import {HashRouter

Word处理控件Aspose.Words功能演示:使用 C# 将 Word 转换为 HTML

Aspose.Words 是一种高级Word文档处理API&#xff0c;用于执行各种文档管理和操作任务。API支持生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现和打印文档&#xff0c;而无需在跨平台应用程序中直接使用Microsoft Word。此外&#xff0c; Aspose API支持流行文件格式处…

实现自定义springboot的starter

引言 学过springboot的肯定用过各种starter&#xff0c;通过这些starter我们可以节省很多没有必要的配置&#xff0c;让项目更简洁&#xff0c;配置起来也更简单。那么starter是怎么开发的呢&#xff1f;这里我通过一个简单的案例演示如何创建一个自己的starter。 初始化项目…

Java毕业生就业系统学生就业统计系统

简介 本项目主要是为了统计毕业生就业情况&#xff08;就业方向分为四种&#xff1a;参加工作&#xff0c;考研&#xff0c;自主创业&#xff0c;待就业&#xff09;&#xff0c;教师可登入该系统查看学生就业情况&#xff0c;包括&#xff1a;考研、职业领域、工作城市&#…

黑马C++ 05 核心项目 —— 职工管理系统

文章目录1. 管理系统需求2. 创建管理类2.1 创建文件 workerManager.h 与 workerManager.cpp2.2 头文件实现 workerManager.h —— 只做函数声明不做实现2.3 源文件实现 workerManager.cpp —— 对声明的函数进行实现3. 菜单功能3.1 添加成员函数 —— workerManager.h添加 void…

Tic-Tac-Toe人机对弈程序(python实现)

目录 1. 前言 2. 处理流程 3. 代码 4. 代码说明 4.1 棋盘显示 4.2 初始化 4.3 人类棋手的下一步 4.4 AI棋手的下一步 4.5 终局及胜负判断 5. 棋局示例 1. 前言 前面几篇博客&#xff08;以循序渐进的方式&#xff09;实现了Tic-Tac-Toe游戏的棋局搜索、遍历以及所有可…

植物大战僵尸:寻找阳光掉落Call调用

通过遍历阳光产生的时间,寻找阳光产生的本地Call,使用代码注入器注入,自定义生成阳光 阳光CALL遍历技巧&#xff1a; 进入植物大战僵尸-> 当出现阳光后->马上搜索未知初始数值返回游戏-> 马上切回CE-> 搜索减少的数值-> 掉一点搜一点最后排查出它的掉落地址-&…

Eureka Server 开启Spring Security Basic认证

概 述 Eureka Server 在实际使用过程中必须考虑安全问题&#xff0c;比如 未认证的用户 不允许其随意调用 Eureka Server的 API&#xff1b;还有一个则是 未认证的 Eureka Client 也禁止其注册到 Eureka Server中来&#xff0c;这些都是可以在工程中进行配置的&#xff0c;当然…

蓝桥杯寒假集训第七天(修改数组)

没有白走的路&#xff0c;每一步都算数&#x1f388;&#x1f388;&#x1f388; 题目描述&#xff1a; 给定一个已知长度的数组&#xff0c;要求出由其变换而来的一组没有重复数据的数组。假定有一个数组A[0,1,2,3,4]。要求如果A[i]在之前的数组A[0,1,2,3..i-1]之中若出现过&…

关于win11 21H2 升22H2及安装安卓子系统的记载

前言 电脑是i7 7700的&#xff0c;没有TPM2.0。但是喜欢折腾&#xff0c;喜欢win11任务栏的居中&#xff0c;之前win10的时候&#xff0c;会用插件折腾。既然有原生的了&#xff0c;自然更好了。 win11系统升级 关于win11系统下载 直接百度搜索win11系统下载&#xff0c;然…

【数据结构】(牛客)链表的回文结构,LeetCode相交链表,LeetCode环形链表

目录 一、链表的回文结构 1、题目说明 2、题目解析 二、相交链表 1、题目说明 2、题目解析 三、环形链表 1、题目说明 2、题目解析 一、链表的回文结构 1、题目说明 题目链接&#xff1a;链表的回文结构 对于一个链表&#xff0c;请设计一个时间复杂度为O(n),额外空间复杂度…

RHCE-ssh服务设置

目录 要求&#xff1a; 1.两台机器&#xff1a;第一台机器作为客户端&#xff0c;第二台机器作为服务器&#xff0c;在第一台使用rhce用户免密登录第二台机器 2.禁止root用户远程登录和设置三个用户sshuser1, sshuser2, sshuser3 3&#xff0c; 只允许sshuser3登录&#xff…

Java开发入门到精通之Java的数据库访问

一、前言 在应用程序开发中&#xff0c;需要使用数据库管理和存储各种数据。在Java中&#xff0c;提供了一个JDBC技术(Java Database Connectivity&#xff0c;JDBC&#xff0c;Java数据库连接)&#xff0c;它的作用是连接数据库并访问。接下来小编带大家一起来学习JDBC技术! …

天猫汽车商详页的SSR改造实践

由于汽车业务的特殊性&#xff0c;天猫汽车基于 Rax 多页应用自建了商品详情的 H5 页面。自定义商详承载了众多业务能力和投放场景。随着业务的发展和页面承载内容的增多&#xff0c;开始出现白屏时间太长等体验问题。前端性能优化算是个老生常谈的问题&#xff0c;我们的页面已…