消息队列-rabbitmq(生产者.消费者. 消息.可靠性)

news2025/1/11 23:57:05

生产者者的可靠性

为了保证我们生产者在发送消息的时候消息不丢失,我们需要保证发送者的可靠性  

1.生产者重试

假如发送消息的时候消息丢失 ,我们可以使用发送者 重试机制,尝试重新发送消息

实现该机制非常简单,只需要在yml文件中 配置 rabbitmq的配置信息就可以了

2.生产者确认机制

在我们 生产者发送消息到交换机的时候,假如 我们发送到交换机 ,但是 队列没有收到消息,会返回ack,发送到交换机,然后发送到队列,消费者没有接收到消息返回ack,但是发送到交换机失败,会返回nack

也就是说 我们 只要消息成功发送到  mq里面去,无论消息是否成功路由被消费,或者没被消费,他都会 返回ack 

下面我们看具体实现代码

生产者代码

 @Test
    public void publisherConfirm(){
        //获得  相关组件
        CorrelationData correlationData = new CorrelationData();
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("消息发送错误{}",ex.getMessage());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if(result.isAck()){
                    log.info("发送成功收到{},但是消息可能未路由",result.getReason());
                }else{
                    log.error("发送失败收到{}",result.getReason());
                }
            }
        });
        //我们像一个不的交换机中发送一条消息 验证我们的情况
        rabbitTemplate.convertAndSend("myExchange.direct","my","aaa",correlationData);
    }

消费者代码

@Component
@Slf4j
@RequiredArgsConstructor
public class Consumer2 {
  private  final   RabbitTemplate rabbitTemplate;
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "mySimple.queue",durable = "true",
                arguments = @Argument(name="x-queue-mode",value="lazy")),
        exchange = @Exchange(value = "myExchange.direct",durable = "true"),
        key = "my"
))
public void  consumerMessage(String message){
    log.info("收到消息{}",message);
}

}

mq消息可靠性

1.交换机 队列持久化

我们在设置的时候可以设置交换机队列持久化,这样即使我们rabbitmq重启的话,也不用担心交换机队列丢失

2.消息持久化

我们在设置的时候可以手动设置 一个持久化的 message 对象,当作消息传递,然后 持久化之后消息会直接存储到磁盘中去,防止内存中的消息积压同时也可以为消息设置优先级

3.lazyqueue

我们使用lazyqueue 形式发送消息到队列的时候,消息会直接存储到磁盘中去,一直到被需要消费的时候才会被加载

设置lazyqueue也很简单,我们只需要在代码中简单配置即可

消费者可靠性

1.消费者确认机制

消费者确认消息之后会返回ack,并且删除消息,

消费者对消息处理失败返回nack,mq需要重新发送消息

reject 消费者处理消息失败,并且拒绝该消息,并且删除消息

而怎么确认消息也有三种机制

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject

我们在yml文件中配置即可

2.消息重试策略

当一条消息发送失败的时候,消费者重新尝试消费消息,达到我们重试的次数之后,消费者返回reject,mq直接删除消息

3.处理失败消息

当消息彻底处理失败,我们消费者设置一个新的交换机队列 重新存储这些失败的消息后续再做处理

package com.itheima.consumer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

死信交换机和延迟消息

死信交换机 ,都是假如一个定时消息过期了,或者发送延迟消息我们直接把该消息传递到我们绑定的死信交换机中,跟上文 消息发送失败了返回rejct之后,消息发送到err交换机是两种不同的策略

我们用死信交换机发送延迟消息的策略可以如下图 

而代码也很简单,只需要再声明队列a的时候绑定死信交换机就可以了

dlx就是死信交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1957246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

The Llama 3 Herd of Models.Llama 3 模型第1,2,3部分全文

现代人工智能(AI)系统是由基础模型驱动的。本文提出了一套新的基础模型,称为Llama 3。它是一组语言模型,支持多语言、编码、推理和工具使用。我们最大的模型是一个密集的Transformer,具有405B个参数和多达128K个tokens的上下文窗口。本文对Llama 3进行了广泛的实证评价。我们…

系统架构师考点--系统架构设计(中)

大家好。今天继续总结一下系统架构设计的一些考点。 一、软件架构复用 软件产品线是指一组软件密集型系统&#xff0c;它们共享一个公共的、可管理的特性集&#xff0c;满足某个特定市场或任务的具体需要&#xff0c;是以规定的方式用公共的核心资产集成开发出来的。即围绕核…

CVPR 2024 最佳论文分享┆物体用体积表示:一种不透明固体图形的随机几何表示方法

本文详细介绍了一篇获得CVPR 2024最佳论文提名的论文《Objects as volumes: A stochastic geometry view of opaque solids》。该论文的作者为Bailey Miller等人。论文提出了一种新的理论框架&#xff0c;从随机几何学的角度解释和改进当前体积表示方法&#xff0c;即将不透明固…

OZON大健康热卖产品,OZON大健康产品有哪些

在俄罗斯&#xff0c;随着全球健康意识的提升&#xff0c;特别是在新冠疫情之后&#xff0c;人们对于增强免疫力和保持健康的关注度显著增加。这种趋势在俄罗斯尤为明显&#xff0c;其中天然食品补剂、家居清洁用品以及个人护理产品等大健康领域的产品需求激增。以下是根据当前…

线上红酒品鉴会:与专业人士面对面交流

