RocketMQ 第一章

news2024/9/20 16:53:40

RocketMQ 第一章

1、什么是MQ

Message Queue(消息队列),从字⾯上理解:⾸先它是⼀个队列。FIFO 先进先出的数据结构 —— 队列。消息队列就是所谓的存放消息的队列。

消息队列解决的不是存放消息的队列的⽬的,而是解决所谓的通信问题。

Untitled

⽐如以电商订单系统为例,如果各服务之间使⽤同步通信,不仅耗时较久,且过程中容易受到⽹络波动的影响,不能保证⾼成功率。因此,我们可以使⽤异步的通信⽅式对架构进⾏改造。

Untitled

使⽤异步的通信⽅式对模块间的调⽤进⾏解耦,可以快速提升系统的吞吐量。上游执⾏完消息的发送业务后⽴即获得结果,下游多个服务订阅到消息后各⾃进行消费。通过消息队列,屏蔽底层的通信协议,使得解耦和并⾏消费得以实现。

2、RocketMQ介绍

2.1、RocketMQ的由来

随着使⽤消息队列和虚拟主题的增加,阿⾥巴巴团队最早使⽤的 ActiveMQ IO 模块达到了瓶颈。为此尽⼒通过节流、断路器或降级来解决这个问题,但效果不佳。所以开始关注当时流⾏的消息传递解决⽅案 Kafka。不幸的是,Kafka ⽆法满⾜其要求,尤其是在低延迟和⾼可靠性⽅⾯。在这种情况下,决定发明⼀种新的消息传递引擎来处理更⼴泛的⽤例,从传统的发布/订阅场景到⼤容量实时零丢失交易系统。⽬前 RocketMQ 已经开源给 Apache 基⾦会。如今,已有 100 多家公司在其业务中使⽤开源版本的 RocketMQ。

2.2、RocketMQ vs ActiveMQ vs Kafka

消息产品客户端 SDK协议和规范订购信息预定消息批量消息广播消息消息过滤器服务器触发的重新交付消息存储消息追溯消息优先级高可用性和故障转移消息跟踪配置管理和运营工具
Active MQJava、.NET、C++ 等。推送模型,支持 OpenWire、STOMP、AMQP、MQTT、JMSExclusive Consumer 或 Exclusive Queues 可以保证排序支持的不支持支持的支持的不支持使用 JDBC 和高性能日志支持非常快速的持久化,例如 levelDB、kahaDB支持的支持的支持,取决于存储,如果使用 levelDB 则需要 ZooKeeper 服务器不支持默认配置为低级,用户需优化配置参数支持的
KafkaJava、Scala 等。拉取模型,支持TCP确保分区内消息的排序不支持支持,带有异步生产者不支持支持,可以使用Kafka Streams过滤消息不支持高性能文件存储支持的偏移量指示不支持支持,需要 ZooKeeper 服务器不支持Kafka 使用键值对格式进行配置。这些值可以从文件或以编程方式提供。支持,使用终端命令公开核心指标
RocketMQJava、C++、Go拉取模型,支持 TCP、JMS、OpenMessaging确保消息的严格排序,并且可以优雅地进行横向扩展支持的支持,具有同步模式以避免消息丢失支持的支持基于 SQL92 的属性过滤器表达式支持的高性能和低延迟的文件存储支持时间戳和偏移量两种表示不支持支持主从模型,无需其他套件支持的开箱即用,用户只需注意一些配置支持的、丰富的 Web 和终端命令以公开核心指标

3、RocketMQ的基本概念

3.1、技术架构

Untitled

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的⻆⾊,⽀持分布式集群⽅式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进⾏消息投递,投递的过程⽀持快速失败并且低延迟。
  • Consumer:消息消费的⻆⾊,⽀持分布式集群⽅式部署。⽀持以 push 推,pull 拉两种模式对消息进⾏消费。同时也⽀持集群⽅式和⼴播⽅式的消费,它提供实时消息订阅机制,可以满⾜⼤多数⽤户的需求。
  • NameServer:NameServer 是⼀个⾮常简单的 Topic 路由注册中⼼,其⻆⾊类似 Dubbo 中的 zookeeper,⽀持 Broker 的动态注册与发现。主要包括两个功能:
    • Broker 管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供⼼跳检测机制,检查 Broker 是否还存活;
    • 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和⽤于客户端查询的队列信息。然后 Producer 和 Conumser 通过NameServer 就可以知道整个 Broker 集群的路由信息,从⽽进⾏消息的投递和消费。NameServer 通常也是集群的⽅式部署,各实例间相互不进⾏信息通讯。Broker 是向每⼀台 NameServer 注册⾃⼰的路由信息,所以每⼀个 NameServer 实例上⾯都保存⼀份完整的路由信息。当某个 NameServer 因某种原因下线了,Broker 仍然可以向其它 NameServer 同步其路由信息,Producer、Consumer 仍然可以动态感知 Broker 的路由信息。
  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务⾼可⽤保证,为了实现这些功能,Broker 包含了以下⼏个重要⼦模块。
    • Remoting Module:整个 Broker 的实体,负责处理来⾃ clients 端的请求。
    • Client Manager:负责管理客户端(Producer/Consumer)和维护 Consumer 的 Topic 订阅信息
    • Store Service:提供⽅便简单的 API 接⼝处理消息存储到物理硬盘和查询功能。
    • HA Service:⾼可⽤服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
    • Index Service:根据特定的 Message key 对投递到 Broker 的消息进⾏索引服务,以提供消息的快速查询。

