RabbitMQ快速实战

news2025/1/14 1:25:22

目录

什么是消息队列?

消息队列的优势

应用解耦

异步提速

削峰填谷

总结

主流MQ产品特点比较

Rabbitmq快速上手

创建用户admin

Exchange和Queue

Connection和Channel

RabbitMQ中的核心概念总结


什么是消息队列?

       MQ全称Message Queue消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。       

       消息队列是一种在应用程序之间传递消息的技术。它提供了一种异步通信模式,允许应用程序在不同的时间处理消息。消息队列通常用于解耦应用程序,以便它们可以独立地扩展和修改。在消息队列中,消息发送者将消息发送到队列中,然后消息接收者从队列中接收消息。这种模式允许消息接收者按照自己的节奏处理消息,而不必等待消息发送者处理完消息。常见的消息队列包括RabbitMQ、Kafka和ActiveMQ等。

     

         

消息队列的优势

应用解耦

              

       假如用户访问订单系统,而订单系统跟其他系统是强耦合的,如图如果库存系统挂了,那么整个订单系统也都不能用了。   如果这种情况还想要增加新的XX系统进来,那么就只能修改源代码来完成。系统的耦合性越高,容错性就越低,可维护性就越低。

                 

       通过引入MQ做到应用解耦,库存系统出现异常可以等库存系统恢复后去MQ中拿消息,此时不影响别的系统调用,如果还要加入新的系统比如XX系统,那么只需XX系统去MQ中拿取消息进行处理即可。使用MQ可以提升容错性和可维护性。

异步提速

             

 原先用户请求订单系统,需要等到订单系顺序调用其他系统无误后返回,比较耗时。

       

        现在通过引入MQ,订单系统只需要把信息发送到MQ中即可,相当于完成了之前顺序请求其他系统的步骤,时间成本大大减低。

削峰填谷
             

以上场景中激增请求会打垮系统,造成服务不可用。

             

 通过将激增请求先放到MQ当前,然后系统再根据自身情况拉取请求来消费

总结

MQ优势

应用解耦:提高系统容错性和可维护性

异步提速:提升用户体验和系统吞吐量

削峰填谷:提高系统稳定性

MQ劣势

系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息不丢失等情况?


主流MQ产品特点比较

       


Rabbitmq快速上手

百度网盘链接(windows版):https://pan.baidu.com/s/1yEHQvd0VrqJTYmkZvDPf4Q 
提取码:kzzr 

安装完成后,访问本地http://localhost:15672/,账号密码都是guest

        

​       登录控制台后上方就能看到RabbitMQ的主要功能。其中Overview是概述,主要展示RabbitMQ服务的一些整体运行情况。后面Conections、Channels、Exchanges和Queues就是RabbitMQ的核心功能。最后的Admin则是一些管理功能。

创建用户admin

 创建用户对应的虚拟机,以下分配了/和/mirror虚拟机,通过左下方set可以添加虚拟机给用户(前提虚拟机已经创建)。

Exchange和Queue

创建一个队列

Virtual host:                                     虚拟机名称,自己选择                 
Type:          创建队列的类型,有三种Classic经典队列、Quorum仲裁队列、Stream流队列
Name:                                     队列名称
Durability:       是否持久(都是针对没有处理过的消息)化Durable会存到硬盘当中,服务重启队列中的消息还是会保留。Transient消息发过来之后,服务重启则消息丢失。            
Autodelete:                                     是否自动删除    
Arguments:                                   配置队列的多个参数

       有了队列后,我们就可以在这个队列上收发消息,在队列列表中点击刚刚创建的队列,然后就可以看到以下队列功能。

        RabbitMQ中的消息都是通过Queue队列传递的,这个Queue其实就是一个典型的FIFO的队列数据结构。而Exchange交换机则是用来辅助进行消息分发的。Exchange与Queue之间会建立一种绑定关系,通过绑定关系,Exchange交换机里发送的消息就可以分发到不同的Queue上。

      进入Exchanges菜单,可以看到针对每个虚拟机,RabbitMQ都预先创建了多个Exchange交换机。我们选择/mirror下的amq.direct交换机,与test队列做绑定。

               

 发送一条消息,在队列中接收消息

       Exchange交换机既然可以绑定一个队列,当然也可以绑定更多的队列。而Exchange的作用,就是将发送到Exchange的消息转发到绑定的队列上。在具体使用时,通常只有消息生产者需要与Exchange打交道。而消费者,则并不需要与Exchange打交道,只要从Queue中消费消息就可以了。