在繁忙的现代生活中&#xff0c;我们时常渴望寻找一个安静的角落&#xff0c;与志同道合的朋友共同品味生活的美好。当红酒的醇香与线上交流的便捷相结合&#xff0c;一场别开生面的线上红酒品鉴会便应运而生。今天&#xff0c;让我们一同走进这场与专业人士面对面交流的线上红…

WSL for Windows

1、安装 超详细Windows10/Windows11 子系统&#xff08;WSL2&#xff09;安装Ubuntu20.04&#xff08;带桌面环境&#xff09;_wsl安装ubuntu20.04-CSDN博客https://blog.csdn.net/weixin_44301630/article/details/122390018 注意&#xff0c;安装之后首次启动 Ubuntu 时&…

当我们谈论前端性能时,我们在谈论什么

前端岗位内推来了 本文结合Google官方工具 Lighthouse 分析最新的前端页面性能评分标准&#xff0c;帮助大家更好地理解各种性能指标&#xff0c;以改进和优化相关前端项目。 前端页面性能一直是大家持续关注的话题&#xff0c;因为用户留存率与页面加载性能密切相关。根据Goog…

全球模块化机器人市场展望与未来增长机遇预测:未来六年CAGR为14.9%

在全球自动化和智能化水平提升的背景下&#xff0c;模块化机器人正成为市场的焦点。本文详细分析了全球模块化机器人市场的现状、增长趋势及未来前景&#xff0c;旨在为投资者和业内人士提供深入的市场洞察和指导。 市场概览 据恒州诚思团队研究分析显示&#xff0c;2023年&am…

zeal 开发者离线文档工具

zeal是一款程序开发者不可或缺的离线文档查看器 下载地址 官网地址&#xff1a; windows版csdn下载(开箱即用含)&#xff1a;https://download.csdn.net/download/xzzteach/89588765 已离线 Android.docset Apache_HTTP_Server.docset Bash.docset Bootstrap_4.docset Bootst…

QT6安装

我是直接使用 qt-online-installer-windows-x64-4.8.0.exe 安装包一键安装的 需要安装包的可以在此路径下载&#xff1a; qt-online-installer-windows-x64-4.8.0.exe&#xff0c;qt6一键安装包资源-CSDN文库

C#编写软件发布公告2——服务端

简单说明 框架&#xff1a;.NET 6.0 MVC 数据库&#xff1a;sqlLite3(当然这是为考虑本地数据简单&#xff0c;可以考虑使用大型数据库) 一、界面效果展示 1、启动主页面 2、记录摘要界面 3、对应版本详细 二、实现代码逻辑 1、启动主页面 //关联日志文件写 builder.Loggi…

音频处理过程

1、音频 &#xff08;1&#xff09;打开设备 &#xff08;2&#xff09;从音频设备中读取数据 &#xff08;3&#xff09;将音频设备中读取的数据写入文件夹中 &#xff08;4&#xff09; 通过界面控制开始录制和结束录制&#xff08;使用多线程和状态码控制&#xff09; &…

Spring监听器不同的注册方式下带来的监听范围的变化

事件监听注册的几种方式 ApplicationContext下面简称AC 1.构建SpringApplication时注册&#xff08;可以监听AC启动阶段事件&#xff09; // 方式一: //写法1 SpringApplication application new SpringApplicationBuilder().listeners(new ApplicationPidFileWriter()).bu…

网课录制新技能,声画同步,三款录屏软件助力教师高效授课

在数字化教育的浪潮中&#xff0c;教师和培训讲师们越来越依赖于录制网课来提升教学效果。无论是PPT课件的深入讲解&#xff0c;Word文档的详细演示&#xff0c;还是操作手册的直观展示&#xff0c;一款出色的录屏软件都能使这一过程更加生动和高效。今天&#xff0c;我将为大家…

【C++/STL】:哈希 -- 线性探测哈希桶

目录 &#x1f4a1;前言一&#xff0c;unordered系列容器二&#xff0c;哈希2.1 哈希的概念2.2 哈希函数2.3 哈希冲突 三&#xff0c;哈希冲突解决(重点)3.1 开放定址法3.2 哈希桶(重点) 四&#xff0c;线性探测的实现4.1 线性探测的基本框架4.2 插入操作4.3 查找操作4.4 删除操…

【C++】类和对象——Lesson1

Hi~&#xff01;这里是奋斗的小羊&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f4a5;&#x1f4a5;个人主页&#xff1a;奋斗的小羊 &#x1f4a5;&#x1f4a5;所属专栏&#xff1a;C &#x1f680;本系列文章为个人学习笔记…

【Golang 面试 - 基础题】每日 5 题(十)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

关联映射和缓存机制学习笔记

学习视频&#xff1a;4001 关联映射概述_哔哩哔哩_bilibili~4007 案例&#xff1a;商品的类别_哔哩哔哩_bilibili 目录 1.关联映射概述 1.1关联映射关系 一对一关系 一对多关系 多对多关系 Java对象如何描述事物之间的关系 1.2一对一查询 元素 a.嵌套查询方式 b.嵌套结果方…

Spring Cache常用注解

依赖代码如下&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency> 常用注解详解 1. Cacheable 作用&#xff1a;主要用于配置方法&#xff0c;使其…

第九届全球渲染大赛来了!CG爱好者准备好了吗!

在CG界的日历上&#xff0c;二月和八月总是特别繁忙的月份。这两个月&#xff0c;全球CG艺术界最盛大的赛事——全球渲染大赛&#xff0c;都会开放报名&#xff0c;吸引着世界各地的CG艺术家和爱好者参与。备受期待的第九届全球渲染大赛&#xff0c;已经定于2024年8月3日在美国…