架构师系列-消息中间件(九)- RocketMQ 进阶(三)-消费端消息保障

news2025/1/11 9:06:03
5.2 消费端保障
5.2.1 注意幂等性

应用程序在使用RocketMQ进行消息消费时必须支持幂等消费,即同一个消息被消费多次和消费一次的结果一样,这一点在使用RoketMQ或者分析RocketMQ源代码之前再怎么强调也不为过。

“至少一次送达”的消息交付策略,和消息重复消费是一对共生的因果关系,要做到不丢消息就无法避免消息重复消费,原因很简单,试想一下这样的场景:客户端接收到消息并完成了消费,在消费确认过程中发生了通讯错误,从Broker的角度是无法得知客户端是在接收消息过程中出错还是在消费确认过程中出错,为了确保不丢消息,重发消息是唯一的选择。

有了消息幂等消费约定的基础,RocketMQ就能够有针对性地采取一些性能优化措施,例如:并行消费、消费进度同步机制等,这也是RocketMQ性能优异的原因之一。

5.2.2 消息消费模式

从不同的维度划分,Consumer支持以下消费模式:

  • 广播消费模式下,消息消费失败不会进行重试,消费进度保存在Consumer端;
  • 集群消费模式下,消息消费失败有机会进行重试,消费进度集中保存在Broker端。
5.2.2.1 集群消费

使用相同 Group ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点

 

注意事项

  • 消费端集群化部署, 每条消息只需要被处理一次。
  • 由于消费进度在服务端维护, 可靠性更高。
  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
5.2.2.2 广播消费

广播消费指的是:一条消息被多个consumer消费,即使这些consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义。

 

注意事项

  • 广播消费模式下不支持顺序消息。
  • 广播消费模式下不支持重置消费位点。
  • 每条消息都需要被相同逻辑的多台机器处理。
  • 消费进度在客户端维护,出现重复的概率稍大于集群模式。
  • 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过, 请谨慎选择。
  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
  • 目前仅 Java 客户端支持广播模式。
  • 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
5.2.2.3 集群模式模拟广播

如果业务需要使用广播模式,也可以创建多个 Group ID,用于订阅同一个 Topic。

 

注意事项

  • 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
  • 消费进度在服务端维护,可靠性高于广播模式。
  • 对于一个 Group ID 来说,可以部署一个消费端实例,也可以部署多个消费端实例。当部署多个消费端实例时,实例之间又组成了集群模式(共同分担消费消息)。假设 Group ID 1 部署了三个消费者实例 C1、C2、C3,那么这三个实例将共同分担服务器发送给 Group ID 1 的消息。同时,实例之间订阅关系必须保持一致。
5.2.3 消息消费模式

RocketMQ消息消费本质上是基于的拉(pull)模式,consumer主动向消息服务器broker拉取消息。

  • 推消息模式下,消费进度的递增是由RocketMQ内部自动维护的;
  • 拉消息模式下,消费进度的变更需要上层应用自己负责维护,RocketMQ只提供消费进度保存和查询功能。
5.2.3.1 推模式(PUSH)

我们上面使用的消费者都是PUSH模式,也是最常用的消费模式

由消息中间件(MQ消息服务器代理)主动地将消息推送给消费者;采用Push方式,可以尽可能实时地将消息发送给消费者进行消费。但是,在消费者的处理消息的能力较弱的时候(比如,消费者端的业务系统处理一条消息的流程比较复杂,其中的调用链路比较多导致消费时间比较久。概括起来地说就是“慢消费问题”),而MQ不断地向消费者Push消息,消费者端的缓冲区可能会溢出,导致异常。

实现方式,代码上使用 DefaultMQPushConsumer

consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送(push)过来的。主要用的也是这种方式。

5.2.3.2 拉模式(PULL)

RocketMQ的PUSH模式是由PULL模式来实现的

