MQ高级篇---消息可靠性

news2025/1/12 10:46:37

MQ的一些常见问题


在这里插入图片描述
后面内容基于springboot 2.3.9.RELEASE

消息可靠性


在这里插入图片描述

生产者确认机制

在这里插入图片描述

  • 在publisher微服务中application.yml中添加
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

在这里插入图片描述

  • 每个RabbitTemplate只能配置一个ReturnCallback, 因此需要在项目启动过程中配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("消息发送到队列失败, 响应码:{}, 失败原因: {}, 交换机: {}, 路由key: {}, 消息: {}",
                    replyCode, replyText, exchange, routingKey, message);
        });
    }
}
  • 发送消息, 指定消息ID,消息ConfirmCallBack
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;


@Slf4j
@SpringBootTest
public class PublishTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void name() throws InterruptedException {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(result -> {
            if(result.isAck()){
                // ACK
                log.debug("消息成功投递到交换机! 消息ID: {}", correlationData.getId());
            }else {
                // NACK
                log.error("消息投递到交换机失败! 消息ID: {}", correlationData.getId());
            }
        }, ex -> {
            log.error("消息发送失败!", ex);
        });
        rabbitTemplate.convertAndSend("high.topic", "high.#", "hello amqp", correlationData);
    }
}

在这里插入图片描述

消息持久化

声明队列和交换机时指定durabletrue,为持久化

spring amqp中交换机、队列、消息默认都是持久的

消费者消息确认

在这里插入图片描述
消费者业务添加配置

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto 

失败重试机制

在这里插入图片描述

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto
        prefetch: 1
        retry:
          enabled: true     # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长1秒
          multiplier: 1  # 下次失败的等待时长倍数
          max-attempts: 3   # 最大重试次数
          stateless: true  # true无状态, false有状态, 如果业务中包含事务, 这里改为false

配置说明:

初始等待时长1秒,倍数为2, 则等待时长为 1秒 2秒 4秒 8秒 …

消费者失败消息处理策略

在这里插入图片描述

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

死信交换机


在这里插入图片描述
这个是由队列投递

TTL

