关于RabbitMQ你了解多少?

news2025/1/13 15:43:20

关于RabbitMQ你了解多少?

文章目录

  • 关于RabbitMQ你了解多少?
    • 基础篇
      • 同步和异步
      • MQ技术选型
      • 介绍和安装
      • 数据隔离
      • SpringAMQP
      • 快速入门
      • Work queues
      • 交换机
        • Fanout交换机
        • Direct交换机
        • Topic交换机
      • 声明队列和交换机
      • MQ消息转换器
    • 高级篇
      • 消息可靠性问题
      • 发送者的可靠性
        • 生产者重连
        • 生产者确认
        • SpringAMQP实现生产者确认

RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件

基础篇

同步和异步

同步调用:

  • 优势:时效性强,等待到结果后才返回
  • 不足:拓展性差、性能下降、级联失败问题

异步调用:

  • 异步调用方式其实就是基于消息通知的方式,一般包含三个角色
    • 消息发送者:投递消息的人,就是原来的调用方
    • 消息代理:管理、暂存、转发消息
    • 消息接收者:接收和处理消息的人,就是原来的服务提供方
  • 优势:解除耦合,拓展性强、无需等待,性能好、故障隔离、缓存消息,流量削峰填谷
  • 不足:不能立刻得到调用结果,时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性

MQ技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP、XMPP、SMTP、STOMPOpenWire、STOMP、REST、XMPP、AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

介绍和安装

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址

同样基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \
 -e RABBITMQ_DEFAULT_USER=admin \
 -e RABBITMQ_DEFAULT_PASS=admin123 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network hmall \
 -d \
 rabbitmq:3.8-management
  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

  • 安装完成后访问管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:

    在这里插入图片描述

RabbitMQ对应的整体架构核心概念:

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列,无存储能力
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

在这里插入图片描述

数据隔离

  • 每个Virtual Host提供了一个独立的消息代理,它们之间完全隔离,相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host,可以实现数据的隔离。
  • 每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样,消息只能在同一个Virtual Host内进行路由和传递,不会跨越Virtual Host。
  • 通过使用Virtual Hosts,可以将不同的应用程序或服务的消息进行逻辑隔离,确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用,可以确保不同的租户之间的消息数据完全隔离,提高安全性和隐私性。

SpringAMQP

SpringAMQP的官方地址

  • AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
  • Spring AMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

快速入门

  1. 引入spring-amqp依赖,这样publisher和consumer服务都可以使用:

    <!——AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 配置RabbitMQ服务端信息

    spring:
     rabbitmq:
      host: 192.168.100.101 #主机名
      port: 5672 # 端口
      virtual-host: /liner #虚拟主机
      username: admin #用户名
      password: admin123 #密码
    
  3. 发送消息:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue() {
        //队列名称
        String queueName = "simple.queue";
        //消息
        String message = "hello,spring amqp!";
        //发送消息
        rabbitTemplate.convertAndSend(queueName,message);
    }
    
  4. 接收消息:SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法

    @slf4j
    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            log.info("spring消费者接收到消息:【"+ msg + "】");
            if (true) {
                throw new MessageconversionException("故意的");
            }
            log.info("消息处理完成");
        }
    }
    

Work queues

Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息

publisher
queue
consumer1
consumer2

消费者消息推送限制

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息

spring:
 rabbitmq:
  listener:
   simple:
    prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息

work模型的使用:

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题
publisher
exchange
queue1
queue2
consumer1
consumer2
consumer3

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
Fanout交换机

Fanout Exchange:会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

在这里插入图片描述

Direct交换机

Direct Exchange :会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

在这里插入图片描述

Topic交换机

TopicExchange:与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 “.” 分割。

Queue与Exchange指定BIndingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

在这里插入图片描述

声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

在这里插入图片描述

@Configuration
public class FanoutConfiguration{
    @Bean
    public FanoutExchange fanoutExchange(){
        //ExchangeBuilder.fanoutExchange("").build();
        return new FanoutExchange("liner.fanout");
    }

    @Bean
    public Queue fanoutQueue(){
        //QueueBuilder.durable("").build();
        return new Queue("fanout.queue");
    }

    @Bean
    public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }


    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }

}

SpringAMQP还提供了基于 @RabbitListener 注解来声明队列和交换机的方式:

