一、前言
我们一般的项目过程都是同步通信,及一个服务结束后在执行另一个服务这会让总体时间变得很长,尤其是在高并发的时候用户体验感很不好,且在调用一个服务期间cup内存等都处于空闲状态造成资源浪费 。如果调用其中某一个服务时这个服务挂掉了,这个请求就会一直卡在这里,许多个请求都卡在这就会导致资源耗尽 导致级联失败。且当采用同步调用时,各个服务之间相互依赖,每次加入新的需求或修改现有功能,都需要改动原有代码,导致系统各部分之间的耦合度增高。
综上 同步通信有四个问题:耦合度高、性能下降、资源浪费和级联失败
异步及找一个代理中间者(broker)用于接收 原来调用其他服务的消息 在通知被调用的服务解耦的同时性能也提升了
mq是消息队列就是broker。rabbitmq是异步通信的一种实现
二、核心概念
virtual-host:虚拟主机,起到数据隔离的作用
publisher:消息发送者
consumer:消息的消费者
queue:队列,存储消息
exchange:交换机,负责路由消息
三、常见消息模型
四、springAMQP
①配置
1.引入依赖spring-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-stater-amqp</artifactId>
</dependency>
2.配置
spring:
rabbitmq:
host: 192.168.88.129
port: 5672
virtual-host: /mq
username: xxxde
password: 123
connection-timeout: 1s
template:
retry:
enabled: true
multiplier: 1s
②代码示例
1.最简单demo
发送者直接发给队列(simple.queue)接收者直接接收
发送者:
接收者:
2.Work Queues
Work queues是任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
创建一个队列work.queue,发送者发50条消息,两个接收者接收。
两个消费者能力一样时(默认),将这50个消息一起消费,一共消费50条,及类似轮询
如果两个中一个的处理速度快,另一个处理的慢,我们想让处理快的多处理一些则在配置文件中设置prefetch: 1(消费者)确保同一时刻最多给消费者投递一条消息,如果此时消费者没有处理完则不投递
队列上绑定多个消费者,可以解决消息堆积问题
spring:
rabbitmq:
host: 192.168.88.129
port: 5672
virtual-host: /hmall
username: hmall
password: 123
listener:
simple:
prefetch: 1
3.Fanout交换机
5.1Fanout:广播
Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
创建一个交换机(hmall.fanout),两个消费者
5.2 Direct:定向
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
规则:
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
创建两个队列,一个队列的bindingkey是blue、red,另一个是yellow、red。交换机hmall.direct
5.3 Topic:话题
TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。Queue与Exchange指定BindingKey时可以使用通配符 # 代指0个或多个单词 * 代指一个单词
创建两个队列,一个队列的bindingkey是china.#,另一个是#.news。交换机hmall.topic
③代码创建交换机、队列、用户
1.声明队列和交换机(一般在消费者端声明)
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
Queue:用于声明队列,可以用工厂类QueueBuilder构建
Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
定义一个config类 可以用new的方式,也可以用builder
package com.itheima.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
//声明FanoutExchange 交换机
@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}
//声明队列
@Bean
public Queue fanoutQueue3(){
// QueueBuilder.durable("fanout").build();
return new Queue("fanout.queue3");
}
//绑定交换机和队列
@Bean
public Binding fanoutBanding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
}
2.*基于注解声明*
④消息转换器
发送一个对象是 接收会有问题
解决:
在发送者和接收者的启动类上加bean
接收消息