​        另外,Exchange并不只是简单的将消息全部转发给Queue,在实际使用中,Exchange与Queue之间可以建立不同类型的绑定关系,然后通过一些不同的策略,选择将消息转发到哪些Queue上。这时候,Messaage上几个没有用上的参数,像Routing Key ,Headers,Properties这些参数就能派上用场了。

​         在这个过程中,我们都是通过页面操作完成的消息发送与接收。在实际应用时,其实就是通过RabbitMQ提供的客户端API来完成这些功能。但是整个执行的过程,其实跟页面操作是相同的。


Connection和Channel

​       这两个概念实际上是跟客户端应用的对应关系。一个Connection可以理解为一个客户端应用。而一个应用可以创建多个Channel,用来与RabbitMQ进行交互。

1、创建一个Maven项目,在pom.xml中引入RabbitMQ客户端的依赖:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>

2、然后就可以创建一个消费者实例,尝试从RabbitMQ上的test1这个队列上拉取消息。

public class FirstConsumer {
    private static final String HOST_NAME="127.0.0.1";
    private static final int HOST_PORT=5672;
    private static final String QUEUE_NAME="test2";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="123456";
    public static final String VIRTUAL_HOST="/mirror";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_NAME);
        factory.setPort(HOST_PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 声明一个对列。几个参数依次为: 队列名,durable是否实例化;exclusive:是否独占;autoDelete:是否自动删除;arguments:参数
         * 这几个参数跟创建队列的页面是一致的。
         * 如果Broker上没有队列,那么就会自动创建队列。
         * 但是如果Broker上已经由了这个队列。那么队列的属性必须匹配,否则会报错。
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //每个worker同时最多只处理一个消息
        channel.basicQos(1);
        //回调函数,处理接收到的消息
        Consumer myconsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                System.out.println("========================");
                String routingKey = envelope.getRoutingKey();
                System.out.println("routingKey >"+routingKey);
                String contentType = properties.getContentType();
                System.out.println("contentType >"+contentType);
                long deliveryTag = envelope.getDeliveryTag();
                System.out.println("deliveryTag >"+deliveryTag);
                System.out.println("content:"+new String(body,"UTF-8"));
                // (process the message components here ...)
                channel.basicAck(deliveryTag, false);
            }
        };
        //从test1队列接收消息
        channel.basicConsume(QUEUE_NAME, myconsumer);
    }
}

虽然我们这次测试只用到了消费者,在此也将生产者代码补充

 

public class FirstProducer {

    private static final String HOST_NAME="127.0.0.1";
    private static final int HOST_PORT=5672;
    private static final String QUEUE_NAME="test2";
    public static final String USER_NAME="admin";
    public static final String PASSWORD="admin";
    public static final String VIRTUAL_HOST="/mirror";

    public static void main(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_NAME);
        factory.setPort(HOST_PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VIRTUAL_HOST);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 声明一个对列。几个参数依次为: 队列名,durable是否实例化;exclusive:是否独占;autoDelete:是否自动删除;arguments:参数
         * 这几个参数跟创建队列的页面是一致的。
         * 如果Broker上没有队列,那么就会自动创建队列。
         * 但是如果Broker上已经由了这个队列。那么队列的属性必须匹配,否则会报错。
         */
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        String message = "message";
        channel.basicPublish("", QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        channel.close();
        connection.close();
    }
}

         执行完消费者应用程序后,就会在RabbitMQ上新创建一个test2的队列(如果你之前没有创建过的话),并且启动一个消费者,处理test2队列上的消息。这时,我们可以从管理平台页面上往test2队列发送一条消息,这个消费者程序就会及时消费消息。

可以看到Connection和Channel已经建立完成。

现在通过test2队列发送消息,我们客户端消费者会进行消费。

控制台打印出了消息内容


RabbitMQ中的核心概念总结

1、服务主机Broker

  ​     一个搭建RabbitMQ Server的服务器称为Broker。这个并不是RabbitMQ特有的概念,但是却是几乎所有MQ产品通用的一个概念。未来如果需要搭建集群,就需要通过这些Broker来构建。

2、虚拟主机 virtual host

​        RabbitMQ出于服务器复用的想法,可以在一个RabbitMQ集群中划分出多个虚拟主机,每一个虚拟主机都有全套的基础服务组件,可以针对每个虚拟主机进行权限以及数据分配。不同虚拟主机之间是完全隔离的,如果不考虑资源分配的情况,一个虚拟主机就可以当成一个独立的RabbitMQ服务使用。

2、连接 Connection

​       客户端与RabbitMQ进行交互,首先就需要建立一个TPC连接,这个连接就是Connection。既然是通道,那就需要尽量注意在停止使用时要关闭,释放资源。

3、信道 Channel

