RabbitMQ如何保证消息幂等性

news2025/1/11 0:49:28

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生副作用。
举个简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再相应客户端的时候也有可能出现网络中断或者异常等等。
1. RabbitMQ自动重试机制

消费者在消费消息的时候,如果消费者业务逻辑出现程序异常,这个时候我们如何处理?

使用重试机制,RabbitMQ默认开启重试机制。

实现原理:

  • @RabbitHandler注解 底层使用Aop拦截,如果程序(消费者)没有抛出异常,自动提交事务
  • 如果Aop使用异常通知拦截获取到异常后,自动实现补偿机制,消息缓存在RabbitMQ服务器端

注意:

  • 默认会一直重试到消费者不抛异常为止,这样显然不好。我们需要修改重试机制策略,如间隔3s重试一次)

配置:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

spring:

  rabbitmq:

    # 连接地址

    host: 127.0.0.1

    # 端口号

    port: 5672

    # 账号

    username: guest

    # 密码

    password: guest

    # 地址(类似于数据库的概念)

    virtual-host: /admin_vhost

    # 消费者监听相关配置

    listener:

      simple:

        retry:

          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试

          enabled: true

          # 最大重试次数

          max-attempts: 5

          # 重试间隔时间(毫秒)

          initial-interval: 3000

2. 如何合理选择重试机制?

情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试,可能是因为网络原因短暂不能访问

情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试,因为属于程序bug需要重新发布版本

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job进行健康检查+人工进行补偿

3. 调用第三方接口自动实现补偿机制

我们知道了,RabbitMQ在消费者消费发生异常时,会自动进行补偿机制,所以我们(消费者)在调用第三方接口时,可以根据返回结果判断是否成功:

  • 成功:正常消费
  • 失败:手动抛处一个异常,这时RabbitMQ自动给我们做重试 (补偿)。

4. 如何解决消费者幂等性问题

防止重复消费 (MQ重试机制需要注意的问题)

产生原因:网络延迟传输中,消费者出现异常或者消费者延迟消费,会造成进行MQ重试补偿,在重试过程中,可能会造成重复消费。

面试题:MQ中消费者如何保证幂等性问题,不被重复消费?

伪代码:

生产者核心代码:

请求头设置消息id(messageId)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@Component

public class FanoutProducer {

 @Autowired

 private AmqpTemplate amqpTemplate;

 public void send(String queueName) {

  String msg = "my_fanout_msg:" + System.currentTimeMillis();

  //请求头设置消息id(messageId)

  Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)

    .setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();

  System.out.println(msg + ":" + msg);

  amqpTemplate.convertAndSend(queueName, message);

 }

}

消费者核心代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

@RabbitListener(queues = "fanout_email_queue")

 public void process(Message message) throws Exception {

  // 获取消息Id

  String messageId = message.getMessageProperties().getMessageId();

  String msg = new String(message.getBody(), "UTF-8");

  //② 判断唯一Id是否被消费,消息消费成功后将id和状态保存在日志表中,我们从(①步骤)表中获取并判断messageId的状态即可

  //从redis中获取messageId的value

  String value = redisUtils.get(messageId)+"";

  if(value.equals("1") ){ //表示已经消费

   return; //结束

  }

  System.out.println("邮件消费者获取生产者消息" + "messageId:" + messageId + ",消息内容:" + msg);

  JSONObject jsonObject = JSONObject.parseObject(msg);

  // 获取email参数

  String email = jsonObject.getString("email");

  // 请求地址

  String emailUrl = "http://127.0.0.1:8083/sendEmail?email=" + email;

  JSONObject result = HttpClientUtils.httpGet(emailUrl);

  if (result == null) {

   // 因为网络原因,造成无法访问,继续重试

   throw new Exception("调用接口失败!");

  }

  System.out.println("执行结束....");

  //① 执行到这里已经消费成功,我们可以修改messageId的状态,并存入日志表(可以存到redis中,key为消息Id、value为状态)

 }

5. SpringBoot整合RabbitMQ应答模式(ACK)

1.修改配置simple下添加 acknowledge-mode: manual:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

spring:

  rabbitmq:

    # 连接地址

    host: 127.0.0.1

    # 端口号

    port: 5672

    # 账号

    username: guest

    # 密码

    password: guest

    # 地址(类似于数据库的概念)

    virtual-host: /admin_vhost

    # 消费者监听相关配置

    listener:

      simple:

        retry:

          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试

          enabled: true

          # 最大重试次数

          max-attempts: 5

          # 重试间隔时间(毫秒)

          initial-interval: 3000

        # 开启手动ack

        acknowledge-mode: manual

2.消费者增加代码:

1

2

Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); 手动ack

channel.basicAck(deliveryTag, false);手动签收

1

2

3

4

5

6

7

8

9

10

11

12

13

14

//邮件队列

@Component

public class FanoutEamilConsumer {

 @RabbitListener(queues = "fanout_email_queue")

 public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {

  System.out

    .println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")

      + ",messageId:" + message.getMessageProperties().getMessageId());

