MQ专题:消费幂等性

news2024/9/20 22:54:10

一、提要

1.1 通过本文将获得

  1. 消息投递的通用代码
    • 非事务消息的投递
    • 事务消息的投递
    • 任意延迟消息的投递,不依赖于任何MQ
    • 上面这些投递都支持批量的方式
    • 投递失败自动重试的代码
  2. 幂等消费的通用代码
  3. 消费失败,衰减式自动重试的通用代码

1.2本文涉及到的主要技术点

  1. SpringBoot2.7
  2. MyBatisPlus
  3. MySQL
  4. 线程池
  5. java中的延迟队列:DelayQueue
  6. 分布式锁
  7. RabbitMQ

二、消费者幂等性概念

2.1 消费者如何确保消息一定会被消费?

消费者这边可以采用下面的过程,可以确保消息一定会被消费。

  • step1:从MQ中拉取消息,此时不要从mq中删除消息

  • step2:执行业务逻辑(需要做幂等)

  • step3:通知MQ删除这条消息

若上面过程失败了,则采用衰减式的方式进行自动重试,比如第一次消费失败后,延迟10秒后,将消息再次丢入队列,进行消费重试,若还是失败,再延迟20秒后丢入队列,继续重试,但是得有个上限,比如最多50次,达到上限需要进行告警人工干预。

这里的关键技术点就是:幂等+重试+开启消费者手动ack

什么是消费失败后衰减式重试?

失败后,会过一会,再次重试,若还是失败,则过一会,再次重试。
比如累计失败次数在1-5次内,每次失败后会间隔10秒进行重试,在6-10次内,间隔20秒,在11-20次内,间隔30秒,但是有个次数上限,比如50次,达到最大次数,将不再重试,报警,人工干预

衰减重试是如何实现的?

通过延迟消息实现的,消费失败后,会投递一条延迟消息,消息的内容和原本消息的内容是一样的,延迟时间到了后,这个消息会进入消息原本的队列,会触发再次消费。

2.2 什么是消费者手动ack(acknowledgemenet)?

消费者从mq中拉取消息后,mq需要将消息从mq中删除,这个删除有2种方式

方式1:MQ自动删除

消费者从mq中拉取消息后,mq立即就把消息删掉了,此时消费者还未消费。
这种可能会有问题,比如消费者拿到消息后,消费失败了,但是此时消息已经被mq删除了,结果会导致消息未被成功消费。

方式2:消费者通知MQ删除(也叫手动ack)

消费者从mq拉取消息后,做业务处理,业务处理完成之后,通知mq删除消息,这种就叫做消费者手动ack
这种会存在通知mq删除消息失败的情况,会导致同一条消息会被消费者消费多次,消费端需要避免重复消费。

本文中用的是这种ack的方式。

2.3 什么是幂等消费?

同一条消息,即使出现了重复的消息,被同一个消费者消费,也只会成功消费一次。

为什么要考虑幂等消费?
在这里插入图片描述

先看下上面这个图,消息从发送到消费的整个过程,中间涉及到网络通信,网络存在不稳定的因素,这就可能导致下面2个问题

重复投递的情况

生产者投递消息到MQ,由于网络问题,未收到回执,生产者以为消息投递失败了,会重试,这就可能会导致同一条消息被投递多次

消费者ACK失败,消息会被再次消费

消费者拉取消息消费后,会通知MQ中删除此消息,通知MQ删除消息这个过程又涉及网络通信,可能会失败,此时会导致消息被消费者消费了,但是却未从mq中删除,这样消息就会被再次拉取进行消费。

上面2种情况,会导致同一条消息,会被消费者处理多次,消费端若未考虑幂等性,可能导致严重的事故。

三、幂等消费问题的解决方案

如何解决幂等消费的问题?

搞定下面2个问题,幂等消费的问题就解决了。

  1. 如何确定MQ中的多条消息是同一条业务消息?
  2. 消费者如何确保同一条消息只被成功消费一次?

3.1 生产者:如何确定MQ中的多条消息是同一条业务消息?

我们可以定义一种通用的消息的格式,格式如下,生产者发送的所有消息,都必须采用这个格式。

public class Msg<T> {
    /**
     * 生产者名称
     */
    private String producer;
    /**
     * 生产者这边消息的唯一标识
     */
    private String producerBusId;
    /**
     * 消息体,主要是消息的业务数据
     */
    private T body;
}

对于多条消息,通过(producer、producerBusId)这两个字段来判断是否是同一条消息,若他们的这两个字段的值是一样的,则表示他们是同一条消息。

  • producer:可以使用服务名称
  • producerBusId:生产者这边消息的唯一标识,比如可以使用UUID

