RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第二天 高级
- 7 RabbitMQ 高级特性
- 7.2 Consumer Ack
- 7.2.1 Consumer Ack
- 7.2.2 Consumer Ack 小结
- 7.2.3 消息可靠性总结
第二天 高级
7 RabbitMQ 高级特性
7.2 Consumer Ack
7.2.1 Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)【啊这…】
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。
但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
【试试】
【编写消费者】
先来一个 新的模块工程
直接创建
pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dingjiaxiong</groupId>
<artifactId>rabbitmq-consumer-spring</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
OK
rabbitmq.properties
rabbitmq.host=43.138.50.253
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=/
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
/>
</beans>
OK,这样环境就准备好了
【定义监听器】
<!-- 包扫描 -->
<context:component-scan base-package="com.dingjiaxiong.listener"/>
再来一个 监听器 容器
<!-- 定义监听器容器 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
OK,定义监听器类
package com.dingjiaxiong.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;
/**
* ClassName: AckListener
* date: 2022/11/16 21:16
*
* @author DingJiaxiong
*/
@Component
public class AckListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
OK,再来一个 死循环的测试类
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* ClassName: ConsumerTest
* date: 2022/11/16 21:18
*
* @author DingJiaxiong
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test(){
while (true){
}
}
}
OK, 直接运行这个 测试
OK,队列中的所有消息 都拿出来了
也识别 到了有一个消费者,OK,接下来就 是编写 ACK 的逻辑
package com.dingjiaxiong.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* ClassName: AckListener
* date: 2022/11/16 21:16
*
* @author DingJiaxiong
*/
/**
* Consumer ACK机制:
* 1. 设置手动签收【默认 就是自动】acknowledge="manual"
* 2. 让监听器类实现 ChannelAwareMessageListener 接口
* 3. 如果消息成功处理,则调用channel 的basicAck() 签收
* 4. 如果消息处理失败,则调用channel 的basicNack() 拒绝签收,让 broker重新发送 consumer
*/
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1. 接收转换消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
//3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (IOException e) {
// e.printStackTrace();
// 如果出现异常 4. 拒绝签收
// 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker 会重新发送给该消息给消费端
channel.basicNack(deliveryTag,true,true);
}
}
}
OK,重新启动消费者
OK,在监听了
打开生产者,发送一条消息
OK,消息发送成功, 查看消费者控制台
这是正常的情况,手动 签收
一个消费者,消息都消费光了
【模拟一个错误】
OK,重新启动消费者
发送一条消息进去
效果就是 会一直重发,然后消费者 就一直处理,始终不能签收
而且消息的 状态
一直Unacked ,不能签收
7.2.2 Consumer Ack 小结
- 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
- 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
- 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
OK
7.2.3 消息可靠性总结
- 持久化
- exchange要持久化
- queue要持久化
- message要持久化
- 生产方确认Confirm
- 消费方确认Ack
- Broker高可用