基于插件实现RabbitMQ“延时队列“

news2025/1/13 15:57:40

1.官网下载

在添加链接描述下载rabbitmq_delayed_message_exchange 插件,本文以v3.10.0为例
在这里插入图片描述

1.1.上传安装包

scp /Users/hong/资料/rabbitmq_delayed_message_exchange-3.10.0.ez  root@10.211.55.4:/usr/local/software

在这里插入图片描述
在这里插入图片描述

1.2.将文件移入RabbitMQ的安装目录下的plugins目录

mv rabbitmq_delayed_message_exchange-3.10.0.ez /usr/local/software/rabbitmq_server-3.10.0/plugins

在这里插入图片描述
在这里插入图片描述

1.3.安装插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

1.4 重启后验证

rabbitmq-server start

在这里插入图片描述

在这里插入图片描述

2.两种实现方式图解

在这里插入图片描述
在这里插入图片描述

3.基于插件的延迟队列配置类

在这里插入图片描述

package com.hong.springboot.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;
import java.util.Map;


/**
 * @Description: 延迟队列配置类
 * @Author: hong
 * @Date: 2024-02-25 20:19
 * @Version: 1.0
 **/
@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    /**
     * 基于延迟插件声明自定义交换机
     * @return
     */
    @Bean
    public CustomExchange delayedExchange(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-delayed-type","direct");
        /**
         * 声明自定义交换机
         * 第1个参数:交换机名称
         * 第2个参数:交换机类型
         * 第3个参数:是否需要持久化
         * 第4个参数:是否需要自动删除
         * 第5个参数:其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,map);
    }

    @Bean
    public Queue delayedQueue(){
        return  new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                  @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4.生产者发送消息

    /**
     * 基于延迟插件的发送消息
     * @param message
     * @param delayTime 延迟时间
     */
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, correlationData -> {
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , delayTime, message);
    }

5.消费者端代码

package com.hong.springboot.rabbitmq.consumer;

import com.hong.springboot.rabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description: 基于延迟插件的延迟消费者
 * @Author: hong
 * @Date: 2024-02-25 21:27
 * @Version: 1.0
 **/
@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayMessage(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列信息{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) , msg);
    }
}

http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 1/20000
http://localhost:8080/ttl/sendDelayMsg/hello rabbitmq 2/2000
在这里插入图片描述
基于插件的延迟与基于死信队列的结果恰好相反更符合预期,因此在实际项目中通常采用延迟插件方式来实现rabbitMQ的延迟队列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1470992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EasyRecovery2024个人免费版本电脑手机数据恢复软件下载

EasyRecovery是一款功能强大的数据恢复软件&#xff0c;能够帮助用户恢复丢失、删除、格式化或损坏的数据。无论是由于误操作、病毒攻击、硬盘故障还是其他原因导致的数据丢失&#xff0c;EasyRecovery都能提供有效的解决方案。 该软件支持从各种存储介质恢复数据&#xff0c;…

linux-并发通信

一.linux-tcp通信框架 1.基础框架 1.1 tcp 服务器框架 1.套接字 #include <sys/socket.h> int socket(int domain, int type, int protocol);
 返回的文件描述符可以指向当前的socket&#xff0c;后续通过对文件描述符的访问就可以配置这个socket 成功时返回文件…

云原生应用测试:挑战与方法

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;【Austin_zhai】 &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xff0c;分享行业相关最新信息。…

Linux进程 ----- 信号处理

前言 从信号产生到信号保存&#xff0c;中间经历了很多&#xff0c;当操作系统准备对信号进行处理时&#xff0c;还需要判断时机是否 “合适”&#xff0c;在绝大多数情况下&#xff0c;只有在 “合适” 的时机才能处理信号&#xff0c;即调用信号的执行动作。 一、信号的处理…

万界星空科技MES系统,实现数字化智能工厂

万界星空科技帮助制造型企业解决生产过程中遇到的生产过程不透明&#xff0c;防错成本高&#xff0c;追溯困难&#xff0c;品质不可控&#xff0c;人工效率低下&#xff0c;库存积压&#xff0c;交期延误等问题&#xff0c;从而达到“降本增效”的目标。打通各个信息孤岛&#…

Python性能测试框架Locust实战教程

01、认识Locust Locust是一个比较容易上手的分布式用户负载测试工具。它旨在对网站&#xff08;或其他系统&#xff09;进行负载测试&#xff0c;并确定系统可以处理多少个并发用户&#xff0c;Locust 在英文中是 蝗虫 的意思&#xff1a;作者的想法是在测试期间&#xff0c;放…

推荐一个 Obsidian 的 ChatGPT 插件

源码地址&#xff1a;https://github.com/nhaouari/obsidian-textgenerator-plugin Text Generator 是目前我使用过的最好的 Obsidian 中的 ChatGPT 功能插件。它旨在智能生成内容&#xff0c;以便轻松记笔记。它不仅可以在 Obsidian 中直接使用 ChatGPT&#xff0c;还提供了优…