3.2 消费者:如何确保同一条消息只被成功消费一次?

3.2.1 使用辅助表建立唯一约束

需要一个幂等辅助表,如下,idempotent_key 添加了唯一约束,多个线程同时向这个表写入数据,若idempotent_key是一样的,则只有一个会成功,其他的会违反唯一约束触发异常,导致失败。

create table if not exists t_idempotent_lesson033
(
    id             varchar(50) primary key comment 'id,主键',
    idempotent_key varchar(500) not null comment '需要确保幂等的key',
    unique key uq_idempotent_key (idempotent_key)
) comment '幂等辅助表';

3.2.2 消费端幂等消费实现代码块

用上面的幂等辅助表,便可实现幂等消费,过程如下

// 这里的幂等key,由消息里面的(producer,producerBusId)加上消费者完整类名组成,也就是同一条消息只能被同一个消费者消费一次
String idempotentKey = (producer,producerBusId,消费者完整类名);

// 幂等表是否存在记录,如果存在说明处理过,直接返回成功
IdempotentPO idempotentPO = select * from t_idempotent_lesson033 where idempotent_key = #{idempotentKey};
if(idempotentPO!=null){
	return "SUCCESS";
}
 --以下是关键理解--
开启Spring事务(这里千万不要漏掉,一定要有事务)

这里放入消息消费的实际业务逻辑,最好是db操作的代码。。。。。

