使用RabbitMQ实现流量削峰填谷

news2025/3/14 20:11:50

原理

流量削峰填谷是指在面对突发的高流量时,通过消息队列将瞬时大量请求暂时存储起来,并逐步处理这些请求,从而避免系统过载。RabbitMQ 作为消息中间件可以很好地支持这一需求,特别是结合其延时消息插件(rabbitmq_delayed_message_exchange),可以在处理消息时加入延时逻辑,进一步优化系统的负载。

  1. 生产者发送消息:当有大量请求涌入时,生产者将这些请求转化为消息并发送到 RabbitMQ 队列中。
  2. 消费者异步处理:消费者从队列中异步获取消息并进行处理。由于消息是逐步被消费的,因此即使短时间内有大量的请求进入系统,也不会导致系统崩溃。
  3. 延时消息处理:对于某些需要延时处理的消息,可以通过 RabbitMQ 的延时消息插件来设置消息的延时时间,使得这些消息在指定的时间后才被消费者处理。

详细步骤

一、环境准备

1. 安装 RabbitMQ 并启用延时消息插件

首先确保你已经安装了 RabbitMQ,并启用了管理插件以便通过 Web 界面进行管理。如果还没有安装 RabbitMQ,可以通过 Docker 快速启动一个 RabbitMQ 实例:

# 启动 RabbitMQ 容器
docker run -d --name rabbitmq 
-p 5672:5672 -p 15672:15672 
rabbitmq:3-management //指向特定的 3.x 版本并包含管理插件

访问 http://localhost:15672 进入 RabbitMQ 的管理界面,默认用户名和密码都是 guest

接下来,安装并启用 RabbitMQ 延时消息插件:

# 下载插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez -P /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启 RabbitMQ 服务
systemctl restart rabbitmq-server

二、RabbitMQ 配置与队列设置

1. 创建队列和交换机

我们使用延时插件的延时交换机来处理延时消息。以下是使用命令行工具 rabbitmqadmin 创建队列和交换机的示例:

# 使用 RabbitMQ 管理插件或命令行工具创建队列
rabbitmqadmin declare queue name=order_queue durable=true arguments='{"x-max-length": 10000, "x-overflow": "drop-head"}'

# 创建延时交换机
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message durable=true arguments='{"x-delayed-type": "direct"}'

# 绑定队列到交换机
rabbitmqadmin declare binding source=delayed_exchange destination=order_queue routing_key=order_routing_key

三、Spring Boot 应用程序配置

1. pom.xml 添加依赖

在 Spring Boot 项目中添加 RabbitMQ 和相关依赖:

<dependencies>
    <!-- Spring Boot Starter for AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- Jackson for JSON processing -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>

    <!-- Lombok for reducing boilerplate code -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>
2. application.yml 配置文件

src/main/resources/application.yml 中配置 RabbitMQ 连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认模式
        concurrency: 10           # 消费者并发数
        max-concurrency: 20       # 最大消费者并发数
3. RabbitMQConfig.java 配置类

定义 RabbitMQ 的配置类,用于声明队列、交换机和绑定关系:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable("order_queue")
                .withArgument("x-max-length", 10000)  // 设置最大长度为10000
                .withArgument("x-overflow", "drop-head")  // 当队列满时丢弃最早的未消费消息
                .build();
    }

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding binding(Queue orderQueue, CustomExchange delayExchange) {
        return BindingBuilder.bind(orderQueue).to(delayExchange).with("order_routing_key").noargs();
    }
}

四、生产者发送消息

1. OrderProducer.java

编写生产者代码,将订单信息作为消息发送到 order_queue 中:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendOrder(String orderData) {
        System.out.println(" [x] Sent order: " + orderData);
        rabbitTemplate.convertAndSend("delayed_exchange", "order_routing_key", orderData);
    }

    public void sendDelayedOrder(String orderData, long delayTime) {
        System.out.println(" [x] Sent delayed order: " + orderData + " with delay: " + delayTime + " ms");

        // 设置消息后处理器,添加延迟时间
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setHeader("x-delay", delayTime);
            return message;
        };

        rabbitTemplate.convertAndSend("delayed_exchange", "order_routing_key", orderData, messagePostProcessor);
    }
}
2. OrderController.java

