一、RocketMQ入门
- 一、RocketMQ 介绍
- 1.1 RocketMQ 是什么?
- 1.2 RocketMQ 应用场景
- 01、应用解耦
- 02、流量削峰
- 03、数据分发
- 1.3 RocketMQ 核心组成
- 01、NameServer
- 02、Broker
- 03、Producer
- 04、Consumer
- 1.6 运转流程
- 1.5 RocketMQ 架构
- 01、NameServer 集群
- 02、Broker 集群
- 03、Producer 集群
- 04、Consumer 集群
- 07、集群工作流程
- 06、集群间的交互方式
- 1.6 RocketMQ 优缺点
- 01、优点
- 02、缺点
- 1.7 各种 MQ 比较
- 二、RocketMQ 安装(Linux 版本)
- 2.1 环境要求
- 2.2 安装步骤
- 01、上传安装包
- 02、解压安装包
- 03、参数配置
- 2.3 目录介绍
- 2.4 启动 RocketMQ
- 2.5 测试 RocketMQ
- 2.6 关闭 RocketMQ
- 三、rocketmq-console 集群监控平台搭建
- 3.1 简介
- 3.2 搭建集群监控平台
- 01、下载
- 02、上传解压
- 03、修改配置参数
- 04、打包
- 05、启动和访问
- 06、问题点
- 四、RocketMQ 发送消息基本样例
- 4.1 普通消息发送
- 4.2 普通消息消费
一、RocketMQ 介绍
1.1 RocketMQ 是什么?
RocketMQ 是一款纯 java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
1.2 RocketMQ 应用场景
消息队列是一种"先进先出"的数据结构,其应用场景主要包含以下三个方面:
01、应用解耦
系统的耦合性越高,容错性就越低。
以电商应用为例:用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。
使用消息队列解耦,系统的耦合性就会提高了。比如:如果物流系统发生故障,需要几分钟才能修复好,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。
02、流量削峰
应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列后,可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提升系统的稳定性和用户体验。
一般情况下,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验。而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样的体验应该要好很多。
处于经济考量的目的:
业务系统正常时段的 QPS 如果是 1000,流量最高峰是 10000,为了应对流量高峰配置高性能的服务器显然不划算,这时就可以考虑使用消息队列对峰值流量削峰。
03、数据分发
通过消息队列可以让数据在多个系统之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可。
1.3 RocketMQ 核心组成
RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer、Consumer。
Topic:区分消息的种类。一个发送者可以发送消息给一个或多个 Topic,一个消息的接收者可以订阅一个或多个 Topic 消息。
Message Queue:相当于是 Topic 的分区,用于并行发送和接收消息。
01、NameServer
NameServer 是一个几乎无状态节点,可集群部署,节点之间没有任何信息同步。所以 RocketMQ 需要先启动 NameServer 再启动 Broker。
-
作用
NameServer 是整个 RocketMQ 的 "大脑",它相当于是服务注册中心的角色,用来管理 Broker。
举例:各个邮局的管理机构。每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。
每个 Broker 在启动的时候会到 NameServer 中注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,进而和 Broker 取得连接。Consumer 也会定时获取 Topic 的路由信息。所以从功能上看应该是和 ZooKeeper 差不多的,但是据说 RocketMQ 的早期版本确实是使用的ZooKeeper ,后来改为了自己实现 NameServer。
-
与 ZooKeeper 的区别
NameServer 和 ZooKeeper 的作用大致是相同的。从宏观上来看,NameServer 做的东西很少,就是保存一些运行数据,NameServer 之间不互相连,这就需要 Broker 端连接所有的 NameServer,运行数据的改动要发送到每一个 NameServer ,从而来保证运行数据的一致性(这个一致性确实有点弱),这样就变成了 NameServer很轻量级,但是 Broker 端就要做更多的东西了。
但是在 ZooKeeper 中,Broker 只需要连接其中的一台机器,运行数据分发、一致性都交给了 ZooKeeper 来完成。
-
高可用保障
Broker 在启动时向所有 NameServer 注册(主要是服务器地址等) ,生产者在发送消息之前先从NameServer 获取 Broker 服务器地址列表(消费者一样),然后根据负载均衡算法从列表中选择一台服务器进行消息发送。
NameServer 与每台 Broker 服务保持长连接,并间隔 30S 检查 Broker 是否存活,如果检测到 Broker 宕机,则从路由注册表中将其移除,这样就可以实现 RocketMQ 的高可用。
02、Broker
Broker 是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。
举例:邮局。
它还存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 Broker 有 Master 和 Slave 两种类型,Master 既可以写又可以读,Slave 只可以读不可以写。
从物理结构上看 Broker 的集群部署方式有四种:单 Master 、多 Master 、多 Master 多 Slave(同步刷盘)、多 Master 多 Slave(异步刷盘)。
-
单 Master
这种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
-
多 Master
所有消息都是 Master,没有 Slave。这种方式优缺点:
- 优点:配置简单,单个 Master 宕机或重启维护对应用无影响。在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
-
多 Master 多 Slave(异步复制)
异步:先响应后再存入磁盘。
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备之间有毫秒级消息延迟。这种方式优缺点:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受到影响。同时 Master 宕机后,消费者仍然可以从 Salve 消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
-
多 Master 多 Slave(同步双写)
同步:立刻存入磁盘后响应。
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功。这种模式的优缺点:
- 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
-
高可用保障
每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时(每隔30s)注册 Topic 信息到所有 NameServer。NameServer定时(每隔10s)扫描所有存活 Broker 的连接,如果 NameServer 超过2分钟没有收到心跳,则 NameServer 断开与 Broker 的连接。
03、Producer
Producer 也称为消息发布者,负责生产并发送消息至 Topic。
举例:发信者。生产者向 brokers 发送由业务应用程序系统生成的消息。RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。
-
同步发送
同步发送消息指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,比如重要通知邮件、营销短信等。
-
异步发送
异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。假如过一段时间检测到某个信息发送失败,可以选择重新发送。
-
单向发送
单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
-
生产者组
生产者组(Producer Group)是一类 Producer 的集合,这类 Producer 通常发送一类消息并且发送逻辑一致,所以将这些 Producer 分组在一起。从部署结构上看生产者通过 Producer Group 的名字来标记自己是一个集群。
-
高可用保障
Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Producer 每隔30s(由 ClientConfig 的 pollNameServerInterval )从 Nameserver 获取所有 topic 队列的最新情况,这意味着如果 Broker 不可用,Producer 最多30s能够感知,在此期间内发往 Broker 的所有消息都会失败。
Producer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,如果 Broker 在2分钟内没有收到心跳数据,则关闭与 Producer 的连接。
04、Consumer
Consumer 也称为消息订阅者,负责从 Topic 接收并消费消息。
举例:收信者。消费者从 brokers 里拉取信息并将其输入应用程序中。
-
消费者组
消费者组(Consumer Group)是一类 Consumer 的集合名称,这类 Consumer 通常消费同一类消息并且消费逻辑一致,所以将这些 Consumer 分组在一起。消费者组与生产者组类似,都是将相同角色的分组在一起并命名。
RocketMQ 中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被不同的消费组消费。
-
高可用保障
Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
Consumer 每隔30s从 Nameserver 获取 topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要30s才能感知。
Consumer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向所有关联的 broker 发送心跳,Broker 每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。
当 Consumer 得到 master 宕机通知后,转向 slave 消费,slave 不能保证 master 的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦 master 恢复,未同步过去的消息会被最终消费掉。
1.6 运转流程
- NameServer 先启动;
- Broker 启动时向 NameServer 注册;
- 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),然后根据负载均衡算法从列表中选择一台 Broker 进行消息发送;
- NameServer 与每台 Broker 服务器保持长连接,并间隔 30S 检测 Broker 是否存活,如果检测到 Broker 宕机(使用心跳机制, 如果检测超120S),则从路由注册表中将其移除;
- 消费者在订阅某个主题的消息之前从 NamerServer 获取 Broker 服务器地址列表(有可能是集群),但是消费者选择从 Broker 中 订阅消息,订阅规则由 Broker 配置决定。
1.5 RocketMQ 架构
RocketMQ 架构图中展示了四个集群:
01、NameServer 集群
提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速存储扩展。
NameServer 是一个功能齐全的服务器,主要包含两个功能:
- Broker 管理,接收来自 Broker 集群的注册请求,提供心跳机制检测 Broker 是否存活;
- 路由管理,每个 NameServer 持有全部有关 Broker 集群和客户端请求队列的路由信息。
02、Broker 集群
通过提供轻量级的 Topic 和 Queue 机制处理消息存储。同时支持推(Push)和拉(Pull)两种模型,包含容错机制。提供强大的峰值填充和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难恢复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。
Broker 有几个重要的子模块:
- 远程处理模块,Broker 入口,处理来自客户端的请求;
- 客户端管理(包括消息生产者和消费者),维护消费者的主题订阅;
- 存储服务,提供在物理硬盘上存储和查询消息的简单 API;
- HA 服务,提供主从 Broker 间数据同步;
- 索引服务,通过指定键为消息建立索引并提供快速消息查询。
03、Producer 集群
消息生产者支持分布式部署,分布式生产者通过多种负载均衡模式向 Broker 集群发送消息。
04、Consumer 集群
消息消费者也支持 Push 和 Pull 模型的分布式部署,还支持集群消费和消息广播。提供了实时的消息订阅机制,可以满足大多数消费者的需求。
DefultMQPullConsumer :consumer 定时向 broker 发送请求获取内存数据,避免给 broker 造成巨大的压力。
一般会在本地使用定时任务实现。
DefultMQPushConsumer :consumer 向 broker 发送请求,两者保持长链接的状态。broker 会定时(每 5 秒)去查询 consumer 中是否有要订阅的数据,有就将消息推送给 consumer。
无论是 pull 还是 push,其实本质上都是拉取消息。
07、集群工作流程
- 启动 NameServer,NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
- Broker 启动后,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 与 Broker 的映射关系。
- 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
- Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
- Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
06、集群间的交互方式
- Broker Master 和 Broker Slave 是主从结构,会执行数据同步 Data Sync
每个 Broker 与 NameServer 集群中所有节点建立长连接,定时注册 Topic 信息到所有 NameServer; - Producer 与 NameServer 集群中的其中一个节点(随机)建立长连接,定期从 NameServer 获取 Topic 路由信息,并与提供 Topic 服务的 Broker Master 建立长连接,定时向 Broker 发送心跳;
- Producer 只能将消息发送到 Broker Master,但是 Consumer 同时和 Broker Master 和 Broker Slave 建立长连接,既可以从 Master 订阅消息,也可以从 Slave 订阅消息。
1.6 RocketMQ 优缺点
01、优点
- 单机吞吐量:十万级
- 可用性:非常高,分布式架构
- 消息可靠性:经过参数优化配置,消息可以做到 0 丢失
- 功能支持:MQ 功能较为完善,还是分布式的,扩展性好
- 支持 10 亿级别的消息堆积,不会因为堆积导致性能下降
- 源码是 Java,方便结合公司自己的业务进行二次开发
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
- RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验。
02、缺点
- 支持的客户端语言不多,目前仅支持 Java 及 C++,而且 C++ 还不成熟
- 没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
1.7 各种 MQ 比较
二、RocketMQ 安装(Linux 版本)
安装 RocketMQ 版本:4.5.0,我这里使用的是阿里云服务器,也可以在虚拟机上操作。
2.1 环境要求
- Linux 64 位操作系统
- JDK 1.8
- Maven 3.9.0(maven 版本不固定,但是最好不要使用最高版本)
2.2 安装步骤
01、上传安装包
将下载好的 RocketMQ 安装包上传到服务器上:
02、解压安装包
# 解压
unzip rocketmq-all-4.5.0-bin-release.zip
# 将解压包移动到指定路径下
mv rocketmq-all-4.5.0-bin-release ../software
03、参数配置
这里需要配置三个文件:
-
/conf/broker.conf
指定 broker 的命名空间地址和当前 broker 监听的 IP:
默认情况下,namesrvAddr = 127.0.0.1:9876
,brokerIP1 = 127.0.0.1
。 -
/bin/runserver.sh
RocketMQ 默认的虚拟机内存比较大,启动 Broker 如果因为内存不足失败,就需要编辑这两个配置文件,修改 JVM 内存大小:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
-
/bin/runbroker.sh
同上,修改 JVM 内存大小:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
2.3 目录介绍
- bin:启动脚本,包括 shell 脚本和 cmd 脚本
- conf:实例配置文件,包括 broker 配置文件、logback 配置文件等
- lib:依赖 jar 包,包括 Netty、commons-lang、FastJSON 等
2.4 启动 RocketMQ
启动前先查看进程:
启动命令(在 bin 目录下执行):
# 启动 nameserver
nohup sh mqnamesrv -n 公网IP:9876 &
# 启动 broker
nohup sh mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
启动后查看进程,有这两个进程即启动成功:
2.5 测试 RocketMQ
-
模拟生产者发送消息
# 生产者 sh tools.sh org.apache.rocketmq.example.quickstart.Producer
输入这个命令后,控制台会输出很多的信息,不报错就说明发送成功了:
-
模拟消费者接收消息
# 消费者 sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.6 关闭 RocketMQ
# 关闭服务
sh mqshutdown namesrv
sh mqshutdown broker
三、rocketmq-console 集群监控平台搭建
3.1 简介
RocketMQ 有个可视化的管理界面,通过可视化界面,我们可以方便地监控 RocketMQ 集群,并实现很多操作。比如:创建管理 Topic,查看和发送 ,essage等等。
3.2 搭建集群监控平台
01、下载
RocketMQ提供了UI管理工具,名为rocketmq-console,项目地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console。
这个是 rocketmq 的扩展,里面不仅包含控制台的扩展,也包含对大数据 flume、hbase 等组件的对接和扩展。
02、上传解压
- 上传
- 解压
# 解压 unzip rocketmq-console.zip # 移动到 software 目录下 mv rocketmq-console ../software
03、修改配置参数
修改 rocketmq-console\src\main\resources\application.properties
配置文件:
04、打包
进入 rocketmq-console 目录下执行命令:
# 打包
mvn clean package ‐Dmaven.test.skip=true
打包完成后,会在 /software/rocketMQ/rocketmq-console 目录下生成一个 target 文件夹
05、启动和访问
-
启动
进入
/rocketmq-console/target
目录下执行:# 指定端口号和命名空间地址 java -jar rocketmq-console-ng-1.0.1.jar --server.port=8088 --rocketmq.config.namesrvAddr=公网IP:9876
-
访问
http://106.15.0.30:8088
06、问题点
-
防火墙
防火墙需要开放访问 RocketMQ 的一系列端口号:
# 查看防火墙状态 systemctl status firewalld # 关闭防火墙 systemctl stop firewalld # 启动防火墙 systemctl start firewalld # 永久开放指定端口号【把用到的端口号都开放】 firewall-cmd --zone=public --add-port=10909/tcp --permanent firewall-cmd --zone=public --add-port=10911/tcp --permanent firewall-cmd --zone=public --add-port=9876/tcp --permanent firewall-cmd --zone=public --add-port=9877/tcp --permanent # 重新加载防火墙 firewall-cmd --reload # 或者重启防火墙 systemctl restart firewalld.service # 查看防火墙信息列表 firewall-cmd --list-all # 只查看防火墙开放端口号列表 firewall-cmd --list-ports
除了这一层防火墙之外,阿里云服务器自己还有一层防火墙
iptables
,是默认配置的,我们也需要关闭这层防火墙或者开放对应的端口号:# 查看防火墙状态出现的问题 service iptables status # 关闭防火墙 service iptables stop
-
安全组
防火墙端口号开放之后,同时也需要在 ECS 服务器安全组中添加端口规则:
四、RocketMQ 发送消息基本样例
引入 jar 包:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
4.1 普通消息发送
/**
* 同步发送消息
*
* @author qiaohaojie
* @date 2023/2/20 17:00
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1. 实例化消息生产者 producer
DefaultMQProducer producer = new DefaultMQProducer("group_producer_demo1");
// 2. 设置 nameServer 的地址
producer.setNamesrvAddr("公网IP:9876");
// 关闭 VIP 通道
// producer.setVipChannelEnabled(false);
producer.setSendMsgTimeout(3000);
// 3. 启动 Producer 实例
producer.start();
for (int i = 0; i < 10; i++) {
// 4. 创建消息 message
Message message = new Message("Topic_Demo", "Tags", "Hello RocketMQ" + i, "hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5. 发送消息
SendResult result = producer.send(message);
System.out.println(result);
}
// 6. 关闭 producer
producer.shutdown();
}
}
这里的 message 实例中有几个参数:
- topic:代表消息的主题
- tags:主要用于消息过滤
- keys:消息的唯一值
- body:消息体,代表消息的内容
4.2 普通消息消费
/**
* 同步发送消息-消费者
*
* @author qiaohaojie
* @date 2023/2/20 21:39
*/
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 创建DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer_demo1");
// 2. 设置nameServer地址
consumer.setNamesrvAddr("公网IP:9876");
// 设置消息拉取最大数
consumer.setConsumeMessageBatchMaxSize(2);
// 3. 设置subscribe,这里是要读取的主题信息
consumer.subscribe("Topic_Demo", "*");
// 4. 设置消息监听
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 5. 获取消息信息
// 迭代消息信息
for (MessageExt msg : msgs) {
try {
// 获取主题
String topic = msg.getTopic();
// 获取标签
String tags = msg.getTags();
// 获取信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消费消息--topic:" + topic + ",targs:" + tags + ",result:" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 6. 返回消息读取状态
// 消息消费完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启Consumer
consumer.start();
}
}
消费消息时有两个参数:
- topic:指定要消费的主题
- subExpression:消息过滤规则