7.微服务项目实战---Rocketmq--消息驱动

news2025/1/23 4:40:15

7.1 MQ简介

7.1.1 什么是MQ

MQ Message Queue )是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。

7.1.2 MQ的应用场景

7.1.2.1 异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:

此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:

异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将 较耗时而且不需要即时(同步)返回结果 的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
7.1.2.2 流量削峰
流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购 ( 高并发 ) 活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ

秒杀处理流程如下所述:
  1. 用户发起海量秒杀请求到秒杀业务处理系统。
  2. 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ
  3. 下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
  4. 用户收到秒杀成功的通知。

7.1.3 常见的MQ产品

目前业界有很多 MQ 产品,比较出名的有下面这些:
  • ZeroMQ  号称最快的消息队列系统,尤其针对大吞吐量的需求场景。扩展性好,开发比较灵活,采用C语言 实现,实际上只是一个socket库的重新封装,如果做为消息队列使用,需要开发大量的代码。ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。
  • RabbitMQ  使用erlang语言开发,性能较好,适合于企业级的开发。但是不利于做二次开发和维护。
  • ActiveMQ  历史悠久的Apache开源项目。已经在很多产品中得到应用,实现了JMS1.1规范,可以和springjms轻松融合,实现了多种协议,支持持久化到数据库,对队列数较多的情况支持不好。
  • RocketMQ  阿里巴巴的MQ中间件,由java语言开发,性能非常好,能够撑住双十一的大流量,而且使用起来很简单。
  • Kafka  Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

7.2 RocketMQ入门

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是 Apache 的一个顶级项目。在阿里内部使用非常广泛,已经经过了" 11" 这种万亿级的消息流转。

7.2.1 RocketMQ环境搭建

接下来我们先在 linux 平台下安装一个 RocketMQ 的服务
7.2.1.1 环境准备
下载 RocketMQ
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
环境要求
  • Linux 64位操作系统
  • 64bit JDK 1.8+
7.2.1.2 安装 RocketMQ

1.上传文件到Linux系统

2.解压到安装目录

7.2.1.3 启动 RocketMQ

1.切换到安装目录

2.启动NameServer

 3.启动Broker

​​​​​​​

7.2.1.4 测试 RocketMQ
1 测试消息发送
2 测试消息接收
7.2.1.5 关闭 RocketMQ

7.2.2 RocketMQ的架构及概念