Untitled

3.2、部署架构

Untitled

RocketMQ ⽹络部署特点:

  • NameServer 是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
  • Broker 部署相对复杂,Broker 分为 Master 与 Slave,⼀个 Master 可以对应多个 Slave,但是⼀个 Slave 只能对应⼀个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,⾮ 0 表示 Slave。Master 也可以部署多个。每个 Broker 与NameServer 集群中的所有节点建⽴⻓连接,定时注册 Topic 信息到所有 NameServer。
    • 注意:当前 RocketMQ 版本在部署架构上⽀持 ⼀Master多Slave,但只有 BrokerId = 1 的从服务器才会参与消息的读负载。
  • Producer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建⽴⻓连接,且定时向 Broker Master 发送⼼跳。Producer 完全⽆状态,可集群部署。
  • Consumer 与 NameServer 集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master、Broker Slave 建⽴⻓连接,且定时向 Master、Slave 发送⼼跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,消费者在向 Master 拉取消息时,Master 服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读 I/O),以及从服务器是否可读等因素决定下⼀次是从 Master 还是 Slave 拉取。

结合部署架构图,描述集群⼯作流程:

  • 启动 NameServer,NameServer 起来后监听端⼝,等待 Broker、Producer、Consumer 连上来,相当于⼀个路由控制中⼼。
  • Broker 启动,跟所有的 NameServer 保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前 Broker 信息(IP + 端⼝等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  • 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时⾃动创建 Topic。
  • Producer 发送消息,启动时先跟 NameServer 集群中的其中⼀台建⽴⻓连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择⼀个队列,然后与队列所在的 Broker 建⽴⻓连接从⽽向 Broker 发送消息。
  • Consumer 与 Producer 类似,跟其中⼀台 NameServer 建⽴⻓连接,获取当前订阅的 Topic 存在哪些 Broker 上,然后直接跟 Broker 建⽴连接通道,开始消费消息。

4、快速开始

4.1、下载RocketMQ

下载地址:https://archive.apache.org/dist/rocketmq/

Untitled

4.2、安装RocketMQ

1、在 /usr/local/ 目录下创建 rocketmq 文件夹

cd /usr/local/

mkdir rocketmq

2、上传 rocketmq 安装包并使⽤ unzip 命令解压缩在 /usr/local/rocketmq ⽬录下

cd /usr/local/rocketmq/

unzip rocketmq-all-4.7.1-bin-release.zip

3、配置 rocketmq 的环境变量

vim /etc/profile

export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH

Untitled

4、让环境变量⽣效

source /etc/profile

5、修改 bin/runserver.sh ⽂件,由于 RocketMQ 默认设置的 JVM 内存为4G,但个人服务器⼀般没有这么大的内存,因此调整为 512 m。

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

vim runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

Untitled

6、修改 bin/runbroker.sh ⽂件,将默认 8G 内存修改为 512 m。

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

vim runbroker.sh

JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"

Untitled

3、修改 bin/tools.sh 文件

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

vim tools.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

Untitled

4.3、启动NameServer

1、启动 RocketMQ 服务需要先启动 NameServer:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqnamesrv -n 1.117.115.99:9876 &

2、查看 bin/nohup.out 显示如下内容表示启动成功:

Untitled

4.4、启动Broker

1、在 conf/broker.conf ⽂件中加⼊如下配置,开启⾃动创建 Topic 功能:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/conf

vim broker.conf

autoCreateTopicEnable=true

Untitled

2、以静默⽅式启动Broker:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqbroker -n 1.117.115.99:9876 &

3、查看 bin/nohup.out 日志,显示如下内容表示启动成功:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

cat nohup.out

Untitled

4.5、使⽤发送和接收消息验证MQ

1、配置 nameserver 的环境变量

在发送/接收消息之前,需要告诉客户端 nameserver 的位置。配置环境变量 NAMESRV_ADDR:

vim /etc/profile

export NAMESRV_ADDR=1.117.115.99:9876

# 让环境变量生效
source /etc/profile

2、使⽤ bin/tools.sh ⼯具验证消息的发送,默认会发送 1000 条消息:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

./tools.sh org.apache.rocketmq.example.quickstart.Producer

3、发送的消息⽇志:

Untitled

4、使⽤ bin/tools.sh ⼯具验证消息的接收:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

5、接收到的消息:

Untitled

4.6、关闭RocketMQ

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

# 1.关闭NameServer
./mqshutdown namesrv
# 2.关闭Broker
./mqshutdown broker

Untitled

5、搭建RocketMQ集群

5.1、RocketMQ集群模式

Untitled

为了追求更好的性能,RocketMQ 的最佳实践⽅式都是在集群模式下完成。RocketMQ 官⽅提供了三种集群搭建⽅式:

1、2主2从的异步通信⽅式

使⽤异步⽅式进⾏主从之间的数据复制,发送消息的吞吐量⼤,但可能会丢消息。

使⽤ conf/2m-2s-async ⽂件夹内的配置⽂件做集群配置。

2、2主2从的同步通信⽅式

使⽤同步⽅式进⾏主从之间的数据复制,保证消息安全投递,不会丢失,但会影响发送消息的吞吐量。

使⽤ conf/2m-2s-sync ⽂件夹内的配置⽂件做集群配置。

3、2主⽆从⽅式

会存在单点故障,且读的性能没有前两种⽅式好。

使⽤ conf/2m-noslave ⽂件夹内的配置⽂件做集群配置。

Dledger⾼可⽤集群

上述三种官⽅提供的集群没办法实现⾼可⽤,即在 master 节点挂掉后,slave 节点没办法⾃动被选举为新的 master,需要⼈⼯实现。

所以 RocketMQ 在 4.5 版本之后引⼊了第三⽅的 Dleger ⾼可⽤集群。

5.2、搭建主从异步集群

准备两台Linux服务器

两台 Linux 服务器中 nameserver 和 broker 之间的关系如下:

服务器服务器IPNameServerBroker节点部署
服务器1119.91.21.45119.91.21.45:9876broker-a(master)、broker-b-s(slave)
服务器21.117.115.991.117.115.99:9876broker-b(master)、broker-a-s(slave)

两台服务器都需要安装 jdk 和 rocketmq,安装步骤参考上⼀章节。

启动两台NameServer

nameserver 是⼀个轻量级的注册中⼼,broker 把⾃⼰的信息注册到 nameserver 上。⽽且 nameserver 是⽆状态的,直接启动即可。两台 nameserver 之间不需要进行通信,⽽是被请求⽅来关联两台 nameserver 的地址。

在每台服务器的 bin ⽬录下执⾏如下命令:

1、服务器1:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqnamesrv -n 119.91.21.45:9876 &

2、服务器2:

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqnamesrv -n 1.117.115.99:9876 &

配置Broker

broker-abroker-b-s 这两台 broker 是配置在服务器1上,broker-bbroker-a-s 这两台 broker 是配置在服务器2上。
需要修改每台 broker 的配置⽂件。

注意:同⼀台服务器上的两个 broker 保存路径不能⼀样

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store-slave
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store-slave/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store-slave/consumequeue
mkdir /usr/local/rocketmq/store/index
mkdir /usr/local/rocketmq/store-slave/index

broker-a 的 master 节点

在服务器 1 上(119.91.21.45),进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-a.properties ⽂件。

# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=119.91.21.45
# broker的id,0表示master,非0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536
# commitLog每个⽂件的⼤⼩默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

broker-a 的 slave节点

在服务器 2 上(1.117.115.99),进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-a-s.properties ⽂件。


# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-a
# broker所在服务器的ip
brokerIP1=1.117.115.99
# broker的id,0表示master,非0表示slave
brokerId=1
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为slave
brokerRole=SLAVE
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=11011
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-slave/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536

broker-b 的 master 节点

在服务器 2 上(1.117.115.99),进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-b.properties ⽂件。

# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=1.117.115.99
# broker的id,0表示master,非0表示slave
brokerId=0
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为master
brokerRole=ASYNC_MASTER
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=10911
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536

broker-b 的 slave 节点

在服务器 1 上(119.91.21.45),进⼊到 conf/2m-2s-async ⽂件夹内,修改 broker-b-s.properties ⽂件。

# 所属集群名称
brokerClusterName=DefaultCluster
# broker名字
brokerName=broker-b
# broker所在服务器的ip
brokerIP1=119.91.21.45
# broker的id,0表示master,非0表示slave
brokerId=1
# 删除⽂件时间点,默认在凌晨4点
deleteWhen=04
# ⽂件保留时间为48⼩时
fileReservedTime=48
# broker的⻆⾊为slave
brokerRole=SLAVE
# 使⽤异步刷盘的⽅式
flushDiskType=ASYNC_FLUSH
# 名称服务器的地址列表
namesrvAddr=119.91.21.45:9876;1.117.115.99:9876
# 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
defaultTopicQueueNums=4
# 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# broker对外服务的监听端⼝
listenPort=11011
# abort⽂件存储路径
abortFile=/usr/local/rocketmq/store-slave/abort
# 消息存储路径
storePathRootDir=/usr/local/rocketmq/store-slave
# commitLog存储路径
storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store-slave/index
# checkpoint⽂件存储路径
storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
# 限制的消息⼤⼩
maxMessageSize=65536

启动Broker

1、在服务器 1 中(119.91.21.45)启动 broker-a(master)和 broker-b-s(slave)

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &

2、在服务器 2 中(1.117.115.99)启动 broker-b(master)和 broker-a-s(slave)

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &

nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &

5.3、验证集群

使⽤ RocketMQ 提供的 tools ⼯具验证集群是否正常⼯作。

1、在服务器 1 和 服务器 2 上配置环境变量,⽤于被 tools 中的⽣产者和消费者程序读取该变量。

vim /etc/profile

export NAMESRV_ADDR='119.91.21.45:9876;1.117.115.99:9876'

# 让环境变量生效
source /etc/profile

2、启动生产者

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

./tools.sh org.apache.rocketmq.example.quickstart.Producer

3、启动消费者

cd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin/

./tools.sh org.apache.rocketmq.example.quickstart.Consumer

5.4、安装可视化管理控制平台

RocketMQ 没有提供可视化管理控制平台,可以使⽤第三⽅管理控制平台:https://github.com/apache/rocketmq-dashboard

将其安装在服务器 2 上面(1.117.115.99)

1、在 /usr/local/rocketmq 目录下,创建 rocketmq-dashboard 文件夹

cd /usr/local/rocketmq

mkdir rocketmq-dashboard

2、下载管理控制平台,将其放在 rocketmq-dashboard 目录下,并将其进行解压缩

cd /usr/local/rocketmq/rocketmq-dashboard/

unzip rocketmq-dashboard-master.zip

cd rocketmq-dashboard-master/

3、修改 /usr/local/rocketmq/rocketmq-dashboard/rocketmq-dashboard-master/src/main/resources目录下的 application.yml 配置文件中的nameserver 地址:

vim /usr/local/rocketmq/rocketmq-dashboard/rocketmq-dashboard-master/src/main/resources/application.yml

Untitled

4、在 rocketmq-dashboard-master 目录下,执⾏ maven 命令进⾏打包

5、运行 jar 包

cd /usr/local/rocketmq/rocketmq-dashboard/

nohup java -jar rocketmq-dashboard-1.0.1.jar

6、访问所在服务器的 8080 端⼝,查看集群界⾯,可以看到之前部署的集群

Untitled

6、消息示例

6.1、构建Java基础环境

在 maven 项⽬中构建出 RocketMQ 消息示例的基础环境,即创建⽣产者程序和消费者程序。通过⽣产者和消费者了解 RocketMQ 操作消息的原⽣ API。

Untitled

1、引入依赖

<!-- 注意依赖的版本要和服务器版本一致 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

2、生产者

public class BaseProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");
        // 2.指定nameserver的地址
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.启动生产者
        producer.start();
        // 4.创建消息
        for (int i = 0; i < 100000; i++) {
            /**
             * 参数一:消息的主题Topic
             * 参数二:消息的tag
             * 参数三:消息的内容
             */
            Message message = new Message("MyTopic1", "TagA", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
            // 5.发送消息
            SendResult sendResult = producer.send(message);
            System.out.println(sendResult);
        }
        // 6.关闭生产者
        producer.shutdown();
    }

}

