消息中间件——RocketMQ(与Kafka、RabbitMQ的对比)

news2025/1/19 14:32:04

RocketMQ、Kafka、RabbitMQ的对比

  • 1.ActiveMQ:Apache出品的比较老的消息中间件

  • 2.Kafka:支持日志消息,监控数据,是一种高吞吐量的分布式发布订阅消息系统,支持百万级别的单机吞吐量,但是可能会造成数据丢失

  • 3.RocketMQ:阿里在使用Kafka之后发现了它的消息系统主要定位于日志传输,并且有可能会造成数据丢失,对于淘宝的一些核心功能,是绝对不允许出现数据丢失的,因此RocketMQ就基于Kafka而诞生,定位于非日志的可靠消息传输;

  • 4.RabbitMQ:由Erlang语言开发的AMQP(高级消息队列协议)的开源实现;它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品,开发语言等条件的限制;

什么是消息队列?

是分布式系统中重要的组件,消息是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象

消息被发送到队列中,消息队列是在消息的传输过程中保存消息的容器,消息队列管理器在将消息从它的原中继到它的目标时充当中间人,队列的主要目的是提供路由,保证消息的传递,如果发送消息时消费者不可用,消息队列会保留消息,直到可以成功传递它

消息队列是一种应用间的通信方式,消息发送可以立即返回,有消息系统来确保信息的可靠传递,消息生产者只管把消息发布到MQ中,而不管谁来取,消息消费者只管从MQ中取而不管是谁发布的,这样生产者和消费者相互都不知道对方是谁;

为什么要使用RocketMQ?

根据阿里的研究,随着队列和虚拟主题的增加,ActiveMQ IO模块达到了一个瓶颈,阿里尽力通过节流,断路器或降级来解决这个问题,但是效果并不理想,于是阿里尝试了流行的消息传递解决方案Kafka,但是由于其高延迟和低可靠性的问题,因此也不能满足要求

因此RocketMQ就诞生了,架构简单,业务功能丰富,具备极强可扩展性,高可用,高伸缩,最终一致性等特点被广泛应用

RocketMQ的优点

异步解耦(广告流水更新)

 当系统间没有实时的数据交换要求,但还需要其他业务信息时,可用通过消息队列来达到系统间解耦的作用,只要发布方定义好消息队列的格式,消费方的任何操作均与发布方无关,减少了不必要的联调和发布冲突等影响;

削峰填谷

特殊场景下,比如秒杀,春晚红包等万亿流量的脉冲式压力下,消息队列可以保护系统免于崩溃

通过高性能的存储和处理能力,将超过系统处理能力的多余流量暂时存储起来,并在系统处理能力内平缓的释放出来,从而达到削峰的效果

分布式缓存同步

例如在支付操作完成之后,将支付结果发到短信通知指定的消息topic下让非核心的操作异步化,从而提高整个业务链路的高效和稳定;

核心概念

 生产者

负责生产消息,一般由业务系统负责生产消息,一个消息生产者会把业务应用系统里面产生的消息发送到Broker服务器,RocketMQ提供多种发送方式,同步发送,异步发送,顺序发送,单向发送

同步和异步方式均需要Broker返回确认信息,单向发送不需要

topic:表示要发送的消息的主题

tag:表示要发送的消息的标签

提示:Topic消息主题,通过Topic对不同的业务消息进行分类,而Tag消息标签,用来进一步区分Topic下的消息分类,消息从生产者发出即带上的属性,因此Topic可以理解为一级分类,Tag为二级分类,Topic和Tag关系如下:

 

body:表示消息的存储内容

properties:表示消息属性

keys:每个消息可以在业务层面设置唯一标识码keys字段,方便将来定位消息丢失问题,Broker端会为每个消息创建索引(哈希索引),应该可以通过topic,key来查询这条消息内容,以及消息被谁消费,由于是hash索引,因此务必保证key的唯一性,避免hash冲突

transactionId:会在事务中使用

MessageQueue队列:为了支持高并发和水平拓展,需要对Topic进行分区,在RocketMQ中这被称为队列,一个Topic可能有多个队列,并且可能分布在不同的Broker上