Vue+SpringBoot打造衣物搭配系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 衣物档案模块2.2 衣物搭配模块2.3 衣物收藏模块 三、系统设计3.1 用例设计3.2 E-R图设计3.3 数据库设计3.3.1 衣物档案表3.3.2 衣物搭配表3.3.3 衣物收藏表 四、系统实现4.1 登录页4.2 衣物档案模块4.3 衣物搭配模块4.4…

力扣用例题:2的幂

此题的解题方法在于根据用例调整代码 bool isPowerOfTwo(int n) {if(n1){return true;}if(n<0){return false;}while(n>2){if(n%21){return false;}nn/2; }if(n1){return false;}return true;}

RDMA内核态函数ib_post_send()源码分析

最近调用linux内核下RDMA的Verb API ib_post_send()出现了问题&#xff0c;因此从源码分析一下这个函数的调用过程。 我使用的内核版本为5.15.0-94 这是函数ib_post_send的头文件定义&#xff0c;这个函数的意义是向发送队列提交发送请求&#xff0c;他会调用qp对应设备的post_…

C# EF Core迁移数据库

现象&#xff1a; 在CodeFirst时&#xff0c;先写字段与表&#xff0c;创建数据库后&#xff0c;再添加内容 但字段与表会变更&#xff0c;比如改名删除增加等 需求&#xff1a; 当表字段变更时&#xff0c;同时变更数据库&#xff0c;执行数据库迁移 核心命令 Add-Migrat…

一种基于道路分类特性的超快速车道检测算法

摘要&#xff1a; 本文介绍了一种新颖、简单但有效的车道检测公式。 车道检测是自动驾驶和高级驾驶员辅助系统 (ADAS) 的基本组成部分&#xff0c;在实际高阶驾驶辅助应用中&#xff0c;考虑车道保持、转向、限速等相关的控制问题&#xff0c;这种方式通常是通过受限的车辆计算…

java——多线程基础

目录 线程的概述多线程的创建方式一&#xff1a;继承Thread类方式二&#xff1a;实现Runnable接口方式三&#xff1a;利用Callable接口、FutureTask类来实现。Thread常用的方法 线程安全问题线程安全问题概述线程安全问题案例取钱案例描述模拟代码如下&#xff1a;执行结果 线程…

2024-02-25 Unity 编辑器开发之编辑器拓展7 —— Inspector 窗口拓展

文章目录 1 SerializedObject 和 SerializedProperty2 自定义显示步骤3 数组、List 自定义显示3.1 基础方式3.2 自定义方式 4 自定义属性自定义显示4.1 基础方式4.2 自定义方式 5 字典自定义显示5.1 SerizlizeField5.2 ISerializationCallbackReceiver5.3 代码示例 1 Serialize…

【Activiti7系列】Activi7简介和基于Spring Boot整合Activiti7(流程设计器)

本文将介绍Activiti7基础概念及基于Spring Boot整合Activiti7(流程设计器)的具体步骤。 作者&#xff1a;后端小肥肠 1. 前言 在企业级应用中&#xff0c;业务流程的管理和执行是至关重要的一环。Activiti7是一个强大的开源工作流引擎&#xff0c;它提供了灵活的流程定义、任务…

linux---安使用nginx

目录 一、编译安装Nginx 1、关闭防火墙&#xff0c;将安装nginx所需要软件包传到/opt目录下 ​编辑2、安装依赖包 3、创建运行用户、组 4、编译安装nginx 5、创建软链接后直接nginx启动 ​编辑 6、创建nginx自启动文件 ​编辑6.1 重新加载配置、设置开机自启并开启服务…

Kafka之Producer源码

Producer源码解读 在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢&a…

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件

unity发布webGL压缩方式的gzip&#xff0c;使用nginx作为web服务器时的配置文件 Unity版本是&#xff1a;2021.3 nginx的版本是&#xff1a;nginx-1.25.4 Unity发布webgl时的测试 设置压缩方式是gzip nginx配置文件 worker_processes 1;events {worker_connections 102…

SpringBoot实现热插拔AOP

热插拔AOP执行核心逻辑 Advice&#xff1a;“通知”&#xff0c;表示 Aspect 在特定的 Join point 采取的操作。包括 “around”, “before” and “after 等 Advice&#xff0c;大体上分为了三类&#xff1a;BeforeAdvice、MethodInterceptor、AfterAdviceAdvisor&#xff1a…

STM32存储左右互搏 QSPI总线FATS文件读写FLASH W25QXX

STM32存储左右互搏 QSPI总线FATS文件读写FLASH W25QXX FLASH是常用的一种非易失存储单元&#xff0c;W25QXX系列Flash有不同容量的型号&#xff0c;如W25Q64的容量为64Mbit&#xff0c;也就是8MByte。这里介绍STM32CUBEIDE开发平台HAL库Quad SPI总线实现FATS文件操作W25Q各型号…