3、消费者

public class BaseConsumer {

    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
        // 2.指定nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.订阅主题topic和过滤消息用的tag表达式
        consumer.subscribe("MyTopic1", "*");
        // 4.创建一个监听器,当broker把消息推过来时调用
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("收到的消息:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }

}

细节

Untitled

6.2、简单消息示例

简单消息分成三种:同步消息、异步消息、单向消息。

同步消息

⽣产者发送消息后,必须等待 broker 返回信息才能继续之后的业务逻辑,在 broker 返回信息之前,⽣产者阻塞等待。

应⽤场景:如重要通知消息、短信通知、短信营销系统等。

Untitled

public class SyncProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 1.创建生产者,并指定生产者所属的生产组
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");
        // 2.指定nameserver的地址
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 4.创建消息
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 5.发送消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 6.关闭生产者
        producer.shutdown();
    }

}

异步消息

⽣产者发完消息后,不需要等待 broker 的回信,可以直接执⾏后面的业务逻辑。⽣产者提供⼀个回调函数供 broker 调⽤,体现了异步的⽅式。

异步传输⼀般⽤于对响应时间比较敏感的业务场景。

Untitled

public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建生产者,并指定生产者所属的生产组
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 2.指定nameserver的地址
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.启动生产者
        producer.start();
        // 异步消息发送失败后,重试的次数
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                // 4.创建消息
                Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 5.异步发送消息,需要创建回调函数
                producer.send(msg, new SendCallback() {
                    // 发送成功的回调函数
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        // 让计数减一
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    // 发送失败的回调函数
                    @Override
                    public void onException(Throwable e) {
                        // 让计数减一
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("=============");
        // 等待计数归零,最多等待5秒
        countDownLatch.await(5, TimeUnit.SECONDS);
        // 关闭生产者
        producer.shutdown();
    }

}

单向消息

⽣产者发送完消息后不需要等待任何回复,直接进⾏之后的业务逻辑,单向传输⽤于需要中等可靠性的情况,例如⽇志收集。

Untitled

public class OnewayProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建生产者,并指定生产者所属的生产组
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 2.指定nameserver的地址
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.启动生产者
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 4.创建消息
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 5.发送单向消息
            producer.sendOneway(msg);
        }

        Thread.sleep(5000);
        // 6.关闭生产者
        producer.shutdown();
    }

}

