物联网中RocketMQ的使用

news2025/1/13 8:05:43

物联网中RocketMQ的使用

1. 背景

       随着物联网行业的发展、智能设备数量越来越多,很多常见的智能设备都进入了千家万户;随着设备数量的增加,也对后台系统的性能提出新的挑战。

       在日常中,存在一些特定的场景,属于高并发请求,瞬时的压力会达到20w/s。为了保证用户的使用体验,提高服务器的处理能力刻不容缓。那应该如何来提高后台服务处理能力呢?

系统访问的两种常见方式:

  • 同步方式,该方式是常用的一种方式,当请求时,就会阻塞当前线程等待返回结果。该方式开发很简单,但是在高并发的情况下,会导致系统负载剧增,处理能力下降,甚至会导致上游服务等待过长产生熔断。
  • 异步方式,当我们请求时,不需要阻塞当前线程(主线程),当时结果处理完之后,再回调告诉主线程;该方式很适合与高并发的场景,但开发比较困难,会导致失败的情况。

2. 缓冲池

       在讲解RocketMQ之前,首先来介绍缓冲池的概念:

       缓冲池是计算机里的概念,很像生活中的蓄水池:从上游来的水并不是直接到达用户家里,而是先存储到一个水池里面;其优点有:

  • 多余的水可以在水池里面存起来,防止压力直接就传到下游;
  • 控制水池的阀门可以调节出水大小,可以根据实际的使用控制水流。

3. RocketMq的组成

       有了缓存池的概念,就很好理解RocketMQ工作的基本原理,在RocketMQ中主要有三部分组成:

  • 生产者(producer),生产者用户生产消息,就像上游的水。
  • brokerbroker就像蓄水池一样,会将生产的消息存储起来。
  • 消费者(consumer),将broker队列里面的消息拉出来消费。

其工作的原理图如下:

在这里插入图片描述

       通过生产者将消息传递到broker存储起来,然后消费者去消费broker里面的消息,其中broker是路由,具体的存储是采用了队列数据结构。

4. RocketMQ在物联网中的应用

       根据它的组成和原理,其非常合适用于异步解耦。比如在某个时间段,大量的请求来到云服务,如果云服务采用同步的方式去控制设备,势必导致服务延迟严重,出现大批量的失败。

       因此为了达到异步解耦,流量削峰,当大量的请求来到云服务的时候,并不是直接去控制设备,而是先将控制消息存入RocketMQ中,然后通过消费者去控制设备。因此具体的控制流程图如下:

在这里插入图片描述

       如图所示,当控制参数来到服务时,现将控制的参数封装为message,然后通过生产者将消息存放到RocketMQbroker存储起来,在消费者端将启动多线程去消费消费消息—即去控制设备,当设备控制完成后,将控制的结果告诉上游服务。

       通过以上的设计,不管多大的流量来,都先将参数存入rocketMQ中,然后慢慢去消费,有效地保护了云端服务,提供了云端的性能,提高了用户的使用体验。

5. 使用RocketMQ搭建一个简单的生产者和消费者demo

  • 前提—RocketMQ已经搭建好

  • 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>
  • 配置文件
server.port: 8085

rocketmq:
  name-server: 10.16.17.246:9876
  producer:
    group: rocket-demo
  • 首先是生产者,这里开发了一个接口:
@RestController()
@RequestMapping("/producer")
public class ProducerController {

    // 注入
    @Resource
    RocketMQTemplate rocketMQTemplate;


    @PostMapping("/push")
    public void producer(){
        ProducerDto producerDto = new ProducerDto();
        Random random = new Random();
        producerDto.setApplianceId(String.valueOf(random.nextInt()));
        producerDto.setReqId(UUID.randomUUID().toString());
		// 将producerDto 封装为消息
        Message<ProducerDto> message = withPayload(producerDto).setHeader(PROPERTY_KEYS, producerDto.getReqId()).build();
        System.out.println("生产的消息为:" + message);
        // 将消息塞入指定的TOPIC
        rocketMQTemplate.syncSend(TOPIC_DEMO, message);
    }
}

其中,ProducerDto消息体有applianceIdreqId两个参数,broker中将消息路由到与topic绑定的队列中。

  • 消费者
@RocketMQMessageListener(
        consumeThreadNumber = 16,
        topic = TOPIC_DEMO,
        consumerGroup = "consumer_demo_group"

)
@Component
public class Consumer implements RocketMQListener<MessageExt> {

    private final String CHARSET = Charset.defaultCharset().name();

