一、消息队列介绍
消息队列是什么
对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。
将 MQ 掰开了揉碎了来看,都是「一发一存一消费」,再直白点就是一个「转发器」。生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者,仅此而已。
上面这个图便是消息队列最原始的模型,它包含了两个关键词:消息和队列。
1、消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
2、队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
MQ的应用场景
目前,MQ 的应用场景非常多,大家能倒背如流的是:系统解耦、异步通信和流量削峰。除此之外,还有延迟通知、最终一致性保证、顺序消息、流式处理等等。
高可用松耦合架构设计
举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。
引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。
削峰填谷
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,削峰填谷是解决该问题的最佳方式;
分布式事务消息
电商的交易系统、支付红包等场景需要确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
RocketMQ介绍
RocketMQ是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件,借鉴参考了JMS规范的MQ实现,更参考了优秀的开源消息中间件KAFKA,实现了业务消峰、分布式事务的优秀框架。
产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
特点:
-
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
- Producer、Consumer、队列都可以分布式
- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
- 能够保证严格的消息顺序
- 支持拉(pull)和推(push)两种消息模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持多种消息协议,如 JMS、OpenMessaging 等
- 较少的依赖
消息队列的选择(参考):该如何选择消息队列? - 武培轩 - 博客园
消息中间件的对比
二、核心概念
生产者Producer
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
同步发送:同步发送指消息发送⽅发出数据后会在收到接收⽅发回响应之后才发下⼀个数据包。⼀般⽤于重要通知消息,例如重要通知邮件、营销短信。
异步发送:异步发送指发送⽅发出数据后,不等接收⽅发回响应,接着发送下个数据包,⼀般⽤于可能链路耗时较⻓⽽对响应时间敏感的业务场景,例如⽤户视频上传后通知启动转码服务。
单向发送:单向发送是指只负责发送消息⽽不等待服务器回应且没有回调函数触发,适⽤于某些耗时⾮常短但对可靠性要求并不⾼的场景,例如⽇志收集。
生产者组Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。
消费者Consumer
与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。
push consumer:消费者端设置Listener
pull consumer:应用可主动从Broker获取消息,主动拉取会存在消费记录位置问题(如果不记录位置,会出现重复消费)
消费者组Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 Consumer 实例组成一个消费者组。
通过组机制(Group),RocketMQ可以天然支持分布式(如下所示)。
名字服务Name Server
名称服务充当路由消息的提供者。是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
NameServer即名称服务,两个功能:
-
- 接收broker的请求,注册broker的路由信息
- 接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册;Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定时获取topic路由信息。
ps:无状态服务
无状态服务(stateless service)对单次请求的处理,不依赖其他请求,也就是说,处理一次请求所需的全部信息,要么都包含在这个请求里,要么可以从外部获取到(比如说数据库),服务器本身不存储任何信息 。
nameServer与consumer、producer以及broker之间的联系(参考):RocketMQ nameserver、broker、生产者和消费者之间的关系_LinYaoGai的博客-CSDN博客_rocketmq broker topic 关系
代理服务器Broker Server
消息中转角色,负责存储消息,转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker
是RocketMQ的核心,它不能挂的,所以需要保证broker
的高可用。
broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
消息主题Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
标签Tag
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
消息队列MessageQueue
对于每个Topic都可以设置一定数量的消息队列用来进行数据的读取,可以理解成ConsumerQueue。里面存入包括topic、起始偏移量、消息长度等内容,消费者消费信息时,根据topic查询consumerqueue文件,找到对应的topic,开始读取消息,此时读取到的数据是消息的起始偏移量和消息长度,根据消息的起始偏移量从commitlog中查找对应的偏移量位置,然后根据消息长度取commitlog中的数据,即取到了指定的消息内容。
三、源码包结构说明
rocketmq-acl:权限相关管理
rocketmq-broker:包含主要的业务逻辑,消息收发,主从同步,pagecache
rocketmq-client:一般我们使用的Consumer和Producer,都是这个包下的
rocketmq-common:公共数据结构等等
rocketmq-distribution:编译模块,编译输出
rocketmq-example:官方示例
rocketmq-filter:过滤
rocketmq-logappender:日志相关模块
rocketmq-logging:日志相关模块
rocketmq-namesrv:namesrv服务,用于服务协调
rocketmq-openmessaging:提供消息接口
rocketmq-remoting:通信组件模块,封装netty底层通信
rocketmq-srvutil:提供了一些工具类方法
rocketmq-store:消息存储
rocketmq-test:测试模块
rocketmq-tools:消息管理工具,例如mqadmin工具
四、环境搭建(基于Docker)
RocketMQ环境搭建
查找rocketmq镜像
docker search rocketmq
[root@localhost ~]# docker search rocketmq INDEX NAME DESCRIPTION STARS OFFICIAL AUTOMATED docker.io docker.io/foxiswho/rocketmq rocketmq 54 docker.io docker.io/rocketmqinc/rocketmq Image repository for Apache RocketMQ 51 docker.io docker.io/styletang/rocketmq-console-ng rocketmq-console-ng 34 docker.io docker.io/apacherocketmq/rocketmq Docker Image for Apache RocketMQ 19 docker.io docker.io/laoyumi/rocketmq 10 [OK]
以第一个为例,查找这个镜像下的版本,选择一个broker和server下载
查看镜像版本
curl https://registry.hub.docker.com/v1/repositories/foxiswho/rocketmq/tags\ | tr -d '[\[\]" ]' | tr '}' '\n'\ | awk -F: -v image='foxiswho/rocketmq' '{if(NR!=NF && $3 != ""){printf("%s:%s\n",image,$3)}}'
为了管理上的方便,rocketmq console也是必不可少的工具
docker pull docker.io/styletang/rocketmq-console-ng
启动nameserver、broker以及rocketmq-console
docker run -d -p 9876:9876 --name rmqserver foxiswho/rocketmq:server-4.3.2 docker run -d -p 10911:10911 -p 10909:10909 --name rmqbroker --link rmqserver:namesrv -e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m" foxiswho/rocketmq:broker-4.3.2 docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\ -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\ -Dcom.rocketmq.sendMessageWithVIPChannel=false"\ -t styletang/rocketmq-console-ng
访问:http://192.168.106.129:8180/#/,可见环境搭建成功。
ps:
- broker服务ip需要修改下,进入broker容器内部,修改 etc/rocketmq/broker.conf 里的brokerIP1=192.168.106.129,否则连接错误
项目集成RocketMQ
引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
测试发送消息
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.106.129:9876"); producer.start(); Message message = new Message("test","生产者发送消息".getBytes()); producer.send(message); producer.shutdown(); }
测试消费消息
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer"); consumer.setNamesrvAddr("192.168.106.129:9876"); //3 绑定消息的主题(主题订阅) consumer.subscribe("test","*"); //4 消费者监听处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("消费程:"+Thread.currentThread().getName()+ ",消息ID:"+msg.getMsgId()+ ",消息内容:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); }
五、RocketMQ基础使用
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
发送消息的三种方式
同步
必须要等消息持久化到磁盘中以后,rocketmq会给生产者返回一个信息,持久化完成。
异步
不需要等消息持久化到磁盘中,就可以执行后面的业务逻辑,可以获得到执行结果。
producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable throwable) { System.out.println("发送失败"); } }); // 注:发送完之后不要立即执行producer.shutdown();否则发不出去
单向
把消息发送到消息中间件中,不关心返回的结果的。
producer.sendOneway(message);
RocketMQ消费模式
集群消费模式
CLUSTERING,可以理解为同组公共消费。公共资源我拿了你就没有。即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这些消息,但我消费了这条消费你就不会再消费。消费者默认是集群消费方式。适用于大部分消息业务。
广播消费模式
BROADCASTING,可以理解为同组各自消费。即同一 Topic 下,同一消息会被多个实例各自都消费一次。所以,广播 消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。
集群模式,一条消息只能被消费一次;广播模式,一条消息可以被多个服务消费。
//设置广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //设置集群模式(默认) consumer.setMessageModel(MessageModel.CLUSTERING);
RocketMQ消费方式
(1)pull方式
由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能;
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("consumerGroup");
(2)push方式
由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常;
使用代码:
public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer"); consumer.setNamesrvAddr("192.168.106.129:9876"); //3 绑定消息的主题 consumer.subscribe("test","tag1||taga"); // consumer.setMessageModel(MessageModel.BROADCASTING); //4 消费者监听处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { System.out.println("消费程:"+Thread.currentThread().getName()+ ",消息ID:"+msg.getMsgId()+ ",消息内容:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); }
@Component @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.groupName}", topic = "test") public class MyMQListener implements RocketMQListener { @Override public void onMessage(Object o) { System.out.println("接收到消息,开始消费"+o.toString()); } }
长轮询机制:
RocketMQ的消费方式都是基于拉模式拉取消息的,而在这其中有一种长轮询机制(对普通轮询的一种优化),来平衡上面Push/Pull模型的各自缺点。基本设计思路是:消费者如果第一次尝试Pull消息失败(比如:Broker端没有可以消费的消息),并不立即给消费者客户端返回Response的响应,而是先hold住并且挂起请求(将请求保存至pullRequestTable本地缓存变量中),然后Broker端的后台独立线程—PullRequestHoldService会从pullRequestTable本地缓存变量中不断地去取,具体的做法是查询待拉取消息的偏移量是否小于消费队列最大偏移量,如果条件成立则说明有新消息达到Broker端。
ps:想知道push实现过程可以参考这个:RocketMQ的push模式机制 - 知乎
延时消息
RocketMQ提供了延时消息类型,简单来说就是生产者在发送消息的时候指定一个延时时间,当到达延时时间之后消息才能够被投送到消费者。目前指定的延时时间间隔有1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h,用等级来表示时间间隔。
延时消息实现原理图
业务代码commitLog.java-->asyncPutMessage
// 判断消息类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 把消息存到SCHEDULE_TOPIC_XXXX这个topic中
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
延时消息处理
RocketMQ提供了定时任务服务ScheduleMessageService,通过定时任务的方式不断的读取topic为SCHEDULE_TOPIC_XXXX何queueId为延时等级的消息进行消息还原处理,这样消息被还原之后消费者就可以拉取消息了。
public void executeOnTimeup() {
//获取消费者消息
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//读取消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
// 到了延时时间还原消息
if (countdown <= 0) {
//获取消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
//还原消息信息topic名称等
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
//重新将消息持久化到commitlog中
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
// 没到延时时间,进行下一次定时任务
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
发送消息示例
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.106.129:9876");
producer.start();
int totalMessageSend = 10;
for (int i = 0;i<totalMessageSend;i++){
Message message = new Message("test","生产者发送消息".getBytes());
// 设置延时等级
message.setDelayTimeLevel(i);
producer.send(message);
}
producer.shutdown();
}
顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
生产者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.106.129:9876");
producer.start();
int totalMessageSend = 20;
for (int i = 0;i<totalMessageSend;i++){
Message message = new Message("test2","*",("生产者发送消息sync"+i).getBytes());
//message.setDelayTimeLevel(0);
producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //可以根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, 1L); // 这个1可以是订单id
}
// Thread.sleep(1000);
producer.shutdown();
}
消费者:
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
consumer.setNamesrvAddr("192.168.106.129:9876");
//3 绑定消息的主题
consumer.subscribe("test2","*");
// consumer.setMessageModel(MessageModel.BROADCASTING);
//4 消费者监听处理消息的方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println("消费程:"+Thread.currentThread().getName()+
",消息ID:"+msg.getMsgId()+
",消息内容:"+new String(msg.getBody())+"队列id:"+msg.getQueueId());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
执行结果:
消息过滤
Tag标签过滤
producer
Message message = new Message("test","taga",("生产者发送消息sync"+i).getBytes());
consumer
consumer.subscribe("test","tag1||taga");
sql92(参考官方样例)
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它,只有使用push模式的消费者才能用使用SQL92标准的sql语句。。
-
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
-
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
样例:
生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
springboot整合RocketMQ
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
可以使用 RocketMQTemplate rocketMQTemplate进行发送消息
@Component
public class RocketMqHelper {
/**
* 日志
*/
private static final Logger LOG = LoggerFactory.getLogger(RocketMqHelper.class);
/**
* rocketmq模板注入
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostConstruct
public void init() {
LOG.info("---RocketMq助手初始化---");
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
*/
public void asyncSend(Enum topic, Message<?> message) {
asyncSend(topic.name(), message, getDefaultSendCallBack());
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(Enum topic, Message<?> message, SendCallback sendCallback) {
asyncSend(topic.name(), message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
*/
public void asyncSend(String topic, Message<?> message) {
rocketMQTemplate.asyncSend(topic, message, getDefaultSendCallBack());
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback) {
rocketMQTemplate.asyncSend(topic, message, sendCallback);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout);
}
/**
* 发送异步消息
*
* @param topic 消息Topic
* @param message 消息实体
* @param sendCallback 回调函数
* @param timeout 超时时间
* @param delayLevel 延迟消息的级别
*/
public void asyncSend(String topic, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) {
rocketMQTemplate.asyncSend(topic, message, sendCallback, timeout, delayLevel);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(Enum topic, Message<?> message, String hashKey) {
syncSendOrderly(topic.name(), message, hashKey);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey) {
LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
/**
* 发送顺序消息
*
* @param message
* @param topic
* @param hashKey
* @param timeout
*/
public void syncSendOrderly(String topic, Message<?> message, String hashKey, long timeout) {
LOG.info("发送顺序消息,topic:" + topic + ",hashKey:" + hashKey + ",timeout:" + timeout);
rocketMQTemplate.syncSendOrderly(topic, message, hashKey, timeout);
}
public void sendFilterProperty(String topic, String message, Map<String,Object> map){
rocketMQTemplate.convertAndSend(topic,message,map);
}
/**
* 默认CallBack函数
*
* @return
*/
private SendCallback getDefaultSendCallBack() {
return new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
LOG.info("---发送MQ成功---");
}
@Override
public void onException(Throwable throwable) {
LOG.error("---发送MQ失败---"+throwable.getMessage(), throwable.getMessage());
}
};
}
@PreDestroy
public void destroy() {
LOG.info("---RocketMq助手注销---");
}
}
六、事务消息
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC(两阶段提交)的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:
Half(Prepare) Message——半消息(预处理消息)
半消息是一种特殊的消息类型,该状态的消息暂时不能被Consumer消费。当一条事务消息被成功投递到Broker上,但是Broker并没有接收到Producer发出的二次确认时,该事务消息就处于"暂时不可被消费"状态,该状态的事务消息被称为半消息。
Message Status Check——消息状态回查
由于网络抖动、Producer重启等原因,可能导致Producer向Broker发送的二次确认消息没有成功送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态(Commit 或 Rollback)。可以看出,Message Status Check主要用来解决分布式事务中的超时问题。
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
half消息
在RocketMQ事务消息的主要流程中,一阶段的消息如何对用户不可见。其中,事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的。那么,如何做到写入消息但是对用户不可见呢?RocketMQ事务消息的做法是:如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。(ps:类似延时消息原理)
七、消息持久化
我们知道 RocketMQ 是一款高性能、高可靠的分布式消息中间件,高性能和高可靠是很难兼得的。因为要保证高可靠,那么数据就必须持久化到磁盘上,将数据持久化到磁盘,那么可能就不能保证高性能了。
RocketMQ 在持久化的设计上,采取的是「消息顺序写、随机读的策略」,利用磁盘顺序写的速度,让磁盘的写速度不会成为系统的瓶颈。并且采用 MMPP 这种“零拷贝”技术,提高消息存盘和网络发送的速度。极力满足 RocketMQ 的高性能、高可靠要求。
消息存储架构
RocketMQ持久化机制架构图
在 RocketMQ 持久化机制中,涉及到了三个角色:
-
- 「CommitLog」:消息真正的存储文件,所有消息都存储在 CommitLog 文件中。
- 「ConsumeQueue」:消息消费逻辑队列,类似数据库的索引文件。
- 「IndexFile」:消息索引文件,主要存储消息 Key 与 offset 对应关系,提升消息检索速度。
CommitLog 文件是存放消息数据的地方,所有的消息都将存入到 CommitLog 文件中。生产者将消息发送到 RocketMQ 的 Broker 后,Broker 服务器会将「消息顺序写入到 CommitLog 文件中」,这也就是 RocketMQ 高性能的原因,因为我们知道磁盘顺序写特别快,RocketMQ 充分利用了这一点,极大的提高消息写入效率。
但是消费者消费消息的时候,可能就会遇到麻烦,每一个消费者只能订阅一个主题,消费者关心的是订阅主题下的所有消息,但是同一主题的消息在 CommitLog 文件中可能是不连续的,那么「消费者消费消息的时候,需要将 CommitLog 文件加载到内存中遍历查找订阅主题下的消息,频繁的 IO 操作,性能就会急速下降」。
为了解决这个问题,RocketMQ 引入了 Consumequeue 文件。「Consumequeue 文件可以看作是索引文件,类似于 MySQL 中的二级索引」。在存放了同一主题下的所有消息,消费者消费的时候只需要去对应的 Consumequeue 组中取消息即可。Consumequeue 文件不会存储消息的全量信息,原理类似MySQL 索引。
关于indexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
ps:上图针对的是集群模式下的消费者消费信息,那么广播模式怎么实现的,一个消费这消费所有消息。
简单说就是两方面:
1:广播模式消费场景下的rebalance过程相比集群模式而言不存在分配MessageQueue的过程,每个consumer负责订阅的topic下的所有MessageQueue。
参考源码:RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 负载均衡的过程中没有进行AllocateMessageQueueStrategy进行分配
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
2:针对广播模式,我们使用本地文件LocalFileOffsetStore来存储消费位移。
参考源码:DefaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 使用本地存储
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
消息刷盘
在 RocketMQ 中提供了「同步刷盘」和「异步刷盘」两种刷盘方式,可以通过 Broker 配置文中中的 flushDiskType 参数来设置(SYNC_FLUSH、ASYNC_FLUSH)。
「异步刷盘方式(默认)」:消息写入到内存的 PAGECACHE中,就立刻给客户端返回写操作成功,当 PAGECACHE 中的消息积累到一定的量时,触发一次写操作,将 PAGECACHE 中的消息写入到磁盘中。这种方式「吞吐量大,性能高,但是 PAGECACHE 中的数据可能丢失,不能保证数据绝对的安全」。
「同步刷盘方式」:消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。这种方式「可以保证数据绝对安全,但是吞吐量不大」
欢迎关注公众号