(四)RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

news2025/1/23 7:26:50

Lison <dreamlison@163.com>, v1.0.0, 2023.06.23

RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列

文章目录

  • RabbitMQ高级特性(消费端限流、利用限流实现不公平分发、消息存活时间、优先级队列
    • 消费端限流
    • 利用限流实现不公平分发
    • 消息存活时间
    • 优先级队列

消费端限流

在这里插入图片描述

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

2、消费端配置限流机制

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
       prefetch: 5

3、消费者监听队列

@Component
public class QosConsumer{
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        System.out.println(new String(message.getBody()));
        // 2.模拟业务处理
        Thread.sleep(3000);
        // 3.签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
   }
}

利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多

1、生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

端配置不公平分发

spring:
 rabbitmq:
   host: 127.0.0.1
   port: 5672
   username: admin
   password: 1233456
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
     }
    
    // 消费者2
    @RabbitListener(queues = "my_queue")
    public void listenMessage2(Message message, Channel channel) throws Exception
     {
        //1.获取消息
        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(3000);// 消费者2处理慢
        //3. 手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1、在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {
    private final String EXCHANGE_NAME="my_topic_exchange2";
    private final String QUEUE_NAME="my_queue2";
    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //队列的每条消息存活10s
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、生产者批量生产消息,测试存活时间

@Test
public void testSendBatch2() throws InterruptedException {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
       rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
   }
}

设置单条消息存活时间

@Test
public void testSendMessage() {
    //设置消息属性
    MessageProperties messageProperties = new MessageProperties();
    //设置存活时间
    messageProperties.setExpiration("10000");
    // 创建消息对象
    Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
    // 发送消息
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。

2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration(“10000”);
// 3.创建消息对象
Message message = new Message((“send message…” +i).getBytes(),messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, message);
} else {
rabbitTemplate.convertAndSend(“my_topic_exchange”, “my_routing”, “sendmessage…” + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。

优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

1、创建队列和交换机

@Configuration
public class RabbitConfig3 {
    private final String EXCHANGE_NAME="priority_exchange";
    private final String QUEUE_NAME="priority_queue";
    // 1.创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               .topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
               .maxPriority(10)
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2、编写生产者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i为5时消息的优先级较高
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(9);
            Message message = new Message(("send message..." +i).getBytes(StandardCharsets.UTF_8), messageProperties);
            rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
           rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
       }
   }
}

3、编写消费者

