RabbitMQ系列(18)--RabbitMQ基于插件实现延迟队列

news2024/11/20 11:24:52

1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件

RabbitMQ官网下载插件的网址:https://www.rabbitmq.com/community-plugins.html

2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明) 

 3、把这个插件传输到服务器上

4、根据官网的指示把插件放到RabbitMQ指定的文件夹下

RabbitMQ官网指示安装插件步骤的网址:https://www.rabbitmq.com/installing-plugins.html

我这里安装RabbitMQ的系统是CentOS,所以放在

5、拷贝插件到指定的目录下

例:

cp rabbitmq_delayed_message_exchange-3.10.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.0/plugins/

效果图:

 6、安装延迟队列插件

输入以下命令安装延迟队列插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

效果图:

7、重启RabbitMQ

输入以下命令重启RabbitMQ

systemctl restart rabbitmq-server.service

效果图:

8、查看插件是否安装成功

 进入RabbitMQ的管理页面,进入Exchange的管理页面,新增Exchange,在Type里面可以看到x-delayed-message的选项,证明延迟队列插件安装成功

9、基于插件实现延迟队列的原理示意图

原先我们没下插件之前实现延迟队列是基于图下这种方式实现的

但我们下载插件后就能通过交换机延迟消息的方式来实现消息的延迟了(由步骤8可见,我们验证插件是否安装成功是从Exchange进去的,而不是从Queues进去的)

10、基于插件延迟队列的代码实现

(1)在config包里新建一个名为DelayedQueueConfig的类用于编写配置队列延迟的代码

代码如下:

package com.ken.springbootrqbbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";

    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "DELAYED_EXCHANGE";

    //交换机
    public static final String DELAYED_ROUTING_KEY = "delayed";

    //声明延迟队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>(3);
        //设置延迟类型
        arguments.put("x-delayed-type","direct");
        /**
         * 声明自定义交换机
         * 第一个参数:交换机的名称
         * 第二个参数:交换机的类型
         * 第三个参数:是否需要持久化
         * 第四个参数:是否自动删除
         * 第五个参数:其他参数
         */
        return new CustomExchange(DELAYED_QUEUE_NAME,"x-delayed-message",true,false,arguments);
    }

    //绑定队列和延迟交换机
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") Exchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

 (2)在SendMsgController类里写一个接口,让其能往延迟队列里发送消息

代码如下:

