rabbit MQ的延迟队列处理模型示例(基于SpringBoot死信模式)

news2025/1/23 22:31:55

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL延迟队列配置文件类
 *
 */
@Configuration
public class TtlQueueConfig {
    //
    //普通交换机的名称 X
    public static final String X_EXCHANGE = "X";

    //死信交换机名称 Y
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //普通队列QA QB
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信队列名称QD
    public static final String DEAD_LETTER_QUEUE = "QD";
    //
    //声明X_EXCHANGE
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明死信交换Y_DEAD_LETTER_EXCHANGE
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //声明队列 QA
    @Bean("queueA")
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey (死信后充当了消费者的发送路由)
        arguments.put("x-dead-letter-routing-key","YD");
        //消息过期时间
        arguments.put("x-message-ttl",10000);

        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();

    }

    //声明队列 QB
    @Bean("queueB")
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信RoutingKey (死信后充当了消费者的发送路由)
        arguments.put("x-dead-letter-routing-key","YD");
        //消息过期时间
        arguments.put("x-message-ttl",40000);

        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();

    }
    //声明死信队列QD
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //捆绑
    //绑定队列QA与交换机X_EXCHANGE
    @Bean
    public Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,
                                      @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    //绑定队列QB与交换机X_EXCHANGE
    @Bean
    public Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,
                                      @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //绑定队列QD与交换机Y_Exchange
    @Bean
    public Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,
                                       @Qualifier("yExchange")DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;

//发送延迟消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/senMsg/{message}")
    public void sendMes(@PathVariable String message){
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
    }
}

消费者与死信队列创建一个监听者示例:
package com.esint.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 队列TTL消费者
 */

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接受消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{

        String msg = new String(message.getBody());
        log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);
    }
}

rabbitmq的配置文件:

spring:
  rabbitmq:
    host: *.*.*.*
    port: 5672
    username: guest
    password: guest

接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


拓展:是不是有一种可能,如果再队列中不设置过期时间,在生产者发送消息时设置过期时间 来实现过期时间自由设定,而延迟自由?

结论是不能:
rabbitMQ队列只会检查第一个消息是否过期。举例如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。

示例验证:

增加一个无过期时间约束的队列,以routing-key为XC绑定X交换机,过期后以routing-key为YD绑定Y交换机。
过期时间放生产者发送时设定。

