rabbitmq-----黑马资料

news2025/1/13 15:35:04

rabbit的三种发送订阅模式

消息从发送,到消费者接收,会经理多个过程:

在这里插入图片描述

其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
  • 生产者发送的消息未送达exchange
  • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案。也就是面试题常问的如何rabbitmq如何保证消息的可靠性?

  1. 生产者确认机制:两种回调函数
  2. mq持久化
  3. 消费者确认机制
  4. 失败重试机制

生产者确认机制(解决生产者->交换机->队列)

步骤
1.修改生产者配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated 
    publisher-returns: true
    template:
      mandatory: true
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
  • simple:同步等待confirm结果,直到超时
  • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

2.定义回调函数(每个RabbitTemplate只能配置一个ReturnCallback)
消息从交换机到队列的过程不通就会回调这个函数

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}

3.定义ConfirmCallback 消息每次发送前都要写

public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> {
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息发送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失败
                log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);
}

mq持久化

  • 交换机持久化

  • 队列持久化

  • 消息持久化
    RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

  • 交换机持久化

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}
  • 队列持久化
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

消息持久化
发送时,可以设置消息的属性(MessageProperties),指定delivery-mode:

消费者确认机制

直接配置即可

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 关闭ac
  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

手动ack

    @RabbitListener(queues = "fanout.queue1")