@RabbitListener(bindings = @QueueBinding(
    value = Queue (name = "direct.queue",durable = "true"),exchange = @Exchange(name = "liner.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue(String msg){
    System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}

MQ消息转换器

Spring的消息发送代码接收的消息体是一个Object:在这里插入图片描述

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题:数据体积过大、有安全漏洞、可读性差

在这里插入图片描述

因此建议采用JSON序列化代替默认的JDK序列化

  • publisherconsumer两个服务中都引入依赖:
<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

  • 配置消息转换器,在publisherconsumer两个服务的启动类中添加Bean:
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断

在这里插入图片描述

高级篇

消息可靠性问题

发送者的可靠性

生产者重连

有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:
 rabbitmq:
  connection-timeout: 1s #设置MQ的连接超时时间
  template:
   retry:
    enabled: true #开启超时重试机制
    initial-interval: 1000ms #失败后的初始等待时间
    multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
    max-attempts: 3 #最大重试次数

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者确认

RabbitMQ有Publisher ConfirmPublisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败
SpringAMQP实现生产者确认
  1. 在publisher这个微服务的application.yml中添加配置:

    spring:
     rabbitmq:
      publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型
      publisher-returns: true #开启publisher return机制
      
    #这里publisher-confirm-type有三种模式可选:
    #  none:  关闭confirm机制
    #  simple:  同步阻塞等待MQ的回执消息
    #  correlated:  MQ异步回调方式返回回执消息
    
  2. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware {
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            //获取RabbitTemplate
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            //设置ReturnCallback
            rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
                log.info("消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{}",
                         replyCode,replyText,exchange,routingKey,message.toString());
            });
        }
    }
    
  3. 发送消息,指定消息ID、消息ConfirmCallback

    @Test
    void testPublisherConfirm() throws InterruptedException {
        //1.创建CorrelationData
        CorrelationData cd = new CorrelationData();
        //2.给Future添加ConfirmCallback
        cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                // 2.1.Future发生异常时的处理逻辑,基本不会触发
                log.error("handle message ack fail", ex);
            }
            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
                if(result.isAck()){	 // result.isAck(), boolean类型, true代表ack回执,false代表nack回执
                    log.debug("发送消息成功,收到ack!");
                }else{	// result.getReason(), String类型,返回nack时的异常描述
                    log.error("发送消息失败,收到nack,reason:{}",result.getReason());
                }
            }
        });
        //3.发送消息
        rabbitTemplate.convertAndSend("liner.direct","red","hello",cd);
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1052834.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

泡泡玛特城市乐园即将开园 解锁“文化+科技”潮流空间

近年来&#xff0c;泡泡玛特以潮玩IP为核心&#xff0c;不断拓展业务版图&#xff0c;推进国际化布局同时实现集团化运营&#xff0c;而泡泡玛特首个城市乐园即将开业。 据了解&#xff0c;泡泡玛特城市乐园是由泡泡玛特精心打造的沉浸式IP主题乐园&#xff0c;占地约4万平方米…

四、浏览器渲染过程,DOM,CSSDOM,渲染,布局,绘制详细介绍

知识点&#xff1a; 1、为什么不能先执行 js文件&#xff1f;&#xff1f; 我们不能先执行JS文件&#xff0c;必须等到CSSOM构建完成了才能执行JS文件&#xff0c;因为前面已经说过渲染树是需要DOM和CSSOM构建完成了以后才能构建&#xff0c;而且JS是可以操控CSS样式的&#…

【Java 进阶篇】深入理解 JDBC:Java 数据库连接详解

数据库是现代应用程序的核心组成部分之一。无论是 Web 应用、移动应用还是桌面应用&#xff0c;几乎都需要与数据库交互以存储和检索数据。Java 提供了一种强大的方式来实现与数据库的交互&#xff0c;即 JDBC&#xff08;Java 数据库连接&#xff09;。本文将深入探讨 JDBC 的…

Linux系统编程(七):线程同步

参考引用 UNIX 环境高级编程 (第3版)黑马程序员-Linux 系统编程 1. 同步概念 所谓同步&#xff0c;即同时起步、协调一致。不同的对象&#xff0c;对 “同步” 的理解方式略有不同 设备同步&#xff0c;是指在两个设备之间规定一个共同的时间参考数据库同步&#xff0c;是指让…

从 低信噪比陆上地震记录 解决办法收集 到 走时层析反演中的折射层析调研

目录 (前言1) 关于背景的回答:(前言2) 现有的降低噪声, 提高信噪比的一些特有方法的论文资料 (传统策略):1. 关于波形反演与走时层析反演2. 折射层析3. 用一个合成数据来解释折射层析反演的思路4. 其他层析反演方法:5. 关于层析反演的一些TIPS (可补充)参考文献: 降噪有关资料参…

ElementUI之CUD+表单验证

目录 前言&#xff1a; 增删改查 表单验证 前言&#xff1a; 继上篇博客来写我们的增删改以及表单验证 增删改查 首先先定义接口 数据样式&#xff0c;我们可以去elementUI官网去copy我们喜欢的样式 <!-- 编辑窗体 --><el-dialog :title"title" :visib…

国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄

国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄 国庆《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书行将售罄

leetcode-----二叉树习题

目录 前言 1. 二叉树的中序遍历 2. 相同的树 3. 二叉树的最大深度 4. 二叉树的最小深度 5.二叉树的前序遍历 6. 二叉树的后序遍历 7. 对称二叉树 前言 前面我们学习过了二叉树的相关知识点&#xff0c;那么今天我们就做做练习&#xff0c;下面我会介绍几道关于二叉树的…

