Springboot 整合MQ实现延时队列入门

news2024/9/21 2:42:26

延时队列

  • 添加依赖
  • 配置文件
  • 队列TTL
    • 代码架构图
    • 交换机、队列、绑定配置文件代码
    • 生产者代码
    • 消费者代码
    • 延时队列优化
    • 添加普通队列配置代码
    • 生产者发送消息是进行设置消息的ttl
  • 通过MQ 插件实现延时队列
    • 代码架构图
    • 配置交换机
    • 生产者代码
    • 消费者代码
    • 测试发送

添加依赖

 <!-- rabbitMQ 集成 spring boot -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
        <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

配置文件

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

队列TTL

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
在这里插入图片描述

交换机、队列、绑定配置文件代码

package com.wlj.rabbitmq.sbmq.confing;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 *@创建人 wlj
 *@创建时间 2023/8/16
 *@描述 MQ配置
 */
@Configuration
public class TtlQueueConfig {
    //X交换机
    public static final String X_EXCHANGE = "X";
    //QA队列
    public static final String QUEUE_A = "QA";
    //QB队列
    public static final String QUEUE_B = "QB";
    //Y死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //QD死信队列
    public static final String DEAD_LETTER_QUEUE = "QD";

    //声明x交换机
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明y交换机
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明QA队列 ttl为10秒 并绑定对应的死信交换机
    @Bean("queueA")
    public Queue queueA(){
        HashMap<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }


