RabbitMQ解决消息积压的方法

news2025/1/12 22:24:11

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // 设置并发消费者数量
        factory.setConcurrentConsumers(5);        // 初始消费者数量
        factory.setMaxConcurrentConsumers(20);    // 最大消费者数量
        
        // 动态调整消费者数量
        factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());
        
        return factory;
    }
}
@Service
public class ConsumerManagerService {
    @Autowired
    private RabbitListenerEndpointRegistry registry;
    
    public void adjustConsumerCount(String queueName, int count) {
        MessageListenerContainer container = 
            registry.getListenerContainer(queueName);
            
        if (container instanceof SimpleMessageListenerContainer) {
            SimpleMessageListenerContainer simpleContainer = 
                (SimpleMessageListenerContainer) container;
            simpleContainer.setConcurrentConsumers(count);
        }
    }
}

批量消费消息

@Service
public class BatchMessageConsumer {
    
    @RabbitListener(queues = "myQueue", containerFactory = "batchFactory")
    public void processBatch(List<Message> messages, Channel channel) {
        try {
            // 批量处理消息
            List<MessageDTO> dtos = messages.stream()
                .map(this::convertToDTO)
                .collect(Collectors.toList());
                
            // 批量保存到数据库
            batchSaveToDatabase(dtos);
            
            // 获取最后一条消息的deliveryTag
            long lastDeliveryTag = messages.get(messages.size() - 1)
                .getMessageProperties()
                .getDeliveryTag();
                
            // 批量确认
            channel.basicAck(lastDeliveryTag, true);
            
        } catch (Exception e) {
            // 批量拒绝
            handleBatchError(messages, channel);
        }
    }
}

// 配置批量消费
@Configuration
public class BatchConsumerConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory batchFactory(
            ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // 启用批量消费
        factory.setBatchListener(true);
        // 批量大小
        factory.setBatchSize(100);
        // 批量超时时间
        factory.setReceiveTimeout(1000L);
        
        return factory;
    }
}

临时队列转移

@Service
public class MessageTransferService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {
        while (true) {
            // 从源队列批量获取消息
            List<Message> messages = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                Message message = rabbitTemplate.receive(sourceQueue);
                if (message == null) break;
                messages.add(message);
            }
            
            if (messages.isEmpty()) break;
            
            // 转移到临时队列
            messages.forEach(msg -> 
                rabbitTemplate.send(tempQueue, msg));
        }
    }
}

// 临时队列的消费者
@Component
public class TempQueueConsumer {
    
    @RabbitListener(queues = "#{tempQueue.name}")
    public void processMessage(Message message) {
        // 使用更高效的处理方式
        fastProcessMessage(message);
    }
    
    @Bean
    public Queue tempQueue() {
        return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);
    }
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Scheduled(fixedRate = 60000) // 每分钟执行一次
    public void monitorQueueSize() {
        String queueName = "myQueue";
        
        // 获取队列信息
        Properties properties = rabbitTemplate.execute(channel -> 
            channel.queueDeclarePassive(queueName));
            
        // 获取消息数量
        int messageCount = properties.getMessageCount();
        
        // 检查消息堆积
        if (messageCount > threshold) {
            // 发送告警
            sendAlert(queueName, messageCount);
            
            // 动态调整消费者
            adjustConsumers(queueName, messageCount);
        }
    }
    
    private void adjustConsumers(String queueName, int messageCount) {
        // 根据消息数量动态调整消费者数量
        int newConsumerCount = calculateConsumerCount(messageCount);
        consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);
    }
}

分阶段实施

@Service
public class MessageHandlingStrategy {
    
    public void handleMessageBacklog() {
        // 1. 首先增加消费者数量
        adjustConsumerCount();
        
        // 2. 如果仍然堆积,启用批量处理
        if (isStillBacklogged()) {
            enableBatchProcessing();
        }
        
        // 3. 如果问题持续,使用临时队列
        if (isEmergency()) {
            transferToTemporaryQueue();
        }
    }
}

