目录
一、简单模型
1、首先控制台创建一个队列
2、父工程导入依赖
3、生产者配置文件
4、写测试类
5、消费者配置文件
6、消费者接收消息
二、WorkQueues模型
1、在控制台创建一个新的队列
2、生产者生产消息
3、创建两个消费者接收消息
4、能者多劳充分利用每一个消费者的能力
三、交换机
四、Fanout交换机
1、 声明队列
2、 创建交换机
编辑 3、 绑定交换机
4、示例
五、Diect交换机
1、 声明队列
2、创建交换机
3、绑定交换机
4、示例
六、Topic交换机
1、创建队列
2、创建交换机
3、绑定队列
4、示例
7、、声明队列交换机
1、SpringAMQP提供的类声明
2、基于注解声明
七、消息转换器
配置JSON转换器
一、简单模型
创建一个父工程和两个子工程consumer和publisher
1、首先控制台创建一个队列
命名为simple.queue
2、父工程导入依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
3、生产者配置文件
spring:
rabbitmq:
host: 192.168.200.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
4、写测试类
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, rabbitmq!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
查看消息
5、消费者配置文件
spring:
rabbitmq:
host: 192.168.200.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
6、消费者接收消息
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
结果
二、WorkQueues模型
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了
1、在控制台创建一个新的队列
命名为work.queue
2、生产者生产消息
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "work.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
3、创建两个消费者接收消息
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】");
}
结果
如果消费者睡眠时间不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】");
Thread.sleep(200);
}
- 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息
- 消费者2 sleep了200毫秒,相当于每秒处理5个消息
消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。
4、能者多劳充分利用每一个消费者的能力
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢。而最终总的执行耗时也大大提升。 正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。
三、交换机
在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,不再发送消息到队列中,而是发给交换机
- Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
- Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
- Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
- Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
四、Fanout交换机
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。 在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
1、 声明队列
创建两个队列fanout.queue1
和fanout.queue2
,绑定到交换机hmall.fanout
2、 创建交换机
创建一个名为fanout
的交换机,类型是Fanout
3、 绑定交换机
4、示例
生产者
/**
* Fanout交换机
*/
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "mq.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消费者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
五、Diect交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
1、 声明队列
首先在控制台声明两个队列direct.queue1
和direct.queue2
2、创建交换机
3、绑定交换机
一个队列绑定两个RoutingKey
4、示例
生产者:RoutingKey为red
/**
* Direct交换机
*/
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "mq.direct";
// 消息
String message = "hello direct red";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
消费者
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
两个队列都收到消息
RoutingKey为blue
/**
* Direct交换机
*/
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "mq.direct";
// 消息
String message = "hello direct blue";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
只有队列2收到消息
六、Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。 只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey
一般都是有一个或多个单词组成,多个单词之间以.
分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
示例
publisher发送的消息使用的RoutingKey
共有四种:
china.news
代表有中国的新闻消息;china.weather
代表中国的天气消息;japan.news
则代表日本新闻japan.weather
代表日本的天气消息;
解释:
topic.queue1
:绑定的是china.#
,凡是以china.
开头的routing key
都会被匹配到,包括:china.news
china.weather
topic.queue2
:绑定的是#.news
,凡是以.news
结尾的routing key
都会被匹配。包括:china.news
japan.news
1、创建队列
2、创建交换机
3、绑定队列
4、示例
生产者:RoutingKey为
china.news
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "mq.topic";
// 消息
String message = "hello topis china.news";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消费者
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
RoutingKey为
china.people
/**
* topicExchange
*/
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "mq.topic";
// 消息
String message = "hello topis china.people";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.people", message);
}
只有消费者1收到消息
7、、声明队列交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
1、Queue:用于声明队列,可以用工厂类QueueBuilder构建
2、Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
3、Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
1、SpringAMQP提供的类声明
示例:创建Fanout交换机队列
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange2(){
return new FanoutExchange("mq.fanout2");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue3(){
return new Queue("fanout.queue3");
}
/**
* 绑定队列和交换机1
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue3, FanoutExchange fanoutExchange3){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue4(){
return new Queue("fanout.queue4");
}
/**
* 绑定队列和交换机2
*/
@Bean
public Binding bindingQueue2(){
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange2());
}
}
direct示例
@Configuration
public class DirectConfig {
/**
* 声明交换机
* @return Direct类型交换机
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange("mq.direct2").build();
}
/**
* 第1个队列
*/
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}
/**
* 第2个队列
*/
@Bean
public Queue directQueue2(){
return new Queue("direct.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("red");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}
}
direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding
2、基于注解声明
/**
* 注解声明交换机
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),//队列名称
exchange = @Exchange(name = "mq.direct", //交换机名称
type = ExchangeTypes.DIRECT),//交换机类型
key = {"red", "blue"}//RoutingKey
))
public void listenDirectQueue3(String msg){
System.out.println("消费者3接收到direct.queue3的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue4"),
exchange = @Exchange(name = "mq.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue4(String msg){
System.out.println("消费者4接收到direct.queue4的消息:【" + msg + "】");
}
队列
交换机
七、消息转换器
@Test
public void testSendMap() throws InterruptedException {
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name", "张三");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
当发送的数据为Objiet类型时会出现乱码现象,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化
配置JSON转换器
在publisher
和consumer
两个服务中都引入依赖:
<dependencies>
<!--mq消息转换为json-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
</dependencies>
注意:如果项目中引入了spring-boot-starter-web
依赖,则无需再次引入Jackson
依赖。
配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
结果
消费者接收Object
我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:
@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {
System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}