一般来说一个消息如果没有重复发送,则只会存在在Topic的其中一个队列中,消息在队列中按照先进先出的原则存储,每条消息会有自己的位点,每个队列会统计当前消息的总条数,叫最大位点MaxOffset,队列的起始位置对应的位置叫做起始位点MinOffset,队列可以提升消息发送和消费的并发度;

同步发送:

需要收到服务器同步响应之后才会发送下一条消息;

异步发送:

需要实现异步发送回调接口SendCallback处理响应结果;

单向模式发送:

调用sendOneway,但是不会对返回结果有任何等待和处理;

消费者

负责消费消息,一般是后台系统负责异步消费,一个消息消费者会从Broker服务器拉取消息,并将其提供给应用程序,从用户应用角度而言提供了两种消费形式:拉取式消费(Push),推动式消费(Pull)

如果多个消费者设置了相同的Consumer Group我们则认为这些消费者在同一个消费组内;

Push消费

Push消费是服务端主动推送消息给客户端,优点是及时性好,但如果客户端没有做好流控,一旦客户端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃;

流程:

1.创建消费者组

2.设置Name Server

3.订阅指定Topic,并且增加消息过滤条件

4.注册消息监听器(回调接口)编写消费逻辑来处理从Broker中收到的消息

Pull消费(基本不用)

Pull消费是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频率容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时;

集群模式

一个消息只会传给一个消费者消费;

如果在一个消费者组中设置了消息模式,那么只要是该消费者组下的消费者则都会应用到该模式;

因此消费模式的设置是组级别的;

consumer.setMessageModel(MessageModel.CLUSTERING);

广播模式

需要对同一个消息进行不同处理的时候使用,一个消息会发送给消费组中的所有的消费者;

consumer.setMessageModel(MessageModel.BROADCASTING);

并发消费

注册消息监听器的时候传入MessageListenerConcurrently的实现来完成;

顺序消费

在并发消费中,可能会有多个线程同时消费一个队列的消息,因此即使发送端通过发送顺序消息保证在同一个对列中按照FIFO的顺序,也无法保证消息实际被顺序消费,因此提供了顺序消费的方式

注册消息监听器的时候传入MessageListenerConcurrently接口的实现来完成;

消息过滤

指消息生产者向Topic中发送消息的时候,设置消息属性对消息进行分类,消费者订阅Topic的时候,根据消息属性设置过滤条件对消息进行过滤,只有符合条件的消息才会被投递到消费端进行消费,

Tag过滤:

        消费者订阅的Tag和发送者设置的Tag相互匹配,则消息被投递给消费端进行消费

场景:简单过滤场景,一条消息支持设置一个Tag,仅需要对Topic中的消息进行一级分类并过滤的时候可以使用

SQL92过滤:

        发送者设置Tag或者消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费

场景:复杂过滤场景,一条消息支持设置多个属性,可根据SQL语法自定义组合多种类型的表达式对消息进行多级分类并实现多维度的过滤;

消息重试

        Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常 可以认为有以下几种情况: 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手 机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消 息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即 使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一 条消息,这样可以减轻Broker重试消息的压力。         RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需 要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为 各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置 多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。 RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台 定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试; 达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消 息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信 消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制 台对死信队列中的消息进行重发来使得消费者实例再次进行消费

定时消息

        定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的 topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意, messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可: msg.setDelayLevel(level)。level有以下三种情况:

level == 0,消息为非延迟消息

1<=Level<=maxLevel,消息延迟特定时间,例如Level=1,延迟1s

Level> maxLevel,则Level==maxLevel,例如level==20,延迟2h

        定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的 queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟 的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

         需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

名字服务NameServer

名称服务充当路由消息的提供者,类似于注册中心,生产者或者消费者能够通过名字服务查找各个主题对应的Broker IP列表,多个Name Server实例组成集群,但相互独立,没有信息交换

代理服务器BrokerServer

