RabbitMQ高可用延迟消息惰性队列

news2024/11/24 0:57:46

目录

生产者确认

消息持久化

消费者确认

TTL延迟队列

TTL延迟消息

惰性队列


生产者确认

生产者确认就是:发送消息的人,要确保消息发送给了消息队列,分别是确保到了交换机,确保到了消息队列这两步。

1、在发送消息服务的application.yml中添加配置

spring:
  rabbitmq:
    publisher-confirm-type: correlated  # 异步回调
    publisher-returns: true
    template:
      mandatory: true

2、确保消息到交换机

package cn.zsh.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirmCallBack() {
        // 1、定义消息
        String message = "ABC";

        // 设置一个消息的唯一ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 3、confirm-ack
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送异常:" + ex.toString());
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                if (result.isAck()) {
                    // 说明到了交换机
                    System.out.println("publish-confirm:ack==消息发送成功:" + correlationData.getId());
                } else {
                    // 消息没有到交换机
                    System.out.println("publish-confirm:nack==消息发送失败:" + correlationData.getId());
                }
            }
        });

        // 4、消息发送
        rabbitTemplate.convertAndSend("191exchange","191",message,correlationData);
    }


}

3、确保消息从交换机路由到队列

创建公开CommonConfig类

package cn.zsh.mq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;

/**
 * 发送消息到交换机没有到消息队列
 */
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 1、获取RabbitTemplate(获取启动中的Bean的方式)
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

        // 2、设置回调函数
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.error("发送消息失败,没到队列===消息:{}, 交换机:{}, 路由Key:{}, 响应CODE:{}, 相应内容:{}", message,exchange,routingKey,replyCode,replyText);
            }
        });
    }
}

消息持久化

消息持久化就是:确保消息不会在交换机或者队列中丢失。

案例:

使用SpringAMQP创建出来的交换机和队列,默认就是做了持久化的

package cn.zsh.mq.config;

import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * 创建交换机与队列
 */
@Component
public class FoundQueue {

    @Bean
    public DirectExchange qiuExchange(){
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new DirectExchange("qiu.deirect",true,false);
    }

    @Bean
    public Queue piqiuQueue(){
        // 使用QueueBuilder构建队列,durable就是持久化的
        return QueueBuilder.durable("piqiu.queue").build();
    }
}

消费者确认

消费者确认就是:消费者把消息从队列中获取出来,然后要消费成功,队列中的消息才能被删除掉。

方案一:消费者确认

加入这个配置以后,消费者消费失败,会直接重试或者删除,具体取决于设置的是none还是auto。

默认是none,不建议设置为auto模式因为会一直不断地尝试,这样会导致服务器压力很大。

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # none:投递完立马删除  auto:失败后让你再次重试(重新投递到队列)知道成功

方案二:消费者失败重试,重试固定次数后,则删除当前消息

加入这个配置以后,消费者消费失败会重试固定的次数,然后将消息删除。

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

方案三:消费者失败重试,重试固定次数后,将当前消息发送给error交换机路由给error队列

加入这个配置之后,重试固定次数后,会将这条消费失败的消息发送给error交换机,路由给error队列。

1、在消费者(消息接收者)中加入配置

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * initial-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # ture:无状态    false:有状态。如果业务中包含事务,这里改成false

2、创建error交换机和队列并绑定

package cn.zsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ErrorConfig {

    /**
     * 定义交换机
     * @return
     */
    @Bean
    public DirectExchange errorExchange2(){
        return new DirectExchange("error.direct");
    }

    /**
     * 定义队列
     * @return
     */
    @Bean
    public Queue errorQueue2(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding bindErrorQueue(DirectExchange errorExchange2,Queue errorQueue2){
        return BindingBuilder.bind(errorQueue2).to(errorExchange2).with("error");
    }
}

3、在启动类或者配置类中加入配置

package cn.zsh.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct","error");
    }
}

TTL延迟队列

延迟队列:

延迟队列就是消息发送到当前队列,会延迟一段时间,然后进行处理,具体如下图:

发送消息给指定队列,然后消息回延迟固定的时间,这个延迟时间是在对应的延迟消息队列中设置的。经过延迟以后,会将消息发送给其他的交换机,然后再路由给对应的消息队列,再进行消费,实现延迟的效果。

使用案例:

1、创建处理延迟消息的队列和交换机

当前交换机名称为:dl.direct

当前消息队列名称为:dl.queue

RoutingKey是:dl

package cn.zsh.mq.config;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 创建,处理延迟消息的,交换机和队列
 */
@Component
public class TtlConfig {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue",durable = "true"),    // 处理延迟消息的队列名称
            exchange = @Exchange(name = "dl.direct",durable = "true"),    // 处理延迟消息的交换机名称
            key = "dl"    // 当前的RoutingKey
    ))
    public void consumerDdlMessage(String message){
        System.out.println("接收到延迟消息:" + message);
    }
}

2、创建延迟交换机、队列,并绑定