如上图所示,整体可以分成 4 个角色,分别是: NameServer Broker Producer Consumer
  • Broker(邮递员)  Broker是RocketMQ的核心,负责消息的接收,存储,投递等功能
  • NameServer(邮局)  消息队列的协调者,Broker向它注册路由信息,同时ProducerConsumer向其获取路由信息
  • Producer(寄件人)  消息的生产者,需要从NameServer获取Broker信息,然后与Broker建立连接,向Broker发送消息
  • Consumer(收件人)  消息的消费者,需要从NameServer获取Broker信息,然后与Broker建立连接,从Broker获取消息
  • Topic(地区 用来区分不同类型的消息,发送和接收消息前都需要先创建Topic,针对Topic来发送和接收消息
  • Message Queue(邮件 为了提高性能和吞吐量,引入了Message Queue,一个Topic可以设置一个或多个Message Queue,这样消息就可以并行往各个Message Queue发送消息,消费者也可以并行的从多个Message Queue读取消息
  • Message  Message 是消息的载体。
  • Producer Group  生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
  • Consumer Group  消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组。

7.2.3 RocketMQ控制台安装

1 下载
2 修改配置文件

3 打成 jar 包,并启动

4 访问控制台

7.3 消息发送和接收演示

接下来我们使用 Java 代码来演示消息的发送和接收

7.3.1 发送消息

消息发送步骤 :
  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者

7.3.2 接收消息

消息接收步骤 :
  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者

7.4 案例

接下来我们模拟一种场景 : 下单成功之后,向下单用户发送短信。设计图如下:

7.4.1 订单微服务发送消息

1 shop - order 中添加 rocketmq 的依赖

2 添加配置
3 编写测试代码

7.4.2 用户微服务订阅消息

1 修改 shop - user 模块配置

2 修改主类

3 修改配置文件

4 编写消息接收服务

5 启动服务,执行下单操作,观看后台输出

7.5 发送不同类型的消息

7.5.1 普通消息

RocketMQ 提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
  • 可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方
式。
此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
  • 可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送
方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知
启动转码服务,转码完成后通知推送转码结果等。
  • 单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不
等待应答。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

三种发送方式的对比

 7.5.2 顺序消息

顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。

 

7.5.3 事务消息

RocketMQ 提供了事务消息,通过事务消息就能达到分布式事务的最终一致。
事务消息交互流程 :
两个概念 :
  • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成暂不能投递状态,处于该种状态下的消息即半事务消息。
  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失, RocketMQ服务端通过扫描发现某条消息长期处于半事务消息时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
事务消息发送步骤:
  1. 发送方将半事务消息发送至RocketMQ服务端。
  2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤
  1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。

7.6 消息消费要注意的细节

RocketMQ 支持两种消息模式 :
  • 广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;
  • 集群消费: 一条消息只能被一个消费者实例消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/466834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么注册Google账号(使用国内手机号注册)

怎么注册Google账号(使用国内手机号注册) 记录一下如何用 国内的手机号 注册Google账号 文章目录 怎么注册Google账号(使用国内手机号注册)进入Google官网创建账号注册信息填写手机号(踩坑版)填写手机号&am…

MySQL——超详细数据库触发器教程

文章目录 一、触发器的概念 二、创建触发器 三、查看触发器 四、删除触发器 一、触发器的概念 在实际开发中往往会碰到这样的情况: 当我们对一个表进行数据操作时,需要同步对其它的表执行相应的操作,正常情况下,如果我们使用s…

C语言字符串函数,字符函数,内存操作函数

提示: 本篇文章涉及到以下内容: 求字符串长度 strlen 长度不受限制的字符串函数(被VS认为不安全,就像scanf)–>非法也要完成任务 strcpy 拷贝(将原字符串内容和\0全拷贝过去) strcat 追加(先找到目标空间中的\0,然后把原字符串中的内容直到\0全拷贝过去,原字符串…

【RPA开发】lxml 库之 etree 使用详解

通过 requests.get 方法获得 html 源代码后,可以通过 etree 进行解析,进而从源代码中提取关键信息。etree 同 Beautiful Soup 一样均可以解析 xml 和 html,两者不同之处在于:etree主要通过 xpath 进行定位,而 Beautifu…

基于Spring Boot+Vue 的校园健康系统设计与实现(附源码,文档)

一 简介 校园健康系统本质上是一个健康知识浏览和在线咨询的平台,从用户角度,系统包括大学生、医生和管理员。 二.主要技术 技术名作用Springboot后端框架Vue前端框架MySQL数据库 三 功能介绍 校园健康系统为用户提供医生预约服务,系统…

Clickhouse分布式表引擎(Distributed)查询核心原理解析

Clickhouse分布式表引擎(Distributed)查询核心原理解析 Clickhouse分布式表引擎(Distributed)写入核心原理解析Clickhouse分布式表引擎(Distributed)查询核心原理解析 与分布式数据写入时可以选择写分布式…

有哪家台灯好又便宜的适合学生党使用?真正合格的小学生台灯

都说眼睛是心灵的窗户,但是现在很多小朋友还没上初中,可能就早早的近视了。究其原因,除了和频繁观看电子屏幕密不可分之外,不良的用眼习惯也是一大关键。孩子写作业时不时揉眼睛的动作,其实只要时间一长,眼…

MYSQL prefer_order_index 的罪责

开头还是介绍一下群,如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题,有需求都可以加群群内有各大数据库行业大咖,CTO,可以解决你的问题。加群请联系 liuaustin3 ,在新加的朋友会分到2群(共…

Linux运维之初识shell

一.补充知识点 1.系统定时任务 系统定时任务需要用到crontab命令,但是使用此命令有一个前提,即需要打开crond服务。为了不那么复杂,可以直接使用我之前学的systemctl命令重新启动crond服务。 语法:crontab [-e -l -r] 选项&am…

安装zsh-theme oh-my-zsh

安装zsh yum install zsh切换到zsh chsh -s /bin/zsh exec /bin/zsh重启并且查看 echo $SHELL//查看当前shell,如果显示/bin/zsh,则配置成功 安装oh my zsh sh -c "$(wget https://raw.github.com/ohmyzsh/ohmyzsh/master/tools/install.sh -O -)"到…

浅谈Java线程

大家好,我是易安!今天我们简单聊下Java线程这个话题。 在Java领域,实现并发程序的主要手段就是多线程。线程是操作系统里的一个概念,虽然各种不同的开发语言如Java、C#等都对其进行了封装,但是万变不离操作系统。Java语…

您的天气类APP会泄露隐私吗?

不知您是否有这样的习惯,在早上出门前、或是在规划次日的行程时,都会不自觉地掏出手机、点开天气类APP进行查看。此类APP有的是智能手机自带的,有的是从应用商店里下载并获取的第三方应用。无论是哪种,它们往往都有着一个共性&…

网络安全合规-汽车行业数据合规

个人信息,是指以电子或者其他方式记录的与已识别或者可识别的车主、驾驶人、乘车 人、车外人员等有关的各种信息,不包括匿名化处理后的信息。 敏感个人信息,是指一旦泄露或者非法使用,可能导致车主、驾驶人、乘车人、车外人员等受…

神策营销云时效性升级,秒级营销即刻开启

信息化时代,时效性成为企业营销与管理的重要竞争力之一。高时效营销能够帮助企业提高决策效率、降低成本,“争分夺秒”留住用户并给用户带来更好的体验,它是促成企业成功营销的关键。 为了帮助企业全面提升营销时效性,神策营销云即…

一次修改jar包中字节码文件内容的尝试

目录 背景解决办法确定修改位置得到字节码文件修改字节码文件组合jar包 背景 最近想实现按照分节符拆分doc / docx文档的功能&#xff0c;然后就找到了这篇文章Java 按节拆分 Word 文档&#xff0c;用的依赖是&#xff1a; <dependency><groupId>e-iceblue</g…

Arduno ESP8266接入OneNET实时显示DHT11数据

Arduno ESP8266接入OneNET实时显示DHT11数据 📌相关篇《OneNET云平台数据APP端查看说明》📍《Arduno ESP8266接入中移OneNet动态显示实时数据》✨上面一篇主要是验证数据上传可行性,这次采用DHT11温湿度传感器上传真实数据到云平台进行检测,同时使用SSD1306屏幕进行数据显…

简单聊聊目标检测新范式RT-DETR的骨干:HGNetv2

【前言】 本文版权属于GiantPandaCV&#xff0c;未经许可&#xff0c;请勿转账&#xff01; 前几天疯狂刷屏的RT-DETR赚足了眼球&#xff0c;在精度和速度上体现的优势和性价比远远高于YOLO&#xff0c;而今年ChatGPT、Sam的出现&#xff0c;也让一些吃瓜群众知乎CNN没有未来了…

第8章:树

1.树是什么 一种分层数据的抽象模型前端工作中常见的树包括&#xff1a;DOM树&#xff0c;级联选择(省市区)&#xff0c;树形控件&#xff0c;…javascript中没有树&#xff0c;但是可以用Object和Array构建树 4.树的常用操作&#xff1a;深度/广度优先遍历&#xff0c;先中后…

【传统方式部署zookeeper集群与迁移至k8s】

zookeeper简介&#xff1a; zk主要服务于分布式系统、配置管理、注册中心、集群管理等&#xff1b;为什么要迁移Zookeeper集群&#xff1b;存储kafka什么数据&#xff1a;kafka有多少节点、topic名称、协调kafka正常运行。ELKKafka收集k8s日志&#xff1b;一、传统方式部署zook…

浙江省区块链数字资产登记中心筹备会议顺利举行

4月25日下午&#xff0c;由浙江省区块链技术应用协会主办、西溪谷管委会、西湖区网联会协办的“浙江省区块链数字资产登记中心筹备会议”在西湖蚂蚁小镇多功能厅顺利举行。 出席本次筹备会议的有中国电子技术标准化研究院区块链研究室主任、IEEE 计算机 协会区块链和分布式记帐…