​       一旦客户端与RabbitMQ建立了连接,就会分配一个AMQP信道 Channel。每个信道都会被分配一个唯一的ID。也可以理解为是客户端与RabbitMQ实际进行数据交互的通道,我们后续的大多数的数据操作都是在信道 Channel 这个层面展开的。

​       RabbitMQ为了减少性能开销,也会在一个Connection中建立多个Channel,这样便于客户端进行多线程连接,这些连接会复用同一个Connection的TCP通道,所以在实际业务中,对于Connection和Channel的分配也需要根据实际情况进行考量。

4、交换机 Exchange

​       这是RabbitMQ中进行数据路由的重要组件。消息发送到RabbitMQ中后,会首先进入一个交换机,然后由交换机负责将数据转发到不同的队列中。RabbitMQ中有多种不同类型的交换机来支持不同的路由策略。从Web管理界面就能看到,在每个虚拟主机中,RabbitMQ都会默认创建几个不同类型的交换机来。

       交换机多用来与生产者打交道。生产者发送的消息通过Exchange交换机分配到各个不同的Queue队列上,而对于消息消费者来说,通常只需要关注自己感兴趣的队列就可以了。

5、队列 Queue

​       Queue是实际保存数据的最小单位。Queue不需要Exchange也可以独立工作,只不过通常在业务场景中,会增加Exchange实现更复杂的消息分配策略。Queue结构天生就具有FIFO的顺序,消息最终都会被分发到不同的Queue当中,然后才被消费者进行消费处理。这也是最近RabbitMQ功能变动最大的地方。最为常用的是经典队列Classic。RabbitMQ 3.8.X版本添加了Quorum队列,3.9.X又添加了Stream队列。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1417183.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零学习Linux操作系统 第二十二部分 企业域名解析服务的部署及安全优化

# 一、dns的主要信息 关于dns的名词解释&#xff1a;dns: domain name service(域名解析服务) 关于客户端: /etc/resolv.conf dns指向文件 A记录 ##ip地址叫做域名的Address 记录 SOA ##授权起始主机 关于服务端 bind安装包named服务名称/etc/named.conf主配置文件/var/na…

【深度学习:多关节嵌入模型】 Meta 解释的 ImageBind 多关节嵌入模型

【深度学习&#xff1a;多关节嵌入模型】 Meta 解释的 ImageBind 多关节嵌入模型 Meta 发布开源人工智能工具的历史分段任何模型DINOv2 什么是多模态学习&#xff1f;什么是嵌入&#xff1f;什么是 ImageBind&#xff1f;集成在 ImageBind 中的模式图像绑定架构特定模式编码器跨…

window下如何安装ffmpeg(跨平台多媒体处理工具)

ffmpeg是什么? FFmpeg是一个开源的跨平台多媒体处理工具&#xff0c;可以用于录制、转换和流媒体处理音视频。它包含了几个核心库和工具&#xff0c;可以在命令行下执行各种音视频处理操作&#xff0c;如剪辑、分割、合并、媒体格式转换、编解码、流媒体传输等。FFmpeg支持多…

java设计模式:工厂模式

1&#xff1a;在平常的开发工作中&#xff0c;我们可能会用到不同的设计模式&#xff0c;合理的使用设计模式&#xff0c;可以提高开发效率&#xff0c;提高代码质量&#xff0c;提高系统的可拓展性&#xff0c;今天来简单聊聊工厂模式。 2&#xff1a;工厂模式是一种创建对象的…

Java TemporalAdjusters 时间调节器

提供了非常多处理日期相关的函数&#xff1a; 使用示例&#xff1a; /*** JCccc* param args*/public static void main(String[] args) {DateTimeFormatter pattern DateTimeFormatter.ofPattern("yyyy-MM-dd");LocalDateTime now LocalDateTime.now();//获取当月…

备战蓝桥杯---二分(入门)

话不多说&#xff0c;先来个模板题来回顾一下上次讲的&#xff1a; 下面是AC代码&#xff1a; 下面进入正题&#xff1a; 本题对1&#xff0c;2行与3&#xff0c;4行组合&#xff0c;再用二分查找即可实现n^2logn的复杂度。 下面是AC代码&#xff1a; 接题&#xff1a; 让我们…

基于springboot校园交友网站源码和论文

随着信息技术和网络技术的飞速发展&#xff0c;人类已进入全新信息化时代&#xff0c;传统管理技术已无法高效&#xff0c;便捷地管理信息。为了迎合时代需求&#xff0c;优化管理效率&#xff0c;各种各样的管理系统应运而生&#xff0c;各行各业相继进入信息管理时代&#xf…

QT学习日记 | QT的环境搭建