消息中转角色,负责存储消息和转发消息,代理服务器在RocketMQ系统中负责接收从生产者发送过来的消息并存储,同时为消费者的拉取请求做准备,代理服务器也存储消息相关的元数据,包括消费者组,消费进度便宜和主题以及队列消息等

消息内容Message

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题,RocketMQ中每个消息拥有唯一的MessageID,并且可以携带具有业务标识的keys;

SpringBoot集成RocketMQ

这里以发送异步消息为例

导入依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>

生产者端

1.注入RocketMQTemplate

创建Message类实现SendCallBack接口:

@Slf4j
public class MQMessage implements SendCallback {

    private String tag;
    private Object data;

    public MQMessage(String tag,Object data){
        this.tag = tag;
        this.data=data;
    }

    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("[创建订单]异步下单发送消息成功-----------目标标签"+tag);
    }

    @Override
    public void onException(Throwable throwable) {
        log.error("[创建订单]消息发送失败----------",throwable);
    }
}

发送异步消息

   //创建订单
        OrderMessage message = new OrderMessage(time,seckillId,token,userInfo.getPhone());
        MQMessage sendCallBack = new MQMessage("创建订单", message);
        rocketMQTemplate.asyncSend(MQConstant.ORDER_PEDDING, message,sendCallBack);

消费者端

注意:不是在Controller层进行处理,而是在监听器层面,需要另外创建一个监听器类。

1.实现RocketMQListener接口,泛型是要接收的消息的类型,也就是OrderMessage;

 2.贴注解@RocketMQMessageListener去指定消费者组,主题以及标签:

@RocketMQMessageListener(
        consumerGroup = MQConstant.ORDER_PEDDING_CONSUMER_GROUP,
        topic = MQConstant.ORDER_PEDDING_TOPIC,
        selectorExpression = MQConstant.ORDER_PEDDING_TAG
)

 这里我是全部封装在一个类里面的,可以给大家看看,主题和标签的关系需要用:来标记

最后在OnMessage方法中进行消费(消费者端的业务逻辑实现):

    @Override
    public void onMessage(OrderMessage orderMessage) {
        log.info("[创建订单....],对应秒杀商品id为:"+orderMessage.getSeckillId());
        OrderMQResult message = new OrderMQResult(orderMessage.getTime(),orderMessage.getSeckillId(),null,orderMessage.getToken(),"订单创建成功", Result.SUCCESS_CODE);
        MQMessage sendCallBack = new MQMessage("创建订单", message);
        String topic = MQConstant.ORDER_RESULT_SUCCESS_DEST;
        try {
            String orderNo = orderInfoService.doSeckill(orderMessage.getUserPhone(), orderMessage.getSeckillId());
            // 订单创建成功
            log.info("[订单创建成功------]");
            message.setOrderNo(orderNo);

            // 发送延迟消息
            rocketMQTemplate.asyncSend(MQConstant.ORDER_PAY_TIMEOUT, MessageBuilder.withPayload(message).build(),new MQMessage("延迟消息",message.getOrderNo()+""),1000,9);

        } catch (Exception e) {
            e.printStackTrace();
            log.error("[订单创建失败------]",e);
            message.setMsg(SeckillCodeMsg.SECKILL_ERROR.getMsg());
            message.setCode(SeckillCodeMsg.SECKILL_ERROR.getCode());
            topic = MQConstant.ORDER_RESULT_FAIL_DEST;
            // 订单创建失败
        }finally {
            rocketMQTemplate.asyncSend(topic,message,sendCallBack);
        }
    }

更改配置后启动:mqbroker -c ../conf/broker.conf

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/554464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot+jsp+java流浪动物猫狗领养救助网站367hp

本流浪猫狗领养救助网站共包含14个表:分别是宠物类型信息表&#xff0c;配置文件信息表&#xff0c;流浪宠物评论表信息表&#xff0c;活动类型信息表&#xff0c;领养宠物信息表&#xff0c;领养中心信息表&#xff0c;流浪宠物信息表&#xff0c;宠物知识信息表&#xff0c;收…

高压功率放大器在木结构的螺栓连接松动检测系统中的应用

