第十一章 RabbitMQ之消费者确认机制

news2024/11/25 14:42:48

目录

一、介绍

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

2.1.2. 生产者主要代码 

2.1.3. 消费者主要代码 

2.1.4. 运行效果 

2.2. manual:手动模式

2.3. auto:自动模式 


一、介绍

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

ack:成功处理消息,RabbitMQ从队列中删除该消息

nack:消息处理失败,RabbitMQ需要再次投递消息

reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

none:不处理 消息投递给消费者后立刻ack 消息立刻从MQ删除(非常不安全不建议使用)

manual:手动模式 即手动ack或reject,需要在业务代码结束后,调用api发送ack,但是这种有代码入侵,不建议使用。

auto:自动模式 SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack。当业务出现异常时,根据异常判断返回不同结果:

1. 如果是业务异常,会自动返回nack

2. 如果是消息处理或校验异常,自动返回reject

Spring默认未我们设定的是auto 自动模式,符合我们实际项目的需求。 

二、演示三种ACK方式效果

2.1. none: 不处理

2.1.1. 消费者配置代码

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack


2.1.2. 生产者主要代码 

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");
    }
}

2.1.3. 消费者主要代码 

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "simple.queue")
    public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
        throw new Exception();
    }
}

2.1.4. 运行效果 

 

我们可以看到,当生产者投递到MQ的那一刻,会立刻返回ACK,此刻消费者的业务逻辑未执行完。

2.2. manual:手动模式

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack


我们定义了一个SimpleMessageListenerContainer,并为它设置了一个ChannelAwareMessageListener。在监听器内部,我们实现了消息的接收和处理,并在处理完成后使用channel.basicAck方法手动发送一个确认消息给RabbitMQ,表明消息已被消费。如果在处理消息时发生异常,我们可以使用channel.basicReject方法拒绝该消息,以便RabbitMQ可以将其重新排队或者进行其他配置的处理。 

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class RabbitMQConfig {
 
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("yourQueueName"); // 设置监听的队列名称
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                try {
                    // 消息处理逻辑
                    System.out.println("Received message: " + new String(message.getBody()));
 
                    // 确认消息已被成功处理
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e) {
                    // 出现异常,拒绝该消息
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        });
        return container;
    }
}

2.3. auto:自动模式 

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack


当生产者投递到MQ后消费者在消费过程中发生业务异常,MQ会将它标记为Unacked,后续会一直投递该消息,直到消费成功为止。

 

下图看到有两条消息,其中一条是第一次投递失败重新投递的消息: 

至此我们思考一下,实际项目中我们推荐采用Spring AMQP为我们实现的auto 自动模式确认机制,虽然看上去我们的系统设计简单了,但是对于如果我们业务代码出现异常,消息在消费过程中执行一直失败,那么RabbitMQ后续会一直投递该消息,这期间异常消息如果一直消费不了,循环投递就会给我们系统造成极大的压力负担,这该怎么解决?下一章将给大家讲解失败消息的处理策略!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2207701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

物流大数据底盘建设方案

1、现状及目标 1.1、离线数仓现状及目标 1.2、实时数仓现状及目标 2、建设方向 2.1、建设概览 2.2、数仓架构重建 2.3、数据架构 2.4、作业优化 2.5、具体作业优化-运营 2.6、具体作业优化-财经 2.7、数据血缘依赖重构 2.8、事实表建设思路 2.9、公共维表建设思路 2.10、数据…

springboot-网站开发-使用slf4j实现网站异常错误的及时跟踪定位

springboot-网站开发-使用slf4j实现网站异常错误的及时跟踪定位!项目部署,开发好后,部署到远程服务器上面了,运行过程中,难免会遇到一些错误和异常情况,我们需要借助一些插件来帮助我们及时捕捉这类错误和异…

【中文版】深度学习 deep learning 花书 pdf下载 2017.09.04

中文版pdf:https://pan.baidu.com/s/1s93yluQGSly5uBDAIVAlNg?pwdx6xy github:https://github.com/exacity/deeplearningbook-chinese 目录 第一章 前言第二章 线性代数第三章 概率与信息论第四章 数值计算第五章 机器学习基础第六章 深度前馈网络第七…

一道Fortran题(Fortran)

题目 代码 选择排序法 结果 我勒个!

【项目管理】如何根据 PV、EV、AC 计算 CPI 和 SPI?