目录 前言 一、QT概述 二、QT的环境搭建 1、QT SDK安装 2、环境变量的配置 前言 本系列为小编新开的一个系列&#xff0c;主要记录小编学习QT的过程&#xff0c;作为笔记仅供各位参考&#xff1b; 一、QT概述 Qt是一个跨平台C图形应用界面框架&#xff1b;简单来说&#x…

Android 13.0 SystemUI下拉状态栏定制二 锁屏页面横竖屏时钟都居中功能实现二

1.前言 在13.0的系统rom定制化开发中,在关于systemui的锁屏页面功能定制中,由于在平板横屏锁屏功能中,时钟显示的很大,并且是在左旁边居中显示的, 由于需要和竖屏显示一样,所以就需要用到小时钟显示,然后同样需要居中,所以就来分析下相关的源码,来实现具体的功能 如图…

C++:异常体系

异常体系 异常1.C语言传统的处理错误的方式2.C异常概念3.异常的使用3.1异常的抛出和捕获3.2 异常的重新抛出3.3异常安全3.4 异常规范 4.C标准库的异常体系5.异常的优缺点 异常 1.C语言传统的处理错误的方式 终止程序&#xff0c;如assert&#xff0c;缺陷&#xff1a;用户难以…

鸿蒙 ArkTs初识

前提&#xff1a;基于官网3.1/4.0文档。参考官网文档 基于Android开发体系来进行比较和思考。&#xff08;或有偏颇&#xff0c;自行斟酌&#xff09; 吐槽&#xff1a;官网上的案例只有代码和文档解释&#xff0c;没有可以直接运行查看效果的模拟器&#xff0c;这一点上&#…

Qt|大小端数据转换

后面打算写Qt关于网络编程的博客&#xff0c;网络编程就绕不开字节流数据传输&#xff0c;字节流数据的传输一般是根据协议来定义对应的报文该如何组包&#xff0c;那这就必然牵扯到了大端字节序和小端字节序的问题了。不清楚的大小端的可以看一下相关资料&#xff1a;大小端模…

【RTP】webrtc 学习3: webrtc对h264的rtp解包

rtp_rtcp\source\video_rtp_depacketizer_h264.cc【RTP】webrtc 学习2: webrtc对h264的rtp打包 中分析了打包过程的代码,这样再来看解析过程的源码就容易多了:本代码主要基于m79,m98类似。这里注明了jitterbuffer 会再次 做 解析stap-a 变为NAL units解析ParseFuaNalu 第一…

Jmeter 分布式测试

Jmeter单机进行压测&#xff0c;受到单台机器的性能影响&#xff0c;Jmeter支持分布式测试&#xff0c;用一个控制节点去控制多个工作节点去模拟更多的用户。 版本信息 内容版本号JDK1.8Jmeter5.6.2 分布式测试原理 jmeter 官网对分布式测试有说明&#xff0c;jmeter分布式…

RabbitMQ 死信交换机的详述➕应用

&#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于RabbitMQ的相关操作吧 目录 &#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 一.什么是死信交换机 二. 死信队列…

说说你对vue的mixin的理解,有什么应用场景

mixin是什么 Vue中的mixin 局部混入全局混入注意事项: 使用场景源码分析Vue 的几种类型的合并策略 替换型合并型队列性叠加型小结 此文章&#xff0c;来源于印客学院的资料&#xff0c;这里只是分享&#xff0c;便于查漏补缺。 mixin是什么 Mixin 是 面向对象程序设计语言中…

一文理清楚-Docker 容器如何工作

Docker 容器如何工作 集装箱什么是虚拟机&#xff1f;虚拟化如何运作&#xff1f;什么是容器&#xff1f;什么是 Docker&#xff1f;总结 五星上将麦克阿瑟曾经说过&#xff1a;在docker面前&#xff0c;虚拟机就是个弟弟 集装箱 《盒子&#xff1a;集装箱如何让世界变得更小&…

车载电子电器架构 —— 多核处理器刷写策略

车载电子电器架构 —— 多核处理器刷写策略 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 屏蔽力是信息过载时代一个人的特殊竞争力&#xff0c;任何消…

内存管理(mmu)/内存分配原理/多级页表

1.为什么要做内存管理&#xff1f; 随着进程对内存需求的扩大&#xff0c;和同时调度的进程增加&#xff0c;内存是比较瓶颈的资源&#xff0c;如何更好的高效的利于存储资源是一个重要问题。 这个内存管理的需求也是慢慢发展而来&#xff0c;早期总线上的master是直接使用物…

C++:STL - string

C&#xff1a;STL - string basic_stringstringstring的常见构造string的输入输出operator<<c_stroperator>>getline string访问及遍历operator[ ] & atfont & back迭代器begin & endrbegin & rend 范围for string的容量操作size & lengthmax_…