  // 手动ack

  Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);

  // 手动签收

  channel.basicAck(deliveryTag, false);

 }

}

RabbitMQ 如何保证幂等性,数据一致性

mq的作用主要是用来解耦,削峰,异步,

增加MQ,系统的复杂性也会增加很多,

也会带来其他的问题,比如MQ挂了怎么办,怎么保持数据的幂等性

幂等性问题通俗点讲就是保证数据不被重复消费,同时数据也不能少,

也就是数据一致性问题。

下面是MQ丢失的3种情况

 

1,生产者发送消息至MQ的数据丢失

解决方法:在生产者端开启comfirm 确认模式,你每次写的消息都会分配一个唯一的 id,

然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了

2,MQ收到消息,暂存内存中,还没消费,自己挂掉,数据会都丢失

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

3,消费者刚拿到消息,还没处理,挂掉了,MQ又以为消费者处理完

解决方式:用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

数据重复的问题简单的多,就是在消费端判断数据是否已经被消费过

 

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
  • 参考文章
  • RabbitMQ 如何解决消息幂等性的问题_java_脚本之家
  • RabbitMQ 之 幂等性_rabbitmq幂等性_$驽马十驾$的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/655873.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DJ8-4 shell 语句的分类、shell 的结构性语句

目录 8.7 shell 编程 8.7.1 shell 编程的基本过程 8.7.2 实例 8.7.3 shell 程序和语句 8.8 说明性语句和功能性语句 8.8.1 说明性语句&#xff08;注释行&#xff09; 8.8.2 常用的功能性语句 8.9 结构性语句 8.9.1 条件语句 if 8.9.2 测试语句 test 8.9.…

什么牌子的电容笔质量好耐用?平板第三方电容笔了解下

苹果的电容笔和普通的电容笔有何区别&#xff1f;其实&#xff0c;就书写情况而言&#xff0c;两者相差不多。只是苹果电容笔生在重量上&#xff0c;更加的沉重&#xff0c;而且还配备了一个特殊的重力传感器&#xff0c;能够准确的感觉到重力对线条的粗细变化。由于苹果这款产…

手机操作系统的沉浮往事(上)

移动终端操作系统&#xff0c;也就是指手机、平板电脑等设备所使用的操作系统。 在移动互联网高度发达的今天&#xff0c;我们使用移动终端操作系统的时长&#xff0c;可能已经远远超过了Windows等桌面操作系统。 那么&#xff0c;你真正了解这些移动终端操作系统吗&#xff1f…

抖音seo源码开发部署技术解析

抖音seo源码开发是一项非常重要的技术&#xff0c;开发需要深入了解抖音平台的特点和用户需求&#xff0c;积累丰富的SEO经验&#xff0c;并不断学习和更新SEO技能&#xff0c;才能不断提高视频在搜索引擎中的曝光率和播放量。 抖音seo开发需要哪些技术 了解抖音的算法和规则&…

浏览器被2345劫持了怎么搞

起因我下载了某些修改东西&#xff0c;然后就被2345篡改了浏览器的数据。我是在虚拟机里下载的&#xff0c;但是虚拟机其实也是物理机的一部分&#xff0c;实际上下载的还是到了物理机里面&#xff0c;于是浏览器打开就变成了2345的导航页面 1 解决方案&#xff1a; 浏览器主页…

DJ8-2 shell 的命令形式、shell 的变量、shell 的内部命令

目录 8.3 shell 可识别的命令形式 8.3.1 单条命令 8.3.2 多条命令 8.3.3 复合命令 8.3.4 后台命令 8.4 shell 变量和引用符 8.4.1 环境变量 plus. echo 命令的使用 8.4.2 系统变量 8.4.3 局部变量&#xff08;用户变量&#xff09; 8.4.4 单引号、双引号、…

跨境电商领域的ChatGPT使用攻略

今天分享一个电商领域的ChatGPT应用指南! 一、写谷歌广告词 提示词: 现在你是一名谷歌广告的编写人员&#xff0c;你需要为xxx产品写10条谷歌广告标题和谷歌广告描述。要求是: 1.用英文输出你的答案 2.广告的标题和广告描述的字数等要符合谷歌的标准 3.广告要引人入胜&#xf…

OceanBase—01(入门篇——使用docker安装OceanBase以及介绍连接OB的几种方式)

OceanBase—01&#xff08;入门篇——使用docker安装OceanBase以及介绍连接OB的几种方式&#xff09; 1. 前言1.1 安装部署参考1.1.1 安装前提1.1.2 参考 1.1 修改数据库用户名密码1.2 总结常见连接命令 2. 安装部署OceanBase2.1 启动 OceanBase 数据库实例2.1.1 默认拉取最新版…

代码审计——XSS详解

为方便您的阅读&#xff0c;可点击下方蓝色字体&#xff0c;进行跳转↓↓↓ 01 漏洞描述02 审计要点03 漏洞特征04 漏洞案例05 修复方案 01 漏洞描述 跨站脚本攻击&#xff08;Cross Site Script&#xff09;是一种将恶意JavaScript代码插入到其他Web用户页面里执行以达到攻击…

