🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ板块正式开始更新,欢迎订阅~
感谢点赞与关注~~~
目录
- 1. RabbitMQ介绍
- 1.1 什么是MQ
- 1.2 MQ的作用
- 2. RabbitMQ安装
- 2.1 安装相关软件包与插件
- 2.2 启动服务并访问
- 3. RabbitMQ快速上手
- 3.1 RabbitMQ核心概念与工作流程
- 3.1.1 producer和consumer
- 3.1.2 Connection和Channel
- 3.1.3 Virtual host
- 3.1.4 Queue
- 3.1.5 Exchange
- 3.1.6 RabbitMQ的工作流程(常见面试题)
- 3.2 AMQP(了解)
- 3.3 web管理界面相关操作
- 3.3.1 用户相关操作
- 3.3.2 虚拟机相关操作
- 4. RabbitMQ简单生产者消费者编写
- 4.1 引入依赖
- 4.2 编写生产者代码
- 4.3 编写消费者代码
1. RabbitMQ介绍
首先,我们要知道一下什么是RabbitMQ.
1.1 什么是MQ
其中,MQ是MessageQueue的缩写,也就是消息队列的缩写,顾名思义,消息队列就是一个队列,有关队列的知识我们在之前数据结构的部分学习过,他的特点就是先入先出,只不过现在队列中放入的是消息而已,消息可以非常简单,比如只包含字符串,也可以很复杂,比如内嵌对象.
MQ通常用于分布式系统之间的通信.
系统之间的通信通常有以下两种方式:
- 同步通信
直接调用对方的服务,数据从一端发出之后就立即传递到另一端.
- 异步通信
数据从一端发出之后,首先进入一个容器中进行存储,达到某种条件之后,再由这个容器发送给另一端.这个容器的具体实现就是MQ.
1.2 MQ的作用
MQ的最主要的作用是接收并转发消息.在不同的业务场景之下有着不同的作用.
- 异步解耦
在业务流程中,⼀些操作可能非常耗时,但并不需要即时返回结果.可以借助MQ把这些操作异步化. - 流量削峰
在访问量剧增的时候,应用任然需要发挥作用,扛得住压力,如果一味地在硬件方面投入,无疑是巨大的浪费,所以我们就需要使用MQ来控制流量,将请求排队处理,使得一些关键组件不会因为突然增长的访问压力而崩溃.比如12306放票. - 消息分发
当多个系统需要对同一个数据的更新做出响应的时候,可以使用MQ进行消息分发.订阅队列消息的系统都会收到通知.比如支付成功之后,支付系统可以直接向MQ发送信息,其他系统订阅该消息,它们会直接收到消息,无需轮询数据库. - 延迟通知
需要在特定的时间之后发送通知的场景中,可以使用MQ的延迟消息功能,比如在电子商务平台中,如果用户在下单之后一定时间之内未支付,可以使用延迟队列在超时后自动取消订单.
还有一些其他的用途,这里不再展示.
2. RabbitMQ安装
2.1 安装相关软件包与插件
这里我们只展示在Ubuntu环境之下安装RabbitMQ.
- 首先我们要知道,RabbitMQ是使用Erlang语言开发的,所以我们需要在Ubuntu的环境之下安装Erlang语言环境.在安装环境之前,我们先需要对Ubuntu中的软件仓库进行更新.
sudo apt-get update
sudo apt-get install erlang
在安装Erlang环境的时候,出现粉色弹窗的时候,直接按ESC键退出即可.
- 安装RabbitMQ
sudo apt-get install rabbitmq-server
之后我们使用systemctl status rabbitmq-server
指令查看rabbitmq的服务状态.
如果出现以下界面,说明RabbitMQ服务正在运行.
- 安装RabbitMQ管理界面
rabbitmq-plugins enable rabbitmq_management
在管理RabbitMQ的时候,我们需要在浏览器的图形化界面上进行管理,所以我们需要安装管理界面插件的.这个插件默认是不安装的.
2.2 启动服务并访问
- 启动服务
sudo service rabbitmq-server start
在安装RabbitMQ之后,默认是启动服务的,所以这一步可以省略.
- 通过IP地址与端口号访问界面
在访问图形化界面的时候,我们需要在云服务上开放一些端口号.首先我们需要开放的端口号是15672,这个端口号是RabbitMQ的图形化管理界面的访问端口号,其次是5672端口号,这个端口号开放之后,我们就可以通过java代码来操作RabbitMQ.
下面我们以阿里云的控制台为例:- 首先选择控制台
- 点击左上角的菜单,点击服务器ECS进入服务器控制台
- 选择要操作的服务器
- 选择安全组,选择管理规则
- 选择手动添加,目的端口添加15672和5672,授权对象选择所有IPv4.
之后我们就可以通过服务器公网IP和服务器端口号访问到RabbitMQ的管理界面了.
- 首先选择控制台
- 添加管理员用户
//添加用户
rabbitmqctl add_user jiangruijia qwe123524
//设置指定用户为管理员
rabbitmqctl set_user_tags jiangruijia administrator
其中,RabbitMQ中,用户有以下的角色可以选择:
1. Administrator超级管理员,可登陆管理控制台(启用managementplugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作
2. Monitoring监控者,可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
3. Policymaker策略制定者,可登陆管理控制台(启用management plugin的情况下),同时可以对policy进行管理。但无法查看节点的相关信息.
4. Management普通管理者,仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理.
5. Impersonator模拟者,无法登录管理控制台。
6. None其他用户,无法登陆管理控制台,通常就是普通的生产者和消费者。
4. 登录管理员用户
3. RabbitMQ快速上手
3.1 RabbitMQ核心概念与工作流程
在上面的RabbitMQ中的管理界面中,我们看到了导航栏中有6个部分,这6个部分分别是什么意思呢.我们首先要了解RabbitMQ的工作流程.
RabbitMQ是一个消息中间件,也是一个生产者消费者模型.他负责接收存储并转发消息.下面我们就来对上面这个图做出详细的解释.
3.1.1 producer和consumer
producer:生产者,是RabbitMQ Server的客户端,向RabbitMQ的服务器中发送消息.
consumer:消费者,是RabbitMQ Server的客户端,向RabbitMQ的服务器中发送消息.
broker:RabbitMQ的服务器,负责接收发送和存储消息.
- 生产者
生产者负责创建消息,然后发布到RabbitMQ中.在实际业务中,生产者生产出来的消息往往具有一定的逻辑结构,比如json字符串,而且带有一定的标签,服务器会根据标签对消息进行路由,把消息发送给感兴趣的消费者. - 消费者
消费者连接到RabbitMQ服务器,就可以消费消息了,消费的过程中,标签会被丢掉,消费者只会收到消息,并不知道消息的生产者是谁,当然消费者也不需要知道.
就像学校给我们发送通知,我们在收到学校的通知的时候,并不需要知道这个通知是哪个老师发的一样.
- broker
是RabbitMQ的核心部分,可以简单地看做RabbitMQ的服务结点,或者RabbitMQ的服务实例.
3.1.2 Connection和Channel
Connection: 连接.是客户端和RabbitMQ服务器之间的一个TCP连接.他负责传输客户端和服务器之间的所有数据和控制信息.
Channel: 通道,信道.Channel是在Connection之上的⼀个抽象层.在RabbitMQ中,一个TCP连接可以有多个Channel,每个Channel都是独立的虚拟连接.消息的发送和接收都是基于Channel的.
Channel属于Connection的一部分.通道最主要的作用就是把消息的读写都重复到同一个TCP连接上,这样可以减少建立和关闭连接的开销.这样的模式和我们之前学习多线程时候的线程池或者之前学习Spring原理的时候的数据库连接池非常类似.
3.1.3 Virtual host
Virtual host: 虚拟主机,一个RabbitMQ Server中有多个虚拟主机,它为消息队列(queue)提供了逻辑上的隔离机制.当多个不同的用户使用同⼀个RabbitMQ Server提供的服务时,可以虚拟划分出多个vhost,每个用户在自己的vhost创建exchange/queue等.
类似与我们之前学习MySQL的时候,一个MySQL服务器中创建多个database一样,每一个database都是相互隔离的.它们之间互不影响.
3.1.4 Queue
Queue: 队列,是RabbitMQ的内部对象,用来存储消息.
多个消费者可以订阅一个队列,一个消费者也可以订阅多个队列.它们是多对多的关系.
3.1.5 Exchange
Exchange: 交换机,message到达broker的第⼀站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到⼀个或多个Queue列中.交换机起到的是消息路由的作用,它会根据类型和规则来确定如何转发接收到的消息给队列.
类似与我们发快递之后,快递在驿站打单揽收之后,快递分拣站就会根据快递单的不同地址分拣到不同的快递货车.
3.1.6 RabbitMQ的工作流程(常见面试题)
理解了上面的概念之后,我们就可以来总结一下RabbitMQ是如何把一条消息从生产者的手中转发到消费者的手中的.
- producer生产了一条信息.
- producer连接到RabbitMQBroker,建立一个Connection,并开通一个信道.
- producer声明一个交换机,用于把消息路由到指定的交换机中.声明队列,用于把消息存储到指定的队列中.这些信息会在消息的标签中声明.
- producer经过Channel,发送消息到RabbitMQBroker中.
- RabbitMQBroker接收消息,并按照消息的标签路由到对应的交换机中,交换机再次按照标签存储到对应的队列中.
- 出队列之后,消息会经过Channel到达消费者客户端中.
3.2 AMQP(了解)
AMQP是Advanced Message Queuing Protocol的缩写,是一种高级消息队列协议.AMQP定义了⼀套确定的消息交换功能,包括交换器(Exchange),队列(Queue)等.这些组件共同工作,使得生产者能够将消息发送到交换器.然后由队列接收并等待消费者接收.AMQP还定义了⼀个网络协议,允许客户端应用通过该协议与消息代理和AMQP模型进行交互通信.
RabbitMQ遵循的就是AMQP协议,换句话说,RabbitMQ就是AMQP协议的Erlang实现.当然RabbitMQ中不仅仅只有这一种协议,还包含一些其他的协议.
3.3 web管理界面相关操作
3.3.1 用户相关操作
- 添加用户
admin->add user
我们需要为用户设置用户名,密码,权限.完成之后点击add user
用户添加成功. - 用户相关操作
- 为用户添加对虚拟机的操作权限.
- 更新/删除用户
- 退出登录当前用户
- 为用户添加对虚拟机的操作权限.
3.3.2 虚拟机相关操作
- 添加虚拟机
admin -> Virtual Host -> Add a new virtual host
设置虚拟机名称
添加成功
4. RabbitMQ简单生产者消费者编写
在这之前,我们需要创建一个Maven项目.
4.1 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
4.2 编写生产者代码
- 首先我们需要创建与Connection的连接,我们使用java操作RabbitMQ的默认端口号是5672.
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 配置IP,端口号,用户信息,以及该用户所要使用的虚拟机
factory.setHost("39.105.137.64");
factory.setPort(5672);
factory.setUsername("jiangruijia");
factory.setPassword("qwe123524");
factory.setVirtualHost("/");
//3. 与Connection建立连接
Connection connection = factory.newConnection();
- 之后,我们开通Connection中的Channel.
//4. 创建信道
Channel channel = connection.createChannel();
- 声明一个队列Queue(如果不声明交换机,交换机使用默认交换机)
//5. 声明队列
channel.queueDeclare("hello",true,false,false,null);
参数解释:
1. Queue: 队列名称,如果不存在,自动创建
2. durable: 是否持久化.如果设置持久化之后,队列中的消息在重新启动服务之后,不会丢失.
3. exclusive: 是否限制独占,如果设置独占,只能有一个消费者监听队列.
4. autoDelete: 是否自动删除,当队列没有对应的消费者消费信息的时候,自动销毁.
5. argument: 相关参数
- 发送信息
//6. 发送信息
String s = "hello world";
channel.basicPublish("","hello",null,s.getBytes());
参数说明:
1. exchange: 交换机名称,在简单的一对一模式中,我们会使用默认交换机"".
2. routingKey: 生产者声明的路由关键字,即需要发送到的队列.
3. props: 配置信息.
4. body: 发送的信息
- 释放资源
先释放信道,再释放连接
//7. 释放资源
channel.close();
connection.close();
这个虽然不是必须的,但是他是一种好习惯.
- 运行代码观察结果
在右上角选择虚拟机
我们发现队列中已经有一个消息了
4.3 编写消费者代码
- 消费者代码和之前生产者的代码是一样的,就是把第四部改为消费当前队列
//6. 使用默认消费者,并其中的重写方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
};
//7. 消费消息,并调用defaultConsumer中的逻辑
channel.basicConsume("hello",true,defaultConsumer);
参数解释:
1. DefaultConsumer 是RabbitMQ提供的⼀个默认消费者,实现了Consumer接口.其中构造方法中传入的是需要消费的队列.
2. 重写方法handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
▪ consumerTag :消费者标签,通常是消费者在订阅队列时指定的.
▪ envelope :包含消息的封包信息,如队列名称,交换机等.
▪ properties :⼀些配置信息
▪ body :消息的具体内容
3. basicConsume(String queue, boolean autoAck, Consumer callback)
.Queue是需要消费的队列名称.atuoAck,是否自动确认,如果消费者收到消息之后,会自动和MQ确认.callback,在接收到消息之后执行的逻辑,传入defaultConsumer对象代表的就是在消费队列之后,就执行defaultConsume中handleDelivery方法中对应的逻辑.
- 释放资源
//休眠5秒,确保消费者接收消息完成
Thread.sleep(5000);
channel.close();
connection.close();
- 运行程序,观察结果
后端接收到消息
队列中的信息被消费