前言
QBM之前使用的消息中间件是ActiveMQ,后续需要升级为RocketMQ。
MQ广泛应用于很多业务场景中,主要的作用
- 异步解耦
- 削峰
- …
常用MQ中间件对比,参考官方文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis
协议和特点 | 消息有序性 | 定时消息 | 批量消息 | 广播消息 | 消息过滤 | 服务器触发的重新投递 | 消息存储 | ||
---|---|---|---|---|---|---|---|---|---|
ActiveMQ | Push model, support OpenWire, STOMP, AMQP, MQTT, JMS | Exclusive (独自)Consumer or Exclusive Queues can ensure ordering | Supported | Not Suppored | Supported | Supported | Not Supported | Supports very fast persistence using JDBC along with a high performance journal,such as levelDB, kahaDB | |
Kafka | Pull model, support TCP | Ensure ordering of messages within a partition | Not Supported | Supported, with async producer | Not Supported | Supported, you can use Kafka Streams to filter messages | Not Supported | High performance file storage | |
RocketMQ | Pull model, support TCP, JMS, OpenMessaging | Ensure strict ordering of messages,and can scale out gracefully | Supported | Supported, with sync mode to avoid message loss | Supported | Supported, property filter expressions based on SQL92 | Supported | High performance and low latency file storage |
通过学习并结合业务知识,重点思考的问题:
- 顺序性消费?顺序消费场景某个消息失败导致消息挤压?
- 消息的挤压?如何根据业务划分topic和tag?相同tag分group?,同一业务消息有相同的key?
- 消息消费的多线程问题?某个业务场景比如财务需要单线程消费?
- …
目录
- 概述
- 实践
- 原理分析
一、概述
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
基于最基础的发布订阅模型,而在实际的应用中,结构会更复杂。例如为了支持高并发和水平扩展,中间的消息主题需要进行分区(对应Message Queue),同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。
ps:存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。
核心概念
- Group:一类生产者或者消费者,每个包含GroupID
- Producer:消息发布者,RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。
- Consumer:消息订阅者,从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
- Message Queue:每个Topic下会由一个到多个队列来存储消息,RocketMQ 对 Topic 进行了分区,这种操作被称为队列(MessageQueue)。
- 死信队列:用于处理无法被正常消息的消息,当一条信息初次消费失败,消息队列RocketMQ会自动进行消息重试,达到重试最大次数仍然失败的话,若消费仍然失败,该消息不会被丢弃,而是直接发到设置的该Consumer对应的死信队列里面。
- Topic:消息主题
- Message:生产者向Topic发送并最终传给消费者的数据消息载体,生产者为消息定义的属性成为消息属性,包含Message Key和Tag,MQ本身会生成一个Message ID。
- Message Key:消息的业务标识,由消息生产者设置,唯一标识某个业务逻辑。
- Message ID:消息的全局唯一标识,由RocketMQ系统自动生成。
- Tag:消息标签,Topic下的进一步区分。
- Topic Partition:分区,物理上的概念,每个Topic包含一个或者多个分区。
- 消费位点:每个Topic会有很多分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset,分区开始的消费位点为MinOffset。
- 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅Topic的消费进度,设置完成后Consumer将接受设定时间点之后由Producer发送到消息队列服务端的消息。
其他消息类型相关概念
- 事务性消息:Exactly-Once:Consumer消费一次仅能消费一次。
- 集群消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID均分消费。
- 广播消息:相同的ConsumerGroup下的消息消费,每个Consumer按照GroupID全量消费。
- 定时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟到当前时间点之后某一个时间点才发给Consumer。
- 延时消息:Producer将消息发送到RocketMQ服务端,不期望消息被马上投递,而是推迟一定时间才发给Consumer。
- 事务消息:类似X/Open XA的分布式事务功能。
- 顺序消息:一种按照顺序进行发布和消费的消息模型,分为全局顺序消息和分区顺序消息。
- 全局顺序消息:对于指定的一个Topic,所有的消息按照严格的FIFO的顺序进行发布和消费。
- 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一分区内的消息按照严格的FIFO顺序进行发布的消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
其他消息相关概念
- 消息堆积:消息被堆积在了RocketMQ的服务端,Consumer没有能力消费来不及消费。
- 消息过滤:Consumer可以根据Tag对消息进行过滤,确保Consumer最终只能接受到被过滤后的消息,过滤操作是在服务端完成的。
- 消息轨迹:消息从生产到消费过程的链路追钟,方便定位排查问题。
参考:
- https://rocketmq.apache.org/docs/quickStart/01quickstart
- https://developer.aliyun.com/article/780968
二、实践
安装RocketMQ参考:https://rocketmq.apache.org/docs/quickStart/01quickstart
容器安装RocketMQ,需要分开安装Nameserver容器和Broker容器以及控制台Console容器,其中Nameserver和Broker的连接通过broker.conf
这样做是为了解耦和方便管理:https://juejin.cn/post/7218438764100108325
开发测试直接使用docker安装
# 拉取镜像
docker pull rocketmqinc/rocketmq
# 一、启动NameServer容器,创建一个新的容器并指定 RocketMQ 的镜像
docker run -d \
--name rmqnamesrv \
-p 9876:9876 \
-v /home/docker/mydata/rocketmq/conf:/root/config \
-v /home/docker/mydata/rocketmq/logs:/root/logs \
-e "JAVA_OPTS=-Duser.home=/opt" \
rocketmqinc/rocketmq \
sh mqnamesrv
# 参数说明:
-d 以守护线程方式启动
--name rmqnamesrv 设置容器名称
-p 9876:9876 端口映射
-v 把容器内的/root/logs日志路径挂载到宿主机的自定义路径中(需根据自己的路径自行创建)
-v 把容器内的/root/store数据存储目录挂载到宿主机的自定义目录(需根据自己的路径自行创建)
rocketmqinc/rocketmq 使用镜像的名称
sh mqnamesrv 执行name server脚本
# 进入容器
docker exec -it d60b /bin/bash
# 修改broker.conf文件,设置通信的brokerIP
vi ... /conf/broker.conf,然后添加brokerIP1 = xxx.xxx.xxx.xxx,内容为宿主机的IP
# broker.conf的其他配置项
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 回到宿主机,将broker.conf拷贝到宿主机
# nameserver容器内配置文件/opt/rocketmq-4.4.0/conf
docker cp d60b:/opt/rocketmq-4.4.0/conf/broker.conf /home/docker/mydata/rocketmq/conf/broker.conf
# 二、启动Broker容器
docker run -d \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v /home/docker/mydata/rocketmq/broker/logs:/root/logs \
-v /home/docker/mydata/rocketmq/broker/store:/root/store \
-v /home/docker/mydata/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c ../conf/broker.conf
# 参数说明
--link rmqnamesrv:namesrv 和rmqnamesrv容器通信
-p 10911:10911 把容器的非vip通道端口挂载到宿主机
-p 10909:10909 把容器的vip通道端口挂载到宿主机
-e “NAMESRV_ADDR=namesrv:9876” 指定namesrv的地址为本机namesrv的ip地址:9876
-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker 指定broker服务的最大堆内存(暂未配置)
sh mqbroker -c ../conf/broker.conf 读取../conf/broker.conf配置并启动broker
# 三、安装控制台
docker pull styletang/rocketmq-console-ng
docker run -d \
-p 8081:8080 \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=120.46.82.131:9876 -Drocketmq.config.isVIPChannel=false" \
styletang/rocketmq-console-ng
# 四、访问控制台(别忘了开8081防火墙)
xxx.xxx.xxx.xxx:8081
三、原理分析
参卡:官网文档:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis
3.1、RocketMQ部署模型
Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?
RocketMQ部署架构上主要分为四个部分
- 生产者Producer:通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
- 消费者Consumer:消费消息角色,支持推Push、拉Pull两种模式对消息消费,同时也支持集群方式和广播方式的消费。
- 域名服务器NameServer:一个简单的Topic路由注册中心,支持Topic、Broker的动态注册与发现。一般集群部署,各实例间相互不进行信息通讯,集群中的每个NameServer都全量的保存完整的路由信息,某个NameServer下线也不影响可用性。主要包括两个功能
- Broker管理:接受Broker集群的注册信息并保存下来作为路由信息,然后提供心跳检测机制,检测Broker是否还存活。
- 路由信息管理:保存关于Broker集群的整个路由信息 和 用于客户端查询的队列信息。
- **代理服务器Broker:**主要负责消息的存储、投递和查询以及服务高可用保证。因为各个Broker中的信息不一样,不能简单像NameServer那样直接集群部署,而是需要采取主从模式集群架构。Broker采取Master-Slave结构,通过指定相同的BrokerName、不同的BrokerId来区分主(BrokerId = 0)、从(BrokerId = 1)
小结
- Broker注册:每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer注册:Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer注册:Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
// TODO