rabbitmq 延时队列

news2025/2/26 15:11:01

要使用 RabbitMQ Delayed Message Plugin 实现延时队列,首先需要确保插件已安装并启用。以下是实现延时队列的步骤和代码示例。

1. 安装 RabbitMQ Delayed Message Plugin

首先,确保你的 RabbitMQ 安装了 rabbitmq-delayed-message-exchange 插件。你可以通过以下命令安装和启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2. 创建交换机和队列

你需要创建一个 延时交换机x-delayed-message)和一个普通队列。我们将在发送消息时指定延迟时间。

3. 发送延迟消息的代码示例

假设你已经在 RabbitMQ 中设置了延时交换机。以下是使用 Java 和 Spring AMQP 发送延迟消息的代码示例。

Maven 依赖

确保你的项目中已经添加了 Spring AMQP 相关依赖:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.4.6</version>  <!-- 适配你使用的版本 -->
</dependency>

配置延时交换机和队列

你需要配置一个 延时交换机队列,并设置消息的延迟时间。

@Configuration
public class RabbitConfig {

    // 创建一个延时交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        // 设定交换机类型为延时交换机
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, arguments);
    }

    // 创建队列
    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed-queue", true);
    }

    // 将队列绑定到延时交换机
    @Bean
    public Binding binding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed.routing.key").noargs();
    }
}

发送延迟消息

在消息发送时,你需要通过设置消息的属性来指定延迟时间。可以使用 AMQP.BasicProperties 来设置消息的 x-delay 属性,这个值表示延迟的时间(单位:毫秒)。

@Service
public class MessageProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendDelayedMessage(String message, int delayMilliseconds) {
        // 创建消息属性,并设置延迟时间
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(delayMilliseconds);  // 设置延迟时间(毫秒)
        Message messageObj = new Message(message.getBytes(), messageProperties);

        // 发送消息到延时交换机
        amqpTemplate.send("delayed-exchange", "delayed.routing.key", messageObj);
        System.out.println("Sent delayed message: " + message + " with delay: " + delayMilliseconds + " ms");
    }
}

在上面的代码中,setDelay(delayMilliseconds) 方法设置了延迟时间。这个时间会告诉 RabbitMQ 延迟多久后将消息投递到队列中。

监听消息

最后,你需要设置消费者来监听这个延时队列,并处理接收到的消息:

@Service
public class MessageConsumer {

    @RabbitListener(queues = "delayed-queue")
    public void consume(String message) {
        System.out.println("Received delayed message: " + message);
    }
}

4. 测试发送延迟消息

现在,你可以在业务逻辑中调用 sendDelayedMessage 方法发送延时消息。例如,发送一条延迟 10 秒的消息:

@Autowired
private MessageProducer messageProducer;

public void testDelay() {
    // 发送一条延迟10秒的消息
    messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 启动和测试

  1. 启动你的 Spring Boot 应用。
  2. 调用 testDelay 方法发送延迟消息。
  3. 你将看到消息在队列中延迟指定的时间(例如,10秒)后被消费。

关键点:

  • 通过 x-delayed-message 交换机,设置 x-delayed-typedirecttopic,根据需求选择交换机类型。
  • 使用 setDelay 方法设置延迟时间,单位是毫秒。
  • RabbitMQ 会在指定的时间到达后,将消息投递到目标队列。

总结

通过 RabbitMQ Delayed Message Plugin,你可以非常方便地实现延时队列。只需要创建一个支持延迟的交换机,并通过设置 x-delay 属性来指定消息的延迟时间。

配置并行消费

要启动多个消费者并并行处理 RabbitMQ 中的消息,通常可以通过 Spring AMQPRabbitListener 实现。这将帮助你加快消费速度,提升系统的吞吐量。下面是如何启动多个消费者进行并行消费的代码修改步骤:

1. 配置多个消费者

Spring AMQP 支持使用 @RabbitListener 注解启动多个消费者实例。通过配置 并行消费者,Spring 会为每个消费者实例分配一个独立的线程来处理消息。

2. 增加消费者并发处理能力

为了实现并发消费,我们可以通过以下几种方式:

  • 使用 @RabbitListener 启动多个消费者实例:每个 @RabbitListener 注解的消费者都会独立地消费队列中的消息。
  • 配置 SimpleMessageListenerContainer 的并发设置:通过配置 SimpleMessageListenerContainer,你可以设置多个消费者同时监听队列,从而提高并发消费能力。

3. 代码修改示例

1) 创建并发消费者

首先,创建一个通用的消息监听器,并将 @RabbitListener 注解应用于多个消费者实例上。你可以通过 @RabbitListener 注解中的 concurrency 属性来设置消费者的并发数量。

@Service
public class ConcurrentMessageConsumer {

    // 使用 @RabbitListener 注解配置多个并发消费者,默认启动2个消费者
    @RabbitListener(queues = "delayed-queue", concurrency = "3-5")  // 设置并发消费者数目 3-5 个消费者
    public void consume(String message) {
        System.out.println("Thread: " + Thread.currentThread().getName() + " - Received message: " + message);
    }
}