    @Override
    public void onMessage(MessageExt message) {
        byte[] body = message.getBody();
        String str = new String(body, Charset.forName(this.CHARSET));
        System.out.println("消费者消费的消息为: " + str);
    }
}

在消费者类上加上@RocketMQMessageListener、@Component注解即可实现消费。

  • 结果, 实现了消息的生产和消费
生产的消息为:GenericMessage [payload=ProducerDto(reqId=086a5666-922a-4345-b408-956ba6bc18d9, applianceId=2140322229), headers={KEYS=086a5666-922a-4345-b408-956ba6bc18d9, id=12816bba-1
259-469a-d0ff-e42311a27fe7, timestamp=1676713480448}]
消费者消费的消息为: {"reqId":"086a5666-922a-4345-b408-956ba6bc18d9","applianceId":"2140322229"} 

6. 结语

以上是关于RocketMQ的简单介绍,只是皮毛而已,里面还有很多的功能可以挖掘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/354804.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java数据类型、基本与引用数据类型区别、装箱与拆箱、a=a+b与a+=b区别

文章目录1.Java有哪些数据类型2.Java中引用数据类型有哪些&#xff0c;它们与基本数据类型有什么区别&#xff1f;3.Java中的自动装箱与拆箱4.为什么要有包装类型&#xff1f;5.aab与ab有什么区别吗?1.Java有哪些数据类型 8种基本数据类型&#xff1a; 6种数字类型(4个整数型…

java ssm志愿者信息服务平台springmvc

志愿者服务平台&#xff0c;采用ssm框架技术&#xff0c;java语言&#xff0c;jsp前端页面以及js&#xff0c;DIVCSS&#xff0c;html5等技术综合应用开发实现。志愿者服务平台&#xff0c;采用了前台后台的模式开发&#xff0c;前台用于志愿者在线项目培训&#xff0c;视频学习…

CUDA编程接口

编程接口 文章目录编程接口3.1利用NVCC编译3.1.1编译流程3.1.1.1 离线编译3.1.1.2 即时编译3.1.2 Binary 兼容性注意&#xff1a;仅桌面支持二进制兼容性。 Tegra 不支持它。 此外&#xff0c;不支持桌面和 Tegra 之间的二进制兼容性。3.1.3 PTX 兼容性3.1.4 应用程序兼容性3.1…

Kafka框架快速入门及异步通知

文章目录1、异步通信原理1.1 观察者模式1.2 生产者消费者模式1.3 缓冲区1.4 数据单元2、消息系统原理2.1 点对点消息传递2.2 发布订阅消息传递3、Kafka简介3.1 设计目标3.2 Kafka的优点4、Kafka系统架构4.1 Broker4.2 Topic4.3 Partition4.4 Leader4.5 Follower4.6 replication…

ACL与NAT

ACL---访问控制列表&#xff0c;是一种策略控制工具 功能&#xff1a;1.定义感兴趣流量&#xff08;数据层面 &#xff09; 2.定义感兴趣路由&#xff08;控制层面&#xff09; ACL 条目表项组成&#xff1a; 编号规则&#xff1a;步数或者跳数默认值为5&#xff0c;…

Rancher 部署 MySQL

文章目录创建 pvc部署 MySQL前置条件&#xff1a;安装 rancher&#xff0c;可参考文章 docker 部署 rancher 创建 pvc MySQL 数据库是需要存储的&#xff0c;所以必须先准备 pvc 创建 pvc 自定义 pvc 名称选择已经新建好的 storageclass&#xff0c;storageclass 的创建可参考…

Mac os如何安装绿盾客户端

环境&#xff1a; Apple Mac mini 八核M1芯片 8G 256G Mac os 11.0 问题描述&#xff1a; Mac os如何安装绿盾客户端 解决方案&#xff1a; 一、关闭系统保护 1.关机电脑&#xff0c;按住“开机键”不放直到屏幕上出现“选项” 点击“继续”&#xff0c;等待进入恢复模…

github 使用

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录一、git与github二、出错的地方1.GitHub没有css样式2、git clone出现错误3、明明创建了responsibility 但git 不显示一、git与github 这个博客写的很好&#xff01;…

剔除绿化后的比较可靠的评价模型

sheet4权重实际评分&#xff08;越高越不好&#xff09;&#xff08;0-5分&#xff09;正向化&#xff08;5-X&#xff09;后分别与权重相乘得到该地的总分总分高&#xff0c;排名高为好&#xff0c;即污染小排序后

【Kubernetes】第三篇 - ci-server 构建节点 Docker、Jenkins 环境搭建

