RabbitMQ高级特性 - 消费者消息确认机制

news2025/1/13 10:03:33

文章目录

  • RabbitMQ 消息确认机制
    • 背景
    • 消费者消息确认机制
      • 概述
      • 手动确认(RabbitMQ 原生 SDK)
      • 手动确认(Spring-AMQP 封装 RabbitMQ SDK)
        • AcknowledgeMode.NONE
        • AcknowledgeMode.AUTO(默认)
        • AcknowledgeMode.MANUAL
        • MANUAL 可能会引发的问题

RabbitMQ 消息确认机制


背景

在这里插入图片描述

上图中可以看出,从生产者发送消息消费者接收到消息并正确处理,这些里路线都可能会出现问题,那么为了保证这些消息最后能被正确处理,RabbitMQ 就提供了消息确认机制.

消费者消息确认机制

概述

在这里插入图片描述
为了保证消息从 队列 到 消费者正确消费,那么就引入了消费者消息确认机制.

a)消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种(以下讲到的方法和参数来自于 RabbitMQ 原生的 SDK,非 Spring 提供).

  • 自动确认:当 autoAck = true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后不管消费者是否真正的消费这些消息,都会从内存中删除.(适合对消息可靠性要求不高的场景).
  • 手动确认:当 autoAck = false 时,RabbitMQ 会等待消费者显示的调用 Basic.Ack 命令(波安排时间哦且确认消息),然后才会从 内存或磁盘 中删除消息.(适合对消息可靠性要求高的场景).

Ps:可靠性高了,性能也就下降了,所以请综合考虑.

b)对于 MQ队列 中的消息,在 MQ管理平台上可以看到以下两种类别:
在这里插入图片描述
Ready:队列已经准备好消息,随时准备发送给消费者 的消息数量(只要消费者来要,就立刻发送).
Unacked:消息已经发送给消费者,但是消费者没有返回消息确认 的消息数量(消息确认包括 ack肯定确认nack否定确认

手动确认(RabbitMQ 原生 SDK)

消费者在收到消息之后,可以选择确认,也可以选择拒绝或者跳过,RabbitMQ因此提供了不同的确认应答方式,消费者客户端可通过调用 channel 的相关方法实现.

a)肯定确认:消费者已经接收到消息,并且成功处理消息,可以将其丢弃了.

Channel.basicAck(long deliveryTag, boolean multiple)
  • deliveryTag:消息的唯一标识(单调递增的 long),特点如下:
    • deliveryTag 是每个 Channel 通道独立维护,所以每个通道上的都是唯一的(生产者 和 Broker 建立一个 channel 会生成一个 deliverTag,消费者 和 Broker 建立一个 channel 会生成一个 deliverTag,这俩 deliverTag 是不同的).
    • 当消费者 ack确认 一条消息时,必须使用对应的通道上 deliveryTag 进行确认.
  • multiple:是否批量确认. 如果值为 true,那么就会一次性 ack确认 所有小于或等于指定的 deliveryTag 的消息,大大减少了网络开销
    • 假设 deliveryTag = 8,multiple = true:那么 deliveryTag <= 8 的消息都会被确认.
    • 假设 deliveryTag = 8,multiple = false:只确认 8.

Ps:deliveryTag 确保了消息传递的可靠性和顺序性.

b)否定确认(单个):用来拒绝这个消息. 被拒绝的消息如何处理,具体要看 requeue 参数.

Channel.basicReject(long deliveryTag, boolean requeue)
  • requeue:标识拒绝后,这条消息如何处理.
    • requeue = true:消息会重新存入队列,将来会发送给下一个订阅的消费者.
    • requeue = false:消息会从队列中移除,因此不会发送给消费者.

c)否定确认(批量):Channel.basicReject 只能拒绝一条消息,如果要批量拒绝消息,就可以使用 Channel.basicNack.

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

multiple:参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息.

d)MQ 的管理平台上也提供了几种确认方式.
在这里插入图片描述

手动确认(Spring-AMQP 封装 RabbitMQ SDK)

Spring-AMQP 对消息确认提供了三种策略:

public enum AcknowledgeMode {
    NONE,
    MANUAL,
    AUTO;
}