6.3、顺序消息

顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息分成两种:局部顺序和全局顺序。

局部顺序

局部顺序指的是消费者消费某个 topic 的某个队列中的消息是顺序的。消费者使⽤ MessageListenerOrderly 类做消息监听,实现局部顺序。

生产者

Untitled

public class OrderProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建生产者,并指定生产者所属的生产组
        MQProducer producer = new DefaultMQProducer("example_group_name");
        /**
         * 2.指定nameserver的地址
         * 名字服务器的地址已经在环境变量中配置好了:NAMESRV_ADDR=172.16.253.101:9876
         */
        // 3.启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            int orderId = i;

            for (int j = 0; j < 5; j++) {
                /**
                 * 4.创建消息
                 * 00,01,02,03,04、 10,11,12,13,14、 20,21,22,23,24
                 */
                Message msg = new Message("OrderTopicTest", "order_" + orderId, "KEY" + orderId, ("order_" + orderId + " step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 第二个参数表示队列选择器
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    /**
                     *
                     * @param mqs 当前主题topic下的所有队列
                     * @param msg
                     * @param arg producer.send()方法的第三个参数,这里是orderId
                     * @return
                     */
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }
        }
        producer.shutdown();
    }

}

消费者

Untitled

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        // 2.指定nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 消费者从第一个队列的offset开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3.订阅主题topic和过滤消息用的tag表达式
        consumer.subscribe("OrderTopicTest", "*");

        /**
         * 遵循顺序消费的效果
         */
        // 4.创建一个顺序消费的监听器(MessageListenerOrderly)
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

}