String idempotentId = "";
// 这里是关键一步,向 t_idempotent 插入记录,如果有并发过来,只会有一个成功,其他的会报异常导致事务回滚
insert into t_idempotent_lesson033 (id, idempotent_key) values (#{idempotentId}, #{idempotentKey});

提交spring事务

四、案例

4.1 下面先看案例

会模拟电商中下单后,投递一条订单消息,然后会搞一个消费者来消费这个消息。

本案例会用到RabbitMQ,大家先安装rabbitmq,然后修改lesson033/src/main/resources/application.yml中rabbitmq相关配置。

RabbitMQ的安装可以参考:https://blog.csdn.net/qq_30166465/article/details/139612362

4.2 会有3个案例代码

  • 投递普通订单消息,模拟消费
  • 投递延迟订单消息,延迟5秒,模拟消费
  • 投递普通消息,模拟消费失败,自动重试的情况

4.3 案例中会用到5个表

先不用记,大概有个印象,知道每个表是干什么用的就行了

-- 创建订单表,业务相关
drop table if exists t_order_lesson033;
create table if not exists t_order_lesson033
(
    id    varchar(32)    not null primary key comment '订单id',
    goods varchar(100)   not null comment '商品',
    price decimal(12, 2) comment '订单金额'
) comment '订单表';

-- 创建本地消息表,用来存储事务消息和延迟消息
drop table if exists t_msg_lesson033;
create table if not exists t_msg_lesson033
(
    id               varchar(32) not null primary key comment '消息id',
    exchange         varchar(100) comment '交换机',
    routing_key      varchar(100) comment '路由key',
    body_json        text        not null comment '消息体,json格式',
    status           smallint    not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
    expect_send_time datetime    not null comment '消息期望投递时间,大于当前时间,则为延迟消息,否则会立即投递',
    actual_send_time datetime comment '消息实际投递时间',
    create_time      datetime comment '创建时间',
    fail_msg         text comment 'status=2 时,记录消息投递失败的原因',
    fail_count       int         not null default 0 comment '已投递失败次数',
    send_retry       smallint    not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
    next_retry_time  datetime comment '投递失败后,下次重试时间',
    update_time      datetime comment '最近更新时间',
    key idx_status (status)
) comment '本地消息表';


-- 创建消息和消费者关联表,(producer, producer_bus_id, consumer_class_name)相同时,此表只会产生一条记录,就是同一条消息被同一个消费者消费,此表只会产生一条记录
drop table if exists t_msg_consume_lesson033;
create table if not exists t_msg_consume_lesson033
(
    id              varchar(32)  not null primary key comment '消息id',
    producer        varchar(100) not null comment '生产者名称',
    producer_bus_id varchar(100) not null comment '生产者这边消息的唯一标识',
    consumer_class_name        varchar(300) not null comment '消费者完整类名',
    queue_name      varchar(100) not null comment '队列名称',
    body_json       text         not null comment '消息体,json格式',
    status          smallint     not null default 0 comment '消息状态,0:待消费,1:消费成功,2:消费失败',
    create_time     datetime comment '创建时间',
    fail_msg        text comment 'status=2 时,记录消息消费失败的原因',
    fail_count      int          not null default 0 comment '已投递失败次数',
    consume_retry   smallint     not null default 1 comment '消费失败后,是否还需要重试?1:是,0:否',
    next_retry_time datetime comment '投递失败后,下次重试时间',
    update_time     datetime comment '最近更新时间',
    key idx_status (status),
    unique uq_msg (producer, producer_bus_id, consumer_class_name)
) comment '消息和消费者关联表';

drop table if exists t_msg_consume_log_lesson033;
-- 消息消费的日志
create table if not exists t_msg_consume_log_lesson033
(
    id              varchar(32)  not null primary key comment '消息id',
    msg_consume_id        varchar(32) not null comment '消息和消费者关联记录',
    status          smallint     not null default 0 comment '消费状态,1:消费成功,2:消费失败',
    create_time     datetime comment '创建时间',
    fail_msg        text comment 'status=2 时,记录消息消费失败的原因',
    key idx_msg_consume_id (msg_consume_id)
) comment '消息消费日志';

-- 幂等辅助表
drop table if exists t_idempotent_lesson033;
create table if not exists t_idempotent_lesson033
(
    id             varchar(50) primary key comment 'id,主键',
    idempotent_key varchar(500) not null comment '需要确保幂等的key',
    unique key uq_idempotent_key (idempotent_key)
) comment '幂等辅助表';

4.4 代码

关键消费代码

继承了AbstractIdempotentConsumer则拥有了幂等消费的功能

在这里插入图片描述

消费成功或失败时,对t_msg_consume和t_msg_consume_log进行更新
在这里插入图片描述

消费失败时,会重新投递一条相同的延迟消息,触发消费重试
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2098437.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新一代网络研发利器——开物™,让用户每一行代码都贡献在核心创新

随着云计算、人工智能、大数据的快速发展&#xff0c;现有的IT系统越来越复杂&#xff0c;传统网络技术难以满足新的业务需求&#xff0c;DPU技术开始崭露头角&#xff0c;越来越多的企业与研究机构认识到高性能网络处理技术带来的巨大价值&#xff0c;并积极开展创新与实践&am…

985硕闭着眼都有15k以上的月薪

985 闭眼 今天在牛客上看到一篇直呼好家伙的帖子&#xff1a; 这位同学指出&#xff1a;论坛里个个 985 的硕士闭着眼睛都有 15k 以上的月薪&#xff0c;还天天嚷嚷着研究生白读了&#xff0c;天天嚷嚷着反向读研了 ... 通常这样的帖子&#xff0c;都会被评论区喷成筛子。 结果…

微积分复习笔记 Calculus Volume 1 - 1.3Trigonometric Functions

1.3 Trigonometric Functions - Calculus Volume 1 | OpenStax

ELK日志服务收集SpringBoot日志案例

第一步&#xff1a;准备docker-compose文件 首先准备 docker-compose.yaml 文件 version: "3" services:elasticsearch:image: elasticsearch:7.17.2container_name: elasticsearch-servernetworks:- hostenvironment:- "cluster.nameelasticsearch" #设…

NeRF原理学习

一个2020年的工作我现在才来学习并总结它的原理&#xff0c;颇有种“时过境迁”的感觉。这篇总结是基于NeRF原文 NeRF: Representing Scenes as Neural Radiance Fields for View Synthesis 阅读理解后写的&#xff0c;作用是以后如果记不太清了可以回忆。 目的&应用 先说…

80.动态申请内存

目录 一.malloc函数 二.其他注意事项 三.视频教程 在定义变量的时候会在内存中申请空间。除了在定义变量的时候申请内存空间&#xff0c;也可以使用库函数动态申请内存&#xff0c; 一.malloc函数 作用&#xff1a;动态申请一块连续的任意尺寸的内存空间。 函数原型&…

【前端9】手风琴v1.0版本:使用插槽实现动态内容插入的Vue组件

【前端9】手风琴&#xff1a;使用插槽实现动态内容插入的Vue组件 写在最前面一、插槽的基本概念1.默认插槽2.具名插槽 二、实现一个折叠面板组件0.关键点和注意事项1.父组件 App.vue2.子组件 Collapse.vue总结 3.详细解读&#xff08;可以略过&#xff09;父组件子组件 三、小结…

通用后台管理系统实战演示(Vue3 + element-plus)汇总篇三

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

Python任务编排和工作流管理库之prefect使用详解

概要 在数据工程和科学的世界中,任务编排和工作流管理变得越来越重要。随着数据处理任务的复杂性增加,开发人员需要一种可靠且易于使用的工具来设计、监控和管理这些任务。Prefect 是一个用于构建、监控和管理数据管道的 Python 库,它简化了工作流的创建和执行,同时提供了…

028、架构_高可用_主从原理

MySQL半同步复制概览 MySQL主从复制是一个异步的复制过程,主库发送更新事件到从库,从库读取更新记录,并执行更新记录,使得从库的内容与主库保持一致。主从复制的基本过程如下图所示: 主从复制的完成通过以下三个进程实现的主库 binary log dump 线程:当从库连接主库时,…

22. K8S及DevOps

22. K8S及DevOps 一. 章节简介二. DevOps1. 简介2. CICD三. Kubernetes1. [官网](https://kubernetes.io/zh-cn/)--------------------------------------------------------------------------------------------------------一. 章节简介 二. DevOps 1. 简介 2. CICD

【科研新手必备】如何高效、高质量、科学的科研?

文献下载网站—英文写作小工具—SCI选刊 1、文献下载网站 中国知网 sci-hub 大木虫学术导航 学术资源搜索 2、英文写作小工具 DeepL 学术短语库 SCI写作辅导材料 赛特新思 3、SCI选刊 Journal Finder SPRINGER NATURE

除了黑神话,探索3A游戏大作:不可错过的经典与录屏软件推荐

在游戏的天地里&#xff0c;3A 大作凭借其美轮美奂的画面、丰富多彩的内容以及意蕴深邃的剧情&#xff0c;俘获了无数玩家的倾心。除了广受瞩目的《黑神话&#xff1a;悟空》&#xff0c;还有诸多其他的 3A 游戏大作值得您去尝试。此类游戏不但在视觉与技术方面臻至行业的巅峰水…

Linux中如何查看一个进程?如何杀死一个进程?如何查看某个端口有没有被占用?

在Linux中 如何查看一个进程&#xff1f; 使用 ps 命令 ps aux这会显示所有正在运行的进程&#xff0c;可以使用 grep 来过滤特定的进程 ps aux | grep process_name使用 top 命令 top这个命令会实时的显示系统重正在运行的进程 如何杀死一个进程&#xff1f; 使用 kill …

C++ | 泛型编程:模板初阶与函数模板深度解析

文章目录 C 泛型编程&#xff1a;模板初阶与函数模板深度解析1. 泛型编程&#xff1a;实现代码的通用性2. 函数模板&#xff1a;代码的模具2.1 什么是函数模板&#xff1f;2.2 函数模板的格式2.3 函数模板的原理2.4 函数模板的实例化2.5 模板参数的匹配原则 3. 类模板&#xff…

【maven】阿里云和apache仓库配置

阿里云公共仓库的配置看起来有多种类型的仓库: 配置指南 我的maven是idea 自带的:D:\Program Files\JetBrains\IntelliJ IDEA 2022.3.1\plugins\maven\lib\maven3\</

校园跑腿系统小程序开发需求分析

校园跑腿系统小程序的开发需求分析是一个综合性的过程&#xff0c;旨在确保系统能够满足校园内用户的实际需求&#xff0c;并具备良好的用户体验。以下是对校园跑腿系统小程序开发需求分析的详细阐述&#xff1a; 一、功能需求分析 注册与登录&#xff1a;支持多种注册方式&am…

WebSphereMQ中间件监控指标解读

监控易是一款功能全面的IT监控系统&#xff0c;能够实时监控各种IT设施的性能和状态&#xff0c;帮助企业及时发现并解决潜在问题。在本次解读中&#xff0c;我们将聚焦于WebSphereMQ&#xff08;现更名为IBM MQ&#xff09;中间件的监控指标&#xff0c;深入剖析其含义和作用。…

[Linux] 权限

标题&#xff1a;[Linux] 权限 水墨不写bug 目录 一、Linux下对用户的分类 二、Linux的文件访问者的分类 三、修改文件的属性 1.修改文件的权限 &#xff08;只有拥有者和root可以修改某一个文件的权限&#xff09; 2.修改文件的拥有者 3.修改文件的所属组 四、文件创建…

Pytorch中不同的Norm归一化详细讲解

在做项目或者看论文时&#xff0c;总是能看到Norm这个关键的Layer&#xff0c;但是不同的Norm Layer具有不同的作用&#xff0c;准备好接招了吗&#xff1f;&#xff08;本文结论全部根据pytorch官方文档得出&#xff0c;请放心食用&#xff09; 一. LayerNorm LayerNorm的公…