这里根 RabbitMQ 原生 SDK 是有些不同的.

AcknowledgeMode.NONE

不管消费者是否成功处理了消息,RabbitMQ 都会自动确认消息,然后从 队列 中移除消息.

a)配置手动确认

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: none

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(
   val rabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

}

c)消费者

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        //业务处理...
        println("业务逻辑处理完成")
    }

}

在这里插入图片描述

d)效果演示
触发接口之后,回到 MQ 管理平台,可以看到队列中消息已经被删除.
在这里插入图片描述

AcknowledgeMode.AUTO(默认)

分为以下情况:

  • 消费者处理消息过程中没有抛出异常,则自动确认消息,然后从 队列 中移除消息.
  • 消费者处理消息过程中抛出异常,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: auto

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(
   val rabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

}

c)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        //业务处理...
        println("业务逻辑处理完成")
    }

}

效果如下:
在这里插入图片描述
在这里插入图片描述
d)消费者(异常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, ${message.messageProperties.deliveryTag}")
        //业务处理...
        val a = 1 / 0
        println("业务逻辑处理完成")
    }

}

效果如下:
消息未被确认,会不断重返队列,进行重试,因此 IDEA 中会循环报错输出.
在这里插入图片描述

AcknowledgeMode.MANUAL

分为以下情况:

  • 消费者在处理完消息后显示调用 basicAck 方法 来确认消息,然后从 队列 中移除消息.
  • 消费者在处理完消息后显示调用 basicNack 方法 来否定确认消息,是否从队列中移除消息需要看 requeue 参数的值
    • requeue = true:重返队列,不断重试.
    • requeue = false:丢弃消息.
  • 消费者在处理完消息后什么都不做,则不会确认消息,消息会重返队列,并且不断重试(MQ 管理平台中 Unacked +1).

a)配置文件

spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
    listener:
      simple:
        acknowledge-mode: manual

b)生产者接口

@RestController
@RequestMapping("/mq")
class MQApi(
   val rabbitTemplate: RabbitTemplate
) {

    @RequestMapping("/ack")
    fun ack(): String {
        rabbitTemplate.convertAndSend(MQConst.ACK_EXCHANGE, MQConst.ACK_BINDING, "ack msg 1")
        return "ok"
    }

}

c)消费者(异常处理消息,requeue = true)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            //业务处理...
            val a = 1 / 0
            println("业务逻辑处理完成")
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliveryTag, false, true) //requeue: true
        }
    }

}

由于消息处理异常,发送 nack,并且 requeue = true,因此消息会重返队列,不断重试.
在这里插入图片描述
在这里插入图片描述

d)消费者(异常处理消息,requeue = false)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            //业务处理...
            val a = 1 / 0
            println("业务逻辑处理完成")
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliveryTag, false, false) //requeue: false
        }
    }

}

由于消息处理异常,发送 nack,并且 requeue = false,因此消息不会重返队列,消息被丢弃.
在这里插入图片描述
d)消费者(正常处理消息)

import com.cyk.rabbitmq.constants.MQConst
import com.rabbitmq.client.Channel //注意这里的依赖
import org.springframework.amqp.core.Message //注意这里的依赖
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
import java.nio.charset.Charset

@Component
class AckListener {

    @RabbitListener(queues = [MQConst.ACK_QUEUE])
    fun handMessage(
        message: Message,
        channel: Channel,
    ) {
        val deliveryTag = message.messageProperties.deliveryTag
        try {
            println("接收到消息: ${String(message.body, Charset.forName("UTF-8"))}, $deliveryTag")
            //业务处理...
            println("业务逻辑处理完成")
            channel.basicAck(deliveryTag, false)
        } catch (e: Exception) {
            channel.basicNack(deliveryTag, false, false) //requeue: false
        }
    }

}

消息被正常处理,返回 ack.
在这里插入图片描述

MANUAL 可能会引发的问题

在这里插入图片描述
如果这里捕获的不是 Exception 异常,那么消费者处理消息的时候,可能会引发一些不会被捕获的异常,就会导致没有返回 nack.
也就意味着,没有进行确认应答,那么 mq管理平台 上就会显示 Unacked 数值 +1.

