1.生产者可靠性消息投递
简单操作参考---------打开主页上篇博客
https://blog.csdn.net/weixin_45810161/article/details/135906602?spm=1001.2014.3001.5501
在使用RabbitMQ的时候,怎么保证保证消息不丢失,RabbitMQ提供了两种不同的方式来控制消息的可靠性投递
1.confirm模式,生产者发送到交换机
2.return模式,交换机发送到队列
2.搭建生产者项目
2.1添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmq</groupId>
<artifactId>springboot-rabbitmq-demo01</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
2.2配置文件
配置文件开启confirm老版本可能不一样,如图所示,配置为true
server:
port: 19991
spring:
application:
name: rabbitmq-producer
rabbitmq:
host: 192.168.3.123
port: 5672
virtual-host: /mqname1
username: admin
password: admin
#开启confirm模式
publisher-confirm-type: correlated
2.3新建启动类
package com.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @Author: albc
* @Date: 2024/01/30/10:06
* @Description: good good study,day day up
*/
@SpringBootApplication
public class RabbitMqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMqApplication.class, args);
System.out.println("启动成功");
}
}
2.4新建配置类
package com.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Rabbitmq配置类
* @Author: albc
* @Date: 2024/01/30/11:09
* @Description: good good study,day day up
*/
@Configuration
public class RabbitMqConfig {
//创建队列
@Bean
public Queue createqueue(){
return new Queue("springboot_queue");
}
//创建交换机
@Bean
public DirectExchange createExchange(){
return new DirectExchange("springboot_exchange");
}
//创建绑定
@Bean
public Binding createBinding(){
return BindingBuilder.bind(createqueue()).to(createExchange()).with("user.insert");
}
}
2.5新建回调函数
package com.rabbitmq.confirm;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;
/**
* 发送交换机回调函数
* @Author: albc
* @Date: 2024/01/30/11:55
* @Description: good good study,day day up
*/
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
/**
*
* @param correlationData 发送消息信息
* @param ack 确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息
* @param cause 如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null
*/
@Override
public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
if(ack){
System.out.println("发送消息到交换机成功:"+cause);
}else{
System.out.println("发送消息到交换机失败,原因是:"+cause);
}
}
}
2.6测试
备注:如果发送失败,查看是否创建原来的队列导致的,进入rabbitmq客户端,删除交换机和队列
package com.rabbitmq.controller;
import com.rabbitmq.confirm.MyConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: albc
* @Date: 2024/01/30/10:14
* @Description: good good study,day day up
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyConfirmCallback myConfirmCallback;
@RequestMapping("/test1")
public String test1(){
//设置回调函数
rabbitTemplate.setConfirmCallback(myConfirmCallback);
//发送消息
rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert消息发送");
System.out.println("测试");
return "发送成功";
}
}
项目结构如下
发送消息
发送成功返回null
消费者收到消息
当发送一个不存在的交换机时
返回失败
消费者未收到消息
3.搭建消费者服务
3.1添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmq</groupId>
<artifactId>springboot-rabbitmq-demo02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
3.2 创建启动类
package com.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消费者消费消息
* @Author: albc
* @Date: 2024/01/30/13:41
* @Description: good good study,day day up
*/
@SpringBootApplication
public class ConsumberApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumberApplication.class, args);
System.out.println("启动成功");
}
}
3.3 创建监听类
package com.rabbitmq.common;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
3.4配置文件
server:
port: 19992
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 192.168.3.123
port: 5672
virtual-host: /mqname1
username: admin
password: admin
4.开启return模式
上面已经开启了confirm模式,可以保证消息发送交换机,但是如果交换机发送成功,消息发送队列的时候发送错误,消息还是会丢,现在需要开启return模式
4.1配置文件开启return模式
#开启return模式
publisher-returns: true
4.2 设置回调函数
发送消息正常不会回调
package com.rabbitmq.returns;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* 开启return模式回调函数
* @Author: albc
* @Date: 2024/01/30/14:31
* @Description: good good study,day day up
*/
@Component
public class MyReturnsCallback implements RabbitTemplate.ReturnCallback {
/**
*
* @param message 退回的消息信息
* @param replyCode 退回的状态码,对应消息信息
* @param replyText 退回的信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("开启Return模式退回的消息是:"+new String(message.getBody()));
System.out.println("开启Return模式退回的replyCode是:"+replyCode);
System.out.println("开启Return模式退回的replyText是:"+replyText);
System.out.println("开启Return模式退回的exchange是:"+exchange);
System.out.println("开启Return模式退回的routingKey是:"+routingKey);
}
}
4.3测试消息发送
package com.rabbitmq.controller;
import com.rabbitmq.confirm.MyConfirmCallback;
import com.rabbitmq.returns.MyReturnsCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: albc
* @Date: 2024/01/30/10:14
* @Description: good good study,day day up
*/
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyConfirmCallback myConfirmCallback;
@Autowired
private MyReturnsCallback myReturnCallback;
@RequestMapping("/test1")
public String test1(){
//设置回调函数
rabbitTemplate.setConfirmCallback(myConfirmCallback);
//发送消息
rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert消息发送");
System.out.println("测试");
return "发送成功";
}
@RequestMapping("/test2")
public String test2(){
//设置回调函数
//rabbitTemplate.setConfirmCallback(myConfirmCallback);
rabbitTemplate.setReturnCallback(myReturnCallback);
//发送消息
rabbitTemplate.convertAndSend("springboot_exchange", "user.insert", "测试user.insert-Returns消息发送");
System.out.println("测试");
return "发送成功";
}
}
发送成功
消费者收到消息
测试发送不存在的队列
回调报错
同时开启confirm模式,return模式
发送交换机成功,发送到队列失败
正常消息发送,修改成正确的交换机和队列
发送成功
消费者收到消息
5.手动确认消息ACK
前面实现了发送的消息不丢失,但是消费者可能会丢消息,例如消息者没有接受消息,或者消费者收到消息后出现异常,这种情况下消息会丢失,RabbitMQ提供了一种进行手动确认的消息,ACK机制
ACK有三种模式
自动确认
当消息被消费者收到以后,会自动确认收到消息,消息会从队列中移除,实际业务中,消息可能收到了,但是业务执行过程中出现了异常,这种情况消息就会丢失
手动确认
当收到消息以后,需要手动确认消息收到,调用channel.basicAck手动签收,如果出现异常,调用channel.basicNack按照功能业务处理,例如:重新发送,拒绝签收进入死信队列等
根据异常情况来确认(基本不用)
RabbitMQ判断异常肯定不如我们判断的异常靠谱,基本不用
5.1开启ack模式
修改配置文件
消费者配置文件
不是生产者开启
#开启手动确认
listener:
simple:
acknowledge-mode: manual
生产者发送消息测试
http://127.0.0.1:19991/test/test2
测试接受消息
消费以后发现消息还在,重启以后又再次收到了消息
5.2手动确认消息
修改监听类RabbitMqConsumber
第一种:签收
channel.basicAck()
第二种:拒绝签收 批量处理
channel.basicNack()
第三种:拒绝签收 不批量处理
channel.basicReject()
自动确认代码修改前
package com.rabbitmq.common;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(String message){
System.out.println("消费者接收到的消息为:" + message);
}
}
修改后,手动确认消息
package com.rabbitmq.common;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(Message message, Channel channel, String msg){
System.out.println("消费者收到了消息:"+msg);
//消息的属性
MessageProperties messageProperties = message.getMessageProperties();
try {
/**
* 手动确认消息
* 1.签收消息的编号
* 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
* 一般不用
*/
channel.basicAck(messageProperties.getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
当前消息还在
重启后消费,已经确认消息不在了
5.3异常处理
package com.rabbitmq.common;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(Message message, Channel channel, String msg){
System.out.println("消费者收到了消息:"+msg);
//消息的属性
MessageProperties messageProperties = message.getMessageProperties();
try {
int i = 1/0;
/**
* 手动确认消息
* 1.签收消息的编号
* 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
* 一般不用
*/
channel.basicAck(messageProperties.getDeliveryTag(),false);
} catch (Exception e) {
System.out.println("进入异常方法.............");
//保存数据库等
try {
/**
* 1.消息的编号
* 2.是否将消息放回到队列,false:不放回,true:放回队列
*/
channel.basicReject(messageProperties.getDeliveryTag(),false);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
此时没有数据
发送测试
进入异常方法
因为没有放回队列,队列无数据
修改放回队列
会出现一直报错一直放回队列
放回队列代码
package com.rabbitmq.common;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(Message message, Channel channel, String msg){
System.out.println("消费者收到了消息:"+msg);
//消息的属性
MessageProperties messageProperties = message.getMessageProperties();
try {
int i = 1/0;
/**
* 手动确认消息
* 1.签收消息的编号
* 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
* 一般不用
*/
channel.basicAck(messageProperties.getDeliveryTag(),false);
} catch (Exception e) {
System.out.println("进入异常方法.............");
//保存数据库等
try {
/**
* 1.消息的编号
* 2.是否将消息放回到队列,false:不放回,true:放回队列
*/
channel.basicReject(messageProperties.getDeliveryTag(),true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
批量处理
package com.rabbitmq.common;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消费者
* @Author: albc
* @Date: 2024/01/30/13:45
* @Description: good good study,day day up
*/
@Component
public class RabbitMqConsumber {
/**
* 消费者监听某个队列的消息
* @param message 接收到的消息
*/
@RabbitListener(queues = "springboot_queue")
public void myListener1(Message message, Channel channel, String msg){
System.out.println("消费者收到了消息:"+msg);
//消息的属性
MessageProperties messageProperties = message.getMessageProperties();
try {
int i = 1/0;
/**
* 手动确认消息
* 1.签收消息的编号
* 2.是否批量签收,如果批量签收,如果收到的消息为1000,则会把小于1000的消息全部签收
* 一般不用
*/
channel.basicAck(messageProperties.getDeliveryTag(),false);
} catch (Exception e) {
System.out.println("进入异常方法.............");
//保存数据库等
try {
/**
* 1.消息的编号
* 2.是否将消息放回到队列,false:不放回,true:放回队列
*/
//channel.basicReject(messageProperties.getDeliveryTag(),true);
/**
* 批量处理
* 1.消息的编号
* 2.是否批量处理
* 3.是否放回到队列
*/
channel.basicNack(messageProperties.getDeliveryTag(),false,false);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}