提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
RocketMQ
- 一、RocketMQ概念~一览无余
- 1.消息队列有啥用?能干啥?消息队列的应用场景?
- 2.常见的消息队列有哪些?如何进行消息队列的技术选型?
- 3.RocketMQ的架构中的角色:
- 4.RocketMQ使用长轮询
- 5.RocketMQ集群:
- 二、RocketMQ实操~整一点
- 1.一言不合就看官网
- 2.使用RocketMQ
- 3.RocketMQ与SpringBoot的整合:
- 巨人的肩膀
一、RocketMQ概念~一览无余
我感觉消息队列或者说消息中间件就像是一个藏咱们私房钱的地方。呀呀呀,这个月的私房钱太多了花不了,先存到这个容器里面,当咱们需要用钱时从里面取出来消息去用
- 【为什么又叫消息中间件,是因为这个消息队列更多的指的是各个微服务以及系统内部各个组件/模块之间的一种通信方式,相当于是一个中间桥梁,所以也可以叫中间件】
- RocketMQ:阿里推出的,放到Apache中孵化
- 中间件:
- 消息中间件【RocketMQ、kafka】、文件中间件【FastDFS】、缓存中间件【Redis】、搜索中间件【ES】
消息的本质就是数据或者待处理的命令
1.消息队列有啥用?能干啥?消息队列的应用场景?
- 通常来说,使用消息队列能为我们的系统带来下面几点好处:
- 应用解耦:让应用之间不在相互依赖,降低系统耦合性
- 之前咱们all in one拆开后,都在一个JVM中,方便
- SOA【服务之间相互调用】中,A调B服务,B宕机了,怎么办
- 服务降级,弃车保帅
- 对B做集群,相当于做备份,一个调不通就调第二个,要是B集群中所有节点都宕机了怎么办-------->
这种情况也说明A模块和B模块之间是强耦合【一个坏了会影响另一个,这就叫强耦合,不是说单一或者分布式咋咋咋】
- 找中间商、中介、第三方来解耦和,A想干啥,把你的意图先告诉第三方,B有活力有空了来满足你------>异步调用
- 流量削峰,或者说削峰限流【流量削峰中接入层跟第三方这俩得足够强大,这俩不能拉垮】
- 先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。
- 之前没用RocketMQ时,碰到双11这种流量高峰时,经常使用限流算法【令牌桶、漏桶、计数器】
- 但用限流算法之后,你想呀,你害怕处理不了你限定,让你的餐厅连锁店一天只能进100个人,要是本来能容纳一万人你得亏多少呀,对不对,你这得丢失多少用户呀【之前用户请求来了,到接入层,你直接用service来接接入层送来的流量,此时为了安全你service上得有限流算法,这样会丢失用户】
- 并且令牌桶你会把数据缓存进来,但是这个服务拉垮了其他服务是不能帮忙处理的
- 使用了消息队列之后,可以使用消息中间件来缓冲大量的请求,匀速消费,
当消息队列中堆积消息过多时,我们可以动态上线增加消费端
,来保证不丢失重要请求- 显得很灵活【保证第三方里的消息不会堆积太多,之后用了Docker后,比如服务是Docker中一个一个镜像实例化出来的容器,那么第三方里流量多了我集群中处理的节点【Docker镜像实例化的容器】就增加,流量少了集群中减少【Docker镜像实例化的容器】,这就更灵活了,足够健壮【安全】】
- 搞个接入层,不管你多少请求流量来,我都能接住你全部这么多请求,接住流量之后再送给第三方,再由service从第三方那里消费这些请求
- 大数据处理:消息中间件可以把各个模块中产生的管理员操作日志、用户行为、系统状态等数据文件作为消息收集到主题中,数据使用方可以订阅自己感兴趣的数据内容互不影响,进行消费
- 异构系统:跨语言
- 并且可以
通过异步处理提高系统性能(减少响应所需时间)
。因为当系统整个的请求量太大时,也就是系统太忙时,将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费(请求数据在后续的业务校验、写数据库等操作中可能失败) - 异步消息:
想要快速发消息,并且可以保证消息不丢失。send方法不会阻塞去等待broker的确认,而是会采用事件监听方式接受broker返回的确认,这不就是异步嘛
producer.send(message,new SendCallback() { public void onSuccess(SendResult sendResult) { // TODO Auto-generated method stub System.out.println("ok"); } public void onException(Throwable e) { // TODO Auto-generated method stub e.printStackTrace(); System.out.println("err"); } });
- 有时候上下文消息有关联时,用异步消息可能来不及,人家可能等不及你回调
- 并且可以
- 应用解耦:让应用之间不在相互依赖,降低系统耦合性
2.常见的消息队列有哪些?如何进行消息队列的技术选型?
- kafka。开源的一个分布式流式处理平台,已经成为 Apache 顶级项目,早期被用来用于处理海量的日志,后面才慢慢发展成了一款功能全面的高性能消息队列
- 流式处理平台常见的关键功能有:作为消息队列、容错的持久方式存储记录消息流、作为一个流式处理平台在消息发布时进行处理
- Kafka 官网:http://kafka.apache.org/
- RocketMQ 是阿里开源的一款云原生“消息、事件、流”实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。
- RocketMQ 官网:
https://rocketmq.apache.org/
(文档很详细,推荐阅读)
- RocketMQ 官网:
- RabbitMQ:RabbitMQ 官网:https://www.rabbitmq.com/
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是你的首选。
- Pulsar 官网:https://pulsar.apache.org/
3.RocketMQ的架构中的角色:
- NameServer集群:相当于一个Topic路由
注册中心【来管教broker的,也就是支持Broker的动态注册与发现】
,提供注册发现功能,- 特点:
是一个无状态节点
,【无状态指的是不存数据,什么属性都没有,每个节点都长得一样】。从而利用允许不同NameServer之间数据不同步来避免各节点数据强一致性带来的额外性能消耗。- zookeeper是有状态的,每个节点存的数据是一样的,节点间会相互通信同步,nameserver各个节点不相互通信,各玩各的
- 那既然不做数据交互,如何保证数据一致性,其中broker启动时会向所有的nameserver节点建立长连接上报自己的topic信息和queue信息【只要你上线nameserver后】,间接的由这种方法来维持数据间的一致性,来弥补生产者和消费者未进行的数据一致性操作带来的缺陷。
- NameServer不保证数据一致性,AP,性能极高【数据不持久化都在内存里,因为这里面的数据一般变化快】
- 底层由netty实现,提供了路由管理、服务注册、服务发现的功能
- 客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求
- nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
- nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用
- NameServer集群间互不通信,没有主备的概念
- nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
- 主要的功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳机制,检查Broker是否存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息【Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息】和用于Producer及Consumer这些个客户端查询的队列信息。然后Producer及Consumer这些个客户端通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的生产投递和消费了。
- 当某个NameServer因为某种原因下线了时,Broker仍然可以向集群中的其他NameServer节点同步其路由信息
- 特点:
- broker集群:
- 每个Broker节点在启动时,都会遍历NameServer列表【不管你是真的注册中心还是高度模拟注册中心,人家到你这来注册,你不得把人家和人家要存的信息映射关系给存下来,这是你的责任】,与每个NameServer建立长连接,注册自己的信息,之后定时上报
- 每个Broker与nameserver集群中的所有节点建立长连接,不仅向nameserver上报自己broker上线相关信息,还得定时注册Topic信息【我目前这个broker中有哪些Topic】到所有nameserver
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的BrokerName不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave
- producer发给broker消息之后,就跟producer没关系了,broker会持久化到内存,然后可以刷盘到硬盘。而consumer跟broker不管是推还是拉都是有消息成功则ack确认的。
- 每个Broker节点在启动时,都会遍历NameServer列表【不管你是真的注册中心还是高度模拟注册中心,人家到你这来注册,你不得把人家和人家要存的信息映射关系给存下来,这是你的责任】,与每个NameServer建立长连接,注册自己的信息,之后定时上报
- Producer:
- 去连接nameserver,问nameserver我想要发送的这个Topic在哪个broker上。然后producer才能发【客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求】
- 比如咱们模拟一个Producer,代码如下:
package com.aiminhu.rocketmq; import org.apache.rocketmq.client.exception.MQBrokerException; import ...... /** * Created by HuHongBo on 2022/12/21. * 消息发送方 */ public class Producer { public static void main(String[] args) { /** * 客户端【比如咱们写的一个Producer类不就算是一个客户端嘛】应该先连向nameserver,由nameserver给我找一个或者说分配一个broker给我客户端,然后客户端再向broker发起消息发送请求 */ DefaultMQProducer producer = new DefaultMQProducer("xxooGroup"); /** * 设置nameserver的地址和端口 */ producer.setNamesrvAddr("192.168.1.165:9876"); try { producer.start(); /** * Message的第一个参数Topic表示消息将要发送的地址 * body表示Message中真正发的消息体,真实的数据 */ Message message = new Message("myTopic001", "first message".getBytes()); producer.send(message); } catch (MQClientException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
- 然后在rocketMq的rocketmq-console的web监控页面就能看到Topic
- 然后在rocketMq的rocketmq-console的web监控页面就能看到Topic
- producer.sendOneWay(message);只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别
- 有可能会丢失消息的
- 一般你没回调你也没同步,非常容器丢失消息,UDP就是因为没有回调或者没有同步
- 批量消息发送:发送多条消息,可以把消息们放到列表中打包到一起
- Consumer
- 一个consumer只能关注一个Topic,消费消息时有没有消费到会反馈ack给broker
- RocketMQ消费模式有几种?
消费模型由consumer决定,消费维度为Topic
- 集群消费:默认的消费模式,消息只被只消费一次
consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setMessageModel(MessageModel.CLUSTERING);
- 一组consumer同时消费一个topic,可以分配消费负载均衡策略分配consumer对应消费topic下的哪些queue
- 多个group同时消费一个topic时,每个group都会消费到数据
- 一条消息只会被一个group中的consumer消费
- 广播消费
- 消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。
- 当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次
- 只广播一次,你没收到我不会再发
- 集群消费:默认的消费模式,消息只被只消费一次
- Topic和Queue:
Topic是一个逻辑上的概念【Topic表示消息将要发送的地址,是个字符串】,实际上Message是在每个Broker上以Queue的形式记录
- 一个Topic不够你可以搞集群
RocketMQ的Topic是一组Message Queue的集合 ConsumeQueue,ConsumerQueue是通过消息偏移量建立的消息索引
。一条消息是广播消息还是队列消息由客户端消费决定
- 针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置。
- indexFile:消息的Key和时间戳索引
CommitLog:包含的是消息体,存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起。每个消息存储后都会有一个offset,代表在commitLog中的偏移量
。
- 启动后会被加载到内存中,加快查找消息速度
- 以Topic作为文件名称,每个Topic下又以queue id作为文件夹分组
- 针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置。
- Group
- Message
- RocketMQ消息存储机制:
- 很多使用文件系统存储的高性能中间件都是用了零拷贝技术来发送文件数据,比如Nginx。RocketMQ需要使用内存映射MappedByteBuffer这个类来实现零拷贝。
- 消息过滤:大数据过滤时特别重要。Message实例化时可以通过添加tag参数来过滤消费
- 在Producer中使用Tag:Message msg = new Message(“TopicTest”,“TagA” ,("Hello RocketMQ " ).getBytes(RemotingHelper.DEFAULT_CHARSET));
- 在Consumer中订阅Tag:consumer.subscribe(“TopicTest”, “TagA||TagB”);// * 代表订阅Topic下的所有消息
- 也可以用SQL表达式过滤:
MessageSelector selector = MessageSelector.bySql("order > 5"); consumer.subscribe("xxoo3", selector);
- RocketMQ事务消息:
- 用了MQ消息中间件之后,本地事务如果commit了,那么发到MQ的这条消息会被标识为真正可用,这边的consumer再从MQ拿消息消费
- 本地事务如果rollback了,那么发到MQ的这条消息会被撤回
- 消息重试:
- producer端:
- consumer端:
- 消费超时,单位分钟:consumer.setConsumeTimeout()
- 发送ack,消费失败:RECONSUME_LATER
- broker投递:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息不重试
- 重投使用messageDelayLevel
- producer端:
- 保证消息的顺序消费:
- MessageListenerOrderly对每个queue开启一个线程,在同一个线程中进行消费,消费(消息)是有序的【每个queue中保证了顺序】
- 或者说同一个Topic同一个queue,发消息时启动一个线程去发消息,消费时一个线程去消费,多个queue只能保证单个queue中的顺序
- RocketMQ消息存储机制:
- 刷盘机制:Producer把消息发到broker中后,broker中会刷盘。在CommitLog初始化时,判断配置文件 加载相应的service
4.RocketMQ使用长轮询
- RocketMq使用介于轮询和长连接之间的方式,也就是长轮询【client掌握主动权,如果没有消息供你消费则先将连接挂起来,这个消息玩完了我再去拉下一条来消费,我拉垮我就多用点时间消费消息,我牛B就少用点时间消费这条消息,而不用你server无脑推送。(server先起来,然后client再起来,跟broker建立连接,如果client有消息产生到broker,broker将消息推给server进行消费,消费完成后再建立连接请求,等待消息来,如果有消息产生进去由broker路由转发进行消费)】
- 因为轮询太浪费资源。【优点是无延迟,响应能被及时送达】
- 轮询最典型的实现是HTTP协议,每次请求都会建立一次三次握手连接
- 因为长连接得维护客户端状态【信息】,还有一个重要原因是RocketMQ嫌弃Server不知道客户端这边对消息的消费速度或者说消费能力,我server发了这么多过去你嫌多还是嫌少呀,多了那你能不能消化掉这么多消息呀,容易产生消息堆积,
- socket
- 因为轮询太浪费资源。【优点是无延迟,响应能被及时送达】
5.RocketMQ集群:
- 单Master模式:
- 只有一个 Master节点,配置简单,方便部署。这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
- 多Master模式:
- 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
- 优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
- 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
- 多Master多Slave模式(异步复制)
- 每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
- 缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
- 多Master多Slave模式(同步双写)
- 每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
- 优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
- 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
二、RocketMQ实操~整一点
1.一言不合就看官网
- RocketMQ
- 官网:https://rocketmq.apache.org/
- https://github.com/apache/rocketmq
2.使用RocketMQ
单纯简单使用MQ,用activeMQ、消息特别多流量特别大、低延迟时用面向集群cluster的RocketMQ
- 编译安装RocketMQ、按照QuickStart运行HelloWorld
- 学到了你该安装什么软件,该解压该配环境变量然后source /etc/profile,该弄就弄,
弄完了比如咱们现在用人家RocketMQ这个软件,就得先编译一下人家的源码,编译命令是:mvn -Prelease-all -DskipTests clean install -U
- 再强调,只有这个项目有pom.xml文件,这个项目才能被maven编译,所以看清楚找找这个pom.xml
- java ?,里面查询之后,有个java -verbose可以查询到咱们java的安装环境
- 编译RocketMq的源码后,进入他的bin目录下,./mqnamesrv和./mqbroker分别启动【这个namesrv相当于一个注册中心,这个broker相当于一个服务】mqnamesrv和mqbroker,mqnamesrv启动时会调用执行runserver.sh这个脚本,而mqbroker启动时会调用执行runbroker.sh这个脚本【改一下rubbroker.sh中Xms和Xma,改小一点】
- 学到了你该安装什么软件,该解压该配环境变量然后source /etc/profile,该弄就弄,
3.RocketMQ与SpringBoot的整合:
- 找官方的starter或者没有官方的starter就用下面的pom.xml
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.6.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.1</version> </dependency>
- Producer配置
- Config配置类:用于在系统启动时初始化producer参数并启动
package com.....rmq.controller; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { public static final Logger LOGGER = LoggerFactory.getLogger(MQConfig.class); @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); try { producer.start(); System.out.println("start...."); LOGGER.info(String.format("producer is start ! groupName:[%s],namesrvAddr:[%s]", this.groupName, this.namesrvAddr)); } catch (MQClientException e) { LOGGER.error(String.format("producer is error {}", e.getMessage(), e)); } return producer; } }
- Service消息发送类
package com.....rmq.service; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQService { @Autowired DefaultMQProducer producer; public Object sendMsg(String string) { for (int i = 0; i < 1; i++) { Message message = new Message("tpk02", "xx".getBytes()); try { return producer.send(message); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } }
- 配置文件
spring.application.name=mq01 rocketmq.producer.namesrvAddr=192.168.150.131:9876 rocketmq.producer.groupName=${spring.application.name} server.port=8081
- Config配置类:用于在系统启动时初始化producer参数并启动
- Consumer配置
- Config配置类
package com.....rmq.controller; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { public static final Logger logger = LoggerFactory.getLogger(MQConfig.class); @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.topics}") private String topics; @Bean public DefaultMQPushConsumer getRocketMQConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.subscribe(topics, "*"); consumer.registerMessageListener(new MyMessageListener() ); consumer.start(); return consumer; } }
- 消息处理类
package com.....rmq.controller; import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; public class MyMessageListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("来啦!!22!"); for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody()));; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
- 配置文件
spring.application.name=mq02 rocketmq.producer.namesrvAddr=192.168.150.131:9876 rocketmq.producer.groupName=${spring.application.name} rocketmq.consumer.topics=tpk02
- Config配置类
巨人的肩膀
RocketMQ官网
码农翻身
javaGuide
凤凰架构
官网:https://rocketmq.apache.org/