RocketMQ、Kafka、RabbitMQ的对比
-
1.ActiveMQ:Apache出品的比较老的消息中间件
-
2.Kafka:支持日志消息,监控数据,是一种高吞吐量的分布式发布订阅消息系统,支持百万级别的单机吞吐量,但是可能会造成数据丢失
-
3.RocketMQ:阿里在使用Kafka之后发现了它的消息系统主要定位于日志传输,并且有可能会造成数据丢失,对于淘宝的一些核心功能,是绝对不允许出现数据丢失的,因此RocketMQ就基于Kafka而诞生,定位于非日志的可靠消息传输;
-
4.RabbitMQ:由Erlang语言开发的AMQP(高级消息队列协议)的开源实现;它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品,开发语言等条件的限制;
什么是消息队列?
是分布式系统中重要的组件,消息是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象
消息被发送到队列中,消息队列是在消息的传输过程中保存消息的容器,消息队列管理器在将消息从它的原中继到它的目标时充当中间人,队列的主要目的是提供路由,保证消息的传递,如果发送消息时消费者不可用,消息队列会保留消息,直到可以成功传递它
消息队列是一种应用间的通信方式,消息发送可以立即返回,有消息系统来确保信息的可靠传递,消息生产者只管把消息发布到MQ中,而不管谁来取,消息消费者只管从MQ中取而不管是谁发布的,这样生产者和消费者相互都不知道对方是谁;
为什么要使用RocketMQ?
根据阿里的研究,随着队列和虚拟主题的增加,ActiveMQ IO模块达到了一个瓶颈,阿里尽力通过节流,断路器或降级来解决这个问题,但是效果并不理想,于是阿里尝试了流行的消息传递解决方案Kafka,但是由于其高延迟和低可靠性的问题,因此也不能满足要求
因此RocketMQ就诞生了,架构简单,业务功能丰富,具备极强可扩展性,高可用,高伸缩,最终一致性等特点被广泛应用
RocketMQ的优点
异步解耦(广告流水更新)
当系统间没有实时的数据交换要求,但还需要其他业务信息时,可用通过消息队列来达到系统间解耦的作用,只要发布方定义好消息队列的格式,消费方的任何操作均与发布方无关,减少了不必要的联调和发布冲突等影响;
削峰填谷
特殊场景下,比如秒杀,春晚红包等万亿流量的脉冲式压力下,消息队列可以保护系统免于崩溃
通过高性能的存储和处理能力,将超过系统处理能力的多余流量暂时存储起来,并在系统处理能力内平缓的释放出来,从而达到削峰的效果
分布式缓存同步
例如在支付操作完成之后,将支付结果发到短信通知指定的消息topic下让非核心的操作异步化,从而提高整个业务链路的高效和稳定;
核心概念
生产者
负责生产消息,一般由业务系统负责生产消息,一个消息生产者会把业务应用系统里面产生的消息发送到Broker服务器,RocketMQ提供多种发送方式,同步发送,异步发送,顺序发送,单向发送
同步和异步方式均需要Broker返回确认信息,单向发送不需要
topic:表示要发送的消息的主题
tag:表示要发送的消息的标签
提示:Topic消息主题,通过Topic对不同的业务消息进行分类,而Tag消息标签,用来进一步区分Topic下的消息分类,消息从生产者发出即带上的属性,因此Topic可以理解为一级分类,Tag为二级分类,Topic和Tag关系如下:
body:表示消息的存储内容
properties:表示消息属性
keys:每个消息可以在业务层面设置唯一标识码keys字段,方便将来定位消息丢失问题,Broker端会为每个消息创建索引(哈希索引),应该可以通过topic,key来查询这条消息内容,以及消息被谁消费,由于是hash索引,因此务必保证key的唯一性,避免hash冲突
transactionId:会在事务中使用
MessageQueue队列:为了支持高并发和水平拓展,需要对Topic进行分区,在RocketMQ中这被称为队列,一个Topic可能有多个队列,并且可能分布在不同的Broker上
一般来说一个消息如果没有重复发送,则只会存在在Topic的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,叫最大位点MaxOffset,队列的起始位置对应的位置叫做起始位点MinOffset,队列可以提升消息发送和消费的并发度;
同步发送:
需要收到服务器同步响应之后才会发送下一条消息;
异步发送:
需要实现异步发送回调接口SendCallback处理响应结果;
单向模式发送:
调用sendOneway,但是不会对返回结果有任何等待和处理;
消费者
负责消费消息,一般是后台系统负责异步消费,一个消息消费者会从Broker服务器拉取消息,并将其提供给应用程序,从用户应用角度而言提供了两种消费形式:拉取式消费(Push),推动式消费(Pull)
如果多个消费者设置了相同的Consumer Group我们则认为这些消费者在同一个消费组内;
Push消费
Push消费是服务端主动推送消息给客户端,优点是及时性好,但如果客户端没有做好流控,一旦客户端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃;
流程:
1.创建消费者组
2.设置Name Server
3.订阅指定Topic,并且增加消息过滤条件
4.注册消息监听器(回调接口)编写消费逻辑来处理从Broker中收到的消息
Pull消费(基本不用)
Pull消费是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频率容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时;
集群模式
一个消息只会传给一个消费者消费;
如果在一个消费者组中设置了消息模式,那么只要是该消费者组下的消费者则都会应用到该模式;
因此消费模式的设置是组级别的;
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式
需要对同一个消息进行不同处理的时候使用,一个消息会发送给消费组中的所有的消费者;
consumer.setMessageModel(MessageModel.BROADCASTING);
并发消费
注册消息监听器的时候传入MessageListenerConcurrently的实现来完成;
顺序消费
在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证在同一个对列中按照FIFO的顺序,也无法保证消息实际被顺序消费,因此提供了顺序消费的方式
注册消息监听器的时候传入MessageListenerConcurrently接口的实现来完成;
消息过滤
指消息生产者向Topic中发送消息的时候,设置消息属性对消息进行分类,消费者订阅Topic的时候,根据消息属性设置过滤条件对消息进行过滤,只有符合条件的消息才会被投递到消费端进行消费,
Tag过滤:
消费者订阅的Tag和发送者设置的Tag相互匹配,则消息被投递给消费端进行消费
场景:简单过滤场景,一条消息支持设置一个Tag,仅需要对Topic中的消息进行一级分类并过滤的时候可以使用
SQL92过滤:
发送者设置Tag或者消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费
场景:复杂过滤场景,一条消息支持设置多个属性,可根据SQL语法自定义组合多种类型的表达式对消息进行多级分类并实现多维度的过滤;
消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常 可以认为有以下几种情况: 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手 机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消 息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即 使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一 条消息,这样可以减轻Broker重试消息的压力。 RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需 要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为 各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置 多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。 RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台 定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试; 达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消 息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信 消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制 台对死信队列中的消息进行重发来使得消费者实例再次进行消费
定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的 topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意, messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可: msg.setDelayLevel(level)。level有以下三种情况:
level == 0,消息为非延迟消息
1<=Level<=maxLevel,消息延迟特定时间,例如Level=1,延迟1s
Level> maxLevel,则Level==maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的 queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟 的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
名字服务NameServer
名称服务充当路由消息的提供者,类似于注册中心,生产者或者消费者能够通过名字服务查找各个主题对应的Broker IP列表,多个Name Server实例组成集群,但相互独立,没有信息交换
代理服务器BrokerServer
消息中转角色,负责存储消息和转发消息,代理服务器在RocketMQ系统中负责接收从生产者发送过来的消息并存储,同时为消费者的拉取请求做准备,代理服务器也存储消息相关的元数据,包括消费者组,消费进度便宜和主题以及队列消息等
消息内容Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题,RocketMQ中每个消息拥有唯一的MessageID,并且可以携带具有业务标识的keys;
SpringBoot集成RocketMQ
这里以发送异步消息为例
导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
生产者端
1.注入RocketMQTemplate
创建Message类实现SendCallBack接口:
@Slf4j
public class MQMessage implements SendCallback {
private String tag;
private Object data;
public MQMessage(String tag,Object data){
this.tag = tag;
this.data=data;
}
@Override
public void onSuccess(SendResult sendResult) {
log.info("[创建订单]异步下单发送消息成功-----------目标标签"+tag);
}
@Override
public void onException(Throwable throwable) {
log.error("[创建订单]消息发送失败----------",throwable);
}
}
发送异步消息
//创建订单
OrderMessage message = new OrderMessage(time,seckillId,token,userInfo.getPhone());
MQMessage sendCallBack = new MQMessage("创建订单", message);
rocketMQTemplate.asyncSend(MQConstant.ORDER_PEDDING, message,sendCallBack);
消费者端
注意:不是在Controller层进行处理,而是在监听器层面,需要另外创建一个监听器类。
1.实现RocketMQListener接口,泛型是要接收的消息的类型,也就是OrderMessage;
2.贴注解@RocketMQMessageListener去指定消费者组,主题以及标签:
@RocketMQMessageListener(
consumerGroup = MQConstant.ORDER_PEDDING_CONSUMER_GROUP,
topic = MQConstant.ORDER_PEDDING_TOPIC,
selectorExpression = MQConstant.ORDER_PEDDING_TAG
)
这里我是全部封装在一个类里面的,可以给大家看看,主题和标签的关系需要用:来标记
最后在OnMessage方法中进行消费(消费者端的业务逻辑实现):
@Override
public void onMessage(OrderMessage orderMessage) {
log.info("[创建订单....],对应秒杀商品id为:"+orderMessage.getSeckillId());
OrderMQResult message = new OrderMQResult(orderMessage.getTime(),orderMessage.getSeckillId(),null,orderMessage.getToken(),"订单创建成功", Result.SUCCESS_CODE);
MQMessage sendCallBack = new MQMessage("创建订单", message);
String topic = MQConstant.ORDER_RESULT_SUCCESS_DEST;
try {
String orderNo = orderInfoService.doSeckill(orderMessage.getUserPhone(), orderMessage.getSeckillId());
// 订单创建成功
log.info("[订单创建成功------]");
message.setOrderNo(orderNo);
// 发送延迟消息
rocketMQTemplate.asyncSend(MQConstant.ORDER_PAY_TIMEOUT, MessageBuilder.withPayload(message).build(),new MQMessage("延迟消息",message.getOrderNo()+""),1000,9);
} catch (Exception e) {
e.printStackTrace();
log.error("[订单创建失败------]",e);
message.setMsg(SeckillCodeMsg.SECKILL_ERROR.getMsg());
message.setCode(SeckillCodeMsg.SECKILL_ERROR.getCode());
topic = MQConstant.ORDER_RESULT_FAIL_DEST;
// 订单创建失败
}finally {
rocketMQTemplate.asyncSend(topic,message,sendCallBack);
}
}
更改配置后启动:mqbroker -c ../conf/broker.conf