Ps:具体还是需要根据业务场景而定

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1980133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

思源笔记软件的优缺点分析

在过去一年里&#xff0c;我用了很多款笔记&#xff0c;从word文档到onenote到语雀再到思源&#xff0c;最后坚定的选择了思源笔记 使用感受 首先是用word文档来记笔记&#xff0c;主要是开始时不知道笔记软件怎么好用&#xff0c;等到笔记越来越膨胀的时候我发现&#xff0c…

2024死磕小红书,一定能赚到钱!

​2024死磕小红书&#xff0c;一定能赚到钱&#xff01;在文末领取小红书运营完全指南电子书 从2023年起&#xff0c;小红书这股热乎劲儿就像开了挂&#xff0c;突然间就成了人人想蹭的“显学”。大伙儿都想趁着平台红利期&#xff0c;分一杯羹。说来惭愧&#xff0c;我从2020年…

C语言指针·高级用法超详解(指针运算、野指针、悬空指针、void类型指针、二级以及多级指针)

目录 1. 指针的运算 2. 野指针和悬空指针 2.1 野指针 2.2 悬空指针 3. void类型指针 4. 二级指针和多级指针 4.1 命名规则 4.2 作用 4.2.1 二级指针可以操作一级指针记录的地址 4.2.2 利用二级指针获取变量中记录的数据 1. 指针的运算 文章开始前可以先了…

基于强化学习的Deep-Qlearning网络玩cartpole游戏

1、环境准备&#xff0c;gym的版本为0.26.2 2、编写网络代码 # 导入必要的库 import gym import torch import torch.nn as nn import torch.optim as optim import numpy as np from collections import deque import random# 定义DQN网络 class DQN(nn.Module):def __init__…

《深入浅出WPF》学习笔记五.Mvvm设计模式

《深入浅出WPF》学习笔记五.Mvvm设计模式 背景 在通过视频学习wpf的过程中&#xff0c;讲师花了不少篇幅来讲Mvvm。特地在此用自己的语言总结一番,方便以后面试回答&#xff0c;如有理解不对&#xff0c;欢迎指正哈。 Mvvm结构 Mvvm指的是ModelViewViewModel 为什么要使用…

《网络安全自学教程》- MySQL匿名用户的原理分析与实战研究

《网络安全自学教程》 低版本的MySQL数据库在安装时会创建一个用户名和密码为空的账户&#xff0c;也就是匿名账户。即使升级到高版本&#xff0c;匿名账户仍然会存在。 MySQL匿名账户 1、检查是否存在匿名账户2、检查用户权限3、创建匿名账户4、使用匿名账户登录5、删除匿名账…

医院管理系统

医院管理系统 本文所涉及所有资源均在传知代码平台可获取 文章目录 医院管理系统概述使用技术核心功能1. 登录与注册2. 管理员系统3. 患者系统&#xff08;医院电子平台&#xff09;4. 医生系统&#xff08;坐诊系统&#xff09; 部署与启动适用场景 概述 本项目是一个专为大学…

读零信任网络:在不可信网络中构建安全系统09用户信任

1. 用户信任 1.1. 将设备身份和用户身份混为一谈会导致一些显而易见的问题 1.1.1. 特别是当用户拥有多台设备时&#xff0c;而这种情况很普遍 1.1.2. 应该针对不同类型的设备提供相匹配的凭证 1.1.3. 在存在共用终端设备的情况下&#xff0c;所有的这些问题将更加凸显 1.2…

打造未来交互新篇章:基于AI大模型的实时交互式流媒体数字人项目

在当今数字化浪潮中,人工智能(AI)正以前所未有的速度重塑我们的交互体验。本文将深入探讨一项前沿技术——基于AI大模型的实时交互式流媒体数字人项目,该项目不仅集成了多种先进数字人模型,还融合了声音克隆、音视频同步对话、自然打断机制及全身视频拼接等前沿功能,为用…

Python中使用正则表达式

