RabbitMQ消息可靠性等机制详解(精细版三)

news2025/1/10 11:45:02

目录

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

7.1.2 Return机制(保证发送可靠)

7.1.3 编写配置文件

7.1.4 开启Confirm和Return

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件

7.2.2 手动ack

7.3 避免消息重复消费

7.3.1 导入依赖

7.3.2 编写配置文件

7.3.3 修改生产者

7.3.4 修改消费者


 官方文档  RabbitMQ Documentation | RabbitMQ

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章 

七 RabbitMQ的其他操作

7.1 消息的可靠性(发送可靠)

7.1.1 confim机制(保证发送可靠)

RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。

RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。

消息传递可靠性

7.1.2 Return机制(保证发送可靠)

Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

而且exchange是不能持久化消息的,queue是可以持久化消息。

采用Return机制来监听消息是否从exchange送到了指定的queue中

消息传递可靠性

在消息发送方项目上加入下面内容:

7.1.3 编写配置文件
spring:
  rabbitmq:
    host: 你的地址
    port: 5672
    virtual-host: /tingyi
    username: test
    password: test
    publisher-confirms: true
    publisher-returns: true

7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
/**
 * @author 听忆
 */
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    @PostConstruct  // init-method
    public void initMethod(){
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
​
        //指定 ReturnCallback
        rabbitTemplate.setReturnCallback(this);
    }
​
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("消息已经送达到Exchange");
        }else{
            System.out.println("消息没有送达到Exchange");
        }
    }
​
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息没有送达到Queue");
    }
}

7.2 手动Ack(保证接收可靠)

7.2.1 添加配置文件
  • 在消费方application.yml文件添加下面配置, 改为手动应答机制.

spring:
  rabbitmq:
    host: 你的地址
    port: 5672
    virtual-host: /tingyi
    username: test
    password: test
    listener:
      simple:
        acknowledge-mode: manual

7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
​
/**
 * @author 听忆
 */