在上面的代码中,concurrency = "3-5" 表示 Spring 会启动 3 到 5 个消费者实例来并行处理队列中的消息。消费者数目是动态的,具体数量由 Spring 的消息监听容器控制。

  • "3-5" 表示最低启动 3 个消费者,最多启动 5 个消费者来并行处理消息。
  • 如果消息量很大,Spring 会动态调整消费者的数量,以适应系统的负载。

2) 配置并发消费者的线程池(可选)

为了更好地控制消费者的线程池和消息消费的并发度,你可以通过配置 SimpleMessageListenerContainer 来定义更具体的并发设置。例如,你可以在 Spring 配置类中手动定义消费者容器。

@Configuration
public class RabbitConfig {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,
                                                                 MessageListener messageListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("delayed-queue");
        container.setMessageListener(messageListener);

        // 设置并发消费的最小值和最大值
        container.setConcurrentConsumers(3);  // 最小3个消费者
        container.setMaxConcurrentConsumers(10);  // 最大10个消费者

        return container;
    }
}
  • setConcurrentConsumers(3):设置最小消费者数量。
  • setMaxConcurrentConsumers(10):设置最大消费者数量,Spring 会根据消息的积压情况动态调整消费者的数量。

3) 控制消费者的负载和流量

如果你希望更精细地控制消息消费的负载,可以使用 @RabbitListener 注解中的 acknowledgeMode 设置来调整消息确认模式,确保消息被正确地处理和确认。例如,使用 MANUAL 手动确认消费:

