RabbitMQ在SpringBoot中的高级应用(1)

news2025/1/16 4:46:59

启动RabbitMQ

        1. 在虚拟机中启动RabbitMQ,要先切换到root用户下: su root

        2.关闭防火墙: systemctl stop firewalld  

        3.rabbitmq-server start # 启用服务

        4.rabbitmq-server -detached # 后台启动

1.消息确认机制

  有两种确认的方式:
            自动ACK:RabbitMQ将消息发送给消费者后就会直接将消息删除,前提是消费者程序没有出现异常,有异常会重新发送,直至到达了最大重试次数后抛出异常后不在重试
            手动ACK:通过代码控制决定是否返回确认消息

        1)开启消息确认机制,在核心配置文件中添加以下配置

#  发送者开启simple确认机制
spring.rabbitmq.publisher-confirm-type=simple
#  发送者开启return确认机制
spring.rabbitmq.publisher-returns=true

     

  2)编写配置类

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

//消息确认机制的接口 RabbitTemplate
@Configuration
public class PublishConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
   @Autowired
   private RabbitTemplate rt;
   
   
   @PostConstruct//加载回调方法
   public void initMethod(){
       rt.setConfirmCallback(this);
       rt.setReturnsCallback(this);
   }



    @Override//RabbitTemplate.ConfirmCallback如果消息被正常发送到交换机,则会调用该方法(自动回调)
    /*
    *  CorrelationData相关数据,有一个id属性,表示消息的唯一标识
    * boolean表示当前消息投放到交换机中的状态,trur表示投放成功
    * String表示投送失败的原因
    * */
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println("消息发送给交换机成功");
          }else {
                System.err.println("消息发送给交换机失败");
         }
    }

    /**
     * 消息往队列发送时成功,不会调用该方法     失败了会调用
     * @param returnedMessage 返回消息的内容
     *                        message发送的内容
     *                        replyCode响应码
     *                        replyText回应的内容
     *                        exchange交换机
     *                        reotingKey路由键
     * */
    @Override//RabbitTemplate.ReturnsCallback如果消息被正常从交换机发送到队列,则回调该方法(自动回调)
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("消息发送到交换机却没有到队列");
        System.out.println("消息内容"+returnedMessage.getMessage());
        System.out.println("响应码"+returnedMessage.getReplyCode());
        System.out.println("回应的内容"+returnedMessage.getReplyText());
        System.out.println("交换机"+returnedMessage.getExchange());
        System.out.println("路由键"+returnedMessage.getRoutingKey());

    }
}

         这是没有消息确认配置类时的运行数据

         这是添加了消息确认配置类时的运行数据 ,可以看到我们是否将数据成功发送到交换机或队列

 持久化

        1.队列持久化         没有消费者连接该队列的时候,会被RabbitMQ自动删除        autoDelete = "true"     默认为false,不会被自动删除

         

   @RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "log_queue_error",autoDelete = "true"),
                exchange = @Exchange(value = "durable_exchange",type = "direct"),
                key = "error_log_key"
        )
    )
    public void druggConsumerError(String message){
        System.out.println("接收error级别的日志"+message);

    }

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "log_queue_all",autoDelete = "false"),
                    exchange = @Exchange(value = "durable_exchange",type = "direct"),
                    key = {"error_log_key","info_log_key","debug_log_key","waring_log_key"}
            )
    )
    public void drugConsumerAll(String message){
        System.out.println("接收all级别的日志"+message);

    }

      ​​

 

  2.交换机持久化

@RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "email_queue",autoDelete = "true"),
                    exchange = @Exchange(value = "temp_exce",type = "topic",autoDelete="true"),
                    key="em_key"
            )
    )
    public void emailConsumer(Object o){
        System.out.println("接收到邮件:"+o);
    }

         如果当前的交换机没有被任何的队列所映射,会被RabbitMQ自动删除,关闭项目就相当于没有映射

消费端限流

        控制消费端的消费速度,方式数据过大造成服务端宕机,通过编写配置文件,控制一次推送的消息数量,来减少一次数据太大冲垮服务器

        1.编写核心配置文件application.properties