package com.ken.springbootrqbbitmq.controller;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * 发送延迟消息
 */
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {

    @Autowired(required = false)
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("normal_exchange","normal01","消息来着ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("normal_exchange","normal02","消息来着ttl为40s的队列:" + message);
    }

    @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的TTL消息给normal03队列:{}", new Date(),ttlTime,message);
        rabbitTemplate.convertAndSend("normal_exchange","normal03",message,msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

    /**
     * 给延迟队列发送消息
     * @param message
     * @param delayTime
     */
    @GetMapping("/sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒的消息给延迟队列:{}", new Date(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_QUEUE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
            //发送消息的时候延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

}

(3)在consumer包里新建一个名为DelayQueueConsumer的类用于编写消费延迟队列的消费者代码

效果图:

代码如下:

package com.ken.springbootrqbbitmq.consumer;

import com.ken.springbootrqbbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟队列消费者
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    private void receiveDelayQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间{},收到延迟队列的消息",new Date(),msg);
    }

}

(4)启动项目,往浏览器输入接口地址和参数,从而调用接口

[1]第一条消息

http://localhost:8080/ttl/sendDelayMsg/我是第一条消息/20000

[2]第二条消息

http://localhost:8080/ttl/sendDelayMsg/我是第二条消息/2000

效果图:

结论:基于测试发现在使用延迟插件的情况下,延迟时间短的消息会被先消费,这证明基于插件的延迟消息达到预期效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/726139.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式锁:RedLock

https://mp.weixin.qq.com/s/8XHvt8vw2pai-QIujM2oxQ 为什么利用 setnx 实现分布式锁只能使用于单Redis实例&#xff0c;不支持Redis集群&#xff1f; 参考&#xff1a; https://blog.csdn.net/weixin_45525272/article/details/126562119

17 回归法做图像变化检测——建立图的回归等式(matlab程序)

1.简述 回归是确定两种或两种以上的变量间相互依赖的定量关系的方法。映射到本文就是用我们图像数据去预测该图像上衣服的价格。说直白点&#xff0c;就是通过X与Y确立函数关系式&#xff0c;只不过X换成了图片罢了。 2.代码 %% 建立图的回归等式 image1imread(11.TIF); image…

单词拆分00

题目链接 单词拆分 题目描述 注意点 s 和 wordDict[i] 仅由小写英文字母组成wordDict 中的所有字符串 互不相同不要求字典中出现的单词全部都使用字典中的单词可以重复使用 解答思路 最初想到用深度右边遍历来做&#xff0c;实现了功能但是由于做了很多重复判断时间复杂度…

OpenCV读取两张图像将下半部分(从中间行开始)的所有像素值设置为0黑色

#include <iostream> #include <opencv2/imgcodecs.hpp> #include <opencv2/opencv.hpp> #include

AI智能语音机器人的功能和作用都有哪些?

智能语音机器人是一种能够使用自然语言处理技术和人工智能算法&#xff0c;通过声音与用户进行交互的机器人。它可以回答用户提出的问题、处理用户的投诉、提供产品或服务的相关信息等等。 实现一个智能语音机器人需要涉及多个技术领域&#xff0c;包括自然语言处理、语音识别…

C#,ALGLIB(01)——支持C#开发、跨平台的、优秀的、开源的数值分析和数据处理库ALGLIB简介

1 关于ALGLIB ALGLIB是一个跨平台的数值分析和数据处理库。 它支持五种编程语言&#xff08;C&#xff0c;C#&#xff0c;Java&#xff0c;Python&#xff0c;Delphi&#xff09;和几种操作系统&#xff08;Windows和POSIX&#xff0c;包括Linux&#xff09;。 ALGLIB的功能包…

十一、Docker网络(Docker network)

学习参考&#xff1a;尚硅谷Docker实战教程、Docker官网、其他优秀博客(参考过的在文章最后列出) 目录 一、什么是docker network1.1 介绍1.2 docker 启动后的网络情况1.3 能干什么&#xff1f; 二、Docker网络相关命令2.1 查看网络2.2 查看网络源数据2.3 创建网络2.4 删除网络…

【RuoYi-Cloud-Plus】学习笔记 08 - Sentinel(三)流量控制知识整理

文章目录 前言参考目录版本说明学习笔记1、概述2、基于调用关系的流量控制&#xff08;流控模式&#xff09;2.1、流量规则 FlowRule2.2、选择节点3、基于QPS/并发数的流量控制&#xff08;流控效果&#xff09;3.1、默认方式&#xff08;直接拒绝&#xff09;3.2、冷启动 Warm…

数通王国历险记之数据从发出到接收的细节介绍{封装与解封装}

系列文章目录 数通王国历险记&#xff08;5&#xff09; 目录 前言 一&#xff0c;数据封装的全过程 1.1&#xff0c;应用层的封装形式 1.2&#xff0c;传输层的封装形式 理解&#xff1a; 1.3&#xff0c;网络层的封装形式 理解&#xff1a; 1.4&#xff0c;数据链路层…

Sublime Text,灵感犹如星辰,点亮创作之路

目录 引言Sublime Text的优点Sublime Text的缺点总结 Sublime Text 官方网站 引言 在这个快速发展的数字时代&#xff0c;创作者们面临着越来越多的选择&#xff0c;以提高他们的生产力和工作效率。而在众多的编辑软件中&#xff0c; Sublime Text 独树一帜&#xff0c;被誉为创…

mac怎么把m4a转换成mp3?

mac怎么把m4a转换成mp3&#xff1f;大家都知道m4a是苹果公司专属的音频文件格式&#xff0c;因此它是可以直接在mac电脑上打开播放的&#xff0c;但这并不代表m4a音频文件可以在其他播放器或者播放设备上直接打开和使用&#xff0c;相信这个问题大家都遇到过&#xff0c;造成这…

【Vivado那些事儿】动态时钟的使用

时钟是每个 FPGA 设计的核心。如果我们正确地设计时钟架构、没有 CDC 问题并正确进行约束设计&#xff0c;就可以减少与工具斗争的时间。 但对于某些应用&#xff0c;我们希望能够更改某些IP中的时钟频率。其中一个例子是在图像处理管道中&#xff0c;输出分辨率可以动态变化&a…

单片机电机控制编程操作系统环境编程与裸机编程的比较

随着单片机技术的不断发展&#xff0c;单片机在电机控制领域中的应用越来越广泛。在单片机编程中&#xff0c;有两种主要的方法&#xff1a;操作系统编程和裸机编程。本文将比较这两种方法在电机控制中的优缺点。 操作系统编程 操作系统编程需要使用操作系统&#xff0c;例如F…

Python GUI编程利器:Tkinker中的滚动条和框架(8)

小朋友们好&#xff0c;大朋友们好&#xff01; 我是猫妹&#xff0c;一名爱上Python编程的小学生。 和猫妹学Python&#xff0c;一起趣味学编程。 今日目标 实现下面效果&#xff1a; 滚动条(Scrollbar类) 滚动条用于调整一些控件的可见范围&#xff0c;根据方向分为水平滚…

[LeetCode]2178.拆分成最多数目的偶整数之和

2178.拆分成最多数目的偶整数之和 题目 思路 首先&#xff0c;奇数是不可拆分成多数目的偶整数&#xff0c;这种情况返回一个空数组。 累加2以组合一个最多不同数目偶整数&#xff0c;当拆分的最后一个偶整数&#xff0c;总和大于原数时&#xff0c;将差值累加到最后一位偶整…

【算法设计与分析】工作分配问题——设计一个算法,对于给定的工作费用,计算最佳工作分配方案,使总费用达到最小。

目录 一、问题描述二、问题分析三、运行结果四、源代码 一、问题描述 设有n件工作分配给n个人。将工作i分配给第j个人所需要的费用是。试设计一个算法&#xff0c;为每个人都分配1件不同的工作&#xff0c;并使总费用达到最小。设计一个算法&#xff0c;对于给定的工作费用&…

LeetCode_BFS_中等_1466.重新规划路线

目录 1.题目2.思路3.代码实现&#xff08;Java&#xff09; 1.题目 n 座城市&#xff0c;从 0 到 n-1 编号&#xff0c;其间共有 n-1 条路线。因此&#xff0c;要想在两座不同城市之间旅行只有唯一一条路线可供选择&#xff08;路线网形成一颗树&#xff09;。去年&#xff0c…

Java Maven安装及环境配置教程

一、安装 1、安装包 apache-maven-3.6.3 安装包下载地址 2、下载安装包然后直接解压就行。 注意&#xff1a;文件的位置路径不能有中文。 二、环境配置 1、用户变量 双击Path&#xff0c;点击新建&#xff0c;将如下复制进去&#xff0c;然后点击确定&#xff1a; %MAVEN_HO…

轻量服务器域名无法解析怎么排查?

​  轻量服务器域名无法解析是指在DNS(域名系统)解析过程中&#xff0c;无法将域名转换为相应的IP地址。DNS可帮助该域名与代表该网站在互联网上的位置的数字 IP 地址相关联&#xff0c;帮助我们找到并连接到目标网站。因此&#xff0c;当我们无法解析域名时&#xff0c;就无…