全局顺序

消费者要想消费全部消息都是顺序的,那么只能通过⼀个 topic 只有⼀个队列才能实现,这种应⽤场景较少,且性能较差。

Untitled

乱序消费

消费者消费消息不需要关注消息的顺序。消费者使⽤ MessageListenerConcurrently 类做消息监听。

public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        // 2.指定nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 消费者从第一个队列的offset开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 3.订阅主题topic和过滤消息用的tag表达式
        consumer.subscribe("OrderTopicTest", "*");

        /**
         * 4.实现乱序消费的效果
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5.启动消费者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

6.4、广播消息

⼴播消息是向主题(topic)的所有订阅者发送消息。订阅同⼀个 topic 的多个消费者,都能全量收到⽣产者发送的所有消息。

消费者

public class BroadcastConsumer {

    public static void main(String[] args) throws Exception {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        // 2.指定nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 让消费者从第一个队列的offset开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 将消息模式设置成广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息内容:" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Broadcast Consumer Started.%n");
    }

}

生产者

public class BroadcastProducer {

    public static void main(String[] args) throws Exception {
        // 1.创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        // 2.指定nameserver的地址
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.启动生产者
        producer.start();

        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest", "TagA", "OrderID188", ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }
}

6.5、延迟消息

延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递。

生产者

延迟等级

RocketMQ 设计了 18 个延迟等级,分别是:

Untitled

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

等级 3 对应的是 10s。系统为这 18 个等级配置了 18 个 topic,⽤于实现延迟队列的效果。

Untitled

在商业版 RocketMQ 中,我们不仅可以设置延迟等级,还可以设置具体的延迟时间;但是在社区版 RocketMQ 中,只能设置延迟等级。

代码示例:

public class ScheduledProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 将消息发送到等级3的topic,等待10s后会推送给消费者
            message.setDelayTimeLevel(3);
            producer.send(message);
        }
        producer.shutdown();
    }

}

消费者

public class ScheduledConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        // 指定nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        consumer.subscribe("TestTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // getBornTimestamp:获取当前消息的生产时间
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }

}

6.6、批量消息

批量发送消息提⾼了传递⼩消息的性能。

使用批量消息

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();
        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>();
        messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
        producer.send(messages);
        producer.shutdown();
    }

}

超出限制的批量消息

官⽅建议批量消息的总⼤⼩不应超过1m,实际不应超过4m。如果超过 4m 的批量消息需要进⾏分批处理,同时设置 broker 的配置参数为 4m(在 broker 的配置⽂件中修改:maxMessageSize=4194304

public class MaxBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }
        producer.send(messages);
        producer.shutdown();
    }

}

Untitled

解决方法:

public class MaxBatchProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        producer.start();

        String topic = "BatchTest";
        List<Message> messages = new ArrayList<>(100 * 1000);
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List<Message> listItem = splitter.next();
            producer.send(listItem);
        }
        producer.shutdown();
    }

}
public class ListSplitter implements Iterator<List<Message>> {

    private int sizeLimit = 1000 * 1000;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20;
            if (tmpSize > sizeLimit) {
                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

}

6.7、过滤消息

在⼤多数情况下,标签是⼀种简单⽽有⽤的设计,可以⽤来选择您想要的消息。

tag过滤的⽣产者

public class TagProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 15; i++) {
            Message msg = new Message("TagFilterTest", tags[i % tags.length], "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }

}

tag过滤的消费者

public class TagConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 消费者只能订阅到TagFilterTest主题里面,打上了TagA或TagC标签的消息
        consumer.subscribe("TagFilterTest", "TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签,这可能不适⽤于复杂的场景。在这种情况下,我们可以使⽤ SQL 表达式来过滤掉消息。

使⽤SQL过滤

SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下,可以实现⼀些有趣的逻辑。这是⼀个例⼦:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

语法

RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它。

1. 数值⽐较,如`>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
2. 字符⽐较,如`=`, `<>`, `IN`;
3. `IS NULL``IS NOT NULL`4. 逻辑`AND`, `OR`, `NOT`;

常量类型有:

1. 数字,如 1233.14152. 字符,如'abc',必须⽤单引号;
3. `NULL`,特殊常数;
4. 布尔值,`TRUE``FALSE`

注意:只有推送模式的消费者才可以使⽤ SQL 过滤。拉取模式是⽤不了的。

SQL过滤的⽣产者

public class SQLProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        producer.start();
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 15; i++) {
            Message msg = new Message("SqlFilterTest", tags[i % tags.length], ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 给消息设置一个用户属性(key:value)
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    }

}

SQL过滤的消费者

public class SQLConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        /**
         * TAGS不为空,并且TAGS是属于TagA或TagB标签的,并且a属性不为空,并且a属性的值在0~3之间
         * 注意:要想使用SQL过滤,需要在broker的配置文件中添加enablePropertyFilter=true的参数
         */
        consumer.subscribe("SqlFilterTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

6.8、事务消息

事务消息的定义

它可以被认为是⼀个两阶段的提交消息实现,以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。

事务消息的三种状态:

TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
TransactionStatus.Unknown:中间状态,表示需要 MQ 回查才能确定状态。

事务消息的实现流程:

Untitled

生产者

public class TransactionProducer {

    public static void main(String[] args) throws Exception {
        // 创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 生产者发送事务型消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }

}

本地事务处理-TransactionListener

public class TransactionListenerImpl implements TransactionListener {

    /**
     * 执行本地事务
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String tags = msg.getTags();
        if (StringUtils.contains(tags, "TagA")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains(tags, "TagB")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

    /**
     * 检查本地事务
     *
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String tags = msg.getTags();
        if (StringUtils.contains(tags, "TagC")) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.contains(tags, "TagD")) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            return LocalTransactionState.UNKNOW;
        }
    }

}

消费者

public class TransactionConsumer {

    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
        // 2.指明nameserver的地址
        consumer.setNamesrvAddr("119.91.21.45:9876;1.117.115.99:9876");
        // 3.订阅主题:topic 和过滤消息用的tag表达式
        consumer.subscribe("TopicTest", "*");
        // 4.创建一个监听器,当broker把消息推过来时调用
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                for (MessageExt msg : msgs) {
//                    System.out.println("收到的消息:"+new String(msg.getBody()));
                    System.out.println("收到的消息:" + msg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

使用限制

  • 事务性消息不⽀持延迟消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息堆积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  • ⼀个事务性消息可能不⽌⼀次被检查或消费。
  • 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使⽤同步双写机制。
  • 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务型消息允许反向查询。MQ 服务器能通过其⽣产者 ID 查询到消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AcWing1015.摘花生

AcWing 1015. 摘花生Hello Kitty想摘点花生送给她喜欢的米老鼠。她来到一片有网格状道路的矩形花生地(如下图)&#xff0c;从西北角进去&#xff0c;东南角出来。地里每个道路的交叉点上都有种着一株花生苗&#xff0c;上面有若干颗花生&#xff0c;经过一株花生苗就能摘走该它…

《FPGA学习》->蜂鸣器播放

&#x1f34e;与其担心未来&#xff0c;不如现在好好努力。在这条路上&#xff0c;只有奋斗才能给你安全感。你若努力&#xff0c;全世界都会为你让路。蜂鸣器的发声原理由振动装置和谐振装置组成&#xff0c;而蜂鸣器又分为无源他激型与有源自激型。本实验采用无源蜂鸣器&…

嵌入物理(PINN)还是基于物理(AD)?

文章目录1. 传统"反演问题"1.1 反演问题是什么1.2 常见反演问题1.3 传统反演问题的困境2. 深度学习优势3. AD inversion 例子3.1 ADsurf3.2 ADseismic关于PINN的内容大家可以直接google PINN (Physical-informed neural network),其主要的目的是用一个神经网络拟合物…

K8S 部署 Jenkins

本文使用 bitnami 镜像部署 Jenkins 官方文档&#xff1a;https://github.com/bitnami/charts/tree/main/bitnami/jenkins 添加 bitnami 仓库 helm repo add bitnami https://charts.bitnami.com/bitnami自定义 values.yaml storageClass&#xff1a;集群的存储类&#xff…

(考研湖科大教书匠计算机网络)第五章传输层-第八节1:TCP连接管理理论部分(三次握手与四次挥手)

获取pdf&#xff1a;密码7281专栏目录首页&#xff1a;【专栏必读】考研湖科大教书匠计算机网络笔记导航此部分内容借鉴博主【小林coding】 &#xff0c;其对计算机网络内容的图解可以说是深入浅出&#xff0c;尤其是三次握手和四次挥手这一部分&#xff0c;堪称全网最佳。所这…

OpenEuler安装软件方法

在树莓派上烧录好OpenEuler后上面是什么软件都没有的&#xff0c;像一些gcc的环境都需要自己进行配置。官方提供的安装命令是yum&#xff0c;但是执行yum是找不到命令的&#xff1a;   这个其实是因为OpenEuler中默认的安装软件使用了dnf而不是yum&#xff0c;所以软件的安装…

智能小车红外跟随原理

红外跟随电路红外跟随电路由电位器R17&#xff0c;R28&#xff1b;发光二极管D8&#xff0c;D9&#xff1b;红外发射管 D2&#xff0c;D4和红外接收管D3&#xff0c;D5和芯片LM324等组成,LM234用于信号的比较&#xff0c;并产生比较结果输出给单片机进行处理。智能小车红外跟随…

OpenGL学习日志之纹理

引言 为了使我们渲染的模型拥有更多细节&#xff0c;我们可以添加足够多的顶点&#xff0c;然后给每一个顶点都添加顶点颜色。但是这样就会产生很多额外的开销&#xff0c;因此就出现了纹理映射技术&#xff0c;我们通过纹理采样为物体的表面添加更多的细节。 纹理定义 通俗…

超25亿全球月活,字节依然没有流量

&#xff08;图片来源于网络&#xff0c;侵删&#xff09; 文|螳螂观察 作者| 搁浅虎鲸 注意看&#xff0c;这个男人叫梁汝波&#xff0c;是字节跳动的联合创始人&#xff0c;也是接棒张一鸣的新任CEO。 在字节跳动十周年之际&#xff0c;他发表了激情昂扬的演讲。“激发创…

【Datawhale图机器学习】图嵌入表示学习

图嵌入表示学习 学习视频&#xff1a;https://www.bilibili.com/video/BV1AP4y1r7Pz/ 如何把节点映射成D维向量&#xff1f; 人工特征工程&#xff1a;节点重要度、集群系数、Graphlet图表示学习&#xff1a;通过随机游走构造自监督学习任务。DeepWalk、Node2Vec矩阵分解深度…

win10字体模糊怎么办?看下面4种宝藏解决方法

最近很多用户反映电脑安装了Win10系统后出现字体发虚&#xff0c;模糊不清的问题&#xff0c;这看起来让人非常难受。win10字体模糊怎么办&#xff1f;来看下面4种宝藏解决方法&#xff01;下面的方法适用于各类台式电脑以及笔记本电脑哦&#xff01; 操作环境&#xff1a; 演示…

ESP开发环境搭建

一、windows中搭建 esp-idf tool(可选),下载连接如下:https://dl.espressif.com/dl/esp-idf/?idf4.4 下载安装tools后进入vscode进行插件安装&#xff08;未离线下载idf工具也可以通过第二步通过插件下载安装&#xff09; 1. vscode安装编译环境 ESP-IDF 需要安装一些必备工…

高并发系统设计之负载均衡

本文已收录至Github&#xff0c;推荐阅读 &#x1f449; Java随想录 文章目录DNS负载均衡Nginx负载均衡负载均衡算法负载均衡配置超时配置被动健康检查与主动健康检查LVS/F5Nginx当我们的应用单实例不能支撑用户请求时&#xff0c;此时就需要扩容&#xff0c;从一台服务器扩容到…

【matplotlib】可视化解决方案——如何设置轴标签的透明度和大小

概述 Axes 标签对于读者理解图表非常重要&#xff0c;它描述了图表中展现的数据内容。通过向 axes 对象添加标签&#xff0c;可以有效理解图表所表达的内容。首先来了解一下 matplotlib 是如何组织图表的。最上层是一个 Figure 实例&#xff0c;包含绘图中所有可见和不可见的内…

北斗导航 | 2023 PTTI会议论文 2023 ITM会议论文 2022 ION GNSS+ 会议论文下载:ION 美国导航学会

===================================================== github:https://github.com/MichaelBeechan CSDN:https://blog.csdn.net/u011344545 ===================================================== 2023 PTTI会议论文 2023 ITM会议论文 2022 ION GNSS+ 论文下载百度云链…

Teradata当年数据仓库的“一哥”为何突然退出中国市场:苦撑了3年,员工早有预料

2月15日&#xff0c;Teradata天睿公司官宣即将撤离中国市场。 又是一个艰难的决定&#xff0c;听着似乎很熟悉。Teradata为什么突然宣布结束在中国的直营&#xff1f;其实&#xff0c;回顾Teradata在中国市场的发展状况&#xff0c;一点也不突然。 多年前&#xff0c;我曾经与…

Excel表格自动转Json数据

Excel表格转JSON格式在实际工作中&#xff0c;我们常常使用Excel记录各种数据&#xff0c;但在各种应用系统传输数据却使用JSON格式&#xff0c;这就需要把Excel转为JSON。如果能把数据转换传输过程自动化就更完美了。Excel转JsonXX公司生产日报表为例&#xff0c;生产工人用Ex…

JSR303基本使用以及整合springboot统一异常处理

目录 一、前言 什么是JSR303 二、JSR303基本使用&#xff08;普通使用&#xff09; 1&#xff09;、引入jar包 2&#xff09;、实体类对需要校验的数据进行校验 3)、对前端传递过来的参数进行限制 三、JSR303基本使用&#xff08;分组校验&#xff09; 1)、创建分组 2)…

leaflet 根据两个坐标值,设置arc弧线和Marker(079)

第069个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中根据提供的两个点,绘制出marker,同时将两点间绘制出一条弧线。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果. 文章目录 示例效果配置方式示例源代码(共88行)安装插件相关API参考…

最全面的SpringBoot教程(六)——SpringBoot运行原理分析

前言 本文为 最全面的SpringBoot教程&#xff08;六&#xff09;——SpringBoot运行原理分析 相关知识&#xff0c;下边将对SpringBoot运行原理以及自动配置原理进行详尽的分析介绍~ &#x1f4cc;博主主页&#xff1a;小新要变强 的主页 &#x1f449;Java全栈学习路线可参考…