最后还有一个方法就是开启队列的懒加载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275676.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何优雅地绘制时序图

说到时序图&#xff0c;相信所有从事嵌入式开发的伙伴都非常熟悉&#xff0c;在各种元器件手册以及处理器说明书中&#xff0c;但凡涉及到通信、接口、交互等内容&#xff0c;都会涉及到时序图。时序图可以非常详细且明确地描述硬件及软件接口中各个信号的时序关系&#xff0c;…

SpringCloud系列教程:微服务的未来(十一)服务注册、服务发现、OpenFeign快速入门

本篇博客将通过实例演示如何在 Spring Cloud 中使用 Nacos 实现服务注册与发现&#xff0c;并使用 OpenFeign 进行服务间调用。你将学到如何搭建一个完整的微服务通信框架&#xff0c;帮助你快速开发可扩展、高效的分布式系统。 目录 前言 服务注册和发现 服务注册 ​编辑 …

WebGIS在应急灾害中对村庄、风景区、机场的影响范围应用-以日喀则市定日县地震为例

目录 前言 一、关于影响范围 1、震中距离5公里 2、震中20公里范围 3、20到80公里范围 二、空间查询知识 1、相关数据介绍 2、空间数据查询 三、前后端数据查询以及web可视化实现 1、后台API实现 2、WebGIS前端实现 四、Web成果展示 1、空间位置分析 2、包含风景区…

使用网页版Jupyter Notebook和VScode打开.ipynb文件

目录 正文 1、网页版Jupyter Notebook查看 2、VScode查看 因为总是忘记查看文件的网址&#xff0c;收藏了但分类众多每次都找不到……当个记录吧&#xff08;/捂脸哭&#xff09;&#xff01; 正文 此处以gitub中的某个仓库为例&#xff1a; https://github.com/INM-6/mu…

景联文科技提供高质量多模态数据处理服务,驱动AI新时代

在当今快速发展的AI时代&#xff0c;多模态数据标注成为推动人工智能技术进步的关键环节。景联文科技作为行业领先的AI数据服务提供商&#xff0c;专注于为客户提供高质量、高精度的多模态数据标注服务&#xff0c;涵盖图像、语音、文本、视频及3D点云等多种类型的数据。通过专…

Python在Excel工作表中创建数据透视表

在数据处理和分析工作中&#xff0c;Excel作为一个广泛使用的工具&#xff0c;提供了强大的功能来管理和解析数据。当面对大量复杂的数据集时&#xff0c;为了更高效地总结、分析和展示数据&#xff0c;创建数据透视表成为一种不可或缺的方法。通过使用Python这样的编程语言与E…

django基于Python的电影推荐系统

Django 基于 Python 的电影推荐系统 一、系统概述 Django 基于 Python 的电影推荐系统是一款利用 Django 框架开发的智能化应用程序&#xff0c;旨在为电影爱好者提供个性化的电影推荐服务。该系统通过收集和分析用户的观影历史、评分数据、电影的属性信息&#xff08;如类型…

GPT-SoVITS学习01

1.什么是TTS TTS&#xff08;Text-To-Speech&#xff09;这是一种文字转语音的语音合成。类似的还有SVC&#xff08;歌声转换&#xff09;、SVS&#xff08;歌声合成&#xff09;等。 2.配置要求 GPT-SoVITS对电脑配置有较高的要求。 训练&#xff1a;对于Windows电脑&#…

计算机网络 (36)TCP可靠传输的实现

前言 TCP&#xff08;传输控制协议&#xff09;是一种面向连接的、可靠的、基于字节流的传输层通信协议。TCP通过多种机制实现可靠传输&#xff0c;这些机制主要包括连接管理、序列号和确认应答机制、重传机制、流量控制、拥塞控制等。 一、连接管理 TCP使用三次握手&#xff0…