@Component
public class Consumer {
​
    @RabbitListener(queues = "boot-queue")
    public void getMessage(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到消息:" + msg);
        try {
            int i = 1 / 0;
            /**
             * 消费者发起成功通知
             * 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号
             * 第二个参数:是否开启批量处理 false:不开启批量
             * 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,
             *          当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            e.printStackTrace();
            /**
             * 返回失败通知
             * 第一个参数: DeliveryTag,消息的唯一标识  channel+消息编号
             * 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝
             * 第三个boolean true消息接收失败重新回到原有队列中
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
        }
​
    }
}

7.3 避免消息重复消费

重复消费消息,会对非幂等行操作造成问题

重复消费消息的原因是,消费者没有给RabbitMQ一个ack

重复消费

  1. 为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,

  2. id-0(正在执行业务)

  3. id-1(执行业务成功)

  4. 然后使用ack给RabbitMQ返回消息

  5. 如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

  6. 极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。

备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx

       作用:
            如果为空就set值,并返回1, true

​ 如果存在(不为空)不进行操作,并返回0, false​

7.3.1 导入依赖

生产者和消费者都加入下面依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.4.5</version>
</dependency>

7.3.2 编写配置文件
spring:
  redis:
    host: 你的地址
    port: 6379

7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
    //第四个参数: 设置消息唯一id
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);
    System.in.read();
}

7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
​
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
​
/**
 * @author 听忆
 */
/**
         * java中的方法叫做setIfAbsent, redis中的命令叫做setnx
         * 作用:
         *      如果为空就set值,并返回1, true
         *      如果存在(不为空)不进行操作,并返回0, false
         */
@Component
public class Consumer {
​
    @Autowired
    private StringRedisTemplate redisTemplate;
​
    @RabbitListener(queues = "boot-queue")
    public void getMessage(String msg, Channel channel, Message message) throws IOException {
        //0. 获取MessageId, 消息唯一id
        String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
        //1. 设置key到Redis
        if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
​
            //2. 消费消息
            System.out.println("接收到消息:" + msg);
​
            //3. 设置key的value为1
            redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
​
            //4.  手动ack
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
​
        }else {
​
            //5. 获取Redis中的value即可 如果是1,手动ack
            if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }
        }
​
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1882958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python爬虫】Python爬取喜马拉雅,爬虫教程!

一、思路设计 &#xff08;1&#xff09;分析网页 在喜马拉雅主页找到自己想要的音频&#xff0c;得到目标URL&#xff1a;https://www.ximalaya.com/qinggan/321787/ 通过分析页面的网络抓包&#xff0c;最终的到一个比较有用的json数据包 通过分析&#xff0c;得到了发送json…

《梦醒蝶飞:释放Excel函数与公式的力量》7.3 RIGHT函数

第七章&#xff1a;文本处理函数 第三节&#xff1a;7.3 RIGHT函数 7.3.1. RIGHT函数简介 RIGHT函数用于从文本字符串的末尾提取指定数量的字符&#xff0c;适合在需要从文本中提取特定后缀或处理固定格式的数据时使用。 语法&#xff1a; RIGHT(text, [num_chars]) text…

1974Springboot医院远程诊断管理系统idea开发mysql数据库web结构java编程计算机网页源码maven项目

一、源码特点 springboot医院远程诊断管理系统是一套完善的信息系统&#xff0c;结合springboot框架和bootstrap完成本系统&#xff0c;对理解JSP java编程开发语言有帮助系统采用springboot框架&#xff08;MVC模式开发&#xff09;&#xff0c;系统具有完整的源代码和数据库…

提高软件测试效率的7个技巧

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 软件测试是保证软件质量的重要环节&#xff0c;也是软件开发过程中不可或缺的一部分。 实际工作…

图片识别的3款神器!码住这篇!

在数字化飞速发展的今天&#xff0c;图片识别技术已经成为我们日常生活和工作中不可或缺的一部分。无论是学习、工作还是娱乐&#xff0c;图片识别软件都为我们带来了极大的便利。接下来&#xff0c;就让我们一起了解三款备受推崇的图片识别软件&#xff0c;看看它们是如何在我…

【密码学基础】对随机不经意传输(Random Oblivious Transfer)的理解

ROT在offline阶段生成大量的OT对&#xff0c;在online阶段通过one-pad方式高效加密&#xff0c;并且只需要简单的异或运算就能实现OT过程&#xff08;去随机化&#xff09;。 在ROT中&#xff0c;有一个关键点是&#xff1a;需要考虑offline阶段的选择比特和online阶段的选择比…

第15周:RNN心脏病预测

目录 前言 二、前期准备 2.1 设置GPU 2.2 导入数据 2.2.1 数据介绍 2.2.2 导入代码 2.2.3 检查数据 三、数据预处理 3.1 划分训练集与测试集 3.2 标准化 四、构建RNN模型 4.1 基本概念 4.2 搭建代码 五、编译模型 六、训练模型 七、模型评估 总结 前言 &#…

2024年文化传播与对外交流国际学术会议(ICCCFE 2024)

2024年文化传播与对外交流国际学术会议&#xff08;ICCCFE 2024&#xff09; 2024 International Conference on Cultural Communication and Foreign Exchange(ICCCFE 2024) 会议简介&#xff1a; 2024年文化传播与对外交流国际学术会议&#xff08;ICCCFE 2024&#xff09;定…

Go线程实现模型-P

P 概述 P是G能够在M中运行关键。Go的运行时系统会适时地让P与不同的M建立或断开关联&#xff0c;以使P中的那些可运行的G能够及时获得&#xff0c;这与操作系统内核在CPU之上实时切换不同进程或线程的情况类似 改变P的数量 改变单个Go程序间拥有的P的最大数量有两种方法 调…

《塔瑞斯世界》国服震撼登场!AOC助力玩家开启游戏新征途!

一款真正高画质、重机制、轻数值的MMORPG大作&#xff01; 你是否厌倦了在MMORPG游戏中被“氪金大佬”碾压&#xff1f;你是否渴望一个纯粹依靠技术和策略就能获得成就感的游戏世界&#xff1f;如果你对这两个问题的答案都是肯定的&#xff0c;那么《塔瑞斯世界》或许值得你一…

docker-compose搭建minio对象存储服务器

docker-compose搭建minio对象存储服务器 最近想使用oss对象存储进行用户图片上传的管理&#xff0c;了解了一下例如aliyun或者腾讯云的oss对象存储服务&#xff0c;但是呢涉及到对象存储以及经费有限的缘故&#xff0c;决定自己手动搭建一个oss对象存储服务器&#xff1b; 首先…

前端git约定式规范化提交-commitizen

当使用commitizen进行代码提交时&#xff0c;commitizen会提示你在提交代码时填写所必填的提交字段信息内容。 1、全局安装commitizen npm install -g commitizen4.2.4 2、安装并配置 cz-customizeable 插件 2.1 使用 npm 下载 cz-customizeable npm i cz-customizeable6.…

30秒就能完成3D翻页画册的工具

​在数字化时代&#xff0c;将传统画册转化为电子版&#xff0c;并赋予其3D翻页的动态效果&#xff0c;不仅能够增强视觉效果&#xff0c;还能提高资料的传播效率。对于需要在短时间内完成3D翻页画册制作的用户&#xff0c;这里推荐一款能迅速实现这一目标的在线工具。 首先&am…

Web3 游戏周报(6.23 - 6.29)

区块链游戏热度不减&#xff0c;你是否掌握了上周的重要动态&#xff1f; 回顾上周区块链游戏动态&#xff0c;查看 Footprint Analytics 与 ABGA 的最新数据报告。 【6.23 - 6.29】Web3 游戏行业动态&#xff1a; 继 Notcoin 之后&#xff0c;另一款 Telegram 游戏 Hamster …

沙箱在“一机两用”新规下的价值体现

在数字化时代&#xff0c;随着企业信息化建设的深入&#xff0c;数据安全问题愈发凸显其重要性。一机两用新规的出台&#xff0c;旨在通过技术创新和管理手段&#xff0c;实现终端设备的安全可控&#xff0c;确保敏感数据的安全存储与传输。SDC沙箱技术作为一种创新的安全防护手…

期权交易指南:为什么要交易场外个股期权?

今天带你了解期权交易指南&#xff1a;为什么要交易场外个股期权&#xff1f;随着金融市场的发展和创新&#xff0c;投资者寻求更多的工具来管理风险和获得更高的回报。场外期权交易应运而生&#xff0c;成为一种重要的金融衍生品交易方式。 简单来说就是期权是一种合约&#…

如何用程序批量下载小红书的图片?

如何使用MediaCrawler快速下载图片 作为一名图像算法工程师&#xff0c;怎么能没有图片资源呢&#xff1f;今天&#xff0c;我要介绍一个能快速下载图片的方法&#xff0c;仅供学习使用&#xff0c;请勿用于其他用途。 下载项目 首先&#xff0c;从GitHub下载项目&#xff1…

fastapi swagger在线接口文档报错

fastapi swagger在线接口文档报错 1、报错信息 Unable to render this definition The provided definition does not specify a valid version field. Please indicate a valid Swagger or OpenAPI version field. Supported version fields are swagger: “2.0” and those …

无线领夹麦克风哪个品牌音质最好,直播用领夹麦克风还是声卡麦

随着社交媒体的兴起&#xff0c;直播和Vlog已经成为内容创作的新趋势&#xff0c;这些变化不仅改变了人们分享生活的方式&#xff0c;也带动了音频设备市场的增长。无线领夹麦克风&#xff0c;以其便携性和卓越的录音品质&#xff0c;迅速成为视频制作者的重要工具。它们在直播…