    //声明QB队列 ttl为40秒 并绑定对应的死信交换机
    @Bean("queueB")
    public Queue queueB(){
        HashMap<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //设置队列QA 绑定交换机X
    @Bean
    public Binding  queueaBindingX(@Qualifier("queueA")Queue queueA
                                    ,@Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //设置队列QB 绑定交换机X
    @Bean
    public Binding  queuebBindingX(@Qualifier("queueB")Queue queueB
            ,@Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    //声明死信队列QD
    @Bean("queueD")
    public  Queue queueD(){
       return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    //死信队列和死信交换机绑定
    @Bean
    public Binding queuedBind(@Qualifier("queueD") Queue queueD,
                              @Qualifier("yExchange") DirectExchange yExchange){
        return  BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

生产者代码

package com.wlj.rabbitmq.sbmq.confing.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Date;

/**
 *@创建人 wlj
 *@创建时间 2023/8/16
 *@描述 生产者controller
 */
@Slf4j
@RestController
@RequestMapping("ttl")
public class MsgController {
    @Resource
    RabbitTemplate rabbitTemplate;
    @GetMapping("/send/{msg}")
    public void sendMsg(@PathVariable String msg){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), msg);
        rabbitTemplate.convertAndSend("X","XA","发送的消息,延时10秒: "+msg);
        rabbitTemplate.convertAndSend("X","XB","发送的消息,延时40秒: "+msg);
    }
}

消费者代码

package com.wlj.rabbitmq.sbmq.confing.dead;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 *@创建人 wlj
 *@创建时间 2023/8/16
 *@描述 消费者
 */
@Component
@Slf4j
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = "QD")
    public void getMsg(Message msg, Channel channel){
        System.out.println(new String(msg.getBody()));
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻
以上代码声明队列的时候,设置队列的延时时间是10秒和40秒,意味着所有进入队列的消息都是根据队列的延时时间的。这就会有一个问题,如果说业务需要延时20秒、15秒、一分钟、等等等等,难道都需要创建每一种延时队列吗?那岂不是要增加无数个队列才能满足需求。下面就进行优化延时队列

延时队列优化

代码架构图
在这里插入图片描述
声明一个普通的队列,只需要在生产消息的时候设置消息的延时时间即可。

添加普通队列配置代码

  //声明普通队列QC代码
    @Bean("queueC")
    public  Queue queueC(){
        HashMap<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        return  QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }
    //设置QC队列和X交换机绑定
    @Bean
    public  Binding queuecBindX(@Qualifier("queueC")Queue queueC,
                                @Qualifier("xExchange")DirectExchange xExchange){
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

生产者发送消息是进行设置消息的ttl

@GetMapping("/send/{msg}/{ttl}")
    public void sendMsgTtl(@PathVariable String msg,@PathVariable String ttl){
       
        rabbitTemplate.convertAndSend("X","XC",msg,correlationData->{
            correlationData.getMessageProperties().setExpiration(ttl);
            return correlationData;});
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(),ttl, msg);
    }

发送测试
http://localhost:8080/ttl/send/嘻嘻嘻/20000
http://localhost:8080/ttl/send/哈哈哈/2000

在这里插入图片描述
消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

通过MQ 插件实现延时队列

Windows 安装MQ延时插件,请查看
Linux 安装MQ延时插件,请查看

代码架构图

延时队列是交换机进行把控消息的ttl。ttl到期才会发送到对应到队列
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下
在这里插入图片描述

配置交换机

package com.wlj.rabbitmq.sbmq.confing.plugins;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;

/**
 *@创建人 wlj
 *@创建时间 2023/8/16
 *@描述 基于插件实现延时消息发送
 */
@Component
public class DelayedQueueConfig {
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
    //声明队列
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }

    /**
     *   声明交换机
     *   因为交换机的类型没有延时类型 所以使用自定义交换机
     */
    @Bean
    public CustomExchange delayedExchange(){

        HashMap<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        // 对应参数: 交换机的名称 x-delayed-message说明是延时消息交换机 是否序列化 是否自动删除,参数
        return  new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
    }

    //进行绑定
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange
                                               delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者代码

 public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
                correlationData ->{
                    correlationData.getMessageProperties().setDelay(delayTime);
                    return correlationData;
                });
        log.info(" 当 前 时 间 : {}, 发送一条延迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(),delayTime, message);
    }

消费者代码

  public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
    }

测试发送

http://localhost:8080/ttl/sendDelayMsg/qwer/20000
http://localhost:8080/ttl/sendDelayMsg/121212/2000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/888418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023国赛数学建模思路 - 复盘:校园消费行为分析

文章目录 0 赛题思路1 赛题背景2 分析目标3 数据说明4 数据预处理5 数据分析5.1 食堂就餐行为分析5.2 学生消费行为分析 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 赛题背景 校园一卡通是集…

Unity如何控制声音大小(包括静音功能)

一&#xff1a;UGUI制作 1. 首先在【层级】下面创建UI里面的Slider组件。设置好它对应的宽度和高度。 2.调整Slider滑动条的填充颜色。一般声音颜色我黄色&#xff0c;所以我们也调成黄色。 我们尝试滑动Slider里面的value。 a.滑动前。 b.滑动一半。 c.滑动完。 从以上滑动va…

Cat(2):下载与安装

1 github源码下载 要安装CAT&#xff0c;首先需要从github上下载最新版本的源码。 官方给出的建议如下&#xff1a; 注意cat的3.0代码分支更新都发布在master上&#xff0c;包括最新文档也都是这个分支注意文档请用最新master里面的代码文档作为标准&#xff0c;一些开源网站…

8月14-15日上课内容 LVS负载均衡的群集

知识点&#xff1a; 本章结构: 企业群集概述 集群的含义&#xff1a; 1、群集的含义 ①、Cluster、集群、群集 ②、由多台主机构成&#xff0c;但对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与IP地址&#xff09;&#xff0c;相当于一台大型计算机。…

TypeScript相关面试题

typeScript 1.什么是TypeScript?是什么&#xff1f;特性&#xff1f;区别&#xff1f; 2.TypeScript数据类型&#xff1f;3.说说你对 TypeScript 中枚举类型的理解&#xff1f;应用场景&#xff1f;4.说说你对 TypeScript 中接口的理解&#xff1f;应用场景&#xff1f;使用方…

面试题. 分割链表

给你一个链表的头节点 head 和一个特定值 x &#xff0c;请你对链表进行分隔&#xff0c;使得所有 小于 x 的节点都出现在 大于或等于 x 的节点之前。 你不需要 保留 每个分区中各节点的初始相对位置。 示例 1&#xff1a; 输入&#xff1a;head [1,4,3,2,5,2], x 3 输出&a…

C#学习....

1.基础 //引用命名空间using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;//项目名或者命名空间 namespace _01_MY_First_Demo {//Program类class Program{//程序的主入口或者Main函数static void Main(S…

大模型是什么?大模型可以在哪些场景应用落地?

大模型是什么&#xff1f;大模型是指模型具有庞大的参数规模和复杂程度的机器学习模型。在深度学习领域&#xff0c;大模型通常是指具有数百万到数十亿参数的神经网络模型。 大模型是指模型具有庞大的参数规模和复杂程度的机器学习模型。在深度学习领域&#xff0c;大模型通常是…

实验三十、压控振荡电路的测量

一、题目 利用 Multisim 分析图1所示电路&#xff0c;测试各项指标参数。 图 1 压控振荡电路 图1\,\,压控振荡电路 图1压控振荡电路 二、仿真电路 仿真电路如图2(a)所示&#xff0c;其中 A 1 \textrm A_1 A1​ 采用通用型集成运放 LM324AJ&#xff0c; A 2 \textrm A_2 A2​…

Java进阶(2)——结合源码深入理解final关键字,修饰数据,方法,类

目录 引出深入理解final关键字final修饰数据基本数据类型对象的引用空白final final修饰方法final修饰类总结 引出 1.在java源码中较多使用final修饰数据&#xff0c;比如ArrayList的初始长度&#xff1b; 2.final关键字修饰对象的引用的特点理解&#xff1b; 3.了解一下空白fi…

地址在数据线和地址线上怎么传?

如下图所示&#xff0c;对于地址总线&#xff0c;其传输方向是单向的&#xff0c;是只能由CPU发出&#xff0c;即只能用于CPU选择主存地址或I/O端口地址&#xff0c;并不能从主存或IO端口发到CPU。 相关题目&#xff1a; 在系统总线的数据线上&#xff0c;不可能传输的是&am…

试卷去痕迹app分享,轻松擦除答案痕迹

在考试中&#xff0c;不小心写错答案是常有的事情。如果你是用铅笔写的&#xff0c;那么你可以直接用橡皮擦擦除。但如果你是用钢笔或圆珠笔写的&#xff0c;该怎么办呢&#xff1f;现在有一些APP可以帮助你擦除答案&#xff0c;以下是一些值得尝试的APP分享。 1.拍试卷 拍试卷…

堆叠聚合模型与单独的逻辑回归模型处理非平衡数据的比较

堆叠聚合模型与单独的逻辑回归模型处理非平衡数据的比较 堆叠聚合模型的设计是通过训练多个模型&#xff0c;然后使用原模型&#xff0c;将多个模型的输出结果整合在一起以实现更准确的预测。这叠聚合模型在多个临床场景上都表现出优于单一模型的效能[1]。是构建临床预测模型过…

java之juc二

JMM 请你谈谈对Volatile的理解 Volatile是jvm提供的轻量级的同步机制&#xff08;和synchronized差不多&#xff0c;但是没有synchronized那么强大&#xff09; 保证可见性不保证原子性禁止指令重排 什么是JMM JMM&#xff1a;java内存模型&#xff0c;不存在的东西&#…

UE_移动端测试使用

教程流程&#xff1a; 参照官方文档-android篇&#xff1a; https://docs.unrealengine.com/5.1/zh-CN/android-development-requirements-for-unreal-engine/https://docs.unrealengine.com/5.1/zh-CN/android-development-requirements-for-unreal-engine/ AS下载&#xf…

点云滤波介绍

一、介绍 1、Filtering a PointCloud using a PassThrough filter 2、Downsampling a PointCloud using a VoxelGrid filter 3、Removing sparse outliers using StatisticalOutlierRemoval 4、Projecting points using a parametric model 数据集&#xff1a;链接&#x…

mysql mysql 容器 忽略大小写配置

首先能够连接上mysql&#xff0c;然后输入下面这个命令查看mysql是否忽略大小写 show global variables like %lower_case%; lower_case_table_names 0&#xff1a;不忽略大小写 lower_case_table_names 1&#xff1a;忽略大小写 mysql安装分为两种&#xff08;根据自己的my…

移动端身份证识别技术的应用,告别手动录入证件信息

随着移动互联网的的发展&#xff0c;越来越多的公司都推出了自己的移动APP&#xff0c;这些APP多数都涉及到个人身份证信息的输入认证&#xff08;即实名认证&#xff09;&#xff0c;如果手动去输入身份证号码和姓名&#xff0c;速度非常慢&#xff0c;且用户体验非常差。为了…

MotionBERT:人体运动表征

MotionBERT&#xff1a;A Unified Perspective on Learning Human Motion Representations解析 摘要1. 简介2. Related Work2.1 学习人体运动表征2.2 3D人体姿态估计2.3 基于骨骼的动作识别2.3 人体网格恢复 3. Method3.1 Overview3.2 网络架构Spatial BlockTemporal BlockDual…

MybatisPlus整合p6spy组件SQL分析

目录 p6spy java为什么需要 如何使用 其他配置 p6spy p6spy是一个开源项目&#xff0c;通常使用它来跟踪数据库操作&#xff0c;查看程序运行过程中执行的sql语句。 p6spy将应用的数据源给劫持了&#xff0c;应用操作数据库其实在调用p6spy的数据源&#xff0c;p6spy劫持到…