# 消费端限流实现
#开启手动签收(手动ACK)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#一次接收3条消息(在单个请求中处理的消息个数)
spring.rabbitmq.listener.simple.prefetch=3
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=1
#消费者最大数量
spring.rabbitmq.listener.simple.max-concurrency=10

        2.创建交换机和队列

  //  channel标识信道,封装了RabbitMQ通过的相关配置信息,如果当前的消息被成功消费,通过信道进行标记,
    //获取到相应ACK的确认信息
     public void currentLimitingConsumer(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(3000);//睡眠3秒钟
        long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息的标记,消息的唯一标识
        //手动确认消息是否接收,通过消息的id来指定该条消息被成功处理
        channel.basicAck(deliveryTag,true);//true表示对应这条消息被消费了
        String s = new String(message.getBody());
        System.out.println("消费者1::接收到的消息"+s);

    }
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue("current_limiting_queue"),
                    exchange = @Exchange(value = "cle",type = "topic"),
                    key = "current.limiting.#"
            )
    )
    public void currentLimitingConsumer2(Message message, Channel channel) throws InterruptedException, IOException {
        Thread.sleep(3000);//睡眠3秒钟
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,true);
        String s = new String(message.getBody());
        System.out.println("消费者2::接收到的消息"+s); 
    }

        3.创建消息发送者,我们将100条消息发送给cle交换机

 @Test//消息限流
    public void currentLimitingPushLisher(){
        for (int i = 1; i <101 ; i++) {
            re.convertAndSend("cle","current.limiting.xm","消息限流:"+i);
        }
    }

         正如我们的配置配置文件所写,一次性只能接收三条消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735242.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一些有意思的耗尽型MOS恒流源阻抗对比

貌似没有什么管子能超过DN2540&#xff0c;测试的环境差别不大&#xff0c;LD1014D因为本身耐压太低&#xff08;25V&#xff09;&#xff0c;而且达不到1mA这个值&#xff0c;因此&#xff0c;测试的时候相应降低了电压&#xff0c;选择了2mA的电流&#xff0c;并将负载电阻减…

Pytorch-ResNet50-MINIST Classify 网络实现流程

分两个文件讲解&#xff1a;1、train.py训练文件 2、test.py测试文件. 1、train.py训练文件 1&#xff09;从主函数入口开始&#xff0c;设置相关参数 # 主函数入口 if __name__ __main__:# ----------------------------## 是否使用Cuda# 没有GPU可以设置成Fasle# -…

IDEA+SpringBoot+mybatis+bootstrap+jquery+Mysql车险理赔管理系统

IDEASpringBootmybatisbootstrapjqueryMysql车险理赔管理系统 一、系统介绍1.环境配置 二、系统展示1. 管理员登录2.编辑个人信息3.用户管理4.添加用户5.申请理赔管理6.赔偿金发放管理7.待调查事故保单8.已调查记录9.现场勘察管理10.勘察记录11.我的保险管理12.我的理赔管理 三…

Atcoder Beginner Contest 309——D-F讲解

前言 由于最近期末考试&#xff0c;所以之前几场都没打&#xff0c;给大家带了不便&#xff0c;非常抱歉。 这个暑假&#xff0c;我将会持续更新&#xff0c;并给大家带了更好理解的题解&#xff01;希望大家多多支持。 由于&#xff0c; A ∼ C A\sim C A∼C 题比较简单&am…

现代C++新特性 扩展的聚合类型(C++17 C++20)(PC浏览效果更佳)

文字版PDF文档链接&#xff1a;现代C新特性(文字版)-C文档类资源-CSDN下载 1.聚合类型的新定义 C17标准对聚合类型的定义做出了大幅修改&#xff0c;即从基类公开且非虚继承的类也可能是一个聚合。同时聚合类型还需要满足常规条件。 1&#xff0e;没有用户提供的构造函数。…

用C语言写一个压缩文件的程序

本篇目录 数据在计算机中的表现形式huffman 编码将文件的二进制每4位划分&#xff0c;统计其值在文件中出现的次数构建二叉树搜索二叉树的叶子节点运行并输出新的编码文件写入部分写入文件首部写入数据部分压缩运行调试解压缩部分解压缩测试为可执行文件配置环境变量总结完整代…

23数字图像置乱技术(matlab程序)

1.简述 一、引言 所谓“置乱”&#xff0c;就是将图像的信息次序打乱&#xff0c;a像素移动到b像素位置上&#xff0c;b像素移动到c像素位置上&#xff0c;……&#xff0c;使其变换成杂乱无章难以辨认的图片。数字图像置乱技术属于加密技术&#xff0c;是指发送发借助数学或者…

