RabbitMQ_消息确认机制

news2024/11/17 1:26:59

消息确认机制分为消息发送确认机制消息消费确认机制

消息发送确认机制

消息发送确认机制:消息由producer发送后,确认其是否到达broker,又是否被exchange转发至对应queue的机制

该机制分为两部分:producer---broker,exchange---queue

前者的实现依靠ConfirmCallback机制,后者的实现依靠ReturrnsCallback机制

ConfirmCallback

实现ConfirmCallback接口,并重写confirm方法

confirm方法参数含义:

correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
public CorrelationData() {
    this.id = UUID.randomUUID().toString();
}

ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)

cause:传输失败的原因

当消息传输至broker后就会触发ConfirmCallback回调,无论传输是否成功,可根据传输的结果进行后续处理

@Component
// ConfirmCallback 用于确认消息是否到达 broker(rabbitmq服务器)
// 实现 ConfirmCallback 接口 重写confirm()方法
public class ConfirmCallbackComponent implements RabbitTemplate.ConfirmCallback {

    /*
    correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
    public CorrelationData() {
        this.id = UUID.randomUUID().toString();
    }

    ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)

    cause:传输失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            System.out.println("消息发送异常");
        } else {
            System.out.println("消息发送成功" + " correlationData=" + correlationData.getId() + " ack=" + ack + " cause=" + cause);
        }
    }
}

ReturrnsCallback

实现ReturnsCallback接口,并重写returnedMessage方法

当消息转发失败后就会触发ReturrnsCallback,会将消息返回给生产者,同时会返回与消息转发失败的相关信息(包含在参数returned内),可对此采取后续处理

@Component
// 实现ReturnCallback接口 重写returnedMessage()方法
public class ReturnsCallbackComponent implements RabbitTemplate.ReturnsCallback {

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("ReturnCallback: replyCode=" + returned.getReplyCode() + " replyText=" + returned.getReplyText() + " message= " + returned.getMessage() + " exchange=" + returned.getExchange() + " routingKey=" + returned.getRoutingKey());
    }
}

配置文件

注:生产者端配置文件

ConfirmCallback

publisher-confirm-type: correlated
    #NONE:
      #禁用发布确认 是默认值。
    #CORRELATED:
      #发布消息后 交换机会触发回调方法。
    #SIMPLE:
      #有两种效果:
        #1:和CORRELATED一样会触发回调方法
        #2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
    #根据返回结果来判定下一步的逻辑: waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 则接下来无法发送消息到 broker

ReturnsCallback

    template:
      mandatory: true # 设置当交换机分发消息失败时 将消息返回至生产者(否则直接丢弃)
    publisher-returns: true # 允许消息返回至生产者

消息消费确认机制

生产者Service

此处需要调用ConfirmCallback接口与ReturnsCallback接口的实现类实例

@Service
public class WorkService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackComponent confirmCallbackComponent;

    @Autowired
    private ReturnsCallbackComponent returnsCallbackComponent;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        // 消息被手动ack时的处理
        rabbitTemplate.setConfirmCallback(confirmCallbackComponent);

        // 消息重返队列时的处理
        rabbitTemplate.setReturnsCallback(returnsCallbackComponent);

        // 发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                // 是否持久化消息
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    return message;
                },
                // 实现ConfirmCallback接口 重写其confirm方法时 作为confirm方法的参数
                new CorrelationData(UUID.randomUUID().toString()));
    }
}

消费者Service

deliveryTag:表示消息投递序号 接收消息后deliveryTag++
手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作

multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息
举个栗子:
        假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
        此时发送第四条消息 其deliveryTag为4 且该消息被确认
        若multiple被设置为true 则会将1、2、3、4的消息全部进行确认

requeue:消息是否重入队列 true为重入

方法参数: 
basicAck: deliveryTag multiple
basicReject: deliveryTag requeue
basicNack: deliveryTag multiple requeue
@Service
@RabbitListener(queues = "work_confirm_queue")
public class WorkerService {

    @RabbitHandler
    public void workerMessage(String msg, Channel channel, Message message) throws IOException {

        /*
        deliveryTag:表示消息投递序号 接收消息后deliveryTag++
        手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作

        multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息

        举个栗子:
                假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
                此时发送第四条消息 其deliveryTag为4 且该消息被确认
                若multiple被设置为true 则会将1、2、3、4的消息全部进行确认

        requeue:消息是否重入队列 true为重入

        方法参数:
        basicAck: deliveryTag multiple
        basicReject: deliveryTag requeue
        basicNack: deliveryTag multiple requeue
         */
        try {
            System.out.println("worker收到消息: " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息
        }  catch (Exception e) {
            // 判断消息是否已重返过队列
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("worker再次接收消息失败 队列拒绝消息的重返 " + msg);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息重返
            } else {
                System.out.println("worker接收消息失败 消息将返回队列 " + msg);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 消息重入队列
            }
        }
    }
}