编写控制器以接收 HTTP 请求并调用生产者发送消息:

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class OrderController {

    private final OrderProducer orderProducer;

    @Autowired
    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @PostMapping("/submitOrder")
    public ResponseEntity<String> submitOrder(@RequestBody String orderData) {
        orderProducer.sendOrder(orderData);
        return ResponseEntity.ok("Order submitted successfully!");
    }

    @PostMapping("/submitDelayedOrder")
    public ResponseEntity<String> submitDelayedOrder(@RequestBody String orderData, @RequestParam long delayTime) {
        orderProducer.sendDelayedOrder(orderData, delayTime);
        return ResponseEntity.ok("Delayed order submitted successfully!");
    }
}

五、消费者处理消息

1. OrderConsumer.java

编写消费者代码,从队列中获取消息并处理订单:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class OrderConsumer {

    private static final Logger logger = LoggerFactory.getLogger(OrderConsumer.class);

    @RabbitListener(queues = "order_queue")
    public void receiveOrder(String orderData) {
        logger.info("Received order: {}", orderData);
        processOrder(orderData);
    }

    private void processOrder(String orderData) {
        logger.info("Processing order: {}", orderData);
        try {
            // 模拟订单处理逻辑
            Thread.sleep(2000);  // 模拟处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

六、测试与验证

1. 启动 RabbitMQ 服务
docker start rabbitmq
2. 编译并启动 Spring Boot 应用程序
mvn spring-boot:run
3. 使用 Postman 或 curl 测试订单提交接口
提交普通订
提交延时订单
curl -X POST 
http://localhost:8080/submitDelayedOrder?delayTime=5000 
-H "Content-Type: application/json" 
-d '{"user_id": 12345, "product_id": 67890, "quantity": 2, "price": 199.99}'


1. curl
curl 是一个用于在不同协议之间传输数据的命令行工具。它支持多种协议,包括HTTP、HTTPS、FTP等。在这个例子中,它被用来发送HTTP请求。
2. -X POST
-X 参数允许您指定HTTP请求的方法。这里使用的是POST方法,通常用于向服务器提交数据或更新资源。
3. http://localhost:8080/submitDelayedOrder?delayTime=5000
这是目标URL,表示请求将被发送到运行在本地机器(localhost)上的服务,监听端口为8080。路径/submitDelayedOrder指定了API的具体端点,而查询参数delayTime=5000可能指示该订单将会延迟5秒(5000毫秒)处理。
4. -H "Content-Type: application/json"
-H 标志用于添加HTTP头信息。这里的头信息指定了内容类型为application/json,意味着请求体中的数据将以JSON格式进行编码。
5. -d '{"user_id": 12345, "product_id": 67890, "quantity": 2, "price": 199.99}'
-d 参数用于指定HTTP请求的数据体。在这个例子中,数据是以JSON格式提供的,包含用户ID、产品ID、购买数量和单价的信息。

查看控制台输出,确认消息被发送到队列并由消费者处理。

七、优化与扩展

1. 动态调整消费者数量

为了动态调整消费者的数量,可以使用 Kubernetes 的 Horizontal Pod Autoscaler (HPA) 来根据队列长度自动扩展消费者实例。例如:

Kubernetes Deployment YAML 文件

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-consumer
spec:
  replicas: 3  # 初始副本数
  selector:
    matchLabels:
      app: order-consumer
  template:
    metadata:
      labels:
        app: order-consumer
    spec:
      containers:
      - name: order-consumer
        image: your-order-consumer-image
        env:
        - name: SPRING_RABBITMQ_HOST
          value: "rabbitmq-service"
        resources:
          requests:
            memory: "64Mi"
            cpu: "250m"
          limits:
            memory: "128Mi"
            cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
spec:
  selector:
    app: rabbitmq
  ports:
    - protocol: TCP
      port: 5672
      targetPort: 5672

自动扩展策略

使用 Horizontal Pod Autoscaler (HPA) 来根据队列长度自动扩展消费者实例:

kubectl autoscale deployment order-consumer --min=3 --max=10 --cpu-percent=80
2. 监控与报警

为了确保系统的稳定性,还需要对 RabbitMQ 进行监控和报警。可以使用 Prometheus 和 Grafana 来监控 RabbitMQ 的状态,并设置报警规则。

Prometheus Adapter 配置

为了监控 RabbitMQ 队列长度,需要安装 Prometheus Adapter:

# custom-metrics-config-map.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: adapter-config
data:
  config.yaml: |
    rules:
      - seriesQuery: '{__name__="rabbitmq_queue_messages", queue="order_queue"}'
        seriesFilters: []
        resources:
          overrides:
            namespace:
              resource: namespace
            pod:
              resource: pod
        name:
          matches: ""
          as: "rabbitmq_queue_length"
        metricsQuery: sum(rabbitmq_queue_messages{queue="order_queue"})

应用配置并更新 HPA 规则:

kubectl apply -f custom-metrics-config-map.yaml
kubectl apply -f hpa-custom-metrics.yaml

其中,hpa-custom-metrics.yaml 文件定义了如何基于自定义指标进行扩展:

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: order-consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: order-consumer
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Pods
    pods:
      metric:
        name: rabbitmq_queue_length
      target:
        type: AverageValue
        averageValue: 5000
3. 日志记录与分析

为了更好地排查问题和优化系统性能,建议启用日志记录和分析功能。可以使用 ELK Stack(Elasticsearch, Logstash, Kibana)来进行日志管理和分析。

OrderConsumer.java 中添加日志记录

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
public class OrderConsumer {

    private static final Logger logger = LoggerFactory.getLogger(OrderConsumer.class);

    @RabbitListener(queues = "order_queue")
    public void receiveOrder(String orderData) {
        logger.info("Received order: {}", orderData);
        processOrder(orderData);
    }

    private void processOrder(String orderData) {
        logger.info("Processing order: {}", orderData);
        try {
            // 模拟订单处理逻辑
            Thread.sleep(2000);  // 模拟处理时间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

使用 ELK Stack 进行日志分析

ELK Stack(Elasticsearch, Logstash, Kibana)是一个强大的日志管理和分析工具集。你可以将日志集中存储在 Elasticsearch 中,并通过 Kibana 进行可视化分析。

八、高级特性与最佳实践

1. 消息确认机制

为了确保消息可靠传递,可以使用手动确认机制。修改 OrderConsumer.java 中的消息监听器:

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")
public void receiveOrder(Channel channel, Message message, String orderData) throws IOException {
    logger.info("Received order: {}", orderData);
    try {
        processOrder(orderData);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}
2. 消息重试机制

当消费者处理失败时,可以设置重试机制。可以在 application.yml 中配置重试策略:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 1000
          max-attempts: 3
          max-interval: 10000
          multiplier: 2
3. 消息持久化

为了防止 RabbitMQ 重启导致消息丢失,可以将消息设置为持久化:

MessagePostProcessor messagePostProcessor = message -> {
    message.getMessageProperties().setHeader("x-delay", delayTime);
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
};

总结

通过上述步骤,我们实现了基于 RabbitMQ 的流量削峰填谷系统,并利用 RabbitMQ 的延时消息插件处理延迟消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2315056.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【TES817】基于XCZU19EG FPGA的高性能实时信号处理平台

板卡概述 TES817是一款基于ZU19EG FPGA的高性能实时信号处理平台&#xff0c;该平台采用1片高性能的FPGA&#xff1a;XCZU19EG-2FFVC1760I作为主处理器&#xff0c;FPGA的PL端外挂1组72位DDR4 SDRAM&#xff0c;用来实现超大容量数据缓存&#xff0c;FPGA的PS端外挂1组72位的D…

Python 进程与线程-分布式进程

目录 分布式进程 小结 分布式进程 在Thread和Process中&#xff0c;应当优选Process&#xff0c;因为Process更稳定&#xff0c;而且&#xff0c;Process可以分布到多台机器上&#xff0c;而Thread最多只能分布到同一台机器的多个CPU上。 Python的multiprocessing模块不但支…

初阶数据结构(C语言实现)——5.2 二叉树的顺序结构及堆的实现

1.二叉树的顺序结构及实现 1.1 二叉树的顺序结构 普通的二叉树是不适合用数组来存储的&#xff0c;因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构的数组来存储&#xff0c;需要注意的是这里的堆和操作系统…

ArcGIS Pro 车牌分区数据处理与地图制作全攻略

在大数据时代&#xff0c;地理信息系统&#xff08;GIS&#xff09;技术在各个领域都有着广泛的应用&#xff0c;而 ArcGIS Pro 作为一款功能强大的 GIS 软件&#xff0c;为数据处理和地图制作提供了丰富的工具和便捷的操作流程。 车牌数据作为一种重要的地理空间数据&#xf…

文件解析漏洞靶场通关合集

一、IIS解析漏洞 &#xff08;一&#xff09;iis6的目录解析漏洞(.asp目录中的所有文件都会被当做asp文件执行) 第一步&#xff1a;在网站根目录下创建了一个x.asp文件夹&#xff0c;并在文件夹中创建一个名为1.txt的文本文档 第二步&#xff1a;文本文档中输入<% now()%&…

塔能IVO-SCY智能机箱:点亮智慧城市的电力“智慧核芯”

在智慧城市建设的宏大征程中&#xff0c;稳定且智能的电力供应犹如坚固基石&#xff0c;支撑着各类设备高效、稳定地运行。塔能科技的IVO-SCY智能机箱&#xff0c;凭借其卓越的电源管理系统&#xff0c;当之无愧地成为了整个智慧城市电力保障体系中的“智慧心脏”&#xff0c;源…

【Oracle】19c数据库控制文件多路径配置

一、关闭数据库&#xff08;2个节点实例都要关闭&#xff09; srvctl stop database -d ora19c 二、多路径控制文件 打开其中一个节点到nomount状态 sqlplus / as sysdba startup nomount; [oracleora19c1:/home/oracle]$ rman target / RMAN> restore controlfile to…

Android Media3 ExoPlayer 开发全攻略:从基础集成到高级功能实战

目录 1. 引言 2. 添加依赖 3. 初始化ExoPlayer并播放视频 3.1 XML 布局 3.2 初始化ExoPlayer 4. 控制播放 5. 监听播放状态 6. 播放网络流&#xff08;HLS / DASH / RTSP&#xff09; 7. ExoPlayer 进阶 7.1 手动切换功能 7.2 DRM 保护 8. 释放播放器资源 9. 从旧…

Trae与Builder模式初体验

说明 下载的国际版&#xff1a;https://www.trae.ai/ 建议 要选新模型 效果 还是挺不错的&#xff0c;遇到问题反馈一下&#xff0c;AI就帮忙解决了&#xff0c;真是动动嘴&#xff08;打打字就行了&#xff09;&#xff0c;做些小的原型效果或演示Demo很方便呀&#xff…

如何通过修改hosts文件、启动Apache服务器、修改httpd.conf文件、配置虚拟主机、创建站点目录和文件等步骤来配置虚拟主机并发布PHP站点

Web服务器配置——修改hosts文件&#xff0c;将域名解析到本地 核心内容&#xff1a;介绍了如何通过修改hosts文件来实现将任意域名解析到本地&#xff0c;以便在开发过程中使用自定义域名访问本地站点。步骤&#xff1a; 打开位于C:\Windows\System32\drivers\etc的hosts文件…

政策助力,3C 数码行业数字化起航

政策引领&#xff0c;数字经济浪潮来袭 在当今时代&#xff0c;数字经济已成为全球经济发展的核心驱动力&#xff0c;引领着新一轮科技革命和产业变革的潮流。我国深刻洞察这一发展趋势&#xff0c;大力推进数字化经济发展战略&#xff0c;为经济的高质量发展注入了强大动力。 …

MySQL数据库复制

文章目录 MySQL数据库复制一、复制的原理二、复制的搭建1.编辑配置文件2.在主库上创建复制的用户3.获取主库的备份4.基于从库的恢复5.建立主从复制6.开启主从复制7.查看主从复制状态 MySQL数据库复制 MySQL作为非常流行的数据库&#xff0c;支撑它如此出彩的因素主要有两个&am…

101.在 Vue 3 + OpenLayers 使用 declutter 避免文字标签重叠

1. 前言 在使用 OpenLayers 进行地图开发时&#xff0c;我们经常需要在地图上添加点、线、区域等图形&#xff0c;并给它们附加文字标签。但当地图上的标注较多时&#xff0c;文字标签可能会发生重叠&#xff0c;导致用户无法清晰地查看地图信息。 幸运的是&#xff0c;OpenL…

uniapp移动端图片比较器组件,仿英伟达官网rtx光追图片比较器功能

组件下载地址&#xff1a;https://ext.dcloud.net.cn/plugin?id22609 已测试h5和微信小程序&#xff0c;理论支持全平台 亮点&#xff1a; 简单易用 使用js计算而不是resize属性&#xff0c;定制化程度更高 组件挂在后可播放指示线动画&#xff0c;提示用户可以拖拽比较图片…

深度学习与大模型-矩阵

矩阵其实在我们的生活中也有很多应用&#xff0c;只是我们没注意罢了。 1. 矩阵是什么&#xff1f; 简单来说&#xff0c;矩阵就是一个长方形的数字表格。比如你有一个2行3列的矩阵&#xff0c;可以写成这样&#xff1a; 这个矩阵有2行3列&#xff0c;每个数字都有一个位置&a…

搭建基于chatgpt的问答系统

一、语言模型&#xff0c;提问范式与 Token 1.语言模型 大语言模型&#xff08;LLM&#xff09;是通过预测下一个词的监督学习方式进行训练的&#xff0c;通过预测下一个词为训练目标的方法使得语言模型获得强大的语言生成能力。 a.基础语言模型 &#xff08;Base LLM&…

LuaJIT 学习(2)—— 使用 FFI 库的几个例子

文章目录 介绍Motivating Example: Calling External C Functions例子&#xff1a;Lua 中调用 C 函数 Motivating Example: Using C Data StructuresAccessing Standard System FunctionsAccessing the zlib Compression LibraryDefining Metamethods for a C Type例子&#xf…

解锁 AI 开发的无限可能:邀请您加入 coze-sharp 开源项目

大家好&#xff01;今天我要向大家介绍一个充满潜力的开源项目——coze-sharp&#xff01;这是一个基于 C# 开发的 Coze 客户端&#xff0c;旨在帮助开发者轻松接入 Coze AI 平台&#xff0c;打造智能应用。项目地址在这里&#xff1a;https://github.com/zhulige/coze-sharp&a…

全面解析与实用指南:如何有效解决ffmpeg.dll丢失问题并恢复软件正常运行

在使用多媒体处理软件或进行视频编辑时&#xff0c;你可能会遇到一个常见的问题——ffmpeg.dll文件丢失。这个错误不仅会中断你的工作流程&#xff0c;还可能导致软件无法正常运行。ffmpeg.dll是FFmpeg库中的一个关键动态链接库文件&#xff0c;负责处理视频和音频的编码、解码…

Python----计算机视觉处理(opencv:像素,RGB颜色,图像的存储,opencv安装,代码展示)

一、计算机眼中的图像 像素 像素是图像的基本单元&#xff0c;每个像素存储着图像的颜色、亮度和其他特征。一系列像素组合到一起就形成 了完整的图像&#xff0c;在计算机中&#xff0c;图像以像素的形式存在并采用二进制格式进行存储。根据图像的颜色不 同&#xff0c;每个像…