@RabbitListener(queues = "delayed-queue", ackMode = "MANUAL")
public void consumeWithAck(Message message, Channel channel) throws IOException {
    try {
        // 消费消息
        System.out.println("Consumed message: " + new String(message.getBody()));
        // 手动确认消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理异常,手动拒绝消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

通过手动确认,你可以更好地控制消息的确认和失败重试机制,防止在消费者挂掉的情况下丢失消息。

4. 测试并发消费

你可以通过调用 testDelay 方法或者其他方式,发送延时消息来验证并发消费是否生效。发送的消息会被多个消费者并行处理,输出的日志中会显示哪个线程消费了哪个消息,从而验证消费者的并发能力。

@Autowired
private MessageProducer messageProducer;

public void testDelay() {
    // 发送一条延迟10秒的消息
    messageProducer.sendDelayedMessage("Hello, delayed world!", 10000);
}

5. 总结

通过配置多个并发消费者来加速消息消费,有以下几个要点:

  • 使用 @RabbitListener(concurrency = "3-5") 注解来启动多个并发消费者。
  • 配置 SimpleMessageListenerContainer 来更灵活地管理消费者线程池。
  • 使用手动确认模式(ackMode = "MANUAL")可以更精细地控制消息确认和失败重试。

通过这些配置,你可以根据消息量的大小和系统负载动态调整消费者数量,以达到加快消费速度的目的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2306427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ReentrantLock 用法与源码剖析笔记

&#x1f4d2; ReentrantLock 用法与源码剖析笔记 &#x1f680; 一、ReentrantLock 核心特性 &#x1f504; 可重入性&#xff1a;同一线程可重复获取锁&#xff08;最大递归次数为 Integer.MAX_VALUE&#xff09;&#x1f527; 公平性&#xff1a;支持公平锁&#xff08;按等…

java进阶专栏的学习指南

学习指南 java类和对象java内部类和常用类javaIO流 java类和对象 类和对象 java内部类和常用类 java内部类精讲Object类包装类的认识String类、BigDecimal类初探Date类、Calendar类、SimpleDateFormat类的认识java Random类、File类、System类初识 javaIO流 java IO流【…

架构思维:架构的演进之路

文章目录 引言为什么架构思维如此重要架构师的特点软件架构的知识体系如何提升架构思维大型互联网系统架构的演进之路一、大型互联网系统的特点二、系统处理能力提升的两种途径三、大型互联网系统架构演化过程四、总结 引言 在软件开发行业中&#xff0c;有很多技术人可能会问…

vue3:vue3项目安装并引入Element-plus

一、安装Element-plus 1、安装语句位置 安装 | Element Plushttps://element-plus.org/zh-CN/guide/installation.html根据所需进行安装&#xff0c;这里使用npm包 2、找到项目位置 找到项目位置&#xff0c;在路径上输入cmd回车打开“运行”窗口 输入安装语句回车完成安装 …

java.2.25

1. 注释 ​ 注释是对代码的解释和说明文字。 Java中的注释分为三种&#xff1a; 单行注释&#xff1a; // 这是单行注释文字多行注释&#xff1a; /* 这是多行注释文字 这是多行注释文字 这是多行注释文字 */ 注意&#xff1a;多行注释不能嵌套使用。文档注释&#xff1a;…

VScode 开发

目录 安装 VS Code 创建一个 Python 代码文件 安装 VS Code VSCode&#xff08;全称&#xff1a;Visual Studio Code&#xff09;是一款由微软开发且跨平台的免费源代码编辑器&#xff0c;VSCode 开发环境非常简单易用。 VSCode 安装也很简单&#xff0c;打开官网 Visual S…

A Large Recurrent Action Model: xLSTM Enables Fast Inference for Robotics Tasks

奥地利林茨约翰开普勒大学机器学习研究所 ELLIS 小组&#xff0c;LIT 人工智能实验室奥地利林茨 NXAI 有限公司谷歌 DeepMind米拉 - 魁北克人工智能研究所 摘要 近年来&#xff0c;强化学习&#xff08;Reinforcement Learning, RL&#xff09;领域出现了一种趋势&#xff0c;…

计算机毕业设计SpringBoot+Vue.js学科竞赛管理系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

Deep Seek-编码器

1. DeepSeek Coder 简介 DeepSeek Coder 由一系列代码语言模型组成,每个模型都在 2T 令牌上从头开始训练,其中 87% 的代码和 13% 的自然语言在中英文中组成。我们提供各种大小的代码模型,从 1B 到 33B 版本。每个模型都通过采用 16K 的窗口大小和额外的填空任务在项目级代码…

Android平台轻量级RTSP服务模块技术对接说明

一、技术背景 随着内网无纸化办公、电子教室等应用场景对超低延迟音视频传输需求的日益增长&#xff0c;为避免用户或开发者单独部署 RTSP 或 RTMP 服务&#xff0c;大牛直播 SDK 推出了轻量级 RTSP 服务 SDK。该 SDK 能够将本地音视频数据&#xff08;如摄像头、麦克风等&…

RoCEv2 高性能传输协议与 Lossless 无损网络

目录 文章目录 目录RoCERoCEv2 v.s. IBRoCEv2 协议栈RoCEv2 需要 Lossless NetworkLossless Network 拥塞控制技术网络拥塞的原因PFC 基于优先级的流量控制PFC Unfairness &#xff08;带宽分配不公平&#xff09;的问题PFC HOL&#xff08;队头拥塞&#xff09;的问题PFC Dead…

联想 SR590 服务器 530-8i RAID 控制器更换损坏的硬盘

坏了的硬盘会自动亮黄灯。用一个空的新盘来替换&#xff0c;新盘最好不要有东西。但是有东西可能也没啥&#xff0c;因为我看 RAID 控制器里有格式化的选项 1. 从 IPMI 把服务器关机&#xff0c;电源键进入绿色闪烁状态 2. 断电&#xff0c;推开塑料滑块拉出支架&#xff0c;…

城电科技|会追日的智能花,光伏太阳花开启绿色能源新篇章

当艺术与科技相遇&#xff0c;会碰撞出怎样的火花&#xff1f;城电科技推出的光伏太阳花&#xff0c;以其独特的设计与智能化的功能&#xff0c;给出了答案。这款产品不仅具备太阳能发电的实用功能&#xff0c;更是一件充满科技属性的艺术性光伏产品&#xff0c;吸引了广泛关注…

基于YOLO11深度学习的苹果叶片病害检测识别系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

多智能体框架

多个不同的角色的Agent&#xff0c;共同完成一份复杂的工作。由一个统筹管理的智能体&#xff0c;自主规划多个智能体分别做什么&#xff0c;以及执行的顺序。 agent 应该包含的属性 执行特定任务 根据其角色和目标做出决策 能够使用工具来实现目标 与其他代理沟通和协作 保留…

C#中级教程(1)——解锁 C# 编程的调试与错误处理秘籍

一、认识错误&#xff1a;编程路上的 “绊脚石” 在 C# 编程中&#xff0c;错误大致可分为两类&#xff1a;语法错误和语义错误&#xff08;逻辑错误&#xff09;。语法错误就像是写作文时的错别字和病句&#xff0c;编译器一眼就能识别出来&#xff0c;比如变量名拼写错误、符…

Jmeter接口并发测试

Apache JMeter 是一款开源的性能测试工具&#xff0c;广泛用于接口并发测试、负载测试和压力测试。以下是使用 JMeter 进行接口并发测试的详细步骤&#xff1a; 一、准备工作 安装 JMeter 下载地址&#xff1a;Apache JMeter 官网 确保已安装 Java 环境&#xff08;JMeter 依…

MySQL-增删改查

一、Create(创建) &#x1f4d6; 语法&#xff1a; INSERT INTO table_name(value_list); 当我们使用表的时候&#xff0c;就可以使用这个语法来向表中插入元素~ 我们这边创建一个用于示范的表(Student)~ create table student( id int, name varchar(20), chinese int, math…

开源堡垒机 JumpServer 社区版实战教程:发布机的配置与Website资产配置使用

文章目录 开源堡垒机 JumpServer 社区版实战教程&#xff1a;发布机的配置与Website资产配置使用一、功能简述二、应用发布机2.1 版本要求2.2 创建应用发布机2.2.1 通过WinRM的协议进行应用发布机的创建2.2.2 通过OpenSSH的协议进行应用发布机的创建2.2.2.1 下载OpenSSH2.2.2.2…