在的rabbitMQ配置类中增加QC 绑定前(X routing-key=XC)后(Y routing-key=YD)交换机:

    // 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    public static final  String QUEUE_C ="QC";
    
    //声明队列 QC 优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    @Bean("queueC")
    public Queue queueC(){

        Map<String,Object> arguments = new HashMap<>(2);
        //设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置routing-key
        arguments.put("x-dead-letter-routing-key","YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
    }
//绑定队列QC与交换机X_EXCHANGE   优化新增队列 队列不设置TTL过期时间 把过期时间放到生产者发送消息时
    @Bean
    public Binding queueCBindXExchange(@Qualifier("queueC") Queue queueC,
                                       @Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者:


    @GetMapping("/sendttl/{message}/{ttlTime}")
    public void sendMes(@PathVariable String message,@PathVariable String ttlTime){
/**
 * 死信队列做延迟时的缺陷:
 * rabbitMQ只会检查第一个消息是否过期带来的问题就是,如果第一个消息的ttl为30s,第二个消息ttl为3s。第二个消息不会再3s后到达,而是会在第一个过期后,再第二个到达。
 */
        log.info("当前时间:{},发送一条ttl为{}ms的消息给QC队列:{}",new Date().toString(),ttlTime,message);
        rabbitTemplate.convertAndSend("X","XC",message,mes->{
            mes.getMessageProperties().setExpiration(ttlTime);
            return mes;
        });
    }

消费者不变,启动服务!

生产者发送消息:第一条 3000ms
http://127.0.0.1:19092/ttl/sendttl/第一条30000ms消息/30000
http://127.0.0.1:19092/ttl/sendttl/第二条3000ms消息/3000

在这里插入图片描述
结论:第二条虽然早早过期,它依然需要等待第一条过期后,才能排到他。rabbitMQ的队列过期检查机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1238025.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

unity自制循环匀速动画

动画制作&#xff0c;有循环匀速要求时&#xff0c;需要调节Curves&#xff0c;将其节点的Tangent改为Linear

python 水质日历热力图

利用日历热力图可以方便的查看站点水质全年的变化情况。 接口获取站点数据 这一步根据自己实际情况&#xff0c;也可以读取excel、MySQL读取数据。这里把API地址已隐去。 import numpy as np import calendar import requests import json import pandas as pd import time f…

『亚马逊云科技产品测评』活动征文|通过Lightsail搭建个人笔记

提示&#xff1a;授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 文章目录 前言实践知识储备Lightsail介绍Leanote介绍实践…

如何用CHAT写励志文章?

问CHAT&#xff1a;写一篇以《过了60岁要积极面对身体疾病的坎儿》为题目&#xff0c;写一篇300字励志文章 CHAT回复&#xff1a; 标题&#xff1a;《过了60岁要积极面对身体疾病的坎儿》 人生&#xff0c;有时会像一趟不期而遇的旅程&#xff0c;各自带着乐观或悲观、阳光或…

Transmit v5.10.3(FTP客户端)

Transmit 5是一款由Panic开发的功能强大的FTP(文件传输协议)客户端软件&#xff0c;专为 macOS 平台设计。它提供了简单、直观的界面和丰富的功能&#xff0c;使用户能够轻松地管理和传输文件。 在文件传输和同步方面&#xff0c;Transmit 5提供了强大的文件同步功能&#xff…

基于Android校园交流uniAPP+vue 微信小程序v7e1

本系统结合现今XX校园交流APP的功能模块以及设计方式进行分析&#xff0c;使用Android平台和Ssm框架进行开发设计&#xff0c;具体研究内容如下&#xff1a; (1) 系统管理员主要对用户管理、类型管理、娱乐天地管理、投诉举报管理、学习平台、我的收藏管理、系统管理等功能进…

小米智能摄像机云台版pro 拆解教程

拆解原因 因为设备提示无内存卡&#xff0c;摄像头手动调整方向到最上面&#xff0c;就可以看到内存卡插槽 但是这个摄像头因为内存卡弹出来了&#xff0c;导致无法插入也无法取出&#xff0c;所以决定拆开重新安装 第一步&#xff0c;拆开后即可拔出底座&#xff0c;拔掉摄像…

2023软件测试的4个技术等级,你在哪个级别?

最近&#xff0c;我们讨论了软件测试工程的的分级&#xff0c;大家都贡献了自己的想法&#xff0c;对于大家来说&#xff0c;软件测试人的分级其实也代表了我们的进阶方向&#xff0c;职业发展。总体来说&#xff0c;测试工程师未来发展有三个方向&#xff1a; 技术精英 行业专…

RK3568开发板在工控工业物联网网关方面的应用

在数字化转型的浪潮中&#xff0c;工控物联网关产品扮演着重要的角色。这些产品通过连接工业设备和网络&#xff0c;为数据传输和分析提供了便利。而迅为RK3568核心板作为一款高性能的芯片&#xff0c;为工控物联网关产品的性能提升和功能扩展提供了强大的支持。 迅为RK3568核心…

【Java】智慧工地管理系统源代码,支持二次开发,SaaS模式

智慧工地系统围绕工程现场人、机、料、法、环及施工过程中质量、安全、进度、成本等各项数据满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效。 一、行业现状 1、施工现场管理难&#xff1a;安全事故频发&#xff0c;人工巡检难度大&#xff0c;质量进度协同难等…

自动化发展趋势以及自动化测试常见问题解析

前言 ⾃动化接⼝测试会越来越受到重视 在移动互联⽹时代&#xff0c;对于质量的要求⽐PC时代⾼的多&#xff0c;⽽投⼊产出⽐最⾼的⾃动化接⼝测试&#xff0c;将会是⼤部分公司的⾸选⽅向&#xff0c;但需要严格掌握⼀门语⾔ 持续集成是⽬前⾮常流⾏的开发⽅式&#xff0c;…

在有springSecurity或者若依项目中获取当前系统登录的用户信息

方法一&#xff08;springSecurity自带的&#xff09; AuthenticationPrincipal 是 Spring Security 框架中的一个注解&#xff0c;用于获取当前已认证用户的 principal&#xff08;即用户身份信息&#xff09;。 方法二&#xff08;若依项目自带的&#xff09; &#xff08;1…

最新AIGC创作系统ChatGPT网站源码,Midjourney绘画系统,支持最新GPT-4-Turbo模型,支持DALL-E3文生图

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

七、通过libfdk_aac编解码器实现aac音频和pcm的编解码

前言 测试环境&#xff1a; ffmpeg的4.3.2自行编译版本windows环境qt5.12 AAC编码是MP3格式的后继产品&#xff0c;通常在相同的比特率下可以获得比MP3更高的声音质量&#xff0c;是iPhone、iPod、iPad、iTunes的标准音频格式。 AAC相较于MP3的改进包含&#xff1a; 更多的采…

【Python】Vscode解决Python中制表符和空格混用导致的缩进问题

【Python】Vscode解决Python中制表符和空格混用导致的缩进问题 文章目录 【Python】Vscode解决Python中制表符和空格混用导致的缩进问题1. 问题来源2. 解决Reference 1. 问题来源 在python中使用缩进来进行代码块的分区&#xff0c;通常来说python的一个缩进包含4个空格&#…

微服务学习|Feign:快速入门、自定义配置、性能优化、最佳实践

RestTemplate方式调用存在的问题 先来看我们以前利用RestTemplate发起远程调用的代码 存在下面的问题 代码可读性差&#xff0c;编程体验不统一 参数复杂URL难以维护 Feign的介绍 Feign是一个声明式的http客户端&#xff0c;官方地址: https://github.com/OpenFeign/feign …

轻松整理文件夹,将视频文件全部归类到另一个文件夹!

如果你需要整理文件夹中的文件&#xff0c;将同一类别的文件归纳到一起&#xff0c;可以更加方便地管理和查找。现在&#xff0c;我们有一个简单而实用的方法&#xff0c;可以将文件夹中的所有视频文件归类到另一个文件夹中&#xff0c;让你的文件管理更加有序和高效。 首先&am…

《向量数据库指南》——向量数据库Milvus Cloud搭建Excel公式编辑器助手

引言 在日常工作中,Excel是我们经常使用的办公工具,而熟练应用Excel公式对于提高工作效率非常重要。然而,有时候我们会遇到一些复杂的需求,需要用到较为专业的Excel公式,而这正是Excel公式编辑器助手的用武之地。本文将介绍如何利用向量数据库Milvus Cloud搭建GPT大模型和…

【顺序表的应用-通讯录的实现】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、顺序表的应用 1. 基于动态顺序表实现通讯录 1、功能要求 2、代码实现 二、通讯录的代码实现 1.通讯录的底层结构(顺序表) (1)思路展示 (2)底层代码实现(顺序表…

如何有效解决UDP协议传输问题实现快速安全的文件传输

随着互联网技术的不断发展&#xff0c;UDP协议作为一种快速、简单的传输协议被广泛应用于文件传输领域。然而&#xff0c;UDP协议传输过程中也存在着一些问题&#xff0c;如传输速度不稳定、数据丢失等&#xff0c;这些问题会影响到文件传输的效率和安全性。本文将介绍UDP协议传…