本章目录:
- 何为发布订阅模式
- FanoutExchange具体使用
一、何为发布订阅模式
在上一篇文章中,我们创建了Work Queue并且发送任务,在Work Queue中,每个任务只会被一个消费者消费,任务消费后就被清除了。
而在本篇中,我们将做一些完全不同的事情——我们将向多个消费者传递消息。这种模式被称为“发布/订阅”。
我们先看看其模式图
该模式核心思想如下:
是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。
相反,生产者只能向交换器发送消息(图片上的蓝色圆圈)。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将消息推送到队列。交换器必须确切地知道如何处理接收到的消息。它应该被附加到特定的队列中吗?它应该被添加到多个队列中吗?还是应该被丢弃?其规则由交换类型定义。
本章主要学习FanoutExchange,它只是将接收到的所有消息广播给它所知道的所有队列,也称为广播
二、 FanoutExchange具体使用
在消费者服务内新建config配置类,我们需要声明
- 交换机
- 队列
- 交换机与队列绑定
config如下:
@Configuration
public class FanoutConfig {
///声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("brrbaii.fanout");
}
//声明队列
@Bean("queueA")
public Queue fanoutQueueA(){
return new Queue("brrbaii.queueA");
}
@Bean("queueB")
public Queue fanoutQueueB(){
return new Queue("brrbaii.queueB");
}
//绑定队列A
@Bean
public Binding fanoutBindingA(@Qualifier("queueA")Queue qA,FanoutExchange fanoutExchange){
//我们将绑定多个队列,这里按byName注入
return BindingBuilder
.bind(qA)
.to(fanoutExchange);
}
//绑定队列A
@Bean
public Binding fanoutBindingB(@Qualifier("queueB")Queue qB,FanoutExchange fanoutExchange){
//我们将绑定多个队列,这里按byName注入
return BindingBuilder
.bind(qB)
.to(fanoutExchange);
}
}
接着声明消费者,分别接听两个队列
@Component
public class SpringRabbitMQListener {
@RabbitListener(queues = "brrbaii.queueA")
public void listenFanoutQueueA(String msg) {
System.out.println("queueA:"+msg);
}
@RabbitListener(queues = "brrbaii.queueB")
public void listenFanoutQueueB(String msg) {
System.out.println("queueB:"+msg);
}
发送消息
观察结果,两个队列都收到了消息,完成了向多个消费者传递消息。
我们也可以不声明配置类,采用注解完成队列和交换机的绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "brrbaii.queueA"),
exchange = @Exchange(name = "brrbaii.fanout")
))
public void lA(String msg){
System.out.println("queueA"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "brrbaii.queueB"),
exchange = @Exchange(name = "brrbaii.fanout")
))
public void lB(String msg){
System.out.println("queueA"+msg);
}