1、Springboot整合RabbitMQ
1、引入场景启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
引入AMQP场景启动器之后,RabbitAutoConfiguration
就会自动生效。然后会给容器中自动配置了RabbitTemplate
、AmqpAdmin
、CachingConnectionFactory
、RabbitMessagingTemplate
等来方便使用AMQP。
2、在yml中配置spring.rabbitmq相关信息
spring:
rabbitmq:
host: 192.168.56.10
port: 5672
username: admin
password: admin
virtual-host: my_vhost
2、简单使用
2.1 创建交换机(Exchange)、队列(Queue)和建立绑定关系(Binding)
@SpringBootTest
public class AmqpAdminTest {
@Autowired
private AmqpAdmin amqpAdmin;
/**
* 1、如何创建Exchange、Queue、Binding
* 1)、使用AmqpAdmin进行创建
*/
@Test
public void creatExchange() {
//创建 名为 itcxc.java.direct 的交换机
DirectExchange directExchange = new DirectExchange("itcxc.java.direct");
amqpAdmin.declareExchange(directExchange);
}
@Test
public void creatQueue() {
//创建名为 itcxc.java 的队列
Queue queue = new Queue("itcxc.java");
amqpAdmin.declareQueue(queue);
}
@Test
public void creatBinding() {
//创建绑定关系 将队列itcxc.java绑定到交换机itcxc.java.direct,routingKey为itcxc.java
Binding binding = new Binding("itcxc.java", Binding.DestinationType.QUEUE,
"itcxc.java.direct","itcxc.java",null);
amqpAdmin.declareBinding(binding);
}
}
2.1.1 交换机类型:
direct
:会将消息发送给路由键必须完全匹配的队列中。
fanout
:会将消息发送给所有绑定的队列中,不管路由键是否匹配。
topic
:主体模式其实就是在路由模式的基础上,支持了对key的通配符匹配(星号以及井号),以满足更加复杂的消息分发场景。(#
匹配零个或者多个单词,*
匹配一个单词,每个单词用.
分割)
2.2 发送消息
@SpringBootTest
public class RabbitTemplateTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 2、如何发消息
* 1)、使用rabbitTemplate发送消息
*/
@Test
public void sendMessageTest(){
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("哈哈");
orderReturnReasonEntity.setCreateTime(new Date());
//1、发送消息
// 默认情况下,如果发送的消息是一个对象,我们会使用序列化机制,将对象写出去,对象必须实现Serializable接口
// 但是我们可以通过向容器中注入Jackson2JsonMessageConverter转换器将序列化机制改为转JSON
rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
}
}
2.2.1 替换消息系列化方式
通过观看RabbitTemplate
的源码发现,我们在默认情况下消息系列化方式是JDK序列化方式。那么我们发送的消息如果是一个对象时,这个对象就必须实现Serializable
接口。
如何使用转JSON的方式序列化消息呢?
通过观察RabbitAutoConfiguration
源码发现,在创建RabbitTemplate
的时候,会从容器中拿消息序列化器(MessageConverter)。
所以我们想要将转JSON的方式序列化消息,只需要给容器中放一个Jackson2JsonMessageConverter
就可以了
@Component
public class GulimallRabbitMqConfig {
/**
* 消息转换器 指定消息转换的方式为转为JSON
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
2.3 获取消息
2.3.1 在启动类或者配置类上添加@EnableRabbit注解
使用@RabbitListener
必须开启@EnableRabbit
,如果没有使用@RabbitListener
可以不添加@EnableRabbit
注解。
2.3.2 添加@RabbitListener
@RabbitListener
:可以标注在类和方法上 (监听哪些队列)
@RabbitHandler
:只能标注在方法上 (重载区别不同的消息)
@Service
public class RabbitListeners {
/**
* queues:声明需要监听的所有队列
*
* 接收参数的类型:
* 1、org.springframework.amqp.core.Message
* 2、直接写原来发送的消息类型
* 3、Channel 当前传送数据的通道
*
* @param message
*/
@RabbitListener(queues = {"itcxc.java"})
public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
Channel channel){
System.out.println("接收到的消息为:" + orderReturnReasonEntity);
}
}
现在有一个情况,就是我们给同一个消息对象发送的消息是有可能不是同一个类型的。例如:
@Test
public void sendMq(){
for (int i = 0; i < 10; i++) {
if (i % 2 == 0){
OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();
orderReturnReasonEntity.setId(1L);
orderReturnReasonEntity.setName("哈哈");
orderReturnReasonEntity.setCreateTime(new Date());
rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderReturnReasonEntity);
} else {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("itcxc.java.direct","itcxc.java", orderEntity);
}
}
}
在这个情况下如果我们还是用原来的方式监听消息的话,就会使发送的消息类型为OrderEntity
的消息丢失。
这个时候我们就可以,将@RabbitListener
标注在类上,然后@RabbitHandler
标注在方法上
@Service
@RabbitListener(queues = {"itcxc.java"})
public class RabbitListeners {
/**
* queues:声明需要监听的所有队列
*
* 接收参数的类型:
* 1、org.springframework.amqp.core.Message
* 2、直接写原来发送的消息类型
* 3、Channel 当前传送数据的通道
*
* @param message
*/
@RabbitHandler
public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
Channel channel){
System.out.println("接收到的消息为:" + orderReturnReasonEntity);
}
@RabbitHandler
public void receiveMessage(OrderEntity orderEntity){
System.out.println("接收到的消息为:" + orderEntity);
}
}
3、消息的可靠传递
3.1 发送端确认
为什么会丢失消息?
- 在发送消息到服务端的时候,有可能因为网络等等问题,没有将消息发送到服务端。
- 在
交换机(Exchange)
通过路由键将消息发送给队列(Queue)
的时候有可能没有找到相应的队列(Queue)
,而默认情况下是将消息直接丢弃的。
3.1.1 开启confirm和return机制
spring:
rabbitmq:
# 消息发送到broker后的回调
publisher-confirm-type: correlated
# 没有设置mandatory时生效
publisher-returns: true
# mandatory的优先级高于publisher-returns,只要设置了mandatory,publisher-returns就失效了
template:
mandatory: true
我翻看源码可以发现mandatory
的优先级高于publisher-returns
,只要设置了mandatory
,publisher-returns
就失效了。
但是经过测试,我发现mandatory
,只有在publisher-confirm-type
、publisher-returns
至少有一个设置才会生效。如果mandatory
、publisher-returns
同时存在的话,则mandatory
优先级高于publisher-returns
。
3.1.2 添加回调方法
@Component
@RequiredArgsConstructor
public class RabbitMqCallback {
private final RabbitTemplate rabbitTemplate;
/**
* RabbitMqCallback 创建完成之后,执行这个方法
* @return
*/
@PostConstruct
public RabbitTemplate initCallback() {
/**
* 需要设置spring.rabbitmq.publisher-confirm-type=correlated
* 消息发broker成功回调:发送到broker的exchange是否正确找到
* correlationData:当前消息的唯一关联数据(这个是消息的唯一ID)
* ack:消息是否发送成功
* cause:失败的原因,成功则返回null
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("setConfirmCallback 消息数据:" + correlationData);
if (Objects.nonNull(correlationData)) {
System.out.println("setConfirmCallback 消息数据:" + correlationData.getReturnedMessage());
}
System.out.println("setConfirmCallback 消息确认:" + ack);
System.out.println("setConfirmCallback 原因:" + cause);
System.out.println("-----------------------------------");
});
/**
* 需要设置spring.rabbitmq.template.mandatory=true或spring.rabbitmq.publisher-returns=true 才会有回调
* 消息路由回调:从交换器路由到队列是否正确发送
* message:投递失败消息的详细消息
* replyCode:回应码
* replyText:回应信息
* exchange:当时这个消息发送给的交换器
* routingKey:当时这个消息发送用的路由键
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("setReturnCallback 消息:" + message);
System.out.println("setReturnCallback 回应码:" + replyCode);
System.out.println("setReturnCallback 回应信息:" + replyText);
System.out.println("setReturnCallback 交换器:" + exchange);
System.out.println("setReturnCallback 路由键:" + routingKey);
System.out.println("-----------------------------------");
});
return rabbitTemplate;
}
}
这样我们就可以知道哪些消息发送成功,哪些消息发送失败了,然后就可以做出相应的处理。
3.2 消费端确认
为什么会丢失消息?
默认情况下只要收到消息,客户端会自动确认,然后服务端就会移除这个消息。由于客户端会一次性接收很多的消息。
在这个情况下,就有可能我们接收了10个消息,只处理了前面2个消息,然后服务宕机了,这样就会使得我们有8个消息丢失。
3.2.1 设置ACK应答机制为手动
spring:
rabbitmq:
# 设置ACK应答机制为手动
listener:
simple:
acknowledge-mode: manual
手动模式,只要我们没有明确的告诉MQ,消息被签收(没有ACK),消息就是一直处于unacked状态,即使客户端宕机了,消息也不会丢失,会重新变为ready,下次有新的客户端连接进来就发给新的客户端。
3.2.2 处理完消息之后手动应答
@RabbitListener(queues = {"itcxc.java"})
public void receiveMessage(Message message, OrderReturnReasonEntity orderReturnReasonEntity,
Channel channel) throws IOException {
//deliveryTag通道(channel)内自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("方法一deliveryTag为"+deliveryTag+"接收到的消息为:" + orderReturnReasonEntity);
//确认签收参数说明(deliveryTag,是否批量签收)
channel.basicAck(deliveryTag,false);
//拒绝签收参数说明(deliveryTag,是否批量签收,是否放回队列中)
//channel.basicNack(deliveryTag,false,true);
}