RabbitMq的高级特性--RabbitMQ高级特性_消息存活时间

news2024/11/23 12:56:39

RabbitMQ高级特性_消费端限流   , [解耦, 限流,降低压力,发送消息]

通过消费端限流的 方式限制消息的拉取速度,达到保护消费端的目的。

下面我们新建springboot项目进行测试:

新建项目myproducer

依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

对application.yml进行配置:

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated
    # 开启退回模式
    publisher-returns: true

先手动创建一个交换机(代码创建在下面)

 我们先做个不做任何限流的操作进行查看

package com.pb.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;、
@SpringBootTest
class MyproducerApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSearchBatch() {
        for(int i = 1; i <=10; i++) {
            rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", "send message....." + i);
        }
    }
}

然后创建交换机 springboot_exchange

创建队列my_queue 并绑定路由my_routing

 绑定

然后启动测试看看数据能不能发送到队列中去

 然后创建消费端项目myconsumer

 对application.yml配置文件进行配置

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual

创建包myconsumer并新建类OosConsumer我们来消费消息

package com.pb.demo.myconsumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class OosConsumer {
    @RabbitListener(queues = "my_queue")
    public void linsenerConsumer(Message message, Channel channel) throws IOException, InterruptedException {
        System.out.println("收到了消息:" + new String(message.getBody()));
        //睡眠下
        Thread.sleep(3000L);
        //签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

启动主启动类进行测试:

我们可以看rabbitmq的控制台

 启动Consumer的启动类  在启动producer的测试类  可以看到 上面的情况

我么发现消息已经被消费掉了

我们会发现他把所有的消息都堆到unacked中,那么就说明所有的消息都会堆到消费者中,因为我们这里并没有开启限流操作,如果我们有10万条消息那么就会造成消费者的内存溢出或者内存泄漏的问题


下面我们来开启限流注意如果我们开启限流必须是手动签收

修改application.yml在消费者的项目中:

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息
        prefetch: 5

RabbitMQ高级特性_利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采 用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多。

首先我们模拟公平分发的操作,或者说看看平均分发的弊端

项目myproducer中的生产者的代码不需要动

我们只需要改变项目myconsumer的代码,在myconsumer包中创建类UnfairConsumer,在内部创建两个消费者

记得把OosConsumer类中的消费者注释掉

package com.pb.demo.myconsumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        System.out.println("消费者1:"+new String(message.getBody()));
        // 2.模拟业务处理
        Thread.sleep(500); // 消费者1处理快
        // 3.签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }

    // 消费者2
    @RabbitListener(queues = "my_queue")
    public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException {
        // 1.获取消息
        System.out.println("消费者2:"+new String(message.getBody()));
        // 2.模拟业务处理
        Thread.sleep(3000); // 消费者2处理快
        // 3.签收消息
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    }
}

我们要消费端的限流prefetch: 5 先注释掉

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息
        #prefetch: 5

好启动所有项目进行对消费者控制台的查看

我们来实现不公平分发,谁处理的快就让他多处理

在消费端项目中修改application.yml配置文件:

# 配置RabbitMQ
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        # 限流机制必须开启手动签收
        acknowledge-mode: manual
        # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息
        #prefetch: 5
        # 消费端最多拉取1条消息进行消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
        prefetch: 1

 从新启动项目进行测试:

来查看消费者的控制台:

 



RabbitMQ高级特性_消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间【需要在创建队列的时候操作】,也可以对某条消息设置存活时间【需要在发送的的时候进行设置】

设置队列所有消息存活时间

在这里我们需要创建新的交换机和队列:

在项目myproducer中创建配置类在包com.pb.demo下新建配置类RabbitConfig

package com.pb.demo;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    private final String EXCHANGE_NAME="my_topic_exchange2";
    private final String QUEUE_NAME="my_queue2";


    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange(){
        return ExchangeBuilder
                .topicExchange(EXCHANGE_NAME) // 交换机类型
                .durable(true) // 是否持久化
                .build();
    }
    @Bean("bootExchange2")
    public Exchange getExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue(){
        return QueueBuilder
                .durable(QUEUE_NAME) // 队列持久化
                .ttl(10000)    //对了的存活时间
                .build();
    }

    // 3.将队列绑定到交换机
    @Bean
    public Binding bindMessageQueue(@Qualifier("bootExchange2") Exchange exchange, @Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
    }
}

然后在myproducer项目的测试类发送10条信息

@Test
public void testSearchBatch2() {
    for(int i = 1; i <=10; i++) {
        rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message....." + i);
    }
}

 没有消费 的情况下:

设置单条消息存活时间【主要是在发送消息的时候设置单条消息的存活时间即可】

