Spring AMQP-保证消费者消息的可靠性

news2025/1/13 4:52:15

为什么要保证消息的可靠性?

当MQ向消费者发送一个消息之后需要得到消费者的状态,因为消息并不一定就真的被消费者给消费了,可能在消费的过程中出现了一些意外,比如

1. 网络问题

2. 消息转换有问题

3. 消费者本身的业务处理有问题


消费者确认机制

消费者消息处理状态:

  • ack:消息成功接收,并且成功被处理,MQ将此消息删除
  • nack:消息处理失败,需要MQ重新发送消息
  • reject:消息处理失败并且拒绝该消息,MQ将此消息删除

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;


在配置文件中通过下面的配置即可设置ACK的处理方式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: c # none 默认 auto 自动确认 manual 手动确认 

消费者重试机制

消费者接收了一个消息,但是在处理的过程中出现异常了,那么AMQP会不断的重试,直到把资源占完然后崩掉,这个时候就必须要设置重试机制,限制重试的次数,避免无限制重试。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

测试

先别配置重试机制,然后在需要在接收消息的地方手动抛出一个异常,查看控制台就会看见消费者在尝试不断的获取消息,但是一直获取不到无限制的重试

  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "fanout.hamll.query1", // 队列名称
                    durable = "true"), // 是否持久
            exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT) // 交换机名称
    ))
    public void query2(String message) {
        System.err.println("fanout.hamll.query1 消息内容:" + message); 
        throw  new RuntimeException("故意的错误"); // 抛出异常
    }

配置好重试之后到了三次就会直接停止,这样子就很好的减少了系统资源的消耗


业务的幂等性判断

什么是幂等性?

在Java领域,幂等性是指同一个请求,不管发送多少次执行的结构都是一样的。

比如支付和交易,支付成功之后通知交易服务修改状态。在交易服务需要查询订单并判断订单的状态,这样子不管同一个订单重复发起多少次请求,都不会对业务的结果造成影响。


MQ保证消息的幂等性

MQ中的幂等是说,不管消息是否被重复消费,都不会对业务造成影响、处理的结果都是一致的。


MQ实现业务幂等性

为每个消息都创建一个唯一的MessageId在操作的时候将其存入数据库,然后在进行判断消息是否存在,存在就直接跳过业务的处理,不存在就继续操作。

    @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }

业务实现幂等性

在业务的操作中,比如支付和交易服务,支付成功之后会通知交易服务修改订单的状态,而在交易服务应该做判断,判断该订单的状态是否未未支付。如果是未支付就继续处理接下来的业务,否则就直接结束。

package com.hmall.trade.listener;