如何实现一个业务系统的自动化框架搭建

1、框架结构 我在该项目采用的是关键字驱动测试的框架类型。首先创建如下几个目录common&#xff08;公共模块&#xff09;、config&#xff08;公共配置&#xff09;、logs&#xff08;运行日志&#xff09;、reports&#xff08;测试报告&#xff09;、resources&#xff08…

前端的多种克隆方式和注意事项

克隆的意义和常见场景: 意义: 保证原数据的完整性和独立性常见场景: 复制数据, 函数入参, class构造函数等 浅克隆: 对象常用的浅克隆 es6扩展运算符...Object.assign 数组常用的浅克隆 es6的扩展运算符...slice>arr.slice(0)[].concat 深度克隆: 克隆对象的每个层级如…

YOLOv8改进算法之添加CA注意力机制

1. CA注意力机制 CA&#xff08;Coordinate Attention&#xff09;注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息&#xff0c;以便模型可以更好地理解不同位置之间的关系。如下图&#xff1a; 1. 输入特…

【RocketMQ】【源码】Dledger日志复制源码分析

消息存储 在 【RocketMQ】消息的存储一文中提到&#xff0c;Broker收到消息后会调用CommitLog的asyncPutMessage方法写入消息&#xff0c;在DLedger模式下使用的是DLedgerCommitLog&#xff0c;进入asyncPutMessages方法&#xff0c;主要处理逻辑如下&#xff1a; 调用serial…

leetCode 122.买卖股票的最佳时机 II 动态规划 + 状态转移 + 状态压缩

122. 买卖股票的最佳时机 II - 力扣&#xff08;LeetCode&#xff09; 给你一个整数数组 prices &#xff0c;其中 prices[i] 表示某支股票第 i 天的价格。 在每一天&#xff0c;你可以决定是否购买和/或出售股票。你在任何时候 最多 只能持有 一股 股票。你也可以先购买&…

006:连续跌三天,第四天上涨的概率--用python统计

我们已经可以获取到K线信息了&#xff0c;然后我们来进行一些统计&#xff0c;就统计连续三天下跌&#xff0c;第四天上涨的概率。 我们用宁波银行&#xff08;002142&#xff09;最近三年的数据来统计。先用上一篇的程序下载到K线数据&#xff0c;得到文件002142.csv。然后在…

Spring修炼之旅(4)静态/动态代理模式与AOP

一、代理模式概述 代理模式 为什么要学习代理模式&#xff0c;因为AOP的底层机制就是动态代理&#xff01; 代理模式&#xff1a; 静态代理 动态代理 学习aop之前 , 我们要先了解一下代理模式&#xff01; 1.1静态代理 静态代理角色分析 抽象角色 : 一般使用接口或者抽象…

【数据结构练习】二叉树相关oj题集锦二

目录 前言 1.平衡二叉树 2.对称二叉树 3.二叉树遍历 4.层序遍历 5.判断一棵树是不是完全二叉树 前言 编程想要学的好&#xff0c;刷题少不了&#xff0c;我们不仅要多刷题&#xff0c;还要刷好题&#xff01;为此我开启了一个弯道超车必做好题锦集的系列&#xff0c;此为…

2023/9/30 使用消息队列完成进程间通信

发送方 ​ #include <myhead.h> //消息结构体 typedef struct {long msgtype; //消息类型char data[1024]; //消息正文 }Msg_ds;#define SIZE sizeof(Msg_ds) - sizeof(long) //正文大小 int main(int argc, const char *argv[]) {//1.创建key值key_t key ;if((key …

中断向量控制器(NVIC)

1. 什么是中断 在处理器中&#xff0c;中断是一个过程&#xff0c;即CPU在正常执行程序的过程中&#xff0c;遇到外部/内部的紧急事件需要处理&#xff0c;暂时中止当前程序的执行&#xff0c;转而去为处理紧急的事件&#xff0c;待处理完毕后再返回被打断的程序处继续往下执行…

Spring MVC 中的国际化和本地化

Spring MVC 中的国际化和本地化 国际化&#xff08;Internationalization&#xff0c;简称i18n&#xff09;和本地化&#xff08;Localization&#xff0c;简称l10n&#xff09;是构建多语言应用程序的重要概念。Spring MVC提供了丰富的支持&#xff0c;使开发人员能够轻松地处…

Python 笔记06(Mysql数据库)

一 基础 1.1 安装 MySQL下载参考&#xff1a;MySQL8.0安装配置教程【超级详细图解】-CSDN博客 测试是否安装并正确配置环境变量&#xff1a; 1.2 查看服务器是否正常运行 1.3 显示数据库 show databases; 1.4 退出 exit 1.5 python 连接 1.6 查主机IP ipconfig