1.同步通讯和异步通讯
- 同步通讯:如果举个例子来说,同步通讯就像是两个人在打电话,一方说的话,能够立马传给另一方,消息的时效性非常高,但是相对的,只能是给一个人通讯,如果这个时候,有另一个人想和你建立联系进行通讯,那么抱歉,你必须得先结束当下的通讯,才能和其他人通讯
- 异步通讯:如果把同步通讯比作打电话,那么异步通讯就像是在发微信,我发给另外一个人消息,但是这个人不一定会立马看见并且回复我,通信的时效性不强,但是,它可以同时给多方发送消息,试想一下,我们平时发微信的时候,是可以同时给多个人发送消息,并且也能够同时接收多个人发过来的消息
同步通讯——打电话
异步通讯——发微信
2.在某些场景中使用同步通讯的缺陷
比如微服务直接相互调用的feign中间件,使用的就是同步调用,具体来看缺陷有以下几点
- 这是一个简单的用户支付模块牵扯的服务,用户在支付完之后,后台需要调用如订单服务、仓储服务、短信服务之类的模块来完善这个支付功能,由于是支付功能来直接调用的其他服务,所以服务之间的耦合性较高,当我们要再添加一个积分服务,来让支付服务调用,那么就需要修改支付服务中的代码,麻烦!
- 在支付业务调用别的服务的时候,由于是同步通讯,所以只能一个一个调用,当前服务调用完之后,才能调用别的服务,比如上面这个情况,支付服务调用完订单服务之后,才能调用仓储服务,调用完仓储服务后,才能调用短信服务,耗时太长了!
- 假如仓储服务突然挂掉了,那么支付服务在调用这个仓储服务时就会停滞在这里,那么当有更多的用户来进行支付,那么就有越来越多的请求卡在这一块,当支付服务的资源被耗尽之后,也就会挂掉,像这样一条链上的服务,有其中一个服务挂掉,相应的也会影响别的服务跟着挂掉,就是级联失效问题。
3.异步调用方案解决相关问题
还是上面的这个例子,俗话说,在web架构中没有什么是加一层解决不了的,所以我们在支付服务和其他被调用的服务之间加一个代理人(Broker), 你可以理解成,当我们去吃饭的时候,和我们交涉的前台人员,我们向前台点餐,而不会直接向厨房点餐(当然,路边小摊只有老板一个人做饭+点餐,当我没说)那么此时这种情况,我们就属于客户端,厨房大厨就是服务端,而这个负责接待我们的前台人员就是代理人,他来代替我们和厨房之间通信
当用户支付成功之后,支付服务就会通知代理人BROKER,说:“有一个顾客已经成功下单了,订单是1001”,那么这个BROKER就会通知其余服务,说:“客户订单1001已经支付完成了,你们赶紧来处理一下!”此时,这些服务就会来完成各自的业务。(但是要注意的是,被调用的服务要通过BROKER订阅事件,相当于让这些服务有能力接收到BROKER发布的事件,不接收到BROKER发布的事件,自然是无法处理这个事件的)
- 在整个过程中,支付服务在用户支付成功之后,就只会通知一下BROKER,发布一个用户完成支付的事件,其余的,比如调用其他业务,它都不再伸手了,联想下现实生活,当我们跟前台点完餐之后,也确实不会再和后台的厨师交流什么了!所以这样就解除了服务与服务之间的耦合了
- 当支付服务被用户使用并且成功支付之后,除了发布事件以外,它就不再干别的了,所以又能继续接待新用户的支付,至于后台的别的服务,什么时候去接收处理这个订单事件,就不是用户和支付服务管的了,这样数据的吞吐量就增高了 ,试想一下我们平时淘宝购物的时候,我们只要支付成功了并且前台给我们回复支付成功的消息之后,是不是就不会管人家后台的事情了,倘若是同步通讯,那我们要等到后台将这个事件处理完毕之后才会得到答复,在此之间我们就会一直占着这个支付服务,是不是太麻烦了!
- 由于异步通讯服务之间的依赖性不强,所以当有其中一个服务挂掉了之后,也不会影响其他服务的正常运行,比如,仓储服务挂了,他并不会有影响支付服务的正常运行,不用担心级联失败的问题
- 随着业务量增加,请求越来越多,假如一下子来了十个订单完成的服务,但是订单服务、仓储服务这些只能在同一时间处理一个事件,这时,BROKER就会起到一个缓冲的作用,就像水库里的水坝一样,他可以将事件排成队列,当其他服务处理完一个事件之后,再放给他们下一个事件,这种操作称之为流量削峰
同样的,有优点就有缺点,这样加一个BROKER,那么基本上中间通讯就全部依赖这个BROKER了,那么就吃这个中间代理人的可靠性、安全性、吞吐能力了,不得不说这个服务员是真不好干。
4.RabbitMQ
MQ(MessageQueue),消息队列。用来存放消息也就是刚刚在上面提到的事件。这个MQ相当于就是事件驱动框架里的BROKER
MQ的具体实现有很多种,例如RabbitMQ,ActiveMQ,RocketMQ和Kafka,下面来重点介绍一下RabbitMQ
4.1.安装RabbitMQ
这里我们采用docker来部署安装,这里我们可以使用docker命令直接从仓库拉取,也可以自己上传rabbitmq的压缩包,我采用的是后者,安装包小伙伴可以自行上官网下载
- 下载好了压缩包,我们直接将其拖入finalshell
- 拖入之后会自行下载,然后使用docker load命令下载rabbitmq的镜像
- 然后使用docker run命令启动容器就ok了,具体命令如下
docker run \
> -e RABBITMQ_DEFAULT_USER=itcast \
> -e RABBITMQ_DEFAULT_PASS=123321 \
> --name mq \
> --hostname mq1 \
> -p 15672:15672 \
> -p 5672:5672 \
> -d \
> rabbitmq:3-management
稍微看下这个docker run命令,首先设置了两个环境变量一个用户名,一个密码,然后设置了一个主机名,这个是集群部署时会用到,然后映射了两个端口,第一个15672是rabbitmq自己的可视化终端的端口,第二个5672是事件传输的端口
访问一下rabbitmq的可视化终端:
看一下rabbitMQ的服务框架:
从这个框架我们可以看出,消息事件的生产者将事件转发给exchange,这个exchange担任一个路由转发的角色,它将事件路由给不同的queue,也就是存放消息的队列,最后消费者再从这个队列中取出事件来处理,其中要留意的是virtual host(虚拟主机),它类似docker的容器,具有隔离效果,每个用户绑定一个独立的虚拟主机,达到互不干扰的效果
4.2.SpringAMQP
AMQP(Advanced Message Queuing Protocol):高级消息协议,是一个进程间传递异步消息的网络协议,而SpringAMQP就是基于这种协议的消息中间件框架,它提供了一个简单的API来发送和接收异步、可靠的消息,其主要特点有以下几点:
- 提供监听器容器,用来异步处理发送来的消息
- 提供RabbitTemplate来接收和发送消息
- 提供RabbitAdmin用于自动声明 queues、exchanges 和 bindings
总而言之,这个SpringAMQP框架大大减少了使用RabbitMQ的成本
怎么使用这个SpringAMQP呢?请往下看!
4.3.一个简单的消息队列的demo
这个demo分了两个服务,一个是consumer消费者,另一个是publisher生产(发布)者,我们要做的首先是引入SpringAMQP的依赖,下面引入了一套完整的依赖,包含springboot,springamqp和junit测试(这些依赖是引入在父工程里的,子工程就不用引入了)
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
紧接着,在生产者publisher服务中编写配置文件,来告诉spring我们要使用rabbitmq来进行消息的发布和接收,其中我们告诉了spring我们要使用的rabbitmq的地址,端口,用户名,密码和虚拟主机名称,host地址是我们虚拟机的地址,port端口号是我们用docker启动rabbitmq容器设置的,一般来说都是5672,用户名和密码也是在那个时候设置的,虚拟主机是rabbitmq默认的,我们可以在可视化界面里更改,这里不做演示了
spring:
rabbitmq:
host: 192.168.88.128
port: 5672
username: itcast
password: 123321
virtual-host: /
然后就是编写测试单元来发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testMessageToSimpleQueue(){
String queueName = "simplequeue";
String message = "hello AMQP";
rabbitTemplate.convertAndSend(queueName,message);
}
}
注意:这是一个springboot的测试类,所以要加上SpringBootTest的注解,由于我们要使用注入的类(这里用autowired自动注入的RabbitTemplate),所以还要添加上RunWith的注解,这个注解在Junit5中是被携带上的,在Junit4中没有,所以我们得单另声明下
使用RabbitTemplate类的convertAndSent方法来完成消息的发送,参数有两个,一个是队列的名称,另一个是消息内容,这里的队列的名称,我是事先创建好的,没有的可以去rabbitmq可视化界面创建一个
运行测试单元之后:
点击队列名称,我们进入到这个队列中查看一下消息内容:
发现正是我们刚刚发送的消息内容
好了,到这里发送消息的服务已经看完了,我们再来看看接收消息的服务
和发送消息的步骤类似:
- 优先引入springamqp的依赖,不过刚刚说了,我们是在父工程中引入的,所以子工程就不用引入了
- 然后就是编写配置文件,和消息发送方的一样
- 最后一步,就是编写接收的代码
刚刚在上面说了springamqp的特性了,它提供一个专门来监听消息的容器,我们只需要把容器创建好,就能接收发送来的消息了,代码如下:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simplequeue")
public void listenSimpleQueue(String msg){
System.out.println("接收到消息了:" + msg);
}
}
创建一个消息监听类,并把它注册到spring的容器中,指定好监听的队列,这里监听的是simplequeue这个队列,创建一个接收消息的方法,注意参数类型和发送的消息类型是一致的
成功接收到消息:
注意:Queue中存放的消息,一旦被消费者处理了之后,就销毁了,有种阅后即焚的感觉,所以这个消息被消费者接收到之后,我们再看Queue队列中,发现已经没有消息了
5.五种消息发布模型
- 基本消息队列(Basic Queue)
- 工作消息队列(Work Queue)
- 发布订阅模式(Publish、Subscribe)其中根据交换机的不同分了三种
- 广播(Fanout Exchange)
- 路由(Direct Exchange)
- 主题(Topic Exchange)
5.1.基本消息队列
基本消息队列就是刚刚4.3中讲的那个demo, 由发布者publisher,队列queue,消费者consumer组成
5.2.工作消息队列
工作消息队列可以有更多的消费者来同时监听同一个队列,让消息处理的效率提高
剩下的三种模式我们之后再介绍,现在我们来重点讲讲这个工作消息队列的处理机制
- 假如现在有一种情况,同一个队列绑定了两个消费者,但这两个消费者处理消息的能力不一样,第一个消费者处理能力强于第二个消费者,我们来看看会不会处理能力强的消费者会处理更多的消息
/**
* 发送50次消息,总共一秒钟发完
* @throws InterruptedException
*/
@Test
public void testMessageToSimpleQueue() throws InterruptedException {
String queueName = "simplequeue";
String message = "hello AMQP";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message);
Thread.sleep(20);
}
}
消息发送方,一共发送50次消息,共1秒发送完
@RabbitListener(queues = "simplequeue")
public void listenSimpleQueue1(String msg){
System.out.println("消费者1接收到消息了:" + msg + LocalDateTime.now());
}
@RabbitListener(queues = "simplequeue")
public void listenSimpleQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到消息了:" + msg + LocalDateTime.now());
Thread.sleep(100);
}
消息接收方,明显可以看到,接收者2的能力要差于接收者1
运行看看结果:
运行结果我没有截全,这里说一下,总共发送了50次消息,但是两个消费者是把50次消息平分成两部分,每人处理25条消息,但是由上面截图可以看到,消费者2处理速度明显要慢些
- 这就奇了怪了,明明消费者2的能力不够,居然还是和消费者1处理一样的消息,这是为什么呢?
- 其实是rabbitmq默认的消息预取机制造成的,有多个消息来时,Queue会把这些消息预先投递给两个消费者,先不管能不能处理,先一人一个把消息分完,然后再各自处理分到的消息,所以性能好的消费者早早就把消息处理完了,而能力差的消费者就要慢些
那么我们怎么控制这个消息预期机制呢?让能力强的消费者多处理一些消息
在配置中,有个prefetch属性,可以控制消息预取数量的上限, 刚刚在上面的例子中,没有设置,那么就默认预取数量上限是无限,来多少消息,我就拿多少,拿完再处理,导致能力强和能力弱的消费者都能分到一样的数量。
那么我们将这个值设置成1,每个消费者预取数量上限是1,表示每个消费者最多预取1条消息,处理完了,再预取下一条消息,这样处理消息数量的多少就完全看消费者的能力了
5.3.发布订阅模式
发布订阅模式
发布订阅模式引入了交换机的概念,它可以控制消息发送给指定的队列,框架模式图如下:
交换机只负责消息的路由,不负责消息的存储,意味着一旦消息从交换机中发送失败之后,消息就会丢失
5.3.1.广播
广播(Fanout Exchange)
刚刚在上面说了,根据交换机的不同,发布订阅模式分成了三种,广播模式就是其中之一;其功能是,将消息发送给每一个与该交换机绑定的队列,可以理解成up主和粉丝的关系,up主一旦推出了新的视频,那么就会推送给每一个关注了该up主的粉丝
该怎么实现?
之前说过,publisher只管发送消息,其他的不管,所以这里的交换机、队列的创建都在消费者中完成
创建一个配置类:
@Configuration
public class FanoutConfig {
/**
* 创建一个交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
//创建并指定交换机的名称
return new FanoutExchange("itcast.fanout");
}
/**
* 创建队列1
* @return
*/
@Bean
public Queue fanoutQueue1(){
//创建并指定队列1的名称
return new Queue("fanout.queue1");
}
/**
* 创建队列2
* @return
*/
@Bean
public Queue fanoutQueue2(){
//创建并指定队列2的名称
return new Queue("fanout.queue2");
}
/**
* 将队列1绑定到交换机上
* @param fanoutQueue1
* @param fanoutExchange
* @return
*/
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
/**
* 将队列2绑定到交换机上
* @param fanoutQueue2
* @param fanoutExchange
* @return
*/
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
在配置类中,我们创建了两个队列,一个交换机,还进行了两个队列和交换机的绑定,其中调用了三个类,FanoutExchange、Queue、Binding,都将其注册到spring中,spring就会帮我们自动进行装配,当服务一启动之后,spring就会将这些带有Bean注解的方法创建成bean然后由spring统一管理,当spring容器启动之后,会自动创建这些bean的对象,还有一个点由Bean注解的方法,其创建的bean的id是方法名
创建监听容器,接收队列里的消息:
//监听队列fanout.queue1
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到fanout.queue1的消息了:" + msg);
}
//监听队列fanout.queue2
@RabbitListener(queues = "fanout.queue2")
public void ListenFanoutQueue2(String msg){
System.out.println("消费者2接收到fanout.queue2的消息了:" + msg);
}
在publisher中发送消息:
@Test
//给交换机FanoutExchange发送消息
public void testMessageToFanoutExchange(){
String exchangeName = "itcast.fanout";
String message = "你好,每一个消费者";
//指定发送的交换机,key,消息内容
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
在这里发送消息的方法里多了一个参数,需要注意,该参数是routingkey,只不过这里是null,在下一个模型中会讲到
测试:
publish发送给交换机消息, 然后交换机将消息路由给每一个与之绑定的队列,之后comsumer再处理
5.3.2.路由
路由(Routing Exchange)
刚刚介绍的广播是将消息全部发送给与之绑定的队列,那么这个路由就是将消息发送给指定的队列,指定的准则就是刚刚在上面提到的routingkey
该怎么实现呢?
方式一:
1.创建一个配置类,像5.3.1一样,在其中创建队列和交换机,并且进行绑定
@Configuration
public class DirectConfig {
//创建交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("itcast.direct");
}
//创建队列queue1
@Bean
public Queue queue1(){
return new Queue("direct.queue1");
}
//创建队列queue2
@Bean
public Queue queue2(){
return new Queue("direct.queue2");
}
//将queue1绑定在交换机上
@Bean
public Binding directBinding1(DirectExchange directExchange,Queue queue1){
return BindingBuilder.bind(queue1).to(directExchange).with("blue");
}
//将queue2绑定在交换机上
public Binding directBinding2(DirectExchange directExchange,Queue queue2){
return BindingBuilder.bind(queue2).to(directExchange).with("yellow");
}
}
需要注意的是,在绑定队列和交换机的时候,要添加上RoutingKey
2.创建监听容器
@RabbitListener(queues = "direct.queue1")
public void ListenDirectMessage1(String msg){
System.out.println("blue接收到了消息:"+msg);
}
@RabbitListener(queues = "direct.queue2")
public void ListenDirectMessage2(String msg){
System.out.println("yellow接收到了消息:"+msg);
}
3.发送消息
@Test
public void testMessageToDirectExchange1(){
String exchangeName = "itcast.direct";
String message = "你好,被指定的消费者";
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
需要注意的是,在发送的时候也要指定RoutingKey,也就是指定交换机向哪一个队列发送消息
方式二:
说是方式二,其实也就是摒弃了在配置类中创建交换机、队列和绑定关系,换成了在创建监听容器的时候使用注解的方式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //创建交换机并指定类型
key = {"blue","red"} //指定队列的key
))
public void ListenDirectQueue1(String msg){
System.out.println("blue接收到消息了:" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void ListenDirectQueue2(String msg){
System.out.println("yellow接受到消息了:" + msg);
}
这样的方式看起来有点乱,但好在它在创建key的时候支持数组类型,可以一次性创建多个key
5.3.3.话题
话题(Topic Exchange)
TopicExchange与DirectExchange类似,区别在于RoutingKey,TopicExchange的RoutingKey必须必须是多个单词的列表,且以 . 隔开
比如: China.news China.vagetables 等等
- 这样把RoutingKey更加的细分,像是一个话题一样,因此得名TopicExchange
- Queue与Exchange指定BindingKey时可以使用通配符
- # 代指0个或者多个单词
- * 代指一个单词
- 比如:当我给两个Queue分别指定Key叫做*.news和China.*然后我在发送消息的时候,我指定Key是China.news那么就意味着,这两个Queue都要接收到我发送的消息,因为China.news对于上面两个Key都是符合的
6.消息转换器
刚刚在上面的例子中,我们发送的消息都是String字符串类型的,但其实发送消息的方法参数是Object,也就是说它是支持发送对象的,那么我们来试一试
这次发送一个Map类型的消息,看看队列里接收到的消息是什么样的:
我们可以看到,我们发送的消息是被springamqp底层的消息转换器给序列化了,想要具体了解的可以看看这个博主的文章:
RabbitMQ发送对象之消息序列化(必踩坑的一个点)_rabbitmq传输对象序列化-CSDN博客
采用这种序列化工具,性能不太好,而且转换出来的字节太冗杂,所以这里我们可以换一种消息转换器
引入依赖:
<!--Json的序列化工具-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
在配置类中创建这个序列化工具的实例并让spring托管:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
将这个json格式转换器的对象让spring托管之后,在底层就会顶替掉原来默认的那种序列化工具,现在我们再来发送一次消息看看
可以看到发送的消息已经被转换成Json格式了,接收消息操作也是如此,导入一样的依赖,在配置类里将序列化工具托管给spring即可