在myproducer项目的测试类进行操作:

@Test
public void testSendMessage() {
    // 1.创建消息属性
    MessageProperties messageProperties = new MessageProperties();
    // 2.设置存活时间
    messageProperties.setExpiration("10000");
    // 3.创建消息对象
    Message message = new Message("send message...".getBytes(), messageProperties);
    // 4.发送消息
    rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", message);
}

然后测试该方法看看能不能给发送的单个消息设置时间

10秒再次查看

我们发现消息已经被清除掉了

注意:

1 如果设置了单条消息的存活时间,也设置了队列的存活时 间,以时间短的为准。
2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。

下面我们来测试下

@Test
public void testSendMessage2() {
    for(int i = 0; i < 10; i++) {
        //当i==5的时候我们来设置单条信息的时间
        if(i == 5) {
            // 1.创建消息属性
            MessageProperties messageProperties = new MessageProperties();
            // 2.设置存活时间
            messageProperties.setExpiration("10000");
            // 3.创建消息对象
            Message message = new Message("send message...".getBytes(), messageProperties);
            // 4.发送消息
            rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", message);
        }
        else {   //否则发送普通消息
            rabbitTemplate.convertAndSend("springboot_exchange" , "my_routing", "send message" + i);
        }
    }
}

发现并没有被立即删除,但是我们一定要注意第5条消息现在已经不能被消费了,因为mq默认他已经被删除了如果在10秒以后,因为他已经过期了

我们测试下,启动任何一个消费端进行查看:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/600260.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python关联算法:从数据挖掘到推荐系统

Python关联算法&#xff1a;从数据挖掘到推荐系统 Python编程语言已经成为各行各业中数据科学家和工程师的首选语言&#xff0c;其中包括处理数据集合的关联算法。 什么是关联算法&#xff1f; 数学上&#xff0c;关联算法是指在大型和复杂数据集合中&#xff0c; 寻找数据之…

干货 | 携程10个有效降低客户端超时的方法

作者简介 Wen&#xff0c;携程资深后端开发工程师&#xff0c;专注系统性能、稳定性、交易系统等领域。 一、背景 在现今的信息时代&#xff0c;微服务技术已成为一种重要的解决方案&#xff0c;微服务技术可以使系统的规模和功能变的更加灵活&#xff0c;从而获得更高的可扩展…

docker基本命令学习 | Docker网络、Docker镜像发布

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; docker安装、卸载 docker安装使用 卸载旧版本docker或者环境 [rootiZf8zdcobr3fw7vn0p3538Z /]# yum remove docker \ > docker-client \ >…

打破逢节降价桎梏!海尔智家:满足用户,全网第一

又是一年618&#xff0c;每到这个上半年最重要的消费节点&#xff0c;许多品牌卖家纷纷掀起价格战。 他们使出满减、满赠、满返等五花八门的策略&#xff0c;为了压制对手进行冲量&#xff0c;这也一度让“逢节降价”成为主流。 在市场天平偏向卖家的时代&#xff0c;这些策略…

SVN服务端visualsvn5.1.4下载安装(windows环境)(实操)

Apache Subversion 通常被缩写成 SVN&#xff0c;是一个开放源代码的版本控制系统&#xff0c;Subversion 在 2000 年由 CollabNet Inc 开发&#xff0c;现在发展成为 Apache 软件基金会的一个项目&#xff0c;同样是一个丰富的开发者和用户社区的一部分。 SVN相对于的RCS、CVS…

做自己喜欢的事

这两天沸沸扬扬的消息说稚辉君公司拿到了百度投资 稚晖君刚拿了百度投资&#xff0c;估值被曝已超独角兽 然后昨晚上小孩发烧&#xff0c;我陪床不敢死睡&#xff0c;跟大佬聊了下拿到投资的感受。 然后说到搞技术好玩这个事情&#xff0c;我就跟他分享了我前天到经历 我前天到…

Jenkins概念及安装配置教程(二)

如何安装Jenkins&#xff1f; Jenkins 安装程序也可以作为通用 Java 包 (.war) 使用。如果您将 Jenkins 与 Selenium 一起用于执行跨浏览器测试&#xff0c;我们建议使用 .war 文件&#xff0c;因为您可以通过在非无头模式下在浏览器上执行的自动化测试来见证测试场景的执行。…

【C# 10 和 .NET 6】使用MVC模式构建网站(笔记1)

Building Websites Using the Model-View-Controller Pattern 使用模型-视图-控制器模式构建网站 本章介绍使用 Microsoft ASP.NET Core MVC 在服务器端构建具有现代 HTTP 架构的网站&#xff0c;包括构成 ASP.NET Core MVC 项目的启动配置、身份验证、授权、路由、请求和响应管…