import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class PayStatusListener {
    @Autowired
    IOrderService orderService;
    @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "trade.pay.success.queue", durable = "true"),
            exchange = @Exchange(value = "pay.direct", type = ExchangeTypes.DIRECT),
            key = "pay.success"
    ))
    public void paySuccess(Long orderId) {
        log.info("支付成功,订单号:{}", orderId);
        //查询当前订单 判断幂等性
        Order order = orderService.getById(orderId);
        //判断状态以及对象是否存在
        if (order == null || order.getStatus() != 1) {
            return;
        }
        orderService.markOrderPaySuccess(orderId);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275809.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AI刷题-数位长度筛选问题、数值生成与运算问题

目录 一、数位长度筛选问题 问题描述 测试样例 解题思路: 问题理解 数据结构选择 算法步骤 关键点 最终代码: 运行结果: 二、数值生成与运算问题 问题描述 测试样例 解题思路: 问题理解 数据结构选择 算法步骤…

算法学习(24)—— BFS解决拓扑排序

关于拓扑排序 ①有向无环图(DAG图) 就跟它的名字一样,有方向但是没有环的图,如下图:我们了解下入度和出度,二者都是针对一个点来说的,就以上图为例入度:表示有多少条边指向一个点&am…

【深度学习入门_基础篇】概率论

开坑本部分主要为基础知识复习,新开坑中,学习记录自用。 学习目标: 随机事件与概率、随机变量及其分布、多维随机变量及其分布、大数定律与中心极限定理。 强烈推荐此视频: 概率论_麻省理工公开课 废话不多说,直接…

Gitlab-Runner配置

原理 Gitlab-Runner是一个非常强大的CI/CD工具。它可以帮助我们自动化执行各种任务,如构建、测试和部署等。Gitlab-Runner和Gitlab通过API通信,接收作业并提交到执行队列,Gitlab-Runner从队列中获取作业,并允许在不同环境下进行作…

多并发发短信处理(头条项目-07)

1 pipeline操作 Redis数据库 Redis 的 C/S 架构: 基于客户端-服务端模型以及请求/响应协议的 TCP服务。客户端向服务端发送⼀个查询请求,并监听Socket返回。通常是以 阻塞模式,等待服务端响应。服务端处理命令,并将结果返回给客…

OSPF - 1类LSA(Router-LSA)

前篇博客有对常用LSA的总结 1类LSA是OSPF计算最原始的材料,他会泛洪发给所有的路由器 LSA是包含在LSU中的,一条LSU能够携带多条LSA options位所有LSA都会有,用于标记起源于什么类型的区域,具体查看文章【邻居建立】 flags位是一…

pdf提取文本,表格以及转图片:spire.pdf

文章目录 🐒个人主页:信计2102罗铠威🏅JavaEE系列专栏📖前言:🎀 1. pdfbox1.1导入pdfbox 的maven依赖1.1 提取文本1.2 提取文本表格(可自行加入逻辑处理)1.3 pdf转换成图片代码&…

_STM32关于CPU超频的参考_HAL

MCU: STM32F407VET6 官方最高稳定频率:168MHz 工具:STM32CubeMX 本篇仅仅只是提供超频(默认指的是主频)的简单方法,并未涉及STM32超频极限等问题。原理很简单,通过设置锁相环的倍频系数达到不同的频率&am…

图片和短信验证码(头条项目-06)

1 图形验证码接口设计 将后端⽣成的图⽚验证码存储在redis数据库2号库。 结构: {img_uuid:0594} 1.1 创建验证码⼦应⽤ $ cd apps $ python ../../manage.py startapp verifications # 注册新应⽤ INSTALLED_APPS [django.contrib.admin,django.contrib.auth,…

解决idea中无法拖动tab标签页的问题

1、按 Ctrl Alt S 打开设置,找到路径 File | Settings | Appearance & Behavior | Appearance 2、去掉勾选 Drag-and-drop with Alt pressed only 即可

单片机(MCU)-简单认识

简介: 内部集成了CPU,RAM,ROM,定时器,中断系统,通讯接口等一系列电脑的常用硬件功能。 单片机的任务是信息采集(依靠传感器),处理(依靠CPU)&…

QT c++ 样式 设置 按钮(QPushButton)的渐变色美化

上一篇文章中描述了标签的渐变色美化,本文描述按钮的渐变色美化。 1.头文件 #ifndef WIDGET_H #define WIDGET_H #include <QWidget> //#include "CustomButton.h"#include <QVBoxLayout> #include <QLinearGradient> #include <QPushButton&…

【物流管理系统 - IDEAJavaSwingMySQL】基于Java实现的物流管理系统导入IDEA教程

有问题请留言或私信 步骤 下载项目源码&#xff1a;项目源码 解压项目源码到本地 打开IDEA 左上角&#xff1a;文件 → 新建 → 来自现有源代码的项目 找到解压在本地的项目源代码文件&#xff0c;点击确定&#xff0c;根据图示步骤继续导入项目 查看项目目录&#xff…

【数据结构-堆】【二分】力扣3296. 移山所需的最少秒数

给你一个整数 mountainHeight 表示山的高度。 同时给你一个整数数组 workerTimes&#xff0c;表示工人们的工作时间&#xff08;单位&#xff1a;秒&#xff09;。 工人们需要 同时 进行工作以 降低 山的高度。对于工人 i : 山的高度降低 x&#xff0c;需要花费 workerTimes…

如何用SQL语句来查询表或索引的行存/列存存储方式|OceanBase 用户问题集锦

一、问题背景 自OceanBase 4.3.0版本起&#xff0c;支持了列存引擎&#xff0c;允许表和索引以行存、纯列存或行列冗余的形式创建&#xff0c;且这些存储方式可以自由组合。除了使用 show create table命令来查看表和索引的存储类型外&#xff0c;也有用户询问如何通过SQL语句…

CDA数据分析师一级经典错题知识点总结(3)

1、SEMMA 的基本思想是从样本数据开始&#xff0c;通过统计分析与可视化技术&#xff0c;发现并转换最有价值的预测变量&#xff0c;根据变量进行构建模型&#xff0c;并检验模型的可用性和准确性。【强调探索性】 2、CRISP-DM模型Cross Industry Standard Process of Data Mi…

算法题(32):三数之和

审题&#xff1a; 需要我们找到满足以下三个条件的所有三元组&#xff0c;并存在二维数组中返回 1.三个元素相加为0 2.三个元素的下标不可相同 3.三元组的元素不可相同 思路&#xff1a; 混乱的数据不利于进行操作&#xff0c;所以我们先进行排序 我们可以采取枚举的方法进行解…

【设计模式】介绍常见的设计模式

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 ✨ 介绍一下常见的设计模式✨ Spring 中常见的设计模式 这期内容主要是总结一下常见的设计模式&#xff0c;可…

单通道串口服务器(三格电子)

一、产品介绍 1.1 功能简介 SG-TCP232-110 是一款用来进行串口数据和网口数据转换的设备。解决普通 串口设备在 Internet 上的联网问题。 设备的串口部分提供一个 232 接口和一个 485 接口&#xff0c;两个接口内部连接&#xff0c;同 时只能使用一个口工作。 设 备 的网 口…

【蓝牙】win11 笔记本电脑连接 hc-06

文章目录 前言步骤 前言 使用电脑通过蓝牙添加串口 步骤 设置 -> 蓝牙和其他设备 点击 显示更多设备 更多蓝牙设置 COM 端口 -> 添加 有可能出现卡顿&#xff0c;等待一会 传出 -> 浏览 点击添加 hc-06&#xff0c;如果没有则点击 再次搜索 确定 添加成…