为什么使用消息队列MQ?
因为使用消息队列有多个好处:可以实现系统服务的解耦、异步和削峰:
- 异步通信:消息队列提供了一种异步通信的方式,发送方可以将消息发送到队列中,然后继续执行其他任务,而不需要等待接收方的响应。这种异步通信方式可以提高系统的响应速度和吞吐量。
- 解耦系统:通过引入消息队列,不同的系统或模块可以通过消息进行通信,而不直接依赖于彼此的实时可用性。这种解耦可以降低系统之间的耦合度,使系统更加灵活、可维护和可扩展。
- 削峰填谷:在高并发场景下,消息队列可以作为一个缓冲层,平衡生产者和消费者之间的速度差异。当生产者的请求量暴增时,消息队列可以暂时存储消息,使消费者可以按照自身的处理能力逐渐消费这些消息,从而避免系统崩溃或过载。
A服务直接把某个功能的消息发给消息队列,B服务从消息队列中取出消息,然后执行任务,这样完成了项目的解耦,同时异步执行了这个任务。
特点:
- 灵活的消息路由:RabbitMQ支持多种灵活的消息路由方式,包括直接交换、主题交换、扇形交换等,可以根据消息的特征将消息路由到不同的队列。
- 可靠性:RabbitMQ提供持久化机制,可以将消息持久化到磁盘上,确保消息不会丢失。同时,它还支持发布确认和消费确认机制,保证消息的可靠传递。
- 高可用性:RabbitMQ支持集群部署,可以将多个节点组成一个集群,提供高可用性和负载均衡。
- 多语言支持:RabbitMQ提供了多种编程语言的客户端库,使得开发者可以使用各种语言来与RabbitMQ进行交互。
- 可扩展性:RabbitMQ提供了插件机制,可以根据需要添加新的功能和特性。
基本概念:
- Producer(生产者):负责向RabbitMQ发送消息。
- Consumer(消费者):负责接收和处理RabbitMQ中的消息。
- Queue(队列):消息在RabbitMQ中的存储区域,消息发送到交换机后最终被投递到队列中等待消费者消费。
- Exchange(交换机):接收生产者发送的消息,并根据规则将消息路由到一个或多个队列中。
- Binding(绑定):用于将交换机和队列之间建立关联关系,定义了消息如何从交换机路由到队列。
- Routing Key(路由键):生产者在发送消息时,可以指定一个路由键,交换机根据路由键将消息路由到相应的队列。
使用:
1、引入依赖:
<!-- 消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、引入配置
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
3、创建交换机和队列
public static void doInit(){
try{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("host");
connectionFactory.setPort(5672);
connectionFactory.setUsername("name");
connectionFactory.setPassword("pass");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String EXCHANGE_NAME = "code_exchange"; //交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 创建队列,写一个队列名
String queueName = "code_queue";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"my_routingKey");
log.info("创建MQ成功");
}catch (Exception e){
log.error("创建MQ失败");
e.printStackTrace();
}
}
- 生产者代码
package com.stukk.stuojbackendquestionservice.message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @Author: stukk
* @Description:
* @DateTime: 2023-12-07 21:08
**/
@Component
public class MyMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange,String routingKey, String message){
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}
}
- 消费者代码
package com.stukk.stuojbackendjudgeservice.message;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @Author: stukk
* @Description: 消费者
* @DateTime: 2023-12-07 21:17
**/
@Component
@Slf4j
public class MyMessageConsumer {
// 指定监听的队列和确认机制
@SneakyThrows
@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliverTag){
log.info("获取到消息:{}", message);
// 根据传递的消息来执行需要执行的业务
channel.basicAck(deliverTag,false);
}
}
确定好要传递的消息,就可以执行你需要的业务了。