如何设计和使用文档模板 | 技术写作什么鬼

今天看到叶伟民老师的一篇文章&#xff0c;瞬间泪目&#xff1a;叶老师&#xff0c;您是懂人性的啊。在我整天鞭策自己“不能再拖了”的关键时刻&#xff0c;及时分享经验&#xff1a; 是的&#xff0c;这篇文章实在是拖了太久&#xff0c;了太久&#xff0c;太久&#xff0c;久…

web前端 --- javascript(03) -- 函数、内置对象

函数&#xff08;function&#xff09; 具有名称的&#xff0c;为了实现特定功能的代码集合体 &#xff08;1&#xff09;javascript如何定义函数&#xff1a;function关键字定义 function 函数名称 &#xff08;[ 参数列表 ]&#xff09;{ // 函数体 // [return 返回值]…

【Springboot】发送QQ邮件

系列文章目录 文章目录 系列文章目录前言添加Maven依赖QQ邮箱开启POP服务配置application.properties文件Controller层编写 vue前端&#xff08;也可以直接省略&#xff09; 前言 这篇博客用于简单实现SpringBoot中使用Controller发送邮件请求&#xff0c;用户可以收到邮件。 …

Python读写access数据库的实战代码

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

chatgpt赋能python:使用Python关闭端口的方法

使用Python关闭端口的方法 在网络安全中&#xff0c;关闭端口是非常重要的一项任务。一旦一个端口被打开并暴露给互联网&#xff0c;恶意用户就可能通过它们的攻击进入您的服务器或计算机系统。Python是一种流行的编程语言&#xff0c;也可以用来关闭端口。下面介绍一些常用的…

JSONSQL:使用SQL过滤JSON类型数据(支持多种数据库常用查询、统计、平均值、最大值、最小值、求和语法)...

1. 简介 在开发中&#xff0c;经常需要根据条件过滤大批量的JSON类型数据。如果仅需要过滤这一种类型&#xff0c;将JSON转为List后过滤即可&#xff1b;如果相同的条件既想过滤数据库表中的数据、也想过滤内存中JSON数据&#xff0c;甚至想过滤Elasticsearch中的数据&#xff…

chatgpt赋能python:Python内置变量:掌握这些变量,让你的编程更高效

Python内置变量&#xff1a;掌握这些变量&#xff0c;让你的编程更高效 Python作为一门优秀的编程语言&#xff0c;自然不会缺少重要的内置变量。这些内置变量可以帮助程序员轻松地实现各种编程功能&#xff0c;提高编程效率。在本文中&#xff0c;我们将介绍Python内置变量的…

chatgpt赋能python:Python内置函数使用指南

Python内置函数使用指南 Python是一种高级编程语言&#xff0c;得益于其简单易学的语法、强大的标准库和丰富的第三方模块&#xff0c;现已成为全球最受欢迎的编程语言之一。其中&#xff0c;Python内置函数是Python编程的重要组成部分&#xff0c;本文将为您介绍这些内置函数…

四种主要的IO模型

基本概念 基本概念阻塞IO指的是需要内核IO操作彻底完成后&#xff0c;才返回到用户空间执行用户的操作。阻塞指的是用户空间程序的执行状态。传统的IO模型都是同步阻塞IO。在Java中&#xff0c;默认创建的socket都是阻塞的。简单来说&#xff1a;阻塞是指用户空间&#xff08…

尚硅谷-云尚办公-项目复盘

尚硅谷-云尚办公-项目复盘 资料地址本文介绍问题汇总问题1.knife4j无法下载 视频4问题2.dev等含义 视频5问题3.wrapper继承/实现图 视频8问题4.修改统一返回结果 视频11问题5.修改后新增也变修改 视频29问题6.redis中key值乱码 视频55-60问题7.RangeError: Maximum call stack …

高完整性系统工程(六):INTRODUCING ADA

目录 1. ADA的历史 2. ADA的特点 2.1 Strong, Static Typing 强语言、强静态类型语言 2.1.1 ADA is Strong, Static Typing 2.1.2 C is Weak, Static Typing 2.2 Module System 2.3 Portable 2.3.1 ADA 2.3.2 C 2.3.3 Cost of Runtime Checking 2.4 Readability …

IPython使用学习笔记

学习《利用python进行数据分析》第三章 IPython:一种交互式计算和开发环境的笔记&#xff0c;共享给大家&#xff0c;同时为自己作为备忘用。 安装ipython用pip即可。ps.博主用的是win7系统&#xff0c;所以接下来的都是在windows系统下操作的。 一.Ipython基础 启动&#xff…