实验名称&#xff1a;功率放大器在面向木结构的螺栓连接松动检测系统中的应用 实验设备&#xff1a; 计算机、压电传感器PZT、D型数显扭矩扳手、NIELVISII&#xff0b;数据采集卡、ATA-2021H功率放大器等。 实验过程&#xff1a; 设计了一种基于压电时间反演法的木材连接螺栓松…

2023年认证杯SPSSPRO杯数学建模C题(第一阶段)心脏危险事件全过程文档及程序

2023年认证杯SPSSPRO杯数学建模 C题 心脏危险事件 原题再现&#xff1a; 心脏的每一次搏动都伴随着心脏的电生理活动。心脏的起博点通过放电&#xff0c;使电流传导到每个心肌纤维&#xff0c;接收到电信号后&#xff0c;相应的心肌纤维完成一次收缩&#xff0c;心脏也就随之…

SpringBoot【开发实用篇】---- 整合第三方技术(监控)

SpringBoot【开发实用篇】---- 整合第三方技术&#xff08;监控&#xff09; 1. 监控的意义2. 可视化监控平台3. 监控原理 在说监控之前&#xff0c;需要回顾一下软件业的发展史。最早的软件完成一些非常简单的功能&#xff0c;代码不多&#xff0c;错误也少。随着软件功能的逐…

在Window10和Window11系统,WPF使用Viewport3D 渲染失败问题解决方案

最近遇到个棘手的问题&#xff1a;在供应商提供的戴尔optiplex 3000的12代处理器主机的集成显卡Intel(R) UHD Graphics 770上使用Viewport3D 渲染失败&#xff08;3D模型显示不了&#xff0c;或者是显示不全&#xff09;&#xff0c;之前开发验证使用的是集成显卡Intel(R) UHD …

【FOSS】新一代绿色节能对象存储

01 背景概述 2020年9月中国明确了“碳达峰、碳中和”目标&#xff0c;2021年&#xff0c;碳达峰、碳中和被首次写入政府工作报告。该事件标志着中国对促进经济高质量发展&#xff0c;社会繁荣和生态环境保护的决心。 据IDC白皮书预测&#xff0c;中国将在2025年成为全球最大数…

团队数千人,苹果XR头显核心高管大曝光

上周&#xff0c;彭博社Mark Gurman从参与研发的相关人士了解到的消息&#xff0c;阐述了苹果XR头显开发简史。本周&#xff0c;继续公布了参与到苹果XR头显研发工作的一些关键岗位或高管人士。相关阅读&#xff1a;《苹果XR头显简史&#xff1a;现实困境与未来预期》 Mark Gu…

Flutter Overlay 你用上了么

Flutter Overlay 你用上了么 前言 Flutter中的Overlay是一个用于在屏幕上显示浮层的组件。它可以用来在应用程序中创建弹出窗口、提示框、菜单、对话框等等。 Overlay通常用于在用户与应用程序交互时显示临时性的UI元素&#xff0c;例如&#xff1a;用户点击按钮时显示下拉菜单…

3dMax一键窗户可入库插件使用方法详解

3dMax一键窗户(可入库)插件使用教程 3dMax一键窗户(可入库)插件,支持在选中的多边形上创建窗户模型,并可以自定义窗户形状,保存到库里下次使用。 【安装方法】 1.复制“窗户样本”文件夹到D盘根目录(D:\窗户样本) 2.拖动插件脚本到3dmax视口中打开即可。 【创建窗户】…

SolidWorks装配体中让弹簧随装配体运动的方法

弹簧是我们日常设计中最常用的几种零部件之一&#xff0c;但是弹簧不跟螺栓一样装好之后是相对静止的&#xff0c;弹簧在装配好后需要进行运动&#xff0c;在SolidWorks装配体中可以让弹簧跟随其他物体运动&#xff0c;操作分为三大步&#xff1a; 一、创建弹簧&#xff08;使…

微服务: Seata AT 分布式事务配置出现异常解决(相当全面)(下篇)