没网络的CentOS7的Docker容器安装Java诊断神器Arthas

操作过程 1. 先把jar包下载到本地的windwos2. 打包复制到服务器3. 启动容器设置4.重启容器并使用Arthas 1. 先把jar包下载到本地的windwos 下载地址 下载好后jar&#xff0c;然后CMD执行命令 java -jar arthas-boot.jar 然后随便进入某个jvm进程查看&#xff0c;会见到Conso…

【技术新趋势】面向图像文档的版面智能分析与理解

目录 一、什么是OCR&#xff1f;什么是版面分析理解&#xff1f;二、文档版面分析2.1、版面布局类型2.2、面向文档图像版面分析的实例分割2.3、逻辑结构分析 三、文档版面理解3.1、位置嵌入3.2、表格数据提取 四、智能文档处理技术新解决方案 人类撰写文档是为了记录和保存信息…

Zoho Books助力跨境贸易!深入了解其多币种处理功能

对于跨境行业而言&#xff0c;合作不同的客户以当地货币收取付款是一个不简单的任务。现在&#xff0c;Zoho Books 推出了新的高级多币种处理功能&#xff0c;让多货币付款或收款不再困扰。&#xff08;注意&#xff1a;此功能在Zoho Books的专业版&#xff0c;高级版&#xff…

使用 ChatGPT 创建 APP 的最佳实践

导读&#xff1a;如果你想用用ChatGPT创建应用程序来赚钱&#xff0c;这是你需要知道的。 本文字数&#xff1a;2900&#xff0c;阅读时长大约&#xff1a;18分钟 如果你想用ChatGPT创建应用程序来赚钱&#xff0c;这是你需要知道的。 我最好先说出坏消息。如果你认为可以两手…

【后端开发】尚硅谷 SpringCloud 学习笔记

文章目录 一、cloud组件二、环境搭建2.1 创建父工程2.2 支付模块构建2.3 消费者模块构建2.3.1 引入RestTemplate2.3.2 远程调用支付模块 三、Eureka3.1 基础知识3.2 单机版Eureka安装3.3 服务注册3.4 Eureka集群3.4.1 Eureka端配置3.4.2 微服务端配置3.4.3 restTemplate负载均衡…

如何让ChatGPT制作XMind思维导图

一、使用ChatGPT辅助生成内容 给大家一个思路&#xff0c;比如我想制作《股神巴菲特给儿女的一生忠告》相关的思维导图&#xff0c;那我们可以在ChatGPT上提问“请使用markdown格式写出股神巴菲特给儿女的一生忠告的思维导图&#xff0c;以代码格式输出”。 生成后&#xff0…

teleport堡垒机的一些问题

teleport文件下载&#xff0c;将teleport服务映射到公网&#xff0c;权限已经分派好了&#xff0c;但无法ssh&#xff0c;这是什么原因呢&#xff1f; 注意teleport助手的版本要跟部署的是一致的&#xff0c;否则会检测不到状态 出现下面的问题&#xff0c;应该还是在防火墙的端…

奇舞周刊第496期:ChatGPT 的工作原理,这篇文章说清楚了!

记得点击文章末尾的“ 阅读原文 ”查看哟~ 下面先一起看下本期周刊 摘要 吧~ 奇舞推荐 ■ ■ ■ ChatGPT 的工作原理&#xff0c;这篇文章说清楚了&#xff01; ChatGPT 能够自动生成一些读起来表面上甚至像人写的文字的东西&#xff0c;这非常了不起&#xff0c;而且出乎意料。…

DM3E,雷赛步进驱动器

0x6040&#xff1a; 0x6041&#xff1a; 状态流&#xff1a; 0x60608; //设置伺服模式 8CSP&#xff0c;6回零模式&#xff0c;3速度模式6040流&#xff1a; 00初始》06上电》07使能》0F待命&#xff08;可操作&#xff09; 快停流&#xff1a; 02快停》0F命令生效 参数保…

Vue全家桶(三):Vuex状态管理(State、Getters、Mutations、Actions)

目录 Vuex1. 理解Vuex1.1 组件之间共享数据的方式1.2 Vuex是什么1.2 什么时候使用Vuex1.3 Vuex的工作原理图 2 使用Vuex2.1 搭建Vuex环境 2.2 Vuex基本使用2.2.1 State2.2.2 Getters2.2.3 Mutations2.2.4 Actions2.2.5 Modules 模块化命名空间 3 求和案例3.1 使用纯vue编写3.2 …

Property ‘code‘ does not exist on type ‘AxiosResponse<any, any>‘ 的解决办法

原文链接 : Property ‘xxx’ does not exist on type ‘AxiosResponse<any, any>’ 的解决办法 vue3 ts 中 调用接口时&#xff1a; const loginOut () > {loginOutApi().then(res > {const { code } resif(code 0){ }})}报了如下错误&#xff1a; Property…