RabbitMQ“延时队列“

news2024/11/16 17:43:38

1.RabbitMQ"延时队列"

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
注意RabbitMQ并没有延时队列慨念,其实是通过死信实现
应用场景:订单30分钟未支付取消…

2.整合springboot

创建springboot Maven项目,其pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hong</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot整合Rabbitmq</description>
    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.2</version>
            </plugin>
        </plugins>
    </build>

</project>

3.配置文件application.properties

spring.rabbitmq.host=10.211.55.4
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

4.添加Swagger配置类

package com.hong.springboot.rabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

/**
 * @Description: Swagger配置类
 * @Author: hong
 * @Date: 2024-01-24 10:38
 * @Version: 1.0
 **/
@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder()
                .title("rabbitmq接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("JAVA小生不才", "https://blog.csdn.net/qq_41596346?spm=1011.2423.3001.5343", "hst1406959716@163.com"))
                .build();
    }
}

5.队列TTL配置列

在这里插入图片描述

package com.hong.springboot.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description: TTL队列配置类
 * @Author: hong
 * @Date: 2024-01-24 09:57
 * @Version: 1.0
 **/

@Configuration
public class TTLQueeueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";

    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列
    public static final String DEAD_LETTER_QUEUE = "QD";

    // 声明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明 死信队列交换机
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明队列 A ttl 为 10s 并绑定到对应的死信交换机
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    // 声明队列 A 绑定 X 交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }



    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

6.消息生产者

package com.hong.springboot.rabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description: 消息生产者
 * @Author: hong
 * @Date: 2024-01-28 10:15
 * @Version: 1.0
 **/
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10S的QA队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为10S的QB队列: " + message);
    }
}

7.消息消费者代码

package com.hong.springboot.rabbitmq.consumer;

import com.hong.springboot.rabbitmq.config.TTLQueeueConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description: 死信消费者
 * @Author: hong
 * @Date: 2024-01-28 10:22
 * @Version: 1.0
 **/
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);
    }

}

启动项目,发起http://localhost:8080/ttl/sendMsg/JAVA%E5%B0%8F%E7%94%9F%E4%B8%8D%E6%89%8D请求
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

8.代码优化

QA和QB的ttl都写死啦,若是再来1个时间不同的咋处理

8.1.TTLQueeueConfig优化

代码中添加如下代码

    //QC 不设置ttl  消息自带ttl
    public static final String QUEUE_C = "QC";

    //声明队列 C 死信交换机    
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //没有声明 TTL 属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    //声明队列C绑定 X 交换机
    @Bean
    public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

8.2.生产者代码优化

添加如下方法

    /**
     * 延时队列优化
     * @param message 消息
     * @param ttlTime 延时的毫秒
     */
    @GetMapping("sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttlTime);
            return correlationData;
        });
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列C:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , ttlTime, message);
    }

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1418355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图片中的水印怎么去掉?教你三个去水印方法

在拍摄照片时&#xff0c;有时候会遇到不期而遇的路人出现在镜头中&#xff0c;给照片带来不必要的干扰。这时候我们就需要把路人给去掉&#xff0c;让照片变的更加完美。下面我将给大家分享三个把照片中的路人去掉的小妙招。 一、水印云 水印云是一款非常实用的图片处理工具…

机器学习3-简单线性回归

需求&#xff1a; 现在要根据学生的学习时间来预测学习成绩&#xff0c;给出现有数据&#xff0c;用来训练模型并预测新数据。 分析&#xff1a; 使用线性回归模型。 代码&#xff1a; import pandas as pd import matplotlib.pyplot as plt from sklearn.model_selection i…

Day02-课后练习2(数据类型和运算符)

参考答案博客链接跳转 文章目录 巩固题1、案例&#xff1a;今天是周2&#xff0c;100天以后是周几&#xff1f;2、案例&#xff1a;求三个整数x,y,z中的最大值3、案例&#xff1a;判断今年是否是闰年4、分析如下代码的计算结果5、分析如下代码的计算结果6、分析如下代码的计算…

SpringBoot之JWT登录

JWT JSON Web Token&#xff08;JSON Web令牌&#xff09; 是一个开放标准(rfc7519)&#xff0c;它定义了一种紧凑的、自包含的方式&#xff0c;用于在各方之间以JSON对象安全地传输信息。此信息可以验证和信任&#xff0c;因为它是数字签名的。jwt可以使用秘密〈使用HNAC算法…

做人力RPO蓝海项目需要有人力资源工作经验吗?

在当前的商业环境中&#xff0c;人力资源外包服务已经成为了许多企业的选择。其中&#xff0c;人力RPO(招聘流程外包)作为人力资源外包的一种形式&#xff0c;尤其在蓝海项目中备受瞩目。那么&#xff0c;对于想要涉足人力RPO领域的个人或企业来说&#xff0c;是否需要具备丰富…

uniapp 解决键盘弹出页面内容挤压问题