一&#xff0c;前言 上一篇&#xff0c;主要介绍了阿里云服务器的采购和简单配置&#xff1a; 三台服务器规划如下&#xff1a; 服务配置内网IP外网IP说明ci-server2c4g172.17.178.104182.92.4.158Jenkins Nexus Dockerk8s-master2c4g172.17.178.10547.93.9.45Kubernetes …

ARM uboot 源码分析5 -启动第二阶段

一、start_armboot 解析6 1、console_init_f (1) console_init_f 是 console&#xff08;控制台&#xff09;的第一阶段初始化。_f 表示是第一阶段初始化&#xff0c;_r 表示第二阶段初始化。有时候初始化函数不能一次一起完成&#xff0c;中间必须要夹杂一些代码&#xff0c;…

ccc-pytorch-回归问题(1)

文章目录1.简单回归实战&#xff1a;2.手写数据识别1.简单回归实战&#xff1a; 用 线性回归拟合二维平面中的100个点 公式&#xff1a;ywxbywxbywxb 损失函数&#xff1a;∑(yreally−y)2\sum(y_{really}-y)^2∑(yreally​−y)2 迭代方法&#xff1a;梯度下降法&#xff0c;…

【QA】[Vue/复选框全选] v-model绑定每一项的赋初值问题

发生场景&#xff1a;不只是复选框的状态改变&#xff0c;还有的功能要用到复选框的选中状态&#xff0c;比如&#xff1a;购物车计算总价&#xff0c;合计等等。 引入&#xff1a;复选框 checkbox 在使用时&#xff0c;需要用v-model绑定布尔值&#xff0c;来获取选中状态&…

一台电脑安装26个操作系统(windows,macos,linux,chromeOS,Android,静待HarmonyOS)

首先看看安装了哪些操作系统1-4: windows系统 四个5.Ubuntu6.deepin7.UOS家庭版8.fydeOS9.macOS10.银河麒麟11.红旗OS12.openSUSE Leap13.openAnolis14.openEuler(未安装桌面UI)15.中标麒麟&#xff08;NeoKylin&#xff09;16.centos17.debian Edu18.fedora19.oraclelinux(特别…

Rust Web入门(一):TCP 和 HTTP Server

本教程笔记来自 杨旭老师的 rust web 全栈教程&#xff0c;链接如下&#xff1a; https://www.bilibili.com/video/BV1RP4y1G7KF?p1&vd_source8595fbbf160cc11a0cc07cadacf22951 学习 Rust Web 需要学习 rust 的前置知识可以学习杨旭老师的另一门教程 https://www.bili…

【原创】java+swing+mysql图书管理系统设计与实现

图书管理系统是一个比较常见的系统&#xff0c;今天我们主要介绍如何使用javaswiingmysql去开发一个cs架构的图书管理系统&#xff0c;方便学生进行图书借阅。 功能分析&#xff1a; 宿舍报修管理系统的使用角色&#xff0c;一般分为管理员和学生&#xff0c;管理员主要进行学…

学习OpenGL图形2D/3D编程

环境&#xff1a;WindowsVisual Studio 2019最流行的几个库&#xff1a;GLUT&#xff0c;SDL&#xff0c;SFML和GLFWGLFWGLAD库查看显卡OPENGL支持情况VS2019glfwgladopenGL3.3顶点着色器片段着色器VAO-VBO-(EBO)->渲染VAO-VBO-EBO->texture纹理矩阵matrix对图形transfor…

jmx prometheus引起的一次cpu飙高

用户接入了jmx agent进行prometheus监控后&#xff0c;在某个时间点出现cpu飙高 排查思路&#xff1a; 1、top&#xff0c;找到java进程ID 2、top -Hp 进程ID&#xff0c;找到java进程下占用高CPU的线程ID 3、jstack 进程ID&#xff0c;找到那个高CPU的线程ID的堆栈。 4、分析堆…

jenkins基础部署

一、jenkins是什么1.Jenkins的前身是Hudson&#xff0c;采用JAVA编写的持续集成开源工具。Hudson由Sun公司在2004年启动&#xff0c;第一个版本于2005年在java.net发布。2007年开始Hudson逐渐取代CruiseControl和其他的开源构建工具的江湖地位。在2008年的JavaOne大会上在开发者…

【Vue3源码】第二章 effect功能的完善下

【Vue3源码】第二章 effect功能的完善下 前言 上一章节我们实现了effect函数的runner 和 scheduler&#xff0c;这一章我们继续完善effect函数的功能&#xff0c;stop和onstop。 1、实现effect的stop功能 顾名思义&#xff0c;stop就是让effect停下来的函数。那么怎么才能让…