RocketMQ 第一章
1、什么是MQ
Message Queue(消息队列),从字⾯上理解:⾸先它是⼀个队列。FIFO 先进先出的数据结构 —— 队列。消息队列就是所谓的存放消息的队列。
消息队列解决的不是存放消息的队列的⽬的,而是解决所谓的通信问题。
⽐如以电商订单系统为例,如果各服务之间使⽤同步通信,不仅耗时较久,且过程中容易受到⽹络波动的影响,不能保证⾼成功率。因此,我们可以使⽤异步的通信⽅式对架构进⾏改造。
使⽤异步的通信⽅式对模块间的调⽤进⾏解耦,可以快速提升系统的吞吐量。上游执⾏完消息的发送业务后⽴即获得结果,下游多个服务订阅到消息后各⾃进行消费。通过消息队列,屏蔽底层的通信协议,使得解耦和并⾏消费得以实现。
2、RocketMQ介绍
2.1、RocketMQ的由来
随着使⽤消息队列和虚拟主题的增加,阿⾥巴巴团队最早使⽤的 ActiveMQ IO 模块达到了瓶颈。为此尽⼒通过节流、断路器或降级来解决这个问题,但效果不佳。所以开始关注当时流⾏的消息传递解决⽅案 Kafka。不幸的是,Kafka ⽆法满⾜其要求,尤其是在低延迟和⾼可靠性⽅⾯。在这种情况下,决定发明⼀种新的消息传递引擎来处理更⼴泛的⽤例,从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前 RocketMQ 已经开源给 Apache 基⾦会。如今,已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。
2.2、RocketMQ vs ActiveMQ vs Kafka
消息产品 | 客户端 SDK | 协议和规范 | 订购信息 | 预定消息 | 批量消息 | 广播消息 | 消息过滤器 | 服务器触发的重新交付 | 消息存储 | 消息追溯 | 消息优先级 | 高可用性和故障转移 | 消息跟踪 | 配置 | 管理和运营工具 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Active MQ | Java、.NET、C++ 等。 | 推送模型,支持 OpenWire、STOMP、AMQP、MQTT、JMS | Exclusive Consumer 或 Exclusive Queues 可以保证排序 | 支持的 | 不支持 | 支持的 | 支持的 | 不支持 | 使用 JDBC 和高性能日志支持非常快速的持久化,例如 levelDB、kahaDB | 支持的 | 支持的 | 支持,取决于存储,如果使用 levelDB 则需要 ZooKeeper 服务器 | 不支持 | 默认配置为低级,用户需优化配置参数 | 支持的 |
Kafka | Java、Scala 等。 | 拉取模型,支持TCP | 确保分区内消息的排序 | 不支持 | 支持,带有异步生产者 | 不支持 | 支持,可以使用Kafka Streams过滤消息 | 不支持 | 高性能文件存储 | 支持的偏移量指示 | 不支持 | 支持,需要 ZooKeeper 服务器 | 不支持 | Kafka 使用键值对格式进行配置。这些值可以从文件或以编程方式提供。 | 支持,使用终端命令公开核心指标 |
RocketMQ | Java、C++、Go | 拉取模型,支持 TCP、JMS、OpenMessaging | 确保消息的严格排序,并且可以优雅地进行横向扩展 | 支持的 | 支持,具有同步模式以避免消息丢失 | 支持的 | 支持基于 SQL92 的属性过滤器表达式 | 支持的 | 高性能和低延迟的文件存储 | 支持时间戳和偏移量两种表示 | 不支持 | 支持主从模型,无需其他套件 | 支持的 | 开箱即用,用户只需注意一些配置 | 支持的、丰富的 Web 和终端命令以公开核心指标 |
3、RocketMQ的基本概念
3.1、技术架构
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的⻆⾊,⽀持分布式集群⽅式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进⾏消息投递,投递的过程⽀持快速失败并且低延迟。
- Consumer:消息消费的⻆⾊,⽀持分布式集群⽅式部署。⽀持以 push 推,pull 拉两种模式对消息进⾏消费。同时也⽀持集群⽅式和⼴播⽅式的消费,它提供实时消息订阅机制,可以满⾜⼤多数⽤户的需求。
- NameServer:NameServer 是⼀个⾮常简单的 Topic 路由注册中⼼,其⻆⾊类似 Dubbo 中的 zookeeper,⽀持 Broker 的动态注册与发现。主要包括两个功能:
- Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供⼼跳检测机制,检查 Broker 是否还存活;
- 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和⽤于客户端查询的队列信息。然后 Producer 和 Conumser 通过NameServer 就可以知道整个 Broker 集群的路由信息,从⽽进⾏消息的投递和消费。NameServer 通常也是集群的⽅式部署,各实例间相互不进⾏信息通讯。Broker 是向每⼀台 NameServer 注册⾃⼰的路由信息,所以每⼀个 NameServer 实例上⾯都保存⼀份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由信息。
- BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务⾼可⽤保证,为了实现这些功能,Broker 包含了以下⼏个重要⼦模块。
- Remoting Module:整个 Broker 的实体,负责处理来⾃ clients 端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息
- Store Service:提供⽅便简单的 API 接⼝处理消息存储到物理硬盘和查询功能。
- HA Service:⾼可⽤服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
- Index Service:根据特定的 Message key 对投递到 Broker 的消息进⾏索引服务,以提供消息的快速查询。
3.2、部署架构
RocketMQ ⽹络部署特点:
- NameServer 是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
- Broker 部署相对复杂,Broker 分为 Master 与 Slave,⼀个 Master 可以对应多个 Slave,但是⼀个 Slave 只能对应⼀个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,⾮ 0 表示 Slave。Master 也可以部署多个。每个 Broker 与NameServer 集群中的所有节点建⽴⻓连接,定时注册 Topic 信息到所有 NameServer。
- 注意:当前 RocketMQ 版本在部署架构上⽀持 ⼀Master多Slave,但只有 BrokerId = 1 的从服务器才会参与消息的读负载。
- Producer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建⽴⻓连接,且定时向 Broker Master 发送⼼跳。Producer 完全⽆状态,可集群部署。
- Consumer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master、Broker Slave 建⽴⻓连接,且定时向 Master、Slave 发送⼼跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读 I/O),以及从服务器是否可读等因素决定下⼀次是从 Master 还是 Slave 拉取。
结合部署架构图,描述集群⼯作流程:
- 启动 NameServer,NameServer 起来后监听端⼝,等待 Broker、Producer、Consumer 连上来,相当于⼀个路由控制中⼼。
- Broker 启动,跟所有的 NameServer 保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前 Broker 信息(IP + 端⼝等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
- 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时⾃动创建 Topic。
- Producer 发送消息,启动时先跟 NameServer 集群中的其中⼀台建⽴⻓连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择⼀个队列,然后与队列所在的 Broker 建⽴⻓连接从⽽向 Broker 发送消息。
- Consumer 与 Producer 类似,跟其中⼀台 NameServer 建⽴⻓连接,获取当前订阅的 Topic 存在哪些 Broker 上,然后直接跟 Broker 建⽴连接通道,开始消费消息。
4、快速开始
4.1、下载RocketMQ
下载地址:https://archive.apache.org/dist/rocketmq/
4.2、安装RocketMQ
1、在 /usr/local/
目录下创建 rocketmq
文件夹
cd /usr/local/
mkdir rocketmq
2、上传 rocketmq 安装包并使⽤ unzip
命令解压缩在 /usr/local/rocketmq
⽬录下
cd /usr/local/rocketmq/
unzip rocketmq-all-4.7.1-bin-release.zip
3、配置 rocketmq 的环境变量
vim /etc/profile
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH
4、让环境变量⽣效
source /etc/profile
5、修改 bin/runserver.sh
⽂件,由于 RocketMQ 默认设置的 JVM 内存为4G,但个人服务器⼀般没有这么大的内存,因此调整为 512 m。
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
6、修改 bin/runbroker.sh
⽂件,将默认 8G 内存修改为 512 m。
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
3、修改 bin/tools.sh
文件
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
vim tools.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
4.3、启动NameServer
1、启动 RocketMQ 服务需要先启动 NameServer:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqnamesrv -n 1.117.115.99:9876 &
2、查看 bin/nohup.out
显示如下内容表示启动成功:
4.4、启动Broker
1、在 conf/broker.conf
⽂件中加⼊如下配置,开启⾃动创建 Topic 功能:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/conf
vim broker.conf
autoCreateTopicEnable=true
2、以静默⽅式启动Broker:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqbroker -n 1.117.115.99:9876 &
3、查看 bin/nohup.out
日志,显示如下内容表示启动成功:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
cat nohup.out
4.5、使⽤发送和接收消息验证MQ
1、配置 nameserver 的环境变量
在发送/接收消息之前,需要告诉客户端 nameserver 的位置。配置环境变量 NAMESRV_ADDR:
vim /etc/profile
export NAMESRV_ADDR=1.117.115.99:9876
# 让环境变量生效
source /etc/profile
2、使⽤ bin/tools.sh
⼯具验证消息的发送,默认会发送 1000 条消息:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
./tools.sh org.apache.rocketmq.example.quickstart.Producer
3、发送的消息⽇志:
4、使⽤ bin/tools.sh
⼯具验证消息的接收:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
5、接收到的消息:
4.6、关闭RocketMQ
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
# 1.关闭NameServer
./mqshutdown namesrv
# 2.关闭Broker
./mqshutdown broker
5、搭建RocketMQ集群
5.1、RocketMQ集群模式
为了追求更好的性能,RocketMQ 的最佳实践⽅式都是在集群模式下完成。RocketMQ 官⽅提供了三种集群搭建⽅式:
1、2主2从的异步通信⽅式
使⽤异步⽅式进⾏主从之间的数据复制,发送消息的吞吐量⼤,但可能会丢消息。
使⽤ conf/2m-2s-async
⽂件夹内的配置⽂件做集群配置。
2、2主2从的同步通信⽅式
使⽤同步⽅式进⾏主从之间的数据复制,保证消息安全投递,不会丢失,但会影响发送消息的吞吐量。
使⽤ conf/2m-2s-sync
⽂件夹内的配置⽂件做集群配置。
3、2主⽆从⽅式
会存在单点故障,且读的性能没有前两种⽅式好。
使⽤ conf/2m-noslave
⽂件夹内的配置⽂件做集群配置。
Dledger⾼可⽤集群
上述三种官⽅提供的集群没办法实现⾼可⽤,即在 master 节点挂掉后,slave 节点没办法⾃动被选举为新的 master,需要⼈⼯实现。
所以 RocketMQ 在 4.5 版本之后引⼊了第三⽅的 Dleger ⾼可⽤集群。
5.2、搭建主从异步集群
准备两台Linux服务器
两台 Linux 服务器中 nameserver 和 broker 之间的关系如下:
服务器 | 服务器IP | NameServer | Broker节点部署 |
---|---|---|---|
服务器1 | 119.91.21.45 | 119.91.21.45:9876 | broker-a(master)、broker-b-s(slave) |
服务器2 | 1.117.115.99 | 1.117.115.99:9876 | broker-b(master)、broker-a-s(slave) |
两台服务器都需要安装 jdk 和 rocketmq,安装步骤参考上⼀章节。
启动两台NameServer
nameserver 是⼀个轻量级的注册中⼼,broker 把⾃⼰的信息注册到 nameserver 上。⽽且 nameserver 是⽆状态的,直接启动即可。两台 nameserver 之间不需要进行通信,⽽是被请求⽅来关联两台 nameserver 的地址。
在每台服务器的
bin
⽬录下执⾏如下命令:
1、服务器1:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqnamesrv -n 119.91.21.45:9876 &
2、服务器2:
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqnamesrv -n 1.117.115.99:9876 &
配置Broker
broker-a
、broker-b-s
这两台 broker 是配置在服务器1上,broker-b
、broker-a-s
这两台 broker 是配置在服务器2上。
需要修改每台 broker 的配置⽂件。
注意:同⼀台服务器上的两个 broker 保存路径不能⼀样
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store-slave
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store-slave/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store-slave/consumequeue
mkdir /usr/local/rocketmq/store/index
mkdir /usr/local/rocketmq/store-slave/index
broker-a 的 master 节点
在服务器 1 上(119.91.21.45),进⼊到 conf/2m-2s-async
⽂件夹内,修改 broker-a.properties
⽂件。
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=119.91.21.45
# broker的id,0表示master,非0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
broker-a 的 slave节点
在服务器 2 上(1.117.115.99),进⼊到 conf/2m-2s-async
⽂件夹内,修改 broker-a-s.properties
⽂件。
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=1.117.115.99
# broker的id,0表示master,非0表示slave
brokerId=1
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为slave
brokerRole=SLAVE
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=11011
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-slave/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
broker-b 的 master 节点
在服务器 2 上(1.117.115.99),进⼊到 conf/2m-2s-async
⽂件夹内,修改 broker-b.properties
⽂件。
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=1.117.115.99
# broker的id,0表示master,非0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
broker-b 的 slave 节点
在服务器 1 上(119.91.21.45),进⼊到 conf/2m-2s-async
⽂件夹内,修改 broker-b-s.properties
⽂件。
# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=119.91.21.45
# broker的id,0表示master,非0表示slave
brokerId=1
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为slave
brokerRole=SLAVE
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=11011
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-slave/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
启动Broker
1、在服务器 1 中(119.91.21.45)启动 broker-a(master)和 broker-b-s(slave)
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
2、在服务器 2 中(1.117.115.99)启动 broker-b(master)和 broker-a-s(slave)
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
5.3、验证集群
使⽤ RocketMQ 提供的 tools ⼯具验证集群是否正常⼯作。
1、在服务器 1 和 服务器 2 上配置环境变量,⽤于被 tools 中的⽣产者和消费者程序读取该变量。
vim /etc/profile
export NAMESRV_ADDR='119.91.21.45:9876;1.117.115.99:9876'
# 让环境变量生效
source /etc/profile
2、启动生产者
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
./tools.sh org.apache.rocketmq.example.quickstart.Producer
3、启动消费者
cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
5.4、安装可视化管理控制平台
RocketMQ 没有提供可视化管理控制平台,可以使⽤第三⽅管理控制平台:https://github.com/apache/rocketmq-dashboard
将其安装在服务器 2 上面(1.117.115.99)
1、在 /usr/local/rocketmq 目录下,创建 rocketmq-dashboard
文件夹
cd /usr/local/rocketmq
mkdir rocketmq-dashboard
2、下载管理控制平台,将其放在 rocketmq-dashboard
目录下,并将其进行解压缩
cd /usr/local/rocketmq/rocketmq-dashboard/
unzip rocketmq-dashboard-master.zip
cd rocketmq-dashboard-master/
3、修改 /usr/local/rocketmq/rocketmq-dashboard/rocketmq-dashboard-master/src/main/resources
目录下的 application.yml
配置文件中的nameserver 地址:
vim /usr/local/rocketmq/rocketmq-dashboard/rocketmq-dashboard-master/src/main/resources/application.yml
4、在 rocketmq-dashboard-master
目录下,执⾏ maven 命令进⾏打包
5、运行 jar 包
cd /usr/local/rocketmq/rocketmq-dashboard/
nohup java -jar rocketmq-dashboard-1.0.1.jar
6、访问所在服务器的 8080 端⼝,查看集群界⾯,可以看到之前部署的集群
6、消息示例
6.1、构建Java基础环境
在 maven 项⽬中构建出 RocketMQ 消息示例的基础环境,即创建⽣产者程序和消费者程序。通过⽣产者和消费者了解 RocketMQ 操作消息的原⽣ API。
1、引入依赖
<!-- 注意依赖的版本要和服务器版本一致 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
2、生产者
public class BaseProducer {
public static void main(String[] args) throws Exception {
// 1.创建生产者
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");
// 2.指定nameserver的地址
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.启动生产者
producer.start();
// 4.创建消息
for (int i = 0; i < 100000; i++) {
/**
* 参数一:消息的主题Topic
* 参数二:消息的tag
* 参数三:消息的内容
*/
Message message = new Message("MyTopic1", "TagA", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
// 5.发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
// 6.关闭生产者
producer.shutdown();
}
}
3、消费者
public class BaseConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
// 2.指定nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.订阅主题topic和过滤消息用的tag表达式
consumer.subscribe("MyTopic1", "*");
// 4.创建一个监听器,当broker把消息推过来时调用
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到的消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
细节
6.2、简单消息示例
简单消息分成三种:同步消息、异步消息、单向消息。
同步消息
⽣产者发送消息后,必须等待 broker 返回信息才能继续之后的业务逻辑,在 broker 返回信息之前,⽣产者阻塞等待。
应⽤场景:如重要通知消息、短信通知、短信营销系统等。
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建生产者,并指定生产者所属的生产组
DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
// 2.指定nameserver的地址
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.启动生产者
producer.start();
for (int i = 0; i < 100; i++) {
// 4.创建消息
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 6.关闭生产者
producer.shutdown();
}
}
异步消息
⽣产者发完消息后,不需要等待 broker 的回信,可以直接执⾏后面的业务逻辑。⽣产者提供⼀个回调函数供 broker 调⽤,体现了异步的⽅式。
异步传输⼀般⽤于对响应时间比较敏感的业务场景。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 1.创建生产者,并指定生产者所属的生产组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 2.指定nameserver的地址
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.启动生产者
producer.start();
// 异步消息发送失败后,重试的次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
// 4.创建消息
Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.异步发送消息,需要创建回调函数
producer.send(msg, new SendCallback() {
// 发送成功的回调函数
@Override
public void onSuccess(SendResult sendResult) {
// 让计数减一
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
// 发送失败的回调函数
@Override
public void onException(Throwable e) {
// 让计数减一
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("=============");
// 等待计数归零,最多等待5秒
countDownLatch.await(5, TimeUnit.SECONDS);
// 关闭生产者
producer.shutdown();
}
}
单向消息
⽣产者发送完消息后不需要等待任何回复,直接进⾏之后的业务逻辑,单向传输⽤于需要中等可靠性的情况,例如⽇志收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 1.创建生产者,并指定生产者所属的生产组
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 2.指定nameserver的地址
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.启动生产者
producer.start();
for (int i = 0; i < 100; i++) {
// 4.创建消息
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.发送单向消息
producer.sendOneway(msg);
}
Thread.sleep(5000);
// 6.关闭生产者
producer.shutdown();
}
}
6.3、顺序消息
顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息分成两种:局部顺序和全局顺序。
局部顺序
局部顺序指的是消费者消费某个 topic 的某个队列中的消息是顺序的。消费者使⽤ MessageListenerOrderly
类做消息监听,实现局部顺序。
生产者
public class OrderProducer {
public static void main(String[] args) throws Exception {
// 1.创建生产者,并指定生产者所属的生产组
MQProducer producer = new DefaultMQProducer("example_group_name");
/**
* 2.指定nameserver的地址
* 名字服务器的地址已经在环境变量中配置好了:NAMESRV_ADDR=172.16.253.101:9876
*/
// 3.启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
int orderId = i;
for (int j = 0; j < 5; j++) {
/**
* 4.创建消息
* 00,01,02,03,04、 10,11,12,13,14、 20,21,22,23,24
*/
Message msg = new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId, ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 第二个参数表示队列选择器
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param mqs 当前主题topic下的所有队列
* @param msg
* @param arg producer.send()方法的第三个参数,这里是orderId
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
}
}
消费者
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
// 2.指定nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 消费者从第一个队列的offset开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3.订阅主题topic和过滤消息用的tag表达式
consumer.subscribe("OrderTopicTest", "*");
/**
* 遵循顺序消费的效果
*/
// 4.创建一个顺序消费的监听器(MessageListenerOrderly)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
全局顺序
消费者要想消费全部消息都是顺序的,那么只能通过⼀个 topic 只有⼀个队列才能实现,这种应⽤场景较少,且性能较差。
乱序消费
消费者消费消息不需要关注消息的顺序。消费者使⽤ MessageListenerConcurrently 类做消息监听。
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
// 2.指定nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 消费者从第一个队列的offset开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3.订阅主题topic和过滤消息用的tag表达式
consumer.subscribe("OrderTopicTest", "*");
/**
* 4.实现乱序消费的效果
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
6.4、广播消息
⼴播消息是向主题(topic)的所有订阅者发送消息。订阅同⼀个 topic 的多个消费者,都能全量收到⽣产者发送的所有消息。
消费者
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
// 1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
// 2.指定nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 让消费者从第一个队列的offset开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 将消息模式设置成广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息内容:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
生产者
public class BroadcastProducer {
public static void main(String[] args) throws Exception {
// 1.创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 2.指定nameserver的地址
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.启动生产者
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest", "TagA", "OrderID188", ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
6.5、延迟消息
延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递。
生产者
延迟等级
RocketMQ 设计了 18 个延迟等级,分别是:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
等级 3 对应的是 10s。系统为这 18 个等级配置了 18 个 topic,⽤于实现延迟队列的效果。
在商业版 RocketMQ 中,我们不仅可以设置延迟等级,还可以设置具体的延迟时间;但是在社区版 RocketMQ 中,只能设置延迟等级。
代码示例:
public class ScheduledProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 将消息发送到等级3的topic,等待10s后会推送给消费者
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
消费者
public class ScheduledConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// 指定nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// getBornTimestamp:获取当前消息的生产时间
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
6.6、批量消息
批量发送消息提⾼了传递⼩消息的性能。
使用批量消息
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
producer.shutdown();
}
}
超出限制的批量消息
官⽅建议批量消息的总⼤⼩不应超过1m,实际不应超过4m。如果超过 4m 的批量消息需要进⾏分批处理,同时设置 broker 的配置参数为 4m(在 broker 的配置⽂件中修改:maxMessageSize=4194304
)
public class MaxBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
producer.send(messages);
producer.shutdown();
}
}
解决方法:
public class MaxBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
}
producer.shutdown();
}
}
public class ListSplitter implements Iterator<List<Message>> {
private int sizeLimit = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20;
if (tmpSize > sizeLimit) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
6.7、过滤消息
在⼤多数情况下,标签是⼀种简单⽽有⽤的设计,可以⽤来选择您想要的消息。
tag过滤的⽣产者
public class TagProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("TagFilterTest", tags[i % tags.length], "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
tag过滤的消费者
public class TagConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 消费者只能订阅到TagFilterTest主题里面,打上了TagA或TagC标签的消息
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签,这可能不适⽤于复杂的场景。在这种情况下,我们可以使⽤ SQL 表达式来过滤掉消息。
使⽤SQL过滤
SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下,可以实现⼀些有趣的逻辑。这是⼀个例⼦:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
语法
RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它。
1. 数值⽐较,如`>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符⽐较,如`=`, `<>`, `IN`;
3. `IS NULL`或`IS NOT NULL`;
4. 逻辑`AND`, `OR`, `NOT`;
常量类型有:
1. 数字,如 123、3.1415;
2. 字符,如'abc',必须⽤单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE`或`FALSE`;
注意:只有推送模式的消费者才可以使⽤ SQL 过滤。拉取模式是⽤不了的。
SQL过滤的⽣产者
public class SQLProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 给消息设置一个用户属性(key:value)
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
SQL过滤的消费者
public class SQLConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
/**
* TAGS不为空,并且TAGS是属于TagA或TagB标签的,并且a属性不为空,并且a属性的值在0~3之间
* 注意:要想使用SQL过滤,需要在broker的配置文件中添加enablePropertyFilter=true的参数
*/
consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
6.8、事务消息
事务消息的定义
它可以被认为是⼀个两阶段的提交消息实现,以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。
事务消息的三种状态:
TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
TransactionStatus.Unknown:中间状态,表示需要 MQ 回查才能确定状态。
事务消息的实现流程:
生产者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 生产者发送事务型消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
本地事务处理-TransactionListener
public class TransactionListenerImpl implements TransactionListener {
/**
* 执行本地事务
*
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagA")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagB")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 检查本地事务
*
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String tags = msg.getTags();
if (StringUtils.contains(tags, "TagC")) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.contains(tags, "TagD")) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
}
消费者
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
// 1.创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
// 2.指明nameserver的地址
consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
// 3.订阅主题:topic 和过滤消息用的tag表达式
consumer.subscribe("TopicTest", "*");
// 4.创建一个监听器,当broker把消息推过来时调用
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// System.out.println("收到的消息:"+new String(msg.getBody()));
System.out.println("收到的消息:" + msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5.启动消费者
consumer.start();
System.out.println("消费者已启动");
}
}
使用限制
- 事务性消息不⽀持延迟消息和批量消息。
- 为了避免单个消息被检查太多次而导致半队列消息堆积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
- 事务消息将在 Broker 配置文件中的参数
transactionMsgTimeout
这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性CHECK_IMMUNITY_TIME_IN_SECONDS
来改变这个限制,该参数优先于transactionMsgTimeout
参数。 - ⼀个事务性消息可能不⽌⼀次被检查或消费。
- 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使⽤同步双写机制。
- 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务型消息允许反向查询。MQ 服务器能通过其⽣产者 ID 查询到消费者。