目录 1. 文章传送门 -> 上篇传送门: 微服务: Seata AT 分布式事务以及配置方式(上篇) -> 中篇传送门: 微服务: Seata AT springCloud整合分布式事务以配置方式(中篇) 2. 异常总结分类: 3. 解决上述问题: -> 解决上述问题一: 1. no available service null f…

面了一个测试工程师要求月薪23K,总感觉他藏了很多面试题...

最近有朋友去华为面试&#xff0c;面试前后进行了20天左右&#xff0c;包含4轮电话面试、1轮笔试、1轮主管视频面试、1轮hr视频面试。 据他所说&#xff0c;80%的人都会栽在第一轮面试&#xff0c;要不是他面试前做足准备&#xff0c;估计都坚持不完后面几轮面试。 其实&…

七人拼团系统开发模式,如何做到短短几个月就销售额上亿?

随着经济的迅速发展&#xff0c;市场上的商业模式也是层出不穷&#xff0c;而且各具特色&#xff0c;看得人眼花缭乱。最近又新出了一个七人拼团商业模式&#xff0c;不仅能够助力企业快速裂变获客&#xff0c;还能迅速提升产品销量&#xff0c;达到短短几个月就销售额上亿的“…

MariaDB 主从同步配置

1 服务器结构 角色ip地址安装教程主节点192.168.31.102CentOS-7 安装 MariaDB-10.8从节点192.168.31.103 2 原理&#xff1a; 原理&#xff1a; &#xff08;1&#xff09;master服务器将数据的改变记录到二进制binlog日志&#xff0c;当master上的数据发生改变时&#xff0c…

【连续介质力学】张量场

张量场 张量场表示张量 T ( x ⃗ , t ) T(\vec x, t) T(x ,t)在空间 x ⃗ \vec x x 和时间 t t t中如何变化&#xff0c;将张量场视为可微函数 如果一个张量场不依赖于时间&#xff0c;则此张量场称为定常场&#xff0c;例如 T T ( x ⃗ ) T T(\vec x) TT(x )&#xff1b;相…

如此优秀的低代码平台,佬们一起来体验一把!

前言&#xff1a;低代码平台是一种新兴的应用开发技术&#xff0c;将可视化建模、自动生成代码和开发者编写的代码结合在一起&#xff0c;使应用程序的开发变得更加快速、简单且高效。低代码平台的基本思想是通过消除繁琐的手动编码工作&#xff0c;来让开发者更好地专注于业务…

基于 Spring Boot + MyBatis Plus + Vue Element 实现的后台管理系统 + 微信小程序

管理后台的 Vue3 版本采用 vue-element-plus-admin &#xff0c;Vue2 版本采用 vue-element-admin 管理后台的移动端采用 uni-app 方案&#xff0c;一份代码多终端适配&#xff0c;同时支持 APP、小程序、H5&#xff01; 后端采用 Spring Boot、MySQL MyBatis Plus、Redis …

UnityVR--组件3--Line Renderer--线性渲染

目录 前言 Line Renderer组件介绍 Trail Renderer组件介绍 使用Line Renderer绘制线段 使用系统工具或自定义工具绘制线段 Trail Renderer简单制作子弹拖尾效果 前言 Line Renderer线性渲染组件用于在3D中渲染线段&#xff0c;如之前在小游戏中做过的激光门伤害&#xff0…

Axure设计—动态条形图(中继器)

本文将教大家如何用AXURE中的中继器动态条形图。 一、效果介绍 如图&#xff1a; 预览地址&#xff1a;https://i7x7i9.axshare.com 下载地址&#xff1a;https://download.csdn.net/download/weixin_43516258/87807039?spm1001.2014.3001.5503 二、功能介绍 简单填写中继…

PyQt5桌面应用开发(17):类结构+QWebEngineView

本文目录 PyQt5桌面应用系列PyQt5学习PyQt5类结构和帮助速查实现与解释最终界面和完整源代码界面完整的代码 总结 PyQt5桌面应用系列 PyQt5桌面应用开发&#xff08;1&#xff09;&#xff1a;需求分析 PyQt5桌面应用开发&#xff08;2&#xff09;&#xff1a;事件循环 PyQt5桌…