package cn.zsh.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class publisherTtlConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");    // 延迟交换机名称
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder.durable("ttl.queue")    // 延迟队列名称
                .ttl(10000) // 延迟队列的延迟时间(当前为10秒)
                .deadLetterExchange("dl.direct") // 设置延时时间到了以后发送到哪个交换机(处理延迟消息的交换机)
                .deadLetterRoutingKey("dl") // 设置具体到那个交换机的具体队列(处理延迟消息队列的RoutingKey)
                .build();
    }

    /**
     * 绑定延迟队列与交换机
     * @return
     */
    @Bean
    public Binding bingTtlQueue(){
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");    // 将延迟队列与交换机绑定,并设置RoutingKey(这个RoutingKey是当前延迟消息队列的RoutingKey)
    }
}

3、发送任意消息到当前延迟队列(ttl.queue)即可实现延迟效果。

延时时间到了以后,会将消息发送给(dl.direct)交换机,路由给RoutingKey为(dl)的消息队列dl.queue。有绑定了dl.queue的队列进行消息的最终处理。

TTL延迟消息

延迟消息:

延迟消息是给消息设置延迟时间,然后将消息发送给延迟队列,可以实现延迟。

注意!!!延迟消息的延迟时间,与延迟队列的延迟时间,哪个时间短,就使用哪个延迟时间。

例1:延迟消息设置延迟时间为5秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟5秒然后就会被处理。

例2:延迟消息设置延迟时间为20秒,延迟队列设置延迟时间为10秒,则消息发送给延迟队列后,延迟10秒然后就会被处理。

使用案例:

延迟消息必须发送给延迟队列,因为延迟时间按最短的执行。发送给没有设置延迟时间的消息队列,会直接被消费。

package cn.zsh.mq.spring;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTTLMessage(){
        // 创建消息
        Message message = MessageBuilder
                .withBody("这是一条延时5秒后执行的消息".getBytes(StandardCharsets.UTF_8))
                .setExpiration("5000")// 延时时间
                .build();

        // 消息ID,需要封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        // 发送消息(这个消息必须要发给延时的消息队列)
        rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);
    }

}

惰性队列

惰性队列是为了防止消息大量积压的一种队列。

消息队列中的消息一般都存在内存中,而消息大量积压,就会产生很多问题,这时候可以使用惰性队列,惰性队列的消息保存在磁盘中。

创建惰性队列:

方案一:

在代码中声明

package cn.itcast.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CommonConfig {

    @Bean
    public Queue lazyQueue(){
        return QueueBuilder
                .durable("lazy.queue")
                .lazy()
                .build();
    }
}

方案二:

在浏览器中MQ的控制台声明

第一步:

第二步:

额外补充:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2246335.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

将django+vue项目发布部署到服务器

1.部署django后端服务 部署架构 1.1 下载依赖插件 pip3.8 freeze > requirements.txt1.2 安装依赖插件 pip3 install -r requirements.txt1.3 安装mysql数据库 apt install mysql-server初始化数据库 CREATE USER admin% IDENTIFIED WITH mysql_native_password BY 123…

论文阅读:SIMBA: single-cell embedding along with features

Chen, H., Ryu, J., Vinyard, M.E. et al. SIMBA: single-cell embedding along with features. Nat Methods 21, 1003–1013 (2024). 论文地址&#xff1a;https://doi.org/10.1038/s41592-023-01899-8 代码地址&#xff1a;https://github.com/pinellolab/simba. 摘要 大多…

商业物联网:拥抱生产力的未来

在现代商业格局中&#xff0c;数据占据至高无上的地位。物联网&#xff08;IoT&#xff09;站在这场数字革命的前沿&#xff0c;将以往模糊不清的不确定因素转变为可衡量、可付诸行动的深刻见解。物联网技术为日常物品配备传感器与连接功能&#xff0c;使其能够实时收集并传输数…

【FRP 内网穿透 从0到1 那些注意事项】

【摘要】 最近跟第三方团队调试问题&#xff0c;遇到一个比较烦的操作。就是&#xff0c;你必须要发个版到公网环境&#xff0c;他们才能链接到你的接口地址&#xff0c;才能进行调试。按理说&#xff0c;也没啥&#xff0c;就是费点时间。但是&#xff0c;在调试的时候&#…

最新Kali安装详细版教程(附安装包,傻瓜式安装教程)

本文主要详细介绍 kali 的安装过程&#xff0c;以及安装完成后的基本设置&#xff0c;比如安装增强工具&#xff0c;安装中文输入法以及更新升级等操作。 文章目录 实验环境准备工作步骤说明安装虚拟机安装 Kali安装增强工具安装中文输入法更新升级 实验环境 VMware &#x…

【山大909算法题】2014-T1

文章目录 1.原题2.算法思想3.关键代码4.完整代码5.运行结果 1.原题 为带表头的单链表类Chain编写一个成员函数Reverse&#xff0c;该函数对链表进行逆序操作&#xff08;将链表中的结点按与原序相反的顺序连接&#xff09;&#xff0c;要求逆序操作就地进行&#xff0c;不分配…

论文浅尝 | MindMap:知识图谱提示激发大型语言模型中的思维图(ACL2024)