在这里插入图片描述

  • 声明死信交换机
	@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg){
        log.info("消费者接收到了dl.queue的延迟消息: {}", msg);
    }
  • 声明TTL交换机和队列
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                .ttl(10000)   // 指定时间10秒
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }
    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}
  • 发送消息
	@Test
    void name() {
        MessageBuilder.withBody("hello ttl".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setExpiration("5000")
                .build();
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", "ttl message");
    }

也可以指定消息的过期时间, 两者都指定时, 以短的为准

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1542188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vscode配置c/c++调试环境

本文记录win平台使用vscode远程连接ubuntu server服务器下,如何配置c/c调试环境。 过程 1. 服务器配置编译环境 这里的前置条件是vscode已经能够连接到服务器,第一步安装编译构建套件(gcc、g、make、链接器等)和调试器&#xf…

Vue2(十):全局事件总线、消息订阅与发布、TodoList的编辑功能、$nextTick、动画

一、全局事件总线!! 任意组件间通信 比如a想收到别的组件的数据,那么就在a里面给x绑定一个demo自定义事件,所以a里面就得有一个回调函数吧,然后我要是想让d组件给a穿数据,那就让d去触发x的自定义事件&…

使用html做一个2048小游戏

下载地址: https://pan.xunlei.com/s/VNtiF13HxmmE4gglflvS1BUhA1?pwdvjrt# 提取码:vjrt”

C#执行命令行

效果图 主要代码方法 private Process p;public List<string> ExecuteCmd(string args){System.Diagnostics.Process p new System.Diagnostics.Process();p.StartInfo.FileName "cmd.exe";p.StartInfo.RedirectStandardInput true;p.StartInfo.RedirectSta…

STM32--RC522学习记录

一&#xff0c;datasheet阅读记录 1.关于通信格式 2.读寄存器 u8 RC522_ReadReg(u8 address) {u8 addr address;u8 data0x00;addr((addr<<1)&0x7e)|0x80;//将最高位置一表示read&#xff0c;最后一位按照手册建议变为0Spi_Start();//选中从机SPI2_ReadWriteByte(ad…

C++实现FFmpeg音视频实时拉流并播放

1.准备工作: 下载rtsp流媒体服务器rtsp-simple-server,安装go开发环境并编译 编译好后启动流媒体服务器 准备一个要推流的mp4视频文件,如db.mp4 使用ffmpeg开始推流 推流命令: ffmpeg -re -stream_loop -1 -i db.mp4 -c copy -rtsp_transport tcp -f rtsp rtsp://192.168.16…

JVM快速入门(2)HotSpot和堆、新生区、永久区、堆内存调优、JProfiler工具分析OOM原因、GC(垃圾回收)、JVM经典面试笔试题整理

5.6 HotSpot和堆 5.6.1 Hotspot 三种JVM&#xff1a; Sun公司&#xff0c;HotspotBEA&#xff0c;JRockitIBM&#xff0c;J9 VM&#xff0c;号称是世界上最快的Java虚拟机 我们一般学习的是&#xff1a;HotSpot 5.6.2 堆 Heap&#xff0c;一个JVM只有一个堆内存&#xff0c…

基于51单片机数控直流电压源proteus仿真LCD显示+程序+设计报告+讲解视频

基于51单片机数控直流电压源proteus仿真LCD显示( proteus仿真程序设计报告讲解视频&#xff09; 仿真图proteus7.8及以上 程序编译器&#xff1a;keil 4/keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;S0072 讲解视频 基于51单片机数控直流电压源proteus仿真程序…

t-rex2开放集目标检测

论文链接&#xff1a;http://arxiv.org/abs/2403.14610v1 项目链接&#xff1a;https://github.com/IDEA-Research/T-Rex 这篇文章的工作是基于t-rex1的工作继续做的&#xff0c;核心亮点&#xff1a; 是支持图片/文本两种模态的prompt进行输入&#xff0c;甚至进一步利用两…

八股文(1)

管道 匿名管道和命名管道 命名管道的使用是什么&#xff1f;在linux系统如何实现 命名管道&#xff08;Named Pipe&#xff09;&#xff0c;也称为FIFO&#xff08;First In First Out&#xff09;&#xff0c;是一种在UNIX和Linux系统中用于进程间通信&#xff08;IPC&…

【PyQt】17.1-日历控件 不同风格的日期和时间、以及高级操作

日历控件puls版本 前言一、日历控件中不同风格的日期和时间1.1 代码1.2 注意事项格式设置m的大小写问题QTime和QDateTime的区别 1.3 运行结果 二、高级操作2.1 成倍调整2.2 下拉出日历2.3 事件函数2.4 获取设置的日期和时间 完整代码 前言 1、不同风格的日期和时间展示 2、高级…

Django下载使用、文件介绍

【一】下载并使用 【1】下载框架 &#xff08;1&#xff09;注意事项 计算机名称不要出现中文python解释器版本不同可能会出现启动报错项目中所有的文件名称不要出现中文多个项目文件尽量不要嵌套,做到一项一夹 &#xff08;2&#xff09;下载 Django属于第三方模块&#…

无人机摄影测量---倾斜摄影测量原理与关键技术

《无人机航空摄影测量精品教程》专栏&#xff1a;无人机航测外业作业流程&#xff08;像控点布设、航线规划、仿地飞行、航拍&#xff09;和内业数据处理软件&#xff08;Pix4d、CC、EPS、PhotoScan、Globalmapper&#xff09;像控点权重调配、空三加密、DOM、DSM、DEM&#xf…

ubuntu18安装opensips3.4,开启ws/wss/http接口模块

、如果是centos 7安装则使用yum 命令。 添加库地址注意系统类型&#xff0c;选择对应的系统类型和版本 curl https://apt.opensips.org/opensips-org.gpg -o /usr/share/keyrings/opensips-org.gpg echo "deb [signed-by/usr/share/keyrings/opensips-org.gpg] https:/…

芯片工程系列(5)2.5D 3D封装

0 英语缩写 硅通孔&#xff08;Through Silicon Via&#xff0c;TSV&#xff09;硅中介层&#xff08;Silicon Interposer&#xff09;物理气象沉淀法&#xff08;Physical Vapor Deposition&#xff0c;PVD&#xff09;DRIE、CVD、PVD、CMP等设备CoWoS&#xff08;Chip on Wa…

政安晨:【深度学习实践】【使用 TensorFlow 和 Keras 为结构化数据构建和训练神经网络】(五)—— Dropout和批归一化

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras实战演绎 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; Dropout和批归一化是深度学习领域中常用的正则化技术…

pytorch如何向tensor结尾添加元素或维度--torch.cat()、torch.unsqueeze()的用法

目录 示例1 矢量后增加元素 示例2 tensor维度增加1 示例3 另一种替代unsqueeze的方法 示例1 矢量后增加元素 使用torch.cat()函数 ptorch.Tensor([1,5,0]) ptorch.cat((p, torch.Tensor([4])), 0) 结果&#xff1a; 这里&#xff0c;cat的第一个输入变量用()包绕&#xf…

中断(NVIC)的使用--EXTI--TIM

目录 中断是什么 轮询 中断 中断调用情况 中断的分类 内部中断&#xff08;TIM、UART等&#xff09; tim.c tim.h 外部中断EXTI exti.c exti.h 中断是什么 在处理事件的时候有两种方式&#xff1a;轮询和中断。 轮询 顾名思义&#xff0c;就是每轮都询问一次。比如…

笔记本和台式机主板内部结构分析

笔记本和态势机主板内存接口以及配件安装位置 笔记本主板 1 以thinkpad L-490为例,使用拆机小工具拆机&#xff0c;打开后面板&#xff0c;内部结构示意图如下 台式机主板 以技嘉-B660M-AORUS-PRO-AX型号主板为例 笔记本电脑和台式机电脑的相同之处 CPU&#xff1a;笔记本…

Python爬虫-批量爬取星巴克全国门店

前言 本文是该专栏的第22篇,后面会持续分享python爬虫干货知识,记得关注。 本文笔者以星巴克为例,通过Python实现批量爬取目标城市的门店数据以及全国的门店数据。 具体的详细思路以及代码实现逻辑,跟着笔者直接往下看正文详细内容。(附带完整代码) 正文 地址:aHR0cHM…