摘要&#xff1a; 正则表达式&#xff0c;又称为规则表达式&#xff0c;它不是某种编程语言所特有的&#xff0c;而是计算机科学的一个概念&#xff0c;通常被用来检索和替换某些规则的文本。 一.正则表达式的语法 ①行定位符 行定位符就是用来描述字符串的边界。"^&qu…

第十三节、人物属性及伤害计算

一、碰撞器层级剔除 选中player和敌人&#xff0c;即可去除 若勾选触发器&#xff0c;则会取消掉碰撞效果&#xff0c;物体掉落 二、人数属性受伤计算 1、创建代码 将两个代码挂载到玩家和敌人身上 2、调用碰撞物体的方法 3、伤害值 开始&#xff1a;最大血量即为当前血量…

Arduino PID库 (6):初始化

Arduino PID库 &#xff08;6&#xff09;&#xff1a;初始化 参考&#xff1a;手把手教你看懂并理解Arduino PID控制库——初始化 Arduino PID库 &#xff08;5&#xff09;&#xff1a;开启或关闭 PID 控制的影响 问题 在上一节中&#xff0c;我们实现了关闭和打开PID的功…

最小二乘法求解线性回归问题

本文章记录通过矩阵最小二乘法&#xff0c;求解二元方程组的线性回归。 假设&#xff0c;二维平面中有三个坐标&#xff08;1&#xff0c;1&#xff09;、&#xff08;2&#xff0c;2&#xff09;、&#xff08;3&#xff0c;2&#xff09;&#xff0c;很显然该三个坐标点不是…

React(三):PDF文件在线预览(简易版)

效果 依赖下载 https://mozilla.github.io/pdf.js/getting_started/ 引入依赖 源码 注意&#xff1a;pdf文件的预览地址需要配置代理后才能显示出来 import ./index.scss;function PreviewPDF() {const PDF_VIEWER_URL new URL(./libs/pdfjs-4.5.136-dist/web/viewer.html, im…

12.SpringDataRedis

介绍 SpringData是Spring中数据操作的模块&#xff0c;包含对各种数据库的集成&#xff0c;其中redis的集成模块就叫做SpringDataRedis。 spring的思想从来都不是重新生产&#xff0c;而是整合其他技术。 SpringDataRedis的特点 1.提供了对不同redis客户端的整合&#xff08…

8.4 day bug

bug1 忘记给css变量加var 复制代码到通义千问&#xff0c;解决 bug2 这不是我的bug&#xff0c;是freecodecamp的bug 题目中“ 将 --building-color2 变量的颜色更改为 #000” “ 应改为” 将 #000 变量的颜色更改为 --building-color2 “ bug3 又忘记加var(–xxx) 还去问…

渗透小游戏,各个关卡的渗透实例

Less-1 首先&#xff0c;可以看见该界面&#xff0c;该关卡主要是SQL注入&#xff0c;由于对用户的输入没有做过滤&#xff0c;使查询语句进入到了数据库中&#xff0c;查询到了本不应该查询到的数据 首先&#xff0c;如果想要进入内部&#xff0c;就要绕过&#xff0c;首先是用…

C#中的TCP和UDP

TcpClient TCP客户端 UDP客户端 tcp和udp的区别 TCP&#xff08;传输控制协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;是两种在网络通信中常用的传输层协议&#xff0c;它们在C#或任何其他编程语言中都具有相似的特性。下面是TCP和UDP的主要区别&#xff1a;…

MySQL的基本使用

文章目录 MySQL的基本使用什么是SQLSQL学习目标SQL的SELECT语句SQL的INSERT INTO语句 SQL的UPDATE语句SQL的DELETE语句 SQL的WHERE子句可在WHERE子句中使用的运算符SQL的AND和OR运算符SQL的ORDER BY子句SQL的COUNT(*)函数 在项目中操作数据库的步骤安装mysql模块配置mysql模块测…

微服务设计原则——易维护

文章目录 1.充分必要2.单一职责3.内聚解耦4.开闭原则5.统一原则6.用户重试7.最小惊讶8.避免无效请求9.入参校验10.设计模式11.禁用 flag 标识12.分页宜小不宜大参考文献 1.充分必要 不是随便一个功能都需要开发个接口。 虽然一个接口应该只专注一件事&#xff0c;但并不是每个…