笔记整理&#xff1a;和东顺&#xff0c;天津大学硕士&#xff0c;研究方向为软件缺陷分析 论文链接&#xff1a;https://aclanthology.org/2024.acl-long.558/ 发表会议&#xff1a;ACL 2024 1. 动机 虽然大语言模型&#xff08;LLMs&#xff09;已经在自然语言理解和生成任务…

Win11 22H2/23H2系统11月可选更新KB5046732发布!

系统之家11月22日报道&#xff0c;微软针对Win11 22H2/23H2版本推送了2024年11月最新可选更新补丁KB5046732&#xff0c;更新后&#xff0c;系统版本号升至22621.4541和22631.4541。本次更新后系统托盘能够显示缩短的日期和时间&#xff0c;文件资源管理器窗口很小时搜索框被切…

SpringSecurity创建一个简单的自定义表单的认证应用

1、SpringSecurity 自定义表单 在 Spring Security 中创建自定义表单认证应用是一个常见的需求&#xff0c;特别是在需要自定义登录页面、认证逻辑或添加额外的表单字段时。以下是一个详细的步骤指南&#xff0c;帮助你创建一个自定义表单认证应用。 2、基于 SpringSecurity 的…

Cloud Native 云原生后端的开发注意事项

在云原生后端开发里&#xff0c;数据管理和存储这块得好好弄。数据库选型得综合考虑&#xff0c;像关系型数据有复杂查询需求就选 MySQL、PostgreSQL&#xff0c;海量非结构化数据就可以考虑 MongoDB、Cassandra 这些。设计数据库得遵循规范化原则&#xff0c;像设计电商订单表…

通达OA前台submenu.php存在SQL注入漏洞(CVE-2024-10600)

通达OA前台submenu.php存在SQL注入漏洞(CVE-2024-10600) pda/appcenter/submenu.php 未包含inc/auth.inc.php且 $appid 参数未用’包裹导致前台SQL注入 影响范围 v2017-v11.6 fofa app"TDXK-通达OA" && icon_hash"-759108386"poc http://url…

TCP连接(三次握手)(四次挥手)

建立TCP连接&#xff08;三次握手&#xff09; 以下是简单阐述 在确定目标服务器 IP 地址后&#xff0c;则经历三次握手建立TCP 连接 三次握手 代表客户端和服务端 之间的传递信息有三次 A说&#xff1a;我想和你聊天 &#xff0c;你能听到我说话吗 B说&#xff1a;我可以听到…

【MySQL实战45讲笔记】基础篇——事务隔离

系列文章 基础篇——MySQL 的基础架构 基础篇——redo log 和 binlog 目录 系列文章1. 事务隔离1.1 隔离性与隔离级别1.2 如何实现事务隔离1.3 事务的启动方式1.4 思考&#xff1a; 使用什么方案来避免长事务 1. 事务隔离 简单来说&#xff0c;事务就是要保证一组数据库操作&…

upload-labs-master第12关详细教程

成功了别忘了回来留下你的评论哦&#xff0c;嘻嘻 目录 环境配置闯关 环境配置 需要的东西 phpstudy-2018 链接&#xff1a; https://pan.baidu.com/s/1D9l13XTQw7o6A8CSJ2ff9Q 提取码&#xff1a;0278 32位 vc9和11运行库 链接&#xff1a; https://pan.baidu.com/s/1pBV3W…

Mac 修改默认jdk版本

当前会话生效 这里演示将 Java 17 版本降低到 Java 8 查看已安装的 Java 版本&#xff1a; 在终端&#xff08;Terminal&#xff09;中运行以下命令&#xff0c;查看已安装的 Java 版本列表 /usr/libexec/java_home -V设置默认 Java 版本&#xff1a; 找到 Java 8 的安装路…

uniapp奇怪bug汇总

H5端请求api文件夹接口报错 踩坑指数&#xff1a;5星 小程序、APP之前都是用api文件夹的接口引用调用&#xff0c;在h5端启动时报错&#xff0c;研究半天&#xff0c;发现把api文件夹名字改成apis就能调用&#xff0c;就像是关键字一样无法使用。 import authApi from /api/…

交换机配置从IP(Switch Configuration from IP)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 本人主要分享计算机核心技…

大语言模型---ReLU函数的计算过程及其函数介绍

文章目录 1. 概要2. ReLU定义 1. 概要 **ReLU 作用&#xff1a;**主要用于为神经网络引入非线性能力&#xff0c;作用是将输入中的整数保留原值&#xff0c;负数置为 0。 从而在层与层之间引入非线性&#xff0c;使神经网络能够拟合复杂的非线性关系。 **ReLU使用场景&#xf…

QT如何共享文件+拷贝文件

QString sharedFolderPathImg "\\\\" IP "/profileImage/"; // 更换为你的共享文件夹路径QDir dirImg(sharedFolderPathImg);dirImg.setFilter(QDir::NoDotAndDotDot | QDir::AllEntries);QVector<QString> curFileEntryArrayImg dirImg.entryList…

sourceTree无效的源路径问题解决

1.点击工具 2.点击选项 3.修改ssh客户端为OpenSSH 4.点击确定&#xff0c;然后重新打开软件