public void onMessage(Message message, Channel channel) throws Exception {
    try {
        // 处理消息
        processMessage(message);
        // 手动确认消息已经被处理完成
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 发生异常时可以选择拒绝消息,重新放回队列或者进入死信队列等
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

失败重试机制

1.生产者发送消息失败配置
默认是消息重新回到队列,但是如果一直失败,就会一直循环的失败回到队列再重试,频率太高太消耗资源了

可以本地重试也就是重写发送消息

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

经典面试题:如何保证消息不会重复消费呢?
1.消费端手动ACK确认机制
在消费端使用RabbitMQ提供的手动ACK确认机制,在消费者成功处理消息后,手动将消息从队列中删除。这样可以确保消息只会被处理一次,避免了重复消费的问题。
2.消费端去重保证机制
可以在消费端处理每条消息之前,通过分布式锁或者数据库唯一索引等方式,判断当前消息是否已经被处理过。如果已经处理过,则忽略该消息;否则正常处理,并将消息标记为已处理。

死信队列

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。

在这里插入图片描述

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .deadLetterExchange("dl.direct") // 指定死信交换机
        .build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
    return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
    return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
    return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

延迟队列

允许消息在设置的时间内未被消费,如果到期,未消费发送到死信队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
上面这种方式不推荐。
官方推荐
使用插件,
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件:
大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1081800.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Python进行食品配送时间预测

一般的食品配送服务需要显示交付订单所需的准确时间,以保持与客户的透明度。这些公司使用机器学习算法来预测食品配送时间,基于配送合作伙伴过去在相同距离上花费的时间。 食品配送时间预测 为了实时预测食物的交付时间,我们需要计算食物准…

[安洵杯 2019]easy_web - RCE(关键字绕过)+md5强碰撞+逆向思维

[安洵杯 2019]easy_web 1 解题流程1.1 阶段一1.2 阶段二2 思考总结1 解题流程 1.1 阶段一 1、F12发现提示md5 is funny ~;还有img标签中,有伪协议和base64编码 2、url地址是index.php?img=TXpVek5UTTFNbVUzTURabE5qYz0&cmd=   这就有意思了,这里的img明显是编码后的…

如何给苹果ipa和安卓apk应用APP包体修改手机屏幕上logo图标iocn?

虽然修改应用文件图标是一个简单的事情,但是还是有很多小可爱是不明白的,你要是想要明白的话,那我就让你今天明白明白,我们今天采用的非常规打包方式,常规打包方式科技一下教程铺天盖地,既然小弟我出马&…

阿里云华中1(武汉)本地地域公网带宽价格表

阿里云华中1(武汉)地域上线,本地地域只有一个可用区A,高可用需要多可用区部署的应用,不建议选择本地地域,可以选择上海或杭州地域,阿里云服务器华中1(武汉)地域公网带宽价…

windows下开启Telnet功能并访问百度

Telnet 是一个实用的远程连接命令,采用的是 TCP/IP 协议。它为用户提供了在本地计算机上完成远程主机工作的能力,在终端使用者的电脑上使用 Telnet 程序,用它连接到服务器。终端使用者可以在 Telnet 程序中输入命令,这些命令会在服…

并购交易:纽交所上市公司Alamo Group宣布收购皇家卡车设备公司

来源:猛兽财经 作者:猛兽财经 猛兽财经获悉,纽交所上市公司Alamo Group(ALG)周二宣布已收购皇家卡车设备公司(Royal Truck & Equipment)。 皇家卡车设备公司在2022年的年收入接近4000万美元,截至2023年8月底,该公…

Linux python运维

Python 是一种高级编程语言,它具有简单易学、可移植性强、丰富的第三方库等特点,因此成为了广泛应用于各个领域的编程语言之一。而在 Linux 系统中,Python 的使用也十分普遍。本文将介绍如何在 Linux 系统中执行 Python 脚本并传入参数&#…

Ajax使用流程

Ajax在不刷新页面的情况下,进行页面局部更新。 Ajax使用流程: 创建XmlHttpReqeust对象发送Ajax请求处理服务器响应 1. 创建XmlHttpReqeust对象 XmlHttpReqeust对象是Ajax的核心,使用该对象发起请求,接收响应 不同的浏览器创建…

【网路安全 --- Linux,window常用命令】网络安全领域,Linux和Windows常用命令,记住这些就够了,收藏起来学习吧!!

一,Linux 1-1 重要文件目录 1-1-1 系统运行级别 /etc/inittab 1-1-2 开机启动配置文件 /etc/rc.local /etc/rc.d/rc[0~6].d## 当我们需要开机启动自己的脚本时,只需要将可执行脚本丢在 /etc/init.d 目录下,然后在 /etc/rc.d/rc*.d 中建…

集成学习的小九九

集成学习(Ensemble Learning)是一种机器学习的方法,通过结合多个基本模型的预测结果来进行决策或预测。集成学习的目标是通过组合多个模型的优势,并弥补单个模型的不足,从而提高整体性能。 集成学习的主要策略 在集成…

docker 部署 xxl-job SpringBoot 整合 xxl-job 执行任务

概述 XXL-JOB是一个轻量级的分布式任务调度平台,具有以下特点: 调度模块:负责管理调度信息,发出调度请求,支持可视化和动态的操作,监控调度结果和日志,支持执行器Failover 执行模块&#xff1…

地产三维实景vr展示的功能及特点

随着科技的不断发展,VR(虚拟现实)技术也越来越成熟。VR技术的广泛应用,已经逐渐渗透到各个领域,其中引人注目的就是虚拟展馆。虚拟展馆是一种利用VR技术构建的线上展示空间,让观众可以在家中就能参观展览,带来了极大地…

ctfshow萌新计划web1-5(正则匹配绕过)

目录 web1 web2 web3 web4 web5 web1 题目注释相当于已经审计了代码 存在一个对id的检测,并且告诉我们flag在id100 这里讲三个方法 (1)payload:?id10*100 (2)使用or来绕过 payload:?…

Jetpack:005-文本组件的扩展

文章目录 1. 概念介绍2. 使用方法2.1 可以选择的文本2.2 可以点击的文本2.3 多种形式的文本 3. 示例代码4. 内容总结 我们在上一章回中介绍了Jetpack中文本组件的使用方法,本章回中主要介绍 文本组件的扩展。闲话休提,让我们一起Talk Android Jetpack吧…

【深度学习实验】循环神经网络(二):使用循环神经网络(RNN)模型进行序列数据的预测

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. RNN模型 a. 初始化__init__ b. 前向传播方法forward 2. 训练和预测 a. 超参数 b. 创建模型实例 c. 模型训练 d. 预测结果可视化 3. 代码整合 经验是智慧之父…

云服务器带宽对上传下载速度的影响

简单来说就是 云服务器收到数据代表入&#xff0c;带宽大小 < 10时&#xff0c;入带宽大小10 带宽大小 > 10时&#xff0c;出入带宽上限 等于实际购买时候的大小

2023Linux C/C++全栈开发知识技术合集(基础入门到高级进阶)

C/Linux服务器开发」别名可以叫「C后台开发」,目前BAT里面都是有大量的C开发岗位&#xff0c;鹅厂在c后台开发岗都是急需。虽然岗位对技术要求难度系数较高&#xff0c;但是有大厂情结的朋友们还是可以冲一冲的。 很多有c/c语言基础的朋友&#xff0c;在面试后台岗的时候都会有…

Response Status Code 301、302

目录 Information Django redirect Influence Information HTTP状态码301、302和304分别表示以下情况&#xff1a; codeinformation301&#xff08;Moved Permanently&#xff09; 永久重定向。当请求的资源已经被永久地移动到了一个新的URI时&#xff0c;服务器会返回这个…

登录认证,登录校验

一、基础登录功能 1.Controller层 import com.itheima.pojo.Emp; import com.itheima.pojo.Result; import com.itheima.service.EmpService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework…

ChatGLM2-6B微调实践-QLora方案

ChatGLM2-6B微调实践-QLora方案 环境部署Lora微调项目部署准备数据集修改训练脚本adapter推理模型合并与量化合并后的模型推理 参数调优微调过程中遇到的问题参考&#xff1a; 环境部署 申请阿里云GPU服务器&#xff1a; CentOS 7.6 64Anaconda3-2023.07-1-Linux-x86_64Pytho…