page.json 配置 加 “app-plus”: { “softinputMode”: “adjustResize” } {"path": "pages/jxx/xx","style": {"navigationBarTitleText": "贺卡DIY","enablePullDownRefresh": false,"app-plus": {…

Mysql查询数据

1 基本查询语句 MySQL从数据表中查询数据的基本语句为SELECT语句。SELECT语句的基本格式是&#xff1a; 2 单表查询 2.1 查询所有字段 SELECT * FROM 表名; 2.2 在SELECT语句中指定所有字段 SELECT f_id, s_id ,f_name, f_price FROM fruits; 2.3 查询单个字段 SELECT 列名FR…

字节8年经验之谈 —— 如何编写出色的接口测试用例?

简介&#xff1a;在所有的开发测试中&#xff0c;接口测试是必不可少的一项。有效且覆盖完整的接口测试&#xff0c;不仅能保障新功能的开发质量&#xff0c;还能让开发在修改功能逻辑的时候有回归的能力&#xff0c;同时也是能优雅地进行重构的前提。编写接口测试要遵守哪些原…

双非本科准备秋招(9.3)—— JVM2

学这个JVM还是挺抽象的&#xff0c;不理解的东西我尽量记忆了&#xff0c;毕竟刚接触两天&#xff0c;也没遇到过实际应用场景&#xff0c;所以学起来还是挺费劲的&#xff0c;明天再补完垃圾回收这块的知识点。U•ェ•*U 先补一下JVM运行时的栈帧结构。 线程调用一个方法的执…

【第十九课】BFS:广度优先搜索 (acwing-844走迷宫 / 含过程演示的视频推荐 / c++代码)

目录 BFS思路 可能需要看的视频和博客 代码如下 输出最短路径途径点 关于这种类型的题&#xff0c;我是有点印象的。。。当时蓝桥杯校内选拔就有这种题&#xff0c;当时还没学算法hhh BFS思路 对应上图来理解BFS的方式还是很容易的&#xff0c;只是如何在题目中应用BFS的思…

【C++】2024.01.29 克隆机

题目描述 有一台神奇的克隆机&#xff0c;可以克隆任何东西。将样品放进克隆机&#xff0c;可以克隆出一份一样的“复制品”。小明得到了 k 种珍贵的植物种子&#xff0c;依次用 A,B,C,D,...,Z 表示&#xff08;1≤k≤26&#xff09;。一开始&#xff0c;每种植物种子只有…

如何在Shopee波兰站点进行选品和销售策略

在Shopee波兰站点进行选品时&#xff0c;卖家需要考虑一些热销品类和策略&#xff0c;以提高产品的市场竞争力和销售业绩。以下是一些值得考虑的关键品类和策略&#xff0c;帮助卖家在波兰市场取得成功。 先给大家推荐一款shopee知虾数据运营工具知虾免费体验地址&#xff08;…

DS:经典算法OJ题(1)

创作不易&#xff0c;友友们给个三连呗&#xff01;&#xff01; 本文为经典算法OJ题练习&#xff0c;大部分题型都有多种思路&#xff0c;每种思路的解法博主都试过了&#xff08;去网站那里验证&#xff09;是正确的&#xff0c;大家可以参考&#xff01;&#xff01; 一、移…

【docker】Docker-compose单机容器集群编排

一、Compose的相关知识 1. Compose的相关概念 Compose是单机编排容器集群或者是分布式服务容器的应用工具。通过Compose&#xff0c;可以使用YAML文件来配置应用程序的服务。然后&#xff0c;使用一个命令&#xff0c;就可以从配置中创建并启动所有服务。 Docker-Compose是一…

VScode通过SSH连接远程服务器

一. 在VScode上安装SSH插件 直接在VScode应用商店搜索安装即可: 二. 登陆服务器的root用户 使用命令"su -"或者"sudo -i -u root"都可以。 三.用vim编辑器打开服务器的SSH配置文件,把PasswordAuthentication后面的no改为yes&#xff0c;表示SSH允许远程密…

MySql的使用方法

一.什么是MySql MySql是一种数据库管理系统&#xff0c;是用来存储数据的&#xff0c;可以有效的管理数据&#xff0c;数据库的存储介质为硬盘和内存。 和文件相比&#xff0c;它具有以下优点&#xff1a; 文件存储数据是不安全的&#xff0c;且不方便数据的查找和管理&#xf…

Java项目:基于SSM框架实现的高校毕业生就业管理系统(ssm+B/S架构+源码+数据库+毕业论文)

一、项目简介 本项目是一套ssm817基于SSM框架实现的高校毕业生就业管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调…

江科大stm32学习笔记6——GPIO输入准备

一、按键消抖 由于按键内部使用的是机械式弹簧片&#xff0c;所以在按下和松开时会产生5~10ms的抖动&#xff0c;需要通过代码来进行消抖。 二、滤波电容 在电路中&#xff0c;如果见到一端接在电路中&#xff0c;一端接地的电容&#xff0c;则可以考虑它的作用为滤波电容&am…

1.Mybatis入门

目录 前言 1入门 1.1 入门程序实现 1.2 数据准备 ​编辑 1.3 配置Mybatis 1.4 编写SQL语句 1.5 单元测试 1.6 解决SQL警告与提示 2. JDBC介绍(了解) 2.1 介绍 2.2 代码 2.3 问题分析 2.4 技术对比 3. 数据库连接池 3.1 介绍 3.2 产品 4. lombok 4.1 介绍 4.…

【李宏毅机器学习】Transformer 内容补充

视频来源&#xff1a;10.【李宏毅机器学习2021】自注意力机制 (Self-attention) (上)_哔哩哔哩_bilibili 发现一个奇怪的地方&#xff0c;如果直接看ML/DL的课程的话&#xff0c;有很多都是不完整的。开始思考是不是要科学上网。 本文用作Transformer - Attention is all you…