【项目管理】如何根据 PV、EV、AC 计算 CPI 和 SPI? 一、PV(计划价值 - Planned Value)二、EV(挣值 - Earned Value)三、AC(实际成本 - Actual Cost)四、CV(成本偏差 - Cost Varianc…

C# WPF 仿 Android Toast 效果

转载请注明出处: https://blog.csdn.net/hx7013/article/details/142860084 主职Android, 最近需要写一些WPF的程序作为上位机,目前WPF的MessageBox过于臃肿,且想找一个内置的非阻塞的简单提示一直找不到,想到了Android的Toast所以写了这个扩…

Kafka之生产者

本章内容将整理下Kafka体系结构中的生产者相关的一些知识。 1. 生产者客户端 生产者客户端在Kafka的发展历程当中一共有两个重大版本: 一个是基于Scala语言开发的版本,称为Old Producer或Scala版的生产者客户端。一个是Kafka0.9.x版本之后以Java语言开发…

《深度学习》OpenCV 光流估计 原理、案例解析

目录 一、光流估计 1、什么是光流估计 2、原理 3、光流估计算法 1)基于局部方法 2)和基于全局方法 4、光流估计的前提 1)亮度恒定 2)小运动 3)空间一致 二、案例实现 1、读取视频 2、特征检测 3、处理每…

Python | Leetcode Python题解之第474题一和零

题目: 题解: class Solution:def findMaxForm(self, strs: List[str], m: int, n: int) -> int:count10 []for s in strs:count10.append([0,0])for c in s:if c 0: count10[-1][0]1else: count10[-1][1]1dp [[0]*(n1) for _ in range(m1)]for i …

十一、数据库的设计规范

文章目录 1. 为什么需要数据库设计2. 范式2.1 范式介绍2.2 范式都包括哪些2.3 键和相关属性的概念2.4 第一范式(1st NF)2.5 第二范式(2nd NF)2.6 第三范式(3rd NF)2.7 小结3. 反范式化3.1 概述3.2 应用举例3.3 反范式的新问题3.4 反范式的使用场景3.4.1 增加冗余字段的建议3.…

windows系统更新升级node指定版本【避坑篇!!!亲测有效】(附带各版本node下载链接)一定看到最后!不用删旧版!

Node.js 是一个开源、跨平台的 JavaScript 运行时环境,广泛应用于服务器端和网络应用的开发。随着 Node.js 版本的不断更新,我们可能需要升级到特定版本以满足项目需求或修复安全漏洞。又或者是学习开发另外一个新项目,新项目对Node版本要求更…

上交大全华班复现o1旅程式学习下的深思考

因篇幅限制不重复原研究内容,建议访问原技术报告链接精读,这里主要向大伙表示我对上交大本此研究所涉三方面的价值认同及更进一步的延展思考。 价值认同: ① 深刻洞察:系统性研究并阐释旅程式学习; ② 行业促进&…

SQL Injection | MySQL 数据库概述

关注这个漏洞的其他相关笔记:SQL 注入漏洞 - 学习手册-CSDN博客 0x01:MySQL 数据库简介 MySQL 是一个流行的关系型数据库管理系统(RDBMS),它基于 SQL (Structured Query Language)进行操作。My…

Django项目的创建及说明(详细图解版)

Django项目的创建及说明 1、安装Django2、创建项目2.1、利用终端创建项目2.2、利用Pycharm企业版创建项目 3、默认文件介绍 1、安装Django 在终端输入下述命令行。 pip install django安装成功后执行如下命令查看Django是否安装好,若正确显示出Django版本号则安装…

[实时计算flink]应用场景

本文将以部门场景和技术领域场景为例,为您介绍实时计算Flink版的大数据是实时化场景。 背景信息 作为流式计算引擎,Flink可以广泛应用于实时数据处理领域,例如ECS在线服务日志,IoT场景下传感器数据等。同时Flink还能订阅云上数据…

进程的那些事--进程间的通信(重点说明管道和共享内存)

目录 前言 一、初始进程间通信 二、管道 1.匿名管道 2.命名管道 三、共享内存 四、消息队列(了解) 五、信号量(了解) 前言 提示:这里可以添加本文要记录的大概内容: 进程是一个能够独立运行&#…

什么情况下数据库和缓存不一致?

首先,在非并发的场景中,出现不一致的问题大家都能比较容易的理解,因为缓存的操作和数据库的操作是存在一定的时间差的。而生两个操作是没办法保证原子些的,也就是说,是有可能一个操作功,一个操作失败的。所…

C语言-数据结构 折半查找

在折半查找中,刚开始学可能会在下标处产生困惑,例如奇数个长度的数组怎么处理,偶数个长度的数组怎么处理,不需要修改代码吗?并且下标我从1开始算和0开始算影响代码吗?其实都可以用一样的代码,产…

【含文档】基于Springboot+Vue的失物招领系统(含源码+数据库+lw)

1.开发环境 开发系统:Windows10/11 架构模式:MVC/前后端分离 JDK版本: Java JDK1.8 开发工具:IDEA 数据库版本: mysql5.7或8.0 数据库可视化工具: navicat 服务器: SpringBoot自带 apache tomcat 主要技术: Java,Springboot,mybatis,mysql,vue 2.视频演示地址 3.功能 系统定…

如何替换OCP节点(一):使用oat | OceanBase应用实践

前言: OceanBase Cloud Platform(简称OCP),是 OceanBase数据库的专属企业级数据库管理平台。 在实际生产环境中,OCP的安装通常是第一步,先搭建OCP平台,进而依赖OCP来创建、管理和监控我们的生…