由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息;采用Pull方式,如何设置Pull消息的频率需要重点去考虑,举个例子来说,可能1分钟内连续来了1000条消息,然后2小时内没有新消息产生(概括起来说就是“消息延迟与忙等待”)。如果每次Pull的时间间隔比较久,会增加消息的延迟,即消息到达消费者的时间加长,MQ中消息的堆积量变大;若每次Pull的时间间隔较短,但是在一段时间内MQ中并没有任何消息可以消费,那么会产生很多无效的Pull请求的RPC开销,影响MQ整体的网络性能。

5.2.3.3 注意事项

注意:RocketMQ 4.6.0版本后将弃用DefaultMQPullConsumer

DefaultMQPullConsumer方式需要手动管理偏移量,官方已经被废弃,将在2022年进行删除

 

 

DefaultLitePullConsumer

该类是官方推荐使用的手动拉取的实现类,偏移量提交由RocketMQ管理,不需要手动管理

5.2.4 消息确认机制

consumer的每个实例是靠队列分配来决定如何消费消息的,那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)

为了保证数据不被丢失,RocketMQ支持消息确认机制,即ack。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

5.2.4.1 确认消费

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
        execute();//执行真正消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
})
5.2.4.2 消费异常

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
        execute();//执行真正消费
        return ConsumeConcurrentlyStatus.RECONSUME_LATER
    }
})

 

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,应用可以监控死信队列来做人工干预。

5.2.5 消息重试机制
5.2.5.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ版会自动不断地进行消息重试(每次间隔时间为1秒),这时,应用会出现消息消费被阻塞的情况,因此,建议您使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

5.2.5.2 无序消息的重试

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

5.2.5.3 重试次数

消息队列RocketMQ版默认允许每条消息最多重试16次,每次重试的间隔时间如下。

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110秒97分钟
230秒108分钟
31分钟119分钟
42分钟1210分钟
53分钟1320分钟
64分钟1430分钟
75分钟151小时
86分钟162小时

如果消息重试16次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个时间范围消息将不再重试投递。

5.2.5.4 和生产端重试区别

消费者和生产者的重试还是有区别的,主要有两点

  • 默认重试次数:Product默认是2次,而Consumer默认是16次
  • 重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。它照1S,5S,10S,30S,1M,2M····2H进行重试。

注意:Product在异步情况重试失效,而对于Consumer在广播情况下重试失效。

5.2.5.5 重试配置方式

需要重试

消费失败后,重试配置方式,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 方式1:返回RECONSUME_LATER(推荐)
  • 方式2:返回Null
  • 方式3:抛出异常

无需重试

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回Action.CommitMessage,此后这条消息将不会再重试。

