目录
1、前置知识
1.1、AMQP怎么理解
1.2、Spring AMQP是什么
1.3、为什么要了解Spring-AMQP?
2、使用Spring-AMQP实现一个发消息案例
3、Work模型
问题:
优化:
小结:Work模型的使用:
1、前置知识
1.1、AMQP怎么理解
- 全称:Advance Message Queuing Protocol
- 用途:用于在应用程序之间传递业务消息的开放标准;
- 该协议与语言、平台无关,更符合微服务中独立性的要求
1.2、Spring AMQP是什么
- Spring AMQP是基于AMQP协议定义的一套API规范,提供了模版来发送和接收消息;
- 包含两部分,其中spring-amqp是基础抽象(接口),spring-rabbit是底层的默认实现(实现)
也就是说,你在使用中,只需要调用Spring AMQP提供的接口就可以了,而Spring AMQP的底层是使用AMQP的(可以理解为AMQP是一种思想,Spring AMQP是它的实现);
1.3、为什么要了解Spring-AMQP?
RabbitMq给java提供的原生的一些使用方法,过于的复杂不便于日常开发的使用,而Spring-AMQP对RabbitMQ进行了一层封装,让我们在使用中更加的简洁了~
2、使用Spring-AMQP实现一个发消息案例
案例 - 黑马课程中的一个简单的微服务~
需求如下:
- 利用控制台创建队列demo1.queue
- 在publisher服务中,利用SpringAMQP直接向demo1.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听demo1.queue队列
- 这个案例先不考虑交换机~
准备一个项目,我的项目目录如下:
步骤一:在控制台中新建一个demo1.queue队列
注:不会创建的可以看我的上一篇文章~
步骤二:父工程中引入AMQP的依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
注:父工程中引入了,他的两个子工程都可以使用~
步骤三:配置RabbitMQ服务端信息
需要在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ,配置如下:
spring:
rabbitmq:
host: env-base
port: 5672
virtual-host: /
username: root
password: 1111
具体信息,需要根据你自己电脑的信息修改哦~
步骤四:发送消息
正常是在业务中发送消息,我们这里为了便于快速看到结果,就在publisher下的单元测试中模拟发送(效果一样)~
@SpringBootTest
public class AMQPTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testdemo1(){
//队列名
String queueName = "demo1.queue";
//消息
String message = "are you ok ?";
//发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
步骤五:接收消息
接收消息是需要长期监控着队列,因此我们写在consumer服务下业务代码中即可~
@Component
public class ListenAMQP {
@RabbitListener(queues = "demo1.queue")
public void listenDemo1(String msg){
System.out.println("接收到消息:" +msg);
}
}
步骤六:启动项目(consumer) - 启动后再执行publisher下的单元测试
观察到的结果如下:
3、Work模型
我们使用案例来理解work模型~
模拟WorkQueue,实现一个队列绑定多个消费者
需求如下:
- 在RabbitMQ的控制台创建一个队列 - work.queue
- 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
- 在consumer服务中定义两个消息监听者,都监听work.queue队列
- 消费者1秒处理50条消息,消费者2每秒处理5条消息
创建队列就不说了,我们来看代码:
publisher中的代码(同上,使用单元测试来模拟):
@Test
public void testwork() throws InterruptedException {
//队列名
String queueName = "work.queue";
for (int i = 1; i <= 50 ;i++){
rabbitTemplate.convertAndSend(queueName, "work消息 --- " + i);
Thread.sleep(20);
}
}
consumer代码:
@RabbitListener(queues = "work.queue")
public void listenwork1(String msg){
System.out.println("接收到消息:" + msg);
}
@RabbitListener(queues = "work.queue")
public void listenwork2(String msg){
System.err.println("接收到消息:" + msg);
}
结果:
上面打印时,我将两个消费者使用不同的颜色打印:
我们会看到消息是一人一个,很均匀有序的划分~
我们把每个消费者处理的速度控制一下:
再观察结果:
虽然顺序不太一样了,还好像依然是一人一个的划分,即使其中一个消费者比另一个消费的更快~
问题:
消费者的消息推送有一定限制,在默认情况下,RabbitMQ会将消息依次投递给绑定在队列上的每一个消费者,但是这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积
优化:
修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息,一条处理完了,才会收到下一条~
运行结果,能者多劳:
小结:Work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度(解决消息堆积问题)
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一题,实现能者多劳