@Component
public class PriorityConsumer {
    @RabbitListener(queues = "priority_queue")
    public void listenMessage(Message message, Channel channel) throws Exception
     {
        //获取消息
        System.out.println(new String(message.getBody()));
        //手动签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/789663.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(一)认识InfluxDB

以下内容来自 尚硅谷&#xff0c;写这一系列的文章&#xff0c;主要是为了方便后续自己的查看&#xff0c;不用带着个PDF找来找去的&#xff0c;太麻烦&#xff01; 第 1 章 认识InfluxDB 1.1 InfluxDB的使用场景 InfluxDB是一种时序数据库&#xff0c;时序数据库通常被用在监…

MySQL笔记——表的修改查询相关的命令操作

系列文章目录 MySQL笔记——MySQL数据库介绍以及在Linux里面安装MySQL数据库&#xff0c;对MySQL数据库的简单操作&#xff0c;MySQL的外接应用程序使用说明 文章目录 系列文章目录 一 表的修改操作 1.1 修改表的名字 1.2 添加一列score 1.3 修改列名称 1.4 修改新增列的…

containerd

Containerd是一个开源的容器运行时&#xff08;Container Runtime&#xff09;&#xff0c;它是Kubernetes和Docker等容器平台的基础组件之一。它旨在提供容器的生命周期管理和基本的运行时功能&#xff0c;使得容器的创建、启动、停止、删除等操作变得简单且高效。 Container…

Apipost使用教程

Apipost是一款集API调试、生成文档、Mock、测试于一体的协同工具。单个工具可以同时满足接口测试、生成/分享文档、Mock、流程测试等功能&#xff0c;还有超实用的多人多角色间实时协作的功能。将前端、后端、测试三种角色串联起来&#xff0c;从而实现工作流程无缝衔接、提高研…

C#中简单Winform程序编译(待验证)

1、文件架构 2、MainWindow.xaml <Window x:Class"WpfApp1.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schemas.microsoft.…

vue中的异步请求Axios(个人学习笔记五)

目录 友情提醒第一章、传统的jQuery方式获取数据1.1&#xff09;后端controller层代码1.2&#xff09;传统的jQuery获取数据1.3&#xff09;使用vue对象和jQuery获取异步数据 第二章、使用Axios获取数据2.1&#xff09;axios简介2.2&#xff09;axios两种使用方式2.3&#xff0…

WEB:easyphp

背景知识 php弱类型比较 MD5碰撞 题目 进行代码审计 <?php highlight_file(__FILE__); $key1 0;//值赋值 $key2 0;$a $_GET[a];//get方法获取值 $b $_GET[b];if(isset($a) && intval($a) > 6000000 && strlen($a) < 3){ //a的值需要大于 60000…

Seaborn中怎样绘制双变量分布图?

两个变量的二元分布可视化也很有用。在 Seaborn中最简单的方法是使用 jointplot()函数&#xff0c;该函数可以创建一个多面板图形&#xff0c;比如散点图、二维直方图、核密度估计等&#xff0c;以显示两个变量之间的双变量关系及每个变量在单坐标轴上的单变量分布。 jointplo…

Linux--Block group

Block Group&#xff1a;ext2文件系统会根据分区的大小划分为数个Block Group。而每个Block Group都有着相 同的结构组成。政府管理各区的例子 超级块&#xff08;Super Block&#xff09;&#xff1a;存放文件系统本身的结构信息。记录的信息主要有&#xff1a;bolck 和 inod…

App测试流程及测试点

1 APP测试基本流程 1.1流程图 1.2测试周期 测试周期可按项目的开发周期来确定测试时间&#xff0c;一般测试时间为两三周&#xff08;即15个工作日&#xff09;&#xff0c;根据项目情况以及版本质量可适当缩短或延长测试时间。正式测试前先向主管确认项目排期。 1.3测试资源…

测试覆盖率 JVM 字节码测试运用 - 远程调试、测试覆盖、影子数据库

目录 前言&#xff1a; 简介 基础使用方式介绍 工具特性 前言&#xff1a; 在软件开发中&#xff0c;测试覆盖率是一个非常重要的指标&#xff0c;它表示代码中所有的测试用例是否都已经被覆盖到。JVM 字节码测试是一种比较新的测试方法&#xff0c;它可以对 JVM 字节码进…

【雕爷学编程】Arduino动手做(86)---4*4位 WS2812 全彩模块4

37款传感器与执行器的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止这37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&am…

【Postman】Postman接口测试进阶用法详解:断言、全局与环境变量、关联、批量执行用例、读取外部文件实现参数化

文章目录 一、Postman断言1、断言位置2、Postman的常用断言3、操作实例 二、全局变量与环境变量1、二者区分2、设置全局变量3、设置环境变量 三、Postman接口关联1、概念2、操作步骤 四、批量执行测试用例1、操作步骤2、查看结果 五、读取外部文件实现参数化1、使用场景2、操作…

云服务器远程nacos服务注册失败/不健康Client not connected, current status:STARTING

文章目录 Nacos报错docker安装不用 docker安装 Nacos报错 docker安装 使用docker在云服务器安装Nacos之后出现Client not connected, current status:STARTING 使用docker 安装之后需要添加映射端口 docker run -e JAVA_OPTS"-Xms256m -Xmx256m"-e MODEstandalone…

7.24 作业

1.自己封装vector template<typename T> class Myverctor {T* first;T* last;T* end; public:Myverctor():first(NULL),last(NULL),end(NULL){}Myverctor(int num,T data):first(new T[num]){last end first num;for(int i 0;i<num;i) first[i] data;}Myverctor…

【ROS2 Foxy】Rviz2 不支持可视化压缩图像消息

这里我通过订阅话题&#xff0c;压缩图像消息是存在的&#xff1a; ros2 topic echo /hk_camera/rgb/compressed从官方代码库的 issue 中了解到&#xff0c;在 Foxy 版本的 Rviz2 是不支持压缩图像消息的可视化的&#xff0c;现在 Foxy 也已经停止维护了&#xff0c;以后更不太…

redis 1

shell 1&#xff1a;安装1. 源码安装&#xff08;CENTOS&#xff09; 2.999:可能会出现得问题1. 编译出错 1&#xff1a;安装 1. 源码安装&#xff08;CENTOS&#xff09; 官方下载源码包 wget https://download.redis.io/redis-stable.tar.gz # 安装依赖 yum install gcc解压…

node 版本管理器 nvm

文章目录 一、卸载node二、nvm 下载三、nvm 安装四、检测环境变量是否一致五、nvm常见问题 一、卸载node 打开cmd命令行窗口&#xff0c;输入npm cache clean --force 回车执行 打开控制面板&#xff0c;在控制面板中把Node.js卸载 二、nvm 下载 nvm全英文也叫node.js ve…

7.25 Qt

制作一个登陆界面 login.pro文件 QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # depend on …

cocosCreator 之 ScrollView

版本&#xff1a;3.4.0 参考&#xff1a;ScrollView组件 简介 ScrollView组件作为滚动容器来使用&#xff0c;它的实现通过ScrollBar组件来展示内容的位置和Mask组件显示指定区域&#xff0c;来保证有限的区域内显示更多的内容。 它的构成部分&#xff1a; ScrollBar滚动条相…