毕业后工作半年,在自己的讲课中需要介绍消息队列,以前在大学也有经常接触message queen,但却还不够深入了解掌握,这次写个专门针对mq的文章理清头绪。
以下是学习mq的知识框架,我会不定时更新补充
RabbitMQ概念_MQ
消息队列
MQ全称Message Queue(消息队列),是在消息的传输过程中保
存消息的容器。多用于系统之间的异步通信
同步通信相当于两个人当面对话,你一言我一语。必须及时回复。
异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。
消息
两台计算机间传送的数据单位。消息可以非常简单,例如只包含文
本字符串;也可以更复杂,可能包含嵌入对象。
队列
数据结构中概念。在队列中,数据先进先出,后进后出。
RabbitMQ概念_MQ的优势
应用解耦
在电商平台中,用户下订单需要调用订单系统,此时订单系统还需
要调用库存系统、支付系统、物流系统完成业务。此时会产生两个
问题:
- 如果库存系统出现故障,会造成整个订单系统崩溃。
- 如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。
如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再
转发到其他系统,则会解决以下问题:
- 由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。
- 如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。
异步提速
如果订单系统同步访问每个系统,则用户下单等待时长如下:
如果引入MQ,则用户下单等待时长如下:
削峰填谷
假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒
5000,则会造成系统崩溃。此时引入mq即可解决该问题
使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期
产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消
息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持
在1000,直到消费完积压的消息,这就叫做“填谷”。
RabbitMQ概念_MQ的劣势
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。
# RabbitMQ概念_MQ应用场景
抢红包、秒杀活动、抢火车票等
这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪
而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。
消息分发
如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。
数据同步
假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。
异步处理
在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。
离线处理
在银行系统中,如果要查询近十年的历史账单,这是非常耗时的
操作。如果发送同步请求,则会花费大量时间等待响应。此时使
用MQ发送异步请求,等到查询出结果后获取结果即可。
RabbitMQ概念_AMQP
RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。
AMQP
即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。
AMQP工作过程
生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规
则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给
订阅了此队列的消费者。
RabbitMQ概念_RabbitMQ工作原理
Producer
消息的生产者。也是一个向交换机发布消息的客户端应用程序。
Connection
连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。
Channel
信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。
Broker
消息队列服务器实体。即RabbitMQ服务器
Virtual host
虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件
划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的
RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机
制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划
分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是 /
Exchange
交换机。用来接收生产者发送的消息,并根据分发规则,将这些
消息分发给服务器中的队列中。不同的交换机有不同的分发规
则。
Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,
也是消息的终点。消息一直在队列里面,等待消费者链接到这个
队列将其取走。
Binding
消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定
信息保存到交换机的路由表中,作为消息的分发依据。
Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程
序。
RabbitMQ为什么使用信道而不直接使用TCP连接通信?
TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需
要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源
巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的,
会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接
可以容纳无限的信道,即使每秒成千上万的请求也不会成为性
能的瓶颈。
RabbitMQ安装_安装Erlang
RabbitMQ安装_安装RabbitMQ
RabbitMQ安装_账户管理
RabbitMQ安装_管控台
RabbitMQ安装_Docker安装
我这里就不教了,大家自行百度哈
ps:大家如果实在不会,又比较需要学习怎么安装,那就多@我,留评论我再来补充(手动狗头),毕竟这篇文章是偏概念和理清mq是个啥的
RabbitMQ简单模式_概念
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,课程不对此模式进行讲解)
首先我们讲解简单模式:
特点:
- 一个生产者对应一个消费者,通过队列进行消息传递。
- 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
RabbitMQ简单模式_项目搭建
接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行
工作。
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包
创建项目
1.启动RabbitMQ
# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached
2.创建普通maven项目,添加RabbitMQ依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqpclient</artifactId>
<version>5.14.0</version>
</dependency>
</dependencies>
RabbitMQ简单模式_编写生产者
接下来我们编写生产者代码创建消息:
// 生产者
public class Producer {
public static void main(String[] args)
throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
7 connectionFactory.setPort(5672);
8
connectionFactory.setUsername("itbaizhan");
9
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");
11 // 2.创建连接
12 Connection connection =
connectionFactory.newConnection();
13 // 3.建立信道
14 Channel channel =
connection.createChannel();
15 // 4.创建队列,如果队列已存在,则使用该队列
16 /**
17 * 参数1:队列名
18 * 参数2:是否持久化,true表示MQ重启后队列
还在。
19 * 参数3:是否私有化,false表示所有消费者
都可以访问,true表示只有第一次拥有它的消费者才能访问
20 * 参数4:是否自动删除,true表示不再使用队
列时自动删除队列
21 * 参数5:其他额外参数
22 */
23
channel.queueDeclare("simple_queue",false,f
alse,false,null);
24 // 5.发送消息
25 String message = "hello!rabbitmq!";
26 /**
27 * 参数1:交换机名,""表示默认交换机
28 * 参数2:路由键,简单模式就是队列名
29 * 参数3:其他额外参数
30 * 参数4:要传递的消息字节数组
31 */
32
channel.basicPublish("","simple_queue",null
,message.getBytes());
33 // 6.关闭信道和连接
34 channel.close();
35 connection.close();
36 System.out.println("===发送成功===");
37 }
38 }
运行生产者后,我们可以看到在RabbitMQ中创建了队列,队列中
已经有了消息。
RabbitMQ简单模式_编写消费者
接下来我们编写消费者代码消费消息:
// 消费者
public class Consumer {
public static void main(String[] args)
throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
9
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");
11 // 2.创建连接
12 Connection connection =
connectionFactory.newConnection();
13 // 3.建立信道
14 Channel channel =
connection.createChannel();
15 // 4.监听队列
16 /**
17 * 参数1:监听的队列名
18 * 参数2:是否自动签收,如果设置为false,
则需要手动确认消息已收到,否则MQ会一直发送消息
19 * 参数3:Consumer的实现类,重写该类方法
表示接受到消息后如何消费
20 */
21
channel.basicConsume("simple_queue",true,ne
w DefaultConsumer(channel){
22 @Override
23 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
24 String message = new
String(body, "UTF-8");
25 System.out.println("接受消息,
消息为:"+message);
26 }
27 });
}
}
运行消费者后,我们可以看到在RabbitMQ中的消息已经被消费。
RabbitMQ工作队列模式_概念
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下
- 一个队列对应多个消费者.
- 一条消息只会被一个消费者消费。
- 消息队列默认采用轮询的方式将消息平均发送给消费者。
RabbitMQ工作队列模式_编写生产者
接下来我们编写生产者代码创建大量消息:
public class Producer {
public static void main(String[] args)
throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection =
connectionFactory.newConnection();
// 3.建立信道
Channel channel =
connection.createChannel();
// 4.创建队列,持久化队列
channel.queueDeclare("work_queue",true,fals
e,false,null);
// 5.发送大量消息,参数3表示该消息为持久化
消息,即除了保存到内存还会保存到磁盘中
for (int i = 1; i <= 100; i++) {
channel.basicPublish("","work_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
("你好,这是今天的第"+i+"条
消息").getBytes());
}
// 6.关闭资源
channel.close();
connection.close();
}
}
效果如下:
RabbitMQ工作队列模式_编写消费者
接下来我们编写三个消费者监听同一个队列:
// 消费者1
public class Consumer1 {
public static void main(String[] args)
throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory
= new ConnectionFactory();
6
connectionFactory.setHost("192.168.0.162");
7 connectionFactory.setPort(5672);
8
connectionFactory.setUsername("itbaizhan");
9
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");
11 // 2.创建连接
12 Connection connection =
connectionFactory.newConnection();
13 // 3.建立信道
14 Channel channel =
connection.createChannel();
15 // 4.监听队列,处理消息
16
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
17 @Override
18 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
19 String message = new
String(body, "UTF-8");
20 System.out.println("消费者1消
费消息,消息为:"+message);
21 }
22 });
}
24 }
25
26 // 消费者2
27 public class Consumer2 {
28 public static void main(String[] args)
throws IOException, TimeoutException {
29 // 1.创建连接工厂
30 ConnectionFactory connectionFactory
= new ConnectionFactory();
31
connectionFactory.setHost("192.168.0.162");
32 connectionFactory.setPort(5672);
33
connectionFactory.setUsername("itbaizhan");
34
connectionFactory.setPassword("itbaizhan");
35
connectionFactory.setVirtualHost("/");
36 // 2.创建连接
37 Connection connection =
connectionFactory.newConnection();
38 // 3.建立信道
39 Channel channel =
connection.createChannel();
40 // 4.监听队列,处理消息
41
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
42 @Override
43 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
44 String message = new
String(body, "UTF-8");
45 System.out.println("消费者2消
费消息,消息为:"+message);
46 }
47 });
48 }
49 }
50
51 // 消费者3
52 public class Consumer3 {
53 public static void main(String[] args)
throws IOException, TimeoutException {
54 // 1.创建连接工厂
55 ConnectionFactory connectionFactory
= new ConnectionFactory();
56
connectionFactory.setHost("192.168.0.162");
57 connectionFactory.setPort(5672);
58
connectionFactory.setUsername("itbaizhan");
59
connectionFactory.setPassword("itbaizhan");
60
connectionFactory.setVirtualHost("/");
61 // 2.创建连接
62 Connection connection =
connectionFactory.newConnection();
63 // 3.建立信道
64 Channel channel =
connection.createChannel();
65 // 4.监听队列,处理消息
66
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
67 @Override
68 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
69 String message = new
String(body, "UTF-8");
70 System.out.println("消费者3消
费消息,消息为:"+message);
71 }
72 });
73 }
74 }
效果如下:
RabbitMQ发布订阅模式_概念
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
特点
- 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
- 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
RabbitMQ发布订阅模式_编写生产者
接下来我们编写发布订阅模式的生产者:
// 生产者
public class Producer {
public static void main(String[] args)
throws IOException, TimeoutException {
4 // 1.创建连接工厂
5 ConnectionFactory connectionFactory
= new ConnectionFactory();
6
connectionFactory.setHost("192.168.0.162");
7 connectionFactory.setPort(5672);
8
connectionFactory.setUsername("itbaizhan");
9
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");
11 // 2.创建连接
12 Connection connection =
connectionFactory.newConnection();
13 // 3.建立信道
14 Channel channel =
connection.createChannel();
15 // 4.创建交换机
16 /**
17 * 参数1:交换机名
18 * 参数2:交换机类型
19 * 参数3:交换机持久化
20 */
21
channel.exchangeDeclare("exchange_fanout",
BuiltinExchangeType.FANOUT,true);
22 // 5.创建队列
23
channel.queueDeclare("SEND_MAIL",true,false
,false,null);
24
channel.queueDeclare("SEND_MESSAGE",true,fa
lse,false,null);
25
channel.queueDeclare("SEND_STATION",true,fa
lse,false,null);
26 // 6.交换机绑定队列
27 /**
28 * 参数1:队列名
29 * 参数2:交换机名
30 * 参数3:路由关键字,发布订阅模式写""即可
31 */
32
channel.queueBind("SEND_MAIL","exchange_fan
out","");
33
channel.queueBind("SEND_MESSAGE","exchange_
fanout","");
34
channel.queueBind("SEND_STATION","exchange_
fanout","");
35 // 7.发送消息
36 for (int i = 1; i <= 10 ; i++) {
37
channel.basicPublish("exchange_fanout","",n
ull,
38 ("你好,尊敬的用户,秒杀商品
开抢
了!"+i).getBytes(StandardCharsets.UTF_8));
39 }
40 // 8.关闭资源
41 channel.close();
connection.close();
}
}
42
效果如下
RabbitMQ发布订阅模式_编写消费者
接下来编写三个消费者,分别监听各自的队列
// 站内信消费者
public class CustomerStation {
public static void main(String[] args)
throws IOException, TimeoutException {
4 // 1.创建连接工厂
5 ConnectionFactory connectionFactory
= new ConnectionFactory();
6
connectionFactory.setHost("192.168.0.162");
7 connectionFactory.setPort(5672);
8
connectionFactory.setUsername("itbaizhan");
9
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");// 默
认虚拟机
11 //2.创建连接
12 Connection conn =
connectionFactory.newConnection();
13 //3.建立信道
14 Channel channel =
conn.createChannel();
15 // 4.监听队列
16 channel.basicConsume("SEND_STATION",
true, new DefaultConsumer(channel) {
17 @Override
18 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
19 String message = new
String(body, "utf-8");
System.out.println("发送站内
信:"+message);
21 }
22 });
23 }
24 }
25
26 // 邮件消费者
27 public class Customer_Mail {
28 public static void main(String[] args)
throws IOException, TimeoutException {
29 // 1.创建连接工厂
30 ConnectionFactory connectionFactory
= new ConnectionFactory();
31
connectionFactory.setHost("192.168.0.162");
32 connectionFactory.setPort(5672);
33
connectionFactory.setUsername("itbaizhan");
34
connectionFactory.setPassword("itbaizhan");
35
connectionFactory.setVirtualHost("/");// 默
认虚拟机
36 //2.创建连接
37 Connection conn =
connectionFactory.newConnection();
38 //3.建立信道
39 Channel channel =
conn.createChannel();
40 // 4.监听队列
channel.basicConsume("SEND_MAIL",
true, new DefaultConsumer(channel) {
42 @Override
43 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
44 String message = new
String(body, "utf-8");
45 System.out.println("发送邮
件:"+message);
46 }
47 });
48 }
49 }
50
51 // 短信消费者
52 public class Customer_Message {
53 public static void main(String[] args)
throws IOException, TimeoutException {
54 // 1.创建连接工厂
55 ConnectionFactory connectionFactory
= new ConnectionFactory();
56
connectionFactory.setHost("192.168.0.162");
57 connectionFactory.setPort(5672);
58
connectionFactory.setUsername("itbaizhan");
59
connectionFactory.setPassword("itbaizhan");
60
connectionFactory.setVirtualHost("/");// 默
认虚拟机
61 //2.创建连接
62 Connection conn =
connectionFactory.newConnection();
63 //3.建立信道
64 Channel channel =
conn.createChannel();
65 // 4.监听队列
66 channel.basicConsume("SEND_MESSAGE",
true, new DefaultConsumer(channel) {
67 @Override
68 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
69 String message = newString(body, "utf-8");
70 System.out.println("发送短信:"+message);
71 }
72 });
73 }
74 }