1.极端情况下 rabbitMQ需要重启,导致消息投递失败(生产者发消息全部丢失)(交换机或者队列出问题)
生产者需要把数据放到缓存,用定时任务重新发送
解决方法:
0.必须配置文件写
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
- correlationDate数据 ack是否确定消息发送成功,cause是失败的原因
@Component
class xxx 实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback//由于是只是他rabbit的内部类,需要手动设置
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct //@Autowired>@PostConstruct在构造器之后执行这个方法
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
-----2.生产者 加个发送的id和信息,加上发送信息的最后一个参数
CorrelationData correlationData1=new CorrelationData("1");//代表传入的id
------3.故意改错交换机名和队列名导致失败
------队列挂了(消息也丢失,需要交换机退回消息)-------
-------4.类 继续实现 ReturnCallback接口(只有路由不成功才调用)(还是要注入rabbit)
--------完整代码--------
//配置类 通知被退回的消息和id,可以自己存到数据库保存,等MQ正常后在生产
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if (ack){
log.info("收到消息id为{}",id);
}else {
log.info("没有收到消息,id为{},原因为{}",id,cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("路由失败,消息为{},退回原因{},交换机名{},routingKey:{}",new String(message.getBody()),replyText,exchange,routingKey);
}
}
//生产者
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "business.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//指定消息 id 为 1
CorrelationData correlationData1=new CorrelationData("1");
String routingKey="rk.001";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
CorrelationData correlationData2=new CorrelationData("2");
routingKey="key2";
//这样只知道交换机有问题,而不知道路由有问题
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);//设置好corelation的id数据
log.info("发送消息内容:{}",message);
}
}
// !!!也可以用下面的解决方案,如果不喜欢用整合其他框架
2.备份交换机(也是队列不成功,上面的解决方案) 可以不用重新发送,走备份交换机到消费者消费和报警消费者失败
工作流程如下图mq6
1.队列转发到另外的交换机
2.交换机比路由先出错会先报交换机的错误,路由的错误不报
注意!!!需要在业务交换机声明 备份交换机
//核心步骤 声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder=ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME); //翻译为可变交换机,出错了改发送的交换机
return (DirectExchange) exchangeBuilder.build();
}
------完整代码------
-------声明交换机和队列-----
```java
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "confirm.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
//声明业务 Exchange
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder=ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange) exchangeBuilder.build();
}
// 声明确认队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
//声明备份 Exchange
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
// 声明备份队列
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
// 声明警告队列
@Bean("warningQueue")
public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build();}
// 声明backup队列绑定关系
@Bean
public Binding backupQueueBindingBackup(@Qualifier("backupQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明warning队列绑定关系
@Bean
public Binding warningQueueBindingBackup(@Qualifier("warningQueue") Queue queue,
@Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
}
-------交换机和队列路由的错误回调-----
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if (ack){
log.info("收到消息id为{}",id);
}else {
log.info("没有收到消息,id为{},原因为{}",id,cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("路由失败,消息为{},退回原因{},交换机名{},routingKey:{}",new String(message.getBody()),replyText,exchange,routingKey);
}
}
-------生产者-----
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//指定消息 id 为 1
CorrelationData correlationData1=new CorrelationData("1");
String routingKey="key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
CorrelationData correlationData2=new CorrelationData("2");
routingKey="key2"; //这里是错误的routingKey,会报错
//这样只知道交换机有问题,而不知道路由有问题
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
}
}
-------消息消费者(消息正常消费)-------
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}",msg);
}
}
-------警告消费者(消费失败转发到备份交换机,和队列进行消费)-------
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}", msg);
}
}
3.其他知识点
1.幂等性问题,消息被重复消费,需要先判断全局唯一id是否消费后,再消费
1.使用mysql,唯一id,有性能瓶颈,不推荐
2.使用redis原子性 执行setnx命令天然有幂等性
4.优先队列(在队列加优先级) 0-255 越大越优先(如天猫的大客户是小米,消息优先被消费发短信通知)
0.最好用最大优先级为0-10,因为消费者让mq排序需要时间消耗cpu和内存
1.排队后,在生产者设置最大入队后出队参数,不然全部一个一个消费完,不能看到效果
//生产者设置消息优先级 mq界面会出现Pri
//设置优先队列的方法
map(发消息channel):
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,Duration,false,false,arg);
//或者
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(10).build();
完整代码
------生产者----------
public class AckPriorMsg {
public static final String QUEUE_NAME="hello1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MQRabbitUtil.getChannel();
channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写
boolean Duration=true;
//队列名
//队列消息是否持久化到磁盘,默认在内存中
//队列是否只供一个消费者消费(不共享)
//是否自动删除,开新队列
//其他参数
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-max-priority",10);
channel.queueDeclare(QUEUE_NAME,Duration,false,false,arg);
AMQP.BasicProperties properties = new
AMQP.BasicProperties().builder().priority(10).build();
for (int i = 1; i <11; i++) {
String message = "info" + i;
if (i == 5) { //默认优先级是5,这里相当于是发送了不同优先级的消息
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
} else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//交换机
//队列名
//设置消息持久化
//二进制
boolean flag = channel.waitForConfirms();
if (flag) {
System.out.println("消息已经写入磁盘的确认");
}
}
}
}
------消费者----------
public class ConsumerPrior1 {
public static final String QUEUE_NAME="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQRabbitUtil.getChannel();
Map<String, Object> params = new HashMap();
//设置了才是优先队列
params.put("x-max-priority", 10);
channel.basicQos(1);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
DeliverCallback deliverCallback=(consumerTag, delivery)->{
SleepUtils.sleep(1);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("消息处理快接收:"+new String(delivery.getBody(),"UTF-8"));
System.out.println(consumerTag);
//确认的标志
//是否批量应答
};
CancelCallback cancelCallback=(var1)->{
System.out.println("应答失败");
};
//消费, 项目队列名,是否自动应答!!!(就是要手动处理还是被动处理)
//失败的回调,消费者取消消费的回调(都要lambda表达式)
boolean IsAck=false;
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
System.out.println("worker1正在等待接收消息");
}
}
–重要------------springboot版本--------
—优先队列springboot配置文件要设置为手动确认和交换机是direct----
//写个方便设置优先级的工具类
@Component
@AllArgsConstructor
public class FileMessageSender {
private static final String EXCHANGE = "priority-exchange";
private static final String ROUTING_KEY_PREFIX = "priority.queue.test";
@Autowired
private final RabbitTemplate rabbitTemplate;
/**
* 发送设置有优先级的消息
*
* @param priority 优先级
*/
public void sendPriorityMessage(String content, Integer priority) {
rabbitTemplate.convertAndSend(EXCHANGE, ROUTING_KEY_PREFIX , content,
message -> {
message.getMessageProperties().setPriority(priority);
return message;
});
}
}
-------------下面是不用工具类的代码,由于我看了许多博客才写成的下面的代码------------
----配置类定义配置—
@Configuration
public class ConfirmConfig {
//----------优先队列-----------
public static final String BUSINESS_ROUTING_KEY = "rk.001";
public static final String BUSINESS_QUEUE = "business.queue";
public static final String BUSINESS_EXCHANGE = "business.exchange";
/**
* 声明业务队列的交换机
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(BUSINESS_EXCHANGE);
}
/**
* 声明业务队列
*/
@Bean
public Queue priorityQueue() {
Map<String, Object> args = new HashMap<>();
// 设置队列最大优先级
args.put("x-max-priority", 10);
args.put("x-queue-mode", "lazy"); //随便定义惰性队列(在磁盘取数据,而不是内存) 可以和优先队列一起使用
return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
}
/**
* 声明业务队列和业务交换机的绑定关系
*/
@Bean
public Binding businessBinding(Queue priorityQueue, DirectExchange directExchange) {
return BindingBuilder.bind(priorityQueue).to(directExchange).with(BUSINESS_ROUTING_KEY);
}
-------生产者设置队列优先级--------
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "business.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
//指定消息 id 为 1
// CorrelationData correlationData1=new CorrelationData("1");
String routingKey="rk.001";
//
//rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData1);
// CorrelationData correlationData2=new CorrelationData("2");
// routingKey="key2";
// //这样只知道交换机有问题,而不知道路由有问题
// rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData2);
// log.info("发送消息内容:{}",message);
for (int i = 0; i < 10000; i++) {
if(i==5){
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, "消息来自 ttl 为 xS 的队列: "+message+i, correlationData ->{
correlationData.getMessageProperties().setPriority(100);
return correlationData;
});
}else if(i==1){
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, "消息来自 ttl 为 xS 的队列: "+message+i, correlationData ->{
correlationData.getMessageProperties().setPriority(6);
return correlationData;
});
}else {
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, routingKey, "消息来自 ttl 为 xS 的队列: "+message+i, correlationData ->{
correlationData.getMessageProperties().setPriority(1);
return correlationData;
});
}
}
}
}
-------消费者消费------
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "business.queue";
@RabbitListener(queues =CONFIRM_QUEUE_NAME)
public void receiveMsg(String msg,Channel channel,Message message) throws IOException {
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-max-priority",10); //配置类消费者都设置最大优先级
arg.put("x-queue-mode", "lazy"); //配置类消费者都设置惰性队列,可以和优先队列一起使用
channel.queueDeclare(CONFIRM_QUEUE_NAME,true,false,false,arg);
log.info("接受到队列 confirm.queue 消息:{}",msg);
}
}
}
5.惰性队列(消息保存在磁盘中,正常放在内存) 应用场景怕内存不够宕机,存在磁盘可以大量存储,缺点是性能不好(springboot完整代码如上)
内存只有索引,管理界面查看只是使用几kb内存,差不多完全存在磁盘了
//生产者和消费者都加上参数
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-max-priority",10);
//惰性队列,存在磁盘中
arg.put("x-queue-mode", "lazy");
channel.queueDeclare(QUEUE_NAME,Duration,false,false,arg);