视频编辑最新SOTA!港中文Adobe等发布统一视频生成传播框架——GenProp

文章链接&#xff1a;https://arxiv.org/pdf/2412.19761 项目链接&#xff1a;https://genprop.github.io 亮点直击 定义了一个新的生成视频传播问题&#xff0c;目标是利用 I2V 模型的生成能力&#xff0c;将视频第一帧的各种变化传播到整个视频中。 精心设计了模型 GenProp&…

make工程管理器与Makefile

目录 一、介绍 1、make工程管理器 2、Makefile 二、Makefile语法规则 1、Makefile语法格式 2、Makefile中特殊处理与伪目标 3、变量、规则与函数 (1)自定义变量使用示例 (2)自动变量使用示例 一、介绍 1、make工程管理器 定义&#xff1a; make是一个命令工具&…

【git】-2 分支管理

目录 一、分支的概念 二、查看、创建、切换分支 1、查看分支-git branch 2、创建分支- git branch 分支名 3、切换分支- git checkout 分支名 三、git指针 -实现分支和版本间的切换 四、普通合并分支 git merge 文件名 五、冲突分支合并 ​​​​​​【git】-初始gi…

3DGabor滤波器实现人脸特征提取

import cv2 import numpy as np# 定义 Gabor 滤波器的参数 kSize 31 # 滤波器核的大小 g_sigma 3.0 # 高斯包络的标准差 g_theta np.pi / 4 # Gabor 函数的方向 g_lambda 10.0 # 正弦波的波长 g_gamma 0.5 # 空间纵横比 g_psi np.pi / 2 # 相位偏移# 生成 Gabor 滤…

接口项目架构流程图-thinkphp6-rabbitmq

一、整个系统流程 第一步&#xff1a;平台在创建好后开启消息队列&#xff1b; 第二步&#xff1a;平台为需要服务的客户开好账号并传输对应的公私钥文件&#xff1b; 第三步&#xff1a;客户通过平台分享的接口连接地址采用开户时的手机号查看&#xff1b; 第四步&#xff1a;…

Vue3初学之组件通信

一起进行学习&#xff1a; 在 Vue 3 中&#xff0c;组件通信是一个非常重要的概念&#xff0c;它决定了如何在父子组件之间、兄弟组件之间以及跨层级组件之间传递数据和事件。以下是 Vue 3 中常见的组件通信方式&#xff1a; 父子组件通信 1.1 父组件向子组件传递数据&#x…

2025年第三届“华数杯”国际大学生数学建模竞赛【A题】Problem A: Can He Swim Faster

问题1&#xff1a;运动员的出色比赛表现通常得益于艰苦且持续的专业训练&#xff0c;这不仅提升了游泳技能&#xff0c;也增强了生理储备。比赛中&#xff0c;科学控制游泳速度是关键&#xff0c;包括保持个人节奏、寻求最佳身体状态节奏和合理分配体力。针对自由泳项目&#x…

【计算机网络】lab4 Ipv4(IPV4的研究)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;计算机网络_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2.…

ELFK日志采集实战

一、日志分析概述 日志分析是运维工程师解决系统故障&#xff0c;发现问题的主要手段 日志主要包括系统日志、应用程序日志和安全日志 系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因 经常分析日志可以了解服务器的负荷&#x…

辅助--Inspector

辅助–Inspector 1.Introduction This manual explains how to use the Inspector. 1.1.Overview Inspector is a Qt-based library that provides functionality to interactively inspect low-level content of the OCAF data model, OCCT viewer and Modeling Data. Thi…

如何播放视频文件

文章目录 1. 概念介绍2. 使用方法2.1 实现步骤2.2 具体细节3. 示例代码4. 内容总结我们在上一章回中介绍了"如何获取文件类型"相关的内容,本章回中将介绍如何播放视频.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 播放视频是我们常用的功能,不过Flutter官方…