Python实现PSO粒子群优化算法优化Catboost分类模型(CatBoostClassifier算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 PSO是粒子群优化算法&#xff08;Particle Swarm Optimization&#xff09;的英文缩写&#xff0c;是一…

《低代码指南》——轻流5.0发布,无代码引擎矩阵全面升级

7月6日,由轻流主办「无代码无边界 202376Day|轻流无代码探索者大会」于上海顺利举行。轻流也在会上重磅发布了更加开放、灵活、低门槛的轻流5.0,和全面升级的专有轻流。 轻流5.0全面迭代升级了轻流的无代码引擎矩阵(表单引擎、流程引擎、报表引擎、门户引擎、数据引擎)。…

软件测试项目实战,电商项目测试实例 - 业务测试(重点)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 支付功能怎么测试…

pytest自动化测试实战之执行参数

上一篇介绍了如何运行pytest代码&#xff0c;以及用例的一些执行规则&#xff0c;执行用例发现我们中间print输出的内容&#xff0c;结果没有给我们展示出来&#xff0c;那是因为pytest执行时&#xff0c;后面需要带上一些参数。 参数内容 我们可以在cmd中通过输入 pytest -h…

域名捡漏的好方法,希望能够帮到你:域霸扫描器 V0.44 绿色免费版,供大家学习研究参考

高速扫描域名的工具&#xff0c;一均程序每小时五万条。 扫描域名是否注册&#xff0c;注册商是谁&#xff0c;域名的注册日期与过期日期。 供大家学习研究参考&#xff01; 下载&#xff1a;https://download.csdn.net/download/weixin_43097956/88025564

【SpringBoot——Error记录】

IDEA正常安装后&#xff0c;运行按钮为灰色解决方法尝试 解决方法一&#xff08;本人适用&#xff09;解决方法二 解决方法一&#xff08;本人适用&#xff09; 检查创建项目时JDK是否添加&#xff0c;版本是否正确。 解决方法二 点击左下角的Structure 参考链接&#xff1…

回归预测 | MATLAB实现WOA-CNN-LSTM鲸鱼算法优化卷积长短期记忆神经网络多输入单输出回归预测

回归预测 | MATLAB实现WOA-CNN-LSTM鲸鱼算法优化卷积长短期记忆神经网络多输入单输出回归预测 目录 回归预测 | MATLAB实现WOA-CNN-LSTM鲸鱼算法优化卷积长短期记忆神经网络多输入单输出回归预测预测效果基本介绍模型描述程序设计学习总结参考资料 预测效果 基本介绍 回归预测 …

node中的数据持久化之mongoDB

一、什么是mongoDB MongoDB是一种开源的非关系型数据库&#xff0c;正如它的名字所表示的&#xff0c;MongoDB支持的数据结构非常松散&#xff0c;是一种以bson格式&#xff08;一种json的存储形式&#xff09;的文档存储方式为主&#xff0c;支持的数据结构类型更加丰富的NoS…

mysql多表查询练习题

创建表及插入数据 create table if not exists dept3( deptno varchar(20) primary key , -- 部门号 name varchar(20) -- 部门名字 ); -- 创建员工表 create table if not exists emp3( eid varchar(20) primary key , -- 员工编号 ename varchar(20), -- 员工名字 age int, -…

换零钱——最小钱币张数(贪心算法)

贪心算法&#xff1a;根据给定钱币面值列表&#xff0c;输出给定钱币金额的最小张数。 (本笔记适合学完python基本数据结构&#xff0c;初通 Python 的 coder 翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣…

CS EXE上线主机+文件下载上传键盘记录

前言 书接上文&#xff0c;CobaltStrike_1_部署教程及CS制作office宏文档钓鱼教程&#xff0c;该篇介绍【使用CS生成对应exe木马&#xff0c;上线主机&#xff1b;对上线主机进行&#xff0c;文件下载&#xff0c;文件上传&#xff0c;键盘记录】。 PS&#xff1a;文章仅供学习…

unseping

代码审计 <?php highlight_file(__FILE__);class ease{private $method;private $args;function __construct($method, $args) {$this->method $method;$this->args $args;}function __destruct(){if (in_array($this->method, array("ping"))) {call…

关于 colab Tutorial的介绍

&#xff08;一&#xff09;常用的快捷键 (二) 网上环境的配置 按照官网上所给的提示一步一步操作即可 注意&#xff1a;此平台需要科学的上网