配置文件

注:消费者端配置文件

    listener:
      simple:
        prefetch: 1 # 消费者一次性可以消费的最大消息数
        acknowledge-mode: manual # 开启手动应答
        # none 一律视为应答
        # manual 手动应答
        # auto 自动应答(与none区别在于有应答条件)
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试数(若使用try-catch 则该设置失效)
          initial-interval: 1000ms # 重试间隔

测试

关于sleep方法:单元测试运行完毕后即关闭,而调用方法与进行通信需要时间,为了确保能收到消费者端的应答,需要保证信道处于开启状态,故sleep

    @Autowired
    WorkService workService;
    @Test
    void workQueuesOrders() throws InterruptedException {
        workService.sendMessage("", "work_confirm_queue", "hello");

        TimeUnit.SECONDS.sleep(5);
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/63682.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 性能优化之内存优化——重识内存

我们知道,手机的内存是有限的,如果应用内存占用过大,轻则引起卡顿,重则导致应用崩溃或被系统强制杀掉,更严重的情况下会影响应用的留存率。因此,内存优化是性能优化中非常重要的一部分。但是,很…

66-86-javajvm-堆

66-javajvm-堆: 堆的核心概述 堆与进程、线程 一个进程对应一个JVM实例一个JVM实例对应一个堆空间进程包含多个线程,所以线程之间共享同一个堆空间 对堆的认识 一个JVM实例只存在一个堆内存,堆也是Java内存管理的核心区域。Java堆区在JVM启动…

HashMap原理

在Java编程语言中,最基本的结构就是两种,一种是数组,一种是模拟指针(引用),所有的数据结构都可以用这两个基本结构构造,HashMap也一样。当程序试图将多个 key-value 放入 HashMap 中时,以如下代码片段为例:…

P1182 数列分段 Section II——二分答案

数列分段 Section II 题目描述 对于给定的一个长度为N的正整数数列 A1∼NA_{1\sim N}A1∼N​,现要将其分成 MMM(M≤NM\leq NM≤N)段,并要求每段连续,且每段和的最大值最小。 关于最大值最小: 例如一数列…

NCTF web总结与复现

前言 打完NCTF休息了一下,总体感觉还行,学到了很多。 calc 这一题也卡了我很久,因为复现过DASCTF三月赛,一直在想着有没有可以替代反引号或绕过的方法,搞了好久都没出,在学长的提示下学到了一个方法&…

最新出炉的阿里巴巴面试题及答案汇总(513页)

前言 秋招已经结束了,不知道各位有没有拿到自己心仪的offer?最近有不少粉丝去阿里巴巴面试了,回来之后我整理成了一份手册java面试时常用到的面试题(附答案)那么今天分享给大家,祝愿大家都能找到满意的工作…

HTML期末作业课程设计期末大作业——我的美丽家乡湛江 海鲜之都HTML+CSS+JavaScript

家乡旅游景点网页作业制作 网页代码运用了DIV盒子的使用方法,如盒子的嵌套、浮动、margin、border、background等属性的使用,外部大盒子设定居中,内部左中右布局,下方横向浮动排列,大学学习的前端知识点和布局方式都有…

python爬虫实战之逆向分析酷狗音乐

文章目录前言一、请求分析二、逆向思路三、全部代码总结前言 声明:本文章只是用于学习逆向知识,仅供学习,未经作者同意禁止转载 对于爬虫而言,不管是什么类型的都会遵循这几个步骤 获取目标url分析请求数据逆向解密数据伪造请求清…

算法日常训练12.5

首先有个很大的进步,看见困难题我没选择做逃兵跑路,这点起码是进步了,虽然算法能力还是那么拉,但是起码敢不自量力地分析一下。。。还能看题解理解下。 先找题解中最简单地一种超时方法开始理解,使用动态规划&#xff…

线程基础概念

1.线程基础 现代软件系统中,除了进程之外,线程也是一个十分重要的概念。特别是随着CPU频率增长开始出现停滞,而开始向多核方向发展。多线程,作为实现软件并发执行的一个重要的方法,也开始具有越来越重要的地位。 什么…

[本人毕业设计] 别踩白块_计算机科学与技术_前端H5游戏毕设

摘 要 本文详细介绍了网页版躲避白色钢琴块音乐游戏的设计和实现。由于游戏软件安装占据较大的空间与安装时间,而且步骤繁琐,用常规的游戏安装方法不能取得便捷的游戏安装体验。网页游戏是一种基于在网络游戏中被广泛应用,网页游戏更具有便捷…

【Tensorflow深度学习】实现手写字体识别、预测实战(附源码和数据集 超详细)

需要源码和数据集请点赞关注收藏后评论区留言私信~~~ 一、数据集简介 下面用到的数据集基于IAM数据集的英文手写字体自动识别应用,IAM数据库主要包含手写的英文文本,可用于训练和测试手写文本识别以及执行作者的识别和验证,该数据库在ICDAR1…

对副业的选择无论是自媒体还是 Python接单 ,始终绕不开IT行业。

前言 这个年代,成年人的日子活成了一部苦情戏。十年前,5000块钱工资还能过的自由自在;今天,估计连车贷,房贷,信用卡都不够还。所以一些想要改变现状的朋友,选择了副业这种形式,副业…

【Linux】Shell脚本详解

目录一.概述二.Linux提供的Shell解析器三.Shell入门1.执行一个简单的shell脚本2.脚本常用的执行方法四.变量1.系统预定义变量2.自定义变量3.特殊变量五.运算符六.条件判断1.单条件判断2.多条件判断七.流程控制(重点)1.if判断2.case语句3.for循环4.while循环八.read读取控制台输…

【论文简述】 Point-MVSNet:Point-Based Multi-View Stereo Network(ICCV 2019)

一、论文简述 1. 第一作者:Rui Chen、Songfang Han 2. 发表年份:2019 3. 发表期刊:ICCV 4. 关键词:MVS、深度学习、点云、迭代改进 5. 探索动机:很多传统方法通过多视图光度一致性和正则化优化迭代更新&#xff…

C语言实例|使用C程序优雅地杀掉其它程序进程

C语言文章更新目录 C语言学习资源汇总,史上最全面总结,没有之一 C/C学习资源(百度云盘链接) 计算机二级资料(过级专用) C语言学习路线(从入门到实战) 编写C语言程序的7个步骤和编程…

FPGA 20个例程篇:18.SD卡存放音频WAV播放(中)

第七章 实战项目提升,完善简历 18.SD卡存放音频WAV播放(中) 如图1所示是WM8731中11个寄存器功能说明概况图,我们需要对照手册,再去深入了解WM8731中的11个寄存器,怎么去配置这些寄存器达到预期的效果&…

了解3dmax坐标系

3dmax具有多种坐标系,其类别如下;默认的是View坐标系; 新建一个茶壶,此时默认是View坐标系; 切换到屏幕坐标系,看一下如下图;要保持视口区域激活; 根据资料,屏幕坐标系&a…

园区如何快速实现数据可视化分析?

对于园区运营方来说,如果没有专业针对性的管理方案以及管理系统辅助的话,实现园区可视化管理的难度非常大,而且操作成本会很高。但如果园区运营方选择引进快鲸智慧楼宇推出的园区数据孪生可视化管理系统的话就会简单很多。 快鲸智慧楼宇数据孪…

视频学习|Springboot在线学习系统

作者主页:编程千纸鹤 作者简介:Java、前端、Pythone开发多年,做过高程,项目经理,架构师 主要内容:Java项目开发、毕业设计开发、面试技术整理、最新技术分享 收藏点赞不迷路 关注作者有好处 文末获得源码 …