//注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        //消息处理逻辑抛出异常,消息将重试。
        try {
            doConsumeMessage(list);
        }catch (Exception e){
            //捕获消费逻辑中的所有异常,并返回Action.CommitMessage;
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        //业务方正常消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

 

5.3 死信队列

在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次 数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

5.3.1 死信特性
5.3.1.1 死信消息特性
  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除,故死信消息应在产生的 3 天内及时处理
5.3.1.2 死信队列特性
  • 一个死信队列对应一个消费者组,而不是对应单个消费者实例
  • 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
  • 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列

6. Redis 轮询队列

redis队列中存放车辆信息,调度系统从队列中获取车辆信息,打车完成后再将车辆信息放回队列中

 

6.1 相关代码
6.1.1 redis获取车辆

从list左侧弹出一个车辆

 

/**
  * 从Redis List列表中拿取一个车辆ID
  * 如果没有获取到延时10S
  *
  * @return
  */
public String takeVehicle() {
    //从Redis List列表中拿取一个车辆ID
    return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}
6.1.2 redis压入车辆

检查车辆状态,并从右侧压入车辆

/**
  * 设置车辆状态为Ready
  *
  * @param vehicleId
  */
public void readyDispatch(String vehicleId) {
    //检查车辆状态
    DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
    //如果车辆时运行状态
    if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
        redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
        //从右侧压入车辆
        redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
    }
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1624163.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不墨迹,向媒体投稿不讲攻略,直接上方法

作为一名单位信息宣传员,我曾深陷于向媒体投稿的泥沼之中,饱尝了费时费力、审核严苛、出稿缓慢的苦涩,承受着领导急切期盼与自我压力交织的煎熬。然而,当我有幸接触到智慧软文发布系统,这一切困境如同阴霾散去,取而代之的是便捷流畅的投稿流程,以及领导满意、团队轻松的工作氛围…

详解Qt中的鼠标事件

在Qt中&#xff0c;处理鼠标事件是构建交互式界面的关键。Qt提供了一系列与鼠标相关的事件处理函数&#xff0c;允许开发者捕获鼠标的各种动作&#xff0c;如按下、释放、移动、双击等。以下是鼠标事件的使用方法、技巧以及注意事项&#xff0c;并附带C代码示例。 基础使用方法…

Node.js 22 发布,原生支持 WebSocket 客户端

昨日&#xff0c;Node.js 官方博客正式宣布 Node.js 22 的发布&#xff01;新版本亮点包括 require() ES 模块、WebSocket 客户端、V8 JavaScript 引擎的更新等&#xff01; Node.js 22 将在 10 月进入长期支持 (LTS)&#xff0c;但在此之前&#xff0c;它将是接下来六个月的 …

问题-MySQL将较大的SQL文件导入MySQL

迁移数据的时候&#xff0c;我们有时候会用sqlyog等数据库工具导入到新数据库。可能插入的SQL语句太大&#xff0c;出现导入一半失败的情况。明明代码没错&#xff0c;这让人摸不着头脑。 对于大文件导入&#xff0c;有几种方法&#xff1a; 方法1&#xff1a;使用命令行&…

总体设计(下)

启发规则 描绘软件结构的图形工具 面向数据流的设计方法

暴雨亮相CCBN2024 助力广电行业数智化转型

4月23日&#xff0c;第三十届中国国际广播电视信息网络展览会&#xff08;简称CCBN2024&#xff09;在北京开展&#xff0c;本次展览会由国家广播电视总局指导、广播电视科学研究院主办&#xff0c;作为国内广电视听领域首个综合性、专业化、引领性、国际化科技产业盛会&#x…

搭建强化学习的机械臂MuJoCo环境以及urdf转xml文件方法

一、背景 基于强化学习的机械臂应用日趋广泛&#xff0c;摆脱了基于模型到达固定点的束缚。基于强化学习算法&#xff0c;机械臂可以完成拧魔方、推抓任务&#xff08;Andy Zeng的经典论文&#xff09;&#xff0c;暂且想一下如果用传统方法完成此类复杂任务是何等困难。 强化…

Chisel 入门(2)运算符

Chisel 入门(2) 运算符 逻辑运算符 ChiselExplanationwidth!x逻辑非1x && y逻辑与1x||y逻辑或1 位操作运算符 ChiselExplanationwidthin Verilog~x位反w(x)~ signal_xx & y位与max(w(x), w(y))signal_x & signal_yx | y位或max(w(x), w(y))signal_x | sign…

操作系统:进程间通信 | System V IPC

目录 前言&#xff1a; 1.共享内存 1.1.什么是共享内存 1.2.共享内存使用接口 shmget函数 shmat函数 shmdt函数 shmctl函数 2.共享内存实现通信 2.1.代码实现 comm.hpp server,cpp client.cpp 2.2.共享内存的缺点 2.3.实现通信的同步化 2.4共享内存通信的优势 3.…

Nginx下载安装,什么是nginx,什么是反向代理,Windows下、linux下安装nginx(保姆级教程)

文章目录 一、Nginx简介为什么要使用NginxNginx的特点Nginx的相关概念正向代理反向代理动静分离负载均衡 二、Nginx安装1. Windows安装2. Linux安装 一、Nginx简介 Nginx 是一个高性能的 HTTP&#xff08;静态资源服务器&#xff09; 和 反向代理 Web 服务器。 为什么要使用N…

【以奖代补】诗情画意润童心 书香课堂志愿行

中华古诗词历史源远流长&#xff0c;名篇佳作数不胜数。为弘扬民族文化精髓&#xff0c;丰富乡村儿童假期生活。2024年4月21日上午&#xff0c;襄州区社会工作者协会联合襄州区张家集镇社工站、张集村“童叟乐园”志愿服务队在张集村开展“诗情画意润童心 书香课堂志愿行”志愿…

数据库安全如何保障?YashanDB有妙招(上篇)

数据库作为信息系统的核心&#xff0c;不仅承载着海量的关键数据&#xff0c;还负责向各类用户提供高效、可靠的信息服务&#xff0c;数据库的安全性显得尤为关键&#xff0c;已成为信息安全体系的重中之重。 什么是数据库安全&#xff1f; 数据库安全是数据安全的一个子集&…

Linux——界面和用户

本篇文章所写的都是基于centos 7 64位&#xff08;通过虚拟机运行&#xff09;。 一、Linux的界面 Linux操作系统提供了多种用户界面&#xff0c;主要分为图形用户界面&#xff08;GUI&#xff09;和命令行界面&#xff08;CLI&#xff09;。 1、图形用户界面(GUI)&#xff…

第十五届蓝桥杯省赛第二场C/C++B组D题【前缀总分】题解(AC)

暴力解法 O ( 26 n 5 ) O(26n^5) O(26n5) 枚举将第 i i i 个字符串的第 j j j 个字符改为 c c c 的所有方案&#xff0c;时间复杂度 O ( 26 n 2 ) O(26n^2) O(26n2)&#xff0c;修改并计算总分&#xff0c; O ( n 3 ) O(n^3) O(n3)。 暴力优化 O ( 26 n 3 log ⁡ n ) O…

IDEA使用技巧(常用设置、快捷键等)

IDEA使用技巧 一、IDEA常用基本设置设置代码背景颜色/主题/字体Ctrl鼠标滚轮缩放字体大小设置字符编码左右两侧的Project&#xff0c;Structure&#xff0c;Maven等按钮消失新增类似sout,psvm的模版切换某个模块编译的JDK版本 二、常用快捷键CtrlAltT包裹代码Alt回车联想补全Ct…

linux系统下载huggingface文件教程

文章目录 准备工作添加SSH Key生成Access Token 模型下载公开模型下载&#xff08;bert-base-chinese为例&#xff09;非公开模型下载&#xff08;Llama3为例&#xff09;权限申请官网预训练模型下载huggingface仓库下载 准备工作 添加SSH Key # 本地机器生成ssh key # step1…

Spring 5源码学习

文章目录 一. 访问[spring官网], 找到Spring Framework&#xff0c;点击红色标记github仓库&#xff0c;下载对应的分支代码&#xff0c;本人下载5.1.x二. 安装gradle三. 调整spring-framework配置四. 开始编译五.导入idea 一. 访问[spring官网], 找到Spring Framework&#xf…

Linux离线安装Harbor镜像仓库

一、Harbor简介 Harbor是一个开源的企业级Docker Registry管理项目&#xff0c;由VMware公司开源。它提供了比Docker官方公共镜像仓库更为丰富和安全的功能&#xff0c;尤其适合企业环境使用。Harbor的关键特性包括权限管理&#xff08;RBAC&#xff09;、LDAP集成、日志审计、…

《动手学深度学习(Pytorch版)》Task02:预备知识——4.25打卡

《动手学深度学习&#xff08;Pytorch版&#xff09;》Task02&#xff1a;预备知识——4.25打卡 数据操作N维数组——张量创建数组访问元素入门初始化矩阵 运算符广播机制索引和切片节省内存转换为其他Python对象转换为NumPy张量ndarray张量转换为Python标量 数据预处理安装pan…

2023平航杯——介质取证部分复现

闻早起的电脑 教徒“闻早起”所使用的笔记本电脑使用何种加密程式&#xff1f; VeraCrypt 教徒“闻早起”所使用的笔记本电脑中安装了一款还原软件&#xff0c;其版本号为&#xff1f;【标准格式&#xff1a;1.2.3.4】 8.71.020.5734 教徒“闻早起”所使用的笔记本电脑中登…