MQ
目录
- MQ
- 一、同步通讯和异步通讯
- 1. 同步通讯
- 2. 异步通讯
- 二、RabbitMQ
- 1. 部署
- 2. 架构
- 3. 常见消息模型
- 3.1 基本消息队列(Basic Queue)
- 3.2 工作消息队列(Work Queue)
- 3.3 发布订阅(Publish、Subscribe)
- 4. 消息转换器
一、同步通讯和异步通讯
1. 同步通讯
优点
- 时效性强,立即获取结果
缺点
- 耦合度高
- 性能和吞吐能力不如异步
- 额外资源消耗
- 级联失败问题
2. 异步通讯
优点
- 服务解耦
- 性能提升,吞吐量提高
- 服务没有强依赖,不担心级联问题
- 流量削峰
缺点
- 依赖Broker的可靠性、安全性、吞吐能力
- 架构复杂的情况下,业务没有明显的流程线,不好追踪管理
MQ即是事件驱动架构中的Broker。
二、RabbitMQ
1. 部署
直接docker拉一个:
# 拉取镜像
docker pull rabbitmq:3-management
#启动容器
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
# 15672是管理口
2. 架构
几个概念:
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtual host:虚拟主机,对queue、exchange等资源的逻辑分组
3. 常见消息模型
3.1 基本消息队列(Basic Queue)
-
依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
amqp是高级消息队列协议,springAMQP则是一种实现。
-
配置
spring: rabbitmq: host: 190.92.246.107 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: root password: 123456
-
实现
-
发布者
public class PublisherTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp"; rabbitTemplate.convertAndSend(queueName, message); } }
-
消费者
配置都是一样的
@Component public class SpringRabbitListener { @RabbitListener(queues = {"simple.queue"}) public void listenSimpleQueue(String msg) { System.out.println(msg); } }
启动main函数,成功:
-
3.2 工作消息队列(Work Queue)
两个消费者合作处理消息,避免消息堆积。
AMQP有一个消息预取机制,预取多少条消息是可以配置的。
spring:
rabbitmq:
host: 190.92.246.107 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root
password: 123456
listener:
simple:
prefetch: 1
-
发布者:
@Test public void testSimpleQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, spring amqp"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
-
消费者
@Component public class SpringRabbitListener { @RabbitListener(queues = {"simple.queue"}) public void listenSimpleQueue1(String msg) throws InterruptedException { System.out.println("消费者1" + "【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = {"simple.queue"}) public void listenSimpleQueue2(String msg) throws InterruptedException { System.err.println("消费者2" + "【" + msg + "】" + LocalTime.now()); Thread.sleep(200); } }
如果消息预取机制不设置,意味着不设限,那么在这个例子中每个消费者无论处理能力如何,都会处理25条消息,设置为1后,则按照能力分配。
3.3 发布订阅(Publish、Subscribe)
和之前不同的是,可以将一条消息发送给多个消费者,实现方式是加入了交换机。
根据交换机类型不同分为三种:广播、路由和主题
-
Fanout Exchange 广播
这个交换机会将消息路由到每一个和它绑定的队列
-
发布者
不同的是,我们发送消息到交换机
@Test public void testSendFanoutExchange() { String exchangeName = "root.fanout"; String message = "hello everyone"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
-
订阅者
首先创建交换机和队列,并将队列绑定到交换机上(有注解的写法,像后文路由模式那样)
@Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("root.fanout"); } @Bean public Queue fanoutQueue1() { return new Queue("fanout.queue1"); } @Bean public Queue fanoutQueue2() { return new Queue("fanout.queue2"); } @Bean public Binding bindQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } @Bean public Binding bindQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
然后监听队列:
@RabbitListener(queues = {"fanout.queue1"}) public void listenFanoutQueue1(String msg) throws InterruptedException { System.out.println("fanout.queue1消费者" + "【" + msg + "】" + LocalTime.now()); } @RabbitListener(queues = {"fanout.queue2"}) public void listenFanoutQueue2(String msg) throws InterruptedException { System.err.println("fanout.queue2消费者" + "【" + msg + "】" + LocalTime.now()); }
启动测试:
-
-
Direct Exchange 路由
特点:
- 每个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息Routingkey一致的队列
接下来就可以测试一下:
有一个交换机,两个队列,两个消费者分别有两个BindingKey。
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT), key = { "blue", "red" } )) public void listenDirectQueue1(String msg) { System.err.println("direct.queue1消费者" + "【" + msg + "】" + LocalTime.now()); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "root.direct", type = ExchangeTypes.DIRECT), key = { "yellow", "red" } )) public void listenDirectQueue2(String msg) { System.err.println("direct.queue2消费者" + "【" + msg + "】" + LocalTime.now()); }
发布者:
@Test public void testSendDirectExchange() { String exchangeName = "root.direct"; String message = "hello red"; rabbitTemplate.convertAndSend(exchangeName, "red", message); }
不断更换routingKey,观察订阅者日志。
-
Topic Exchange 主题
和路由模式类似,区别是这个模式的key是多个单词的列表,以 “ . ” 分割。
在指定BIndingKey时可以使用通配符。例如:#代表0个或多个单词,*代表一个单词。
-
订阅
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC), key = { "china.#" } )) public void listenTopicQueue1(String msg) { System.err.println("topic.queue1消费者" + "【" + msg + "】" + LocalTime.now()); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "root.topic", type = ExchangeTypes.TOPIC), key = { "#.news" } )) public void listenTopicQueue2(String msg) { System.err.println("topic.queue2消费者" + "【" + msg + "】" + LocalTime.now()); }
-
发布
@Test public void testSendTopicExchange() { String exchangeName = "root.topic"; String message = "hello world"; rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
改变routingkey,观察日志。
-
4. 消息转换器
我们不仅仅可以发送字符串消息,还可以发送对象,默认情况下,需要传统的序列化方式,对象需要实现Serializable接口,不太方便,我们使用json。
-
引入依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
-
自定义MessageConverter
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }
这个时候发送的消息就会经过json序列化了。
-
测试
创建队列
@Bean public Queue fanoutExchange() { return new Queue("object.queue"); }
消费者(需要像发布者一样的,引入jackson,然后定义messageConverter)
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String, Object> msg) { System.err.println("object.queue消费者" + "【" + msg.get("name") + "】" + LocalTime.now()); System.err.println("object.queue消费者" + "【" + msg.get("date") + "】" + LocalTime.now()); }
发布消息
@Test public void testSendObj() { String queue = "object.queue"; Map<String, Object> msg = new HashMap<>(); msg.put("name", "root"); msg.put("date", new Date()); rabbitTemplate.convertAndSend(queue, msg); }
成功: