文章目录
- 基本概念
- 详细介绍
- 主题(Topic)
- 消息类型(MessageType)
- 消息队列(MessageQueue)
- 消息(Message)
- 消息视图(MessageView)
- 消息标签(MessageTag)
- 消息位点(MessageQueueOffset)
- 消费位点(ConsumerOffset)
- 消息索引(MessageKey)
- 生产者(Producer)
- 事务检查器(TransactionChecker)
- 事务状态(TransactionResolution)
- 消费者分组(ConsumerGroup)
- 消费者(Consumer)
- 消费结果(ConsumeResult)
- 订阅关系(Subscription)
- 消息过滤
- 重置消费位点
- 消息轨迹
- 消息堆积
- 事务消息
- 定时/延时消息
- 顺序消息
- 功能原理
- 参数约束
- RocketMQ 的架构
- RocketMQ 的使用方法
- 生产者
- 消费者
- RocketMQ 的实战应用
- 发送普通消息
- 延时消息(定时消息)
- 事务消息
- 顺序消息
- SDK相关
RocketMQ 是一款性能强劲、可靠性高的分布式消息中间件,在大型分布式系统中得到了广泛的应用。本文将从入门到原理简介扼要讲解 RocketMQ 的原理、架构、使用方法和实战应用。
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
基本概念
在学习 RocketMQ 之前,需要了解一些基本概念:
- Producer:消息的生产者,负责向 Broker 发送消息。
- Broker:消息的中间件,负责存储和传递消息。
- Consumer:消息的消费者,负责从 Broker 消费消息。
- Topic:消息的主题,是消息的逻辑概念,用于标识一类消息。
- Message:消息的实体,包括消息的内容、标签、属性等信息。
- NameServer:RocketMQ 的注册中心,用于保存 Broker 的元数据信息。
详细介绍
本文介绍 Apache RocketMQ 的基本概念,以便您更好地理解和使用 Apache RocketMQ 。
主题(Topic)
Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。更多信息,请参见主题(Topic)。
消息类型(MessageType)
Apache RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。 Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
Apache RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认关闭,推荐通过服务端参数 enableTopicMessageTypeCheck 手动开启校验。
消息队列(MessageQueue)
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。更多信息,请参见队列(MessageQueue)。
消息(Message)
消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。更多信息,请参见消息(Message)。
消息视图(MessageView)
消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag)
消息标签是Apache RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。更多信息,请参见消息过滤。
消息位点(MessageQueueOffset)
消息是按到达Apache RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。更多信息,请参见消费进度管理。
消费位点(ConsumerOffset)
一条消息被某个消费者消费完成后不会立即从队列中删除,Apache RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。更多信息,请参见消费进度管理。
消息索引(MessageKey)
消息索引是Apache RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer)
生产者是Apache RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。更多信息,请参见生产者(Producer)。
事务检查器(TransactionChecker)
Apache RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器。事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。更多信息,请参见事务消息。
事务状态(TransactionResolution)
Apache RocketMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。更多信息,请参见事务消息。
消费者分组(ConsumerGroup)
消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。更多信息,请参见消费者分组(ConsumerGroup)。
消费者(Consumer)
消费者是Apache RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。更多信息,请参见消费者(Consumer)。
消费结果(ConsumeResult)
Apache RocketMQ 中PushConsumer消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription)
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。更多信息,请参见订阅关系(Subscription)。
消息过滤
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在Apache RocketMQ
的服务端完成。更多信息,请参见消息过滤。
重置消费位点
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到Apache RocketMQ 服务端的消息。更多信息,请参见重置消费位点。
消息轨迹
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由Apache RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积
生产者已经将消息发送到Apache RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
事务消息
事务消息是Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
分布式事务的诉求
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。 事务消息诉求
以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:
主分支订单系统状态更新:由未支付变更为支付成功。
物流系统状态新增:新增待发货物流记录,创建订单物流记录。
积分系统状态变更:变更用户积分,更新用户积分表。
购物车系统状态变更:清空购物车,更新用户购物车记录。
定时/延时消息
定时/延时消息是Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
任务超时处理 超时任务处理
以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。
基于定时消息的超时任务处理具备如下优势:
精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。
顺序消息
顺序消息是Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 Apache RocketMQ 的顺序消息可以有效保证数据传输的顺序性。
典型场景一:撮合交易 交易撮合
以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
典型场景二:数据实时增量同步
普通消息
顺序消息顺序消息
以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。
功能原理
什么是顺序消息
顺序消息是 Apache RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。
Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。
基于消息组的顺序判定逻辑,支持按照业务逻辑做细粒度拆分,可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。
如何保证消息的顺序性
Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。
-
生产顺序性 :
Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。
如需保证消息生产的顺序性,则必须满足以下条件:
- 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
- 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:
- 相同消息组的消息按照先后顺序被存储在同一个队列。
- 不同消息组的消息可以混合在同一个队列中,且不保证连续。
如上图所示,消息组1和消息组4的消息混合存储在队列1中, Apache RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。
-
消费顺序性 :
Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。
如需保证消息消费的顺序性,则必须满足以下条件:
-
投递顺序
Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。
备注
消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。消费者类型的具体信息,请参见消费者分类。
-
有限重试
Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。
对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。
-
生产顺序性和消费顺序性组合
如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。
一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,您可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。例如发送顺序消息,但使用非顺序的并发消费方式来提高吞吐能力。更多组合方式如下表所示:
生产顺序 | 消费顺序 | 顺序性效果 |
---|---|---|
设置消息组,保证消息顺序发送。 | 顺序消费 | 按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。 |
设置消息组,保证消息顺序发送。 | 并发消费 | 并发消费,尽可能按时间顺序处理。 |
未设置消息组,消息乱序发送。 | 顺序消费 | 按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。 |
未设置消息组,消息乱序发送。 | 并发消费 | 并发消费,尽可能按照时间顺序处理。 |
顺序消息生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
参数约束
Apache RocketMQ 系统中存在很多自定义参数和资源命名,您在使用 Apache RocketMQ 时建议参考如下说明规范系统设置,避对某些具体参数设置不合理导致应用出现异常。
参数 | 建议范围 | 说明 |
---|---|---|
Topic名称 | 字符建议:字母a~z或A~Z、数字0~9以及下划线()、短划线(-)和百分号(%)。 长度建议:1~64个字符。 系统保留字符:Topic名称不允许使用以下保留字符或含有特殊前缀的字符命名。 保留字符: TBW102 * BenchmarkTest * SELF_TEST_TOPIC * OFFSET_MOVED_EVENT * SCHEDULE_TOPIC_XXXX * RMQ_SYS_TRANS_HALF_TOPIC * RMQ_SYS_TRACE_TOPIC * RMQ_SYS_TRANS_OP_HALF_TOPIC 特殊前缀: * rmq_sys %RETRY%_ %DLQ%_ rocketmq-broker- | Topic命名应该尽量使用简短、常用的字符,避免使用特殊字符。特殊字符会导致系统解析出现异常,字符过长可能会导致消息收发被拒绝。 |
ConsumerGroup名称 | 字符建议:支持字母a~z或A~Z、数字0~9以及下划线(_)、短划线(-)和百分号(%)。 长度建议:1~64个字符。 系统保留字符:ConsumerGroup不允许使用以下保留字符或含有特殊前缀的字符命名。 保留字符: * DEFAULT_CONSUMER * DEFAULT_PRODUCER * TOOLS_CONSUMER * FILTERSRV_CONSUMER * _MONITOR_CONSUMER * CLIENT_INNER_PRODUCER * SELF_TEST_P_GROUP * SELF_TEST_C_GROUP * CID_ONS-HTTP-PROXY * CID_ONSAPI_PERMISSION * CID_ONSAPI_OWNER * CID_ONSAPI_PULL * CID_RMQ_SYS_TRANS * 特殊字符 * CID_RMQ_SYS * CID_HOUSEKEEPING | 无。 |
ACL Credentials | 字符建议:AK(AccessKey ID)、SK(AccessKey Secret)和Token仅支持字母a~z或A~Z、数字0~9。 长度建议:不超过1024个字符。 | 无。 |
请求超时时间 | 默认值:3000毫秒。 取值范围:该参数为客户端本地行为,取值范围建议不要超过30000毫秒。 | 请求超时时间是客户端本地同步调用的等待时间,请根据实际应用设置合理的取值,避免线程阻塞时间过长。 |
消息大小 | 默认值:不超过4 MB。不涉及消息压缩,仅计算消息体body的大小。 取值范围:建议不超过4 MB。 | 消息传输应尽量压缩和控制负载大小,避免超大文件传输。若消息大小不满足限制要求,可以尝试分割消息或使用OSS存储,用消息传输URL。 |
消息自定义属性 | 字符限制:所有可见字符。 长度建议:属性的Key和Value总长度不超过16 KB。 系统保留属性:不允许使用以下保留属性作为自定义属性的Key。 保留属性Key | 无。 |
MessageGroup | 字符限制:所有可见字符。 长度建议:1~64字节。 | MessageGroup是顺序消息的分组标识。一般设置为需要保证顺序的一组消息标识,例如订单ID、用户ID等。 |
消息发送重试次数 | 默认值:3次。 取值范围:无限制。 | 消息发送重试是客户端SDK内置的重试策略,对应用不可见,建议取值不要过大,避免阻塞业务线程。 如果消息达到最大重试次数后还未发送成功,建议业务侧做好兜底处理,保证消息可靠性。 |
消息消费重试次数 | 默认值:16次。 | 消费重试次数应根据实际业务需求设置合理的参数值,避免使用重试进行无限触发。重试次数过大容易造成系统压力过量增加。 |
事务异常检查间隔 | 默认值:60秒。 | 事务异常检查间隔指的是,半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照该间隔时间进行事务状态回查。 间隔时长不建议设置过短,否则频繁的回查调用会影响系统性能。 |
半事务消息第一次回查时间 | 默认值:取值等于[事务异常检查间隔] * 最大限制:不超过1小时。 | 无。 |
半事务消息最大超时时长 | 默认值:4小时。 * 取值范围:不支持自定义修改。 | 半事务消息因系统重启或异常情况导致没有提交,生产者客户端会按照事务异常检查间隔时间进行回查,若超过半事务消息超时时长后没有返回结果,半事务消息将会被强制回滚。 您可以通过监控该指标避免异常事务。 |
PushConsumer本地缓存 | 默认值: * 最大缓存数量:1024条。 * 最大缓存大小:64 M。 取值范围:支持用户自定义设置,无限制。 | 消费者类型为PushConsumer时,为提高消费者吞吐量和性能,客户端会在SDK本地缓存部分消息。缓存的消息的数量和大小应设置在系统内存允许的范围内。 |
PushConsumer重试间隔时长 | 默认值: * 非顺序性投递:间隔时间阶梯变化,具体取值,请参见PushConsumer消费重试策略。 * 顺序性投递:3000毫秒。 | 无。 |
PushConsumer消费并发度 | 默认值:20个线程。 | 无。 |
获取消息最大批次 | 默认值:32条。 | 消费者从服务端获取消息时,一次获取到最大消息条数。建议按照实际业务设置合理的参数值,一次获取消息数量过大容易在消费失败时造成大批量消息重复。 |
SimpleConsumer最大不可见时间 | 默认值:用户必填参数,无默认值。 取值范围建议:最小10秒;最大12小时。 |
RocketMQ 的架构
RocketMQ 的架构分为两部分:消息存储和消息传输。
消息存储部分包括:
- NameServer:用于保存 Broker 的元数据信息,例如 Broker 的地址、主题等。
- Broker:用于存储和传递消息,包括消息的存储、消息的订阅和推送等功能。
消息传输部分包括:
- Producer:消息的生产者,负责向 Broker 发送消息。
- Consumer:消息的消费者,负责从 Broker 消费消息。
RocketMQ 的消息传输采用的是异步传输方式,生产者将消息发送到 Broker 后立即返回,不需要等待消息被消费者消费。消费者从 Broker 拉取消息后再进行消费,消费完成后向 Broker 发送确认信息,告诉 Broker 该消息已经被消费。
RocketMQ 的使用方法
RocketMQ 的使用方法分为两部分:生产者和消费者。
生产者
生产者负责将消息发送到 Broker,其主要步骤如下:
- 创建 DefaultMQProducer 对象,并设置 NameServer 的地址。
- 设置 Producer 的 Group 名称,用于区分不同的 Producer。
- 启动 Producer。
- 创建 Message 对象,设置消息的主题、标签和内容等信息。
- 调用 send 方法发送消息。
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("topic", "tag", "Hello World".getBytes());
SendResult result = producer.send(message);
producer.shutdown();
消费者
消费者负责从 Broker 消费消息,其主要步骤如下:
- 创建 DefaultMQPushConsumer 对象,并设置 NameServer 的地址。
- 设置 Consumer 的 Group 名称,用于区分不同的 Consumer。
- 注册 MessageListener 监听器,用于处理消费到的消息。
- 启动 Consumer。
- 订阅感兴趣的消息主题。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
consumer.subscribe("topic", "*");
RocketMQ 的实战应用
RocketMQ 在实际应用中有很多场景,例如:
- 日志收集:将分布式系统的日志收集到一个中心化的地方进行分析和处理。
- 消息通知:例如订单状态变更、支付成功等消息通知。
- 分布式事务:RocketMQ 提供了事务消息的功能,支持分布式事务的一致性。
以消息通知为例,实现步骤如下:
- 创建消息生产者,向指定主题发送消息。
- 创建消息消费者,订阅指定主题,并注册消息监听器。
- 在消息监听器中处理消费到的消息,例如发送邮件、短信等通知。
示例代码:
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("notification_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("notification_topic", "order_status_changed", "Order #123 status changed to shipped".getBytes());
SendResult result = producer.send(message);
producer.shutdown();
// 消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("notification_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理消息,例如发送邮件、短信等通知
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
consumer.subscribe("notification_topic", "order_status_changed");
以上示例代码仅为演示代码,实际应用中还需要进行错误处理、重试机制等。同时,RocketMQ 还支持集群部署、负载均衡、消息过滤等功能,需要根据具体业务场景进行配置和使用。
总的来说,RocketMQ 的使用方法较为简单,但在实际应用中需要考虑到各种场景和问题,需要进行深入的学习和实践。
发送普通消息
package com.aliyun.openservices;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
public class Demo {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云内网 VPC 访问,建议填写 VPC 接入点。
* 如果是在本地公网访问,或者是线下 IDC 环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
public static final String ENDPOINT = "rmq-cn-pe33aqsu40k-vpc.cn-hangzhou.rmq.aliyuncs.com:8080";
public static final String TOPIC_NAME = "rmqfctopic_nomal_retry";
public static final String TAG = "";
public static final String KEY = "";
public static final String BODY = "";
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder configBuilder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
/**
* 如果是使用公网接入点访问,configuration 还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云内网 VPC 中访问,无需填写该配置,服务端会根据内网 VPC 信息智能获取。
*/
// configBuilder.setCredentialProvider(
// new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")
// );
ClientConfiguration configuration = configBuilder.build();
/**
* 初始化 Producer 时直接配置需要使用的 Topic 列表,实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的 Topic 是否合法。
* 注意!!!事务消息 Topic 必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(configuration)
.setTopics(TOPIC_NAME)
.build();
MessageBuilder builder = provider.newMessageBuilder()
// 为当前消息设置 Topic。
.setTopic(TOPIC_NAME)
// 消息体。
.setBody(BODY.getBytes(StandardCharsets.UTF_8));
if (!KEY.isEmpty()) {
// 设置消息索引键,可根据关键字精确查找某条消息。
builder.setKeys(KEY);
}
if (!TAG.isEmpty()) {
// 设置消息 Tag,用于消费端根据指定 Tag 过滤消息。
builder.setTag(TAG);
}
// 配置消息的自定义属性
// builder.addProperty("key", "value");
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
final SendReceipt sendReceipt = producer.send(builder.build());
System.out.println("Send mq message success! Topic is:" + TOPIC_NAME + " msgId is: "
+ sendReceipt.getMessageId().toString());
} catch (Throwable t) {
System.out.println("Send mq message failed! Topic is:" + TOPIC_NAME);
t.printStackTrace();
}
// 如果不需要再使用,可关闭该进程。
producer.close();
}
}
PushConsumer 方式消费
package com.aliyun.openservices;
import java.util.Collections;
import java.io.IOException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
public class Demo {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云内网 VPC 访问,建议填写 VPC 接入点。
* 如果是在本地公网访问,或者是线下 IDC 环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
public static final String ENDPOINT = "rmq-cn-pe33aqsu40k-vpc.cn-hangzhou.rmq.aliyuncs.com:8080";
public static final String TOPIC_NAME = "rmqfctopic_nomal";
public static final String FILTER_EXPRESSION = "*";
public static final String CONSUMER_GROUP_ID = "";
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder configBuilder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
/**
* 如果是使用公网接入点访问,configuration 还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云内网 VPC 中访问,无需填写该配置,服务端会根据内网 VPC 信息智能获取。
*/
// configBuilder.setCredentialProvider(
// new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")
// );
ClientConfiguration configuration = configBuilder.build();
// 订阅消息的过滤规则。* 代表订阅全部消息。
FilterExpression filterExpression = new FilterExpression(FILTER_EXPRESSION, FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
// 为消费者指定所属的消费者分组,Group 需要提前在控制台创建,如果不创建直接使用会返回报错。
.setConsumerGroup(CONSUMER_GROUP_ID)
// 指定需要订阅哪个目标 Topic,并设置预绑定的订阅关系。Topic 需要提前在控制台创建,如果不创建直接使用会返回报错。
.setSubscriptionExpressions(Collections.singletonMap(TOPIC_NAME, filterExpression))
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
System.out.println("Consume message=" + messageView.toString());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该进程。
// pushConsumer.close();
}
}
延时消息(定时消息)
package com.aliyun.openservices;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
public class Demo {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云内网 VPC 访问,建议填写 VPC 接入点。
* 如果是在本地公网访问,或者是线下 IDC 环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
public static final String ENDPOINT = "rmq-cn-pe33aqsu40k-vpc.cn-hangzhou.rmq.aliyuncs.com:8080";
public static final String TOPIC_NAME = "rmqfctopic_delay";
public static final String TAG = "";
public static final String KEY = "";
public static final String BODY = "";
public static final String DELAY_SECONDS = "10";
public static final String DELIVERY_TIMESTAMP = "";
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder configBuilder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
/**
* 如果是使用公网接入点访问,configuration 还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云内网 VPC 中访问,无需填写该配置,服务端会根据内网 VPC 信息智能获取。
*/
// configBuilder.setCredentialProvider(
// new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")
// );
ClientConfiguration configuration = configBuilder.build();
/**
* 初始化 Producer 时直接配置需要使用的 Topic 列表,实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的 Topic 是否合法。
* 注意!!!事务消息 Topic 必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(configuration)
.setTopics(TOPIC_NAME)
.build();
MessageBuilder builder = provider.newMessageBuilder()
// 为当前消息设置 Topic。
.setTopic(TOPIC_NAME)
// 消息体。
.setBody(BODY.getBytes(StandardCharsets.UTF_8));
if (!KEY.isEmpty()) {
// 设置消息索引键,可根据关键字精确查找某条消息。
builder.setKeys(KEY);
}
if (!TAG.isEmpty()) {
// 设置消息 Tag,用于消费端根据指定 Tag 过滤消息。
builder.setTag(TAG);
}
if (!DELIVERY_TIMESTAMP.isEmpty()) {
// 设置您期望的投递消息的时间的 Unix 时间戳。
builder.setDeliveryTimestamp(Long.parseLong(DELIVERY_TIMESTAMP));
} else if (!DELAY_SECONDS.isEmpty()) {
// 设置您期望的投递消息的时间的 Unix 时间戳。
builder.setDeliveryTimestamp(
System.currentTimeMillis() + Duration.ofSeconds(Long.parseLong(DELAY_SECONDS)).toMillis());
}
// 配置消息的自定义属性
// builder.addProperty("key", "value");
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
final SendReceipt sendReceipt = producer.send(builder.build());
System.out.println("Send mq message success! Topic is:" + TOPIC_NAME + " msgId is: "
+ sendReceipt.getMessageId().toString());
} catch (Throwable t) {
System.out.println("Send mq message failed! Topic is:" + TOPIC_NAME);
t.printStackTrace();
}
// 如果不需要再使用,可关闭该进程。
producer.close();
}
}
事务消息
package com.aliyun.openservices;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
public class Demo {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云内网 VPC 访问,建议填写 VPC 接入点。
* 如果是在本地公网访问,或者是线下 IDC 环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
public static final String ENDPOINT = "rmq-cn-pe33aqsu40k-vpc.cn-hangzhou.rmq.aliyuncs.com:8080";
public static final String TOPIC_NAME = "rmqfctopic_tx";
public static final String TAG = "";
public static final String KEY = "";
public static final String BODY = "";
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder configBuilder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
/**
* 如果是使用公网接入点访问,configuration 还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云内网 VPC 中访问,无需填写该配置,服务端会根据内网 VPC 信息智能获取。
*/
// configBuilder.setCredentialProvider(
// new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")
// );
ClientConfiguration configuration = configBuilder.build();
/**
* 初始化 Producer 时直接配置需要使用的 Topic 列表,实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的 Topic 是否合法。
* 注意!!!事务消息 Topic 必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(configuration)
.setTopics(TOPIC_NAME)
/**
* 事务检查器,用于检查确认异常半事务的中间状态。事务检查器一般是根据业务的 ID 去检查本地事务是否正确提交还是回滚。
* 例如,如果在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
.setTransactionChecker(messageView -> {
System.out.println("Receive transactional message check, message=" + messageView.toString());
return TransactionResolution.COMMIT;
})
.build();
// 开启事务分支。
final Transaction transaction = producer.beginTransaction();
MessageBuilder builder = provider.newMessageBuilder()
// 为当前消息设置 Topic。
.setTopic(TOPIC_NAME)
// 消息体。
.setBody(BODY.getBytes(StandardCharsets.UTF_8));
if (!KEY.isEmpty()) {
// 设置消息索引键,可根据关键字精确查找某条消息。
builder.setKeys(KEY);
}
if (!TAG.isEmpty()) {
// 设置消息 Tag,用于消费端根据指定 Tag 过滤消息。
builder.setTag(TAG);
}
// 一般事务消息都会设置一个本地事务关联的唯一 ID,用来做本地事务回查的校验。
builder.addProperty("OrderId", "xxx");
// 配置消息的自定义属性
// builder.addProperty("key", "value");
try {
// 发送半事务消息
final SendReceipt sendReceipt = producer.send(builder.build(), transaction);
System.out.println("Send mq message success! Topic is:" + TOPIC_NAME + " msgId is: "
+ sendReceipt.getMessageId().toString());
} catch (Throwable t) {
System.out.println("Send mq message failed! Topic is:" + TOPIC_NAME);
t.printStackTrace();
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
transaction.commit();
// transaction.rollback();
// 如果不需要再使用,可关闭该进程。
producer.close();
}
}
顺序消息
package com.aliyun.openservices;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
public class Demo {
/**
* 实例接入点,从控制台实例详情页的接入点页签中获取。
* 如果是在阿里云内网 VPC 访问,建议填写 VPC 接入点。
* 如果是在本地公网访问,或者是线下 IDC 环境访问,可以使用公网接入点。使用公网接入点访问,必须开启实例的公网访问功能。
*/
public static final String ENDPOINT = "rmq-cn-pe33aqsu40k-vpc.cn-hangzhou.rmq.aliyuncs.com:8080";
public static final String TOPIC_NAME = "rmqfctopic_fifo";
public static final String TAG = "";
public static final String KEY = "";
public static final String BODY = "";
public static final String MESSAGE_GROUP = "";
public static void main(String[] args) throws ClientException, IOException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder configBuilder = ClientConfiguration.newBuilder().setEndpoints(ENDPOINT);
/**
* 如果是使用公网接入点访问,configuration 还需要设置实例的用户名和密码。用户名和密码在控制台实例详情页获取。
* 如果是在阿里云内网 VPC 中访问,无需填写该配置,服务端会根据内网 VPC 信息智能获取。
*/
// configBuilder.setCredentialProvider(
// new StaticSessionCredentialsProvider("Instance UserName", "Instance Password")
// );
ClientConfiguration configuration = configBuilder.build();
/**
* 初始化 Producer 时直接配置需要使用的 Topic 列表,实现提前检查错误配置、拦截非法配置启动。
* 针对非事务消息 Topic,也可以不配置,服务端会动态检查消息的 Topic 是否合法。
* 注意!!!事务消息 Topic 必须提前配置,以免事务消息回查接口失败,具体原理请参见事务消息。
*/
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(configuration)
.setTopics(TOPIC_NAME)
.build();
MessageBuilder builder = provider.newMessageBuilder()
// 为当前消息设置 Topic。
.setTopic(TOPIC_NAME)
// 消息体。
.setBody(BODY.getBytes(StandardCharsets.UTF_8));
if (!KEY.isEmpty()) {
// 设置消息索引键,可根据关键字精确查找某条消息。
builder.setKeys(KEY);
}
if (!TAG.isEmpty()) {
// 设置消息 Tag,用于消费端根据指定 Tag 过滤消息。
builder.setTag(TAG);
}
if (!MESSAGE_GROUP.isEmpty()) {
// 顺序消息的顺序关系通过消息组来判定和识别,发送顺序消息时需要为每条消息设置归属的消息组。
builder.setMessageGroup(MESSAGE_GROUP);
}
// 配置消息的自定义属性
// builder.addProperty("key", "value");
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
final SendReceipt sendReceipt = producer.send(builder.build());
System.out.println("Send mq message success! Topic is:" + TOPIC_NAME + " msgId is: "
+ sendReceipt.getMessageId().toString());
} catch (Throwable t) {
System.out.println("Send mq message failed! Topic is:" + TOPIC_NAME);
t.printStackTrace();
}
// 如果不需要再使用,可关闭该进程。
producer.close();
}
}
SDK相关
本文上面的示例是基于RockMQ云消息服务提供。
服务端端版本和客户端SDK版本的兼容情况如下,为了获得更完善的产品功能和更高的性能,建议您使用5.x版本实例,并优先通过RocketMQ 5.x系列SDK接入服务端。
https://help.aliyun.com/document_detail/441918.html