- 消息从生产者到Broker,则会触发confirmCallBack回调
- 消息从exchange到Queue,投递失败则会调用returnCallBack
- 用一张表来记录发送到mq的每一条消息,方便发送失败需要重试。status: 1-正常,0-重试,2-失败。
- 发送消息前,保存消息,并设置status为0和设置重试时间TryTime。
- 发送消息后:
- 如果回调方法confirm接收到为true---表示发送成功,拿到msgId,修改数据库中status为1,表示消息投递成功。
- 如果回调方法confirm接收到为false---失败不做任何操作,此时消息状态status还是0,而0表示重试的状态。
4. 设置一个定时任务:10秒执行一次. 根据(status)状态0并且重试时间(try_time)< sysdate()当前时间查询出List<RabbitmqSendLog> rabbitmqSendLogs消息的数量rabbitmqSendLogs
5.遍历所有消息rabbitmqSendLogs.forEach
- 如果重试次数count>=3,则直接设置这条消息发送失败status=2
- 如果重试次数count<3, 则根据message_id和获取当前时间更新date来更新这条消息重试次数count+1;
- 获取消息体content,并讲获取到的json字符串解析成对象,然后通过rabbitTemplate把对象投递到rabbitmq
数据表rabbitmq_send_log
对应的实体类
@Data
@TableName("rabbitmq_send_log")
public class RabbitmqSendLog {
// @TableId(type = IdType.AUTO)
private String messageId;
private String content;
private Integer status;
private String routeKey;
private String exchange;
private Integer count;
private Date tryTime;
@TableField(fill = FieldFill.INSERT)
private Date createTime;
@TableField(fill = FieldFill.INSERT_UPDATE)
private Date updateTime;
}
server:
port: 8071
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: /ems
template:
retry: #重试,消息发送失败会重试
enabled: true # 开启重试
initial-interval: 10000ms #第一次十秒重试
max-interval: 80000ms #最后一次是八秒重试
multiplier: 2 #重试翻倍率
publisher-confirms: true #发送者开启 confirm 确认机制
publisher-returns: true # 发送者开启 return 确认机制
datasource:
url: jdbc:mysql://localhost:3306/one?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&AllowPublicKeyRetrieval=True
username: root
password: root
mybatis-plus:
mapper-locations: classpath*:com/test/mapper/xml/*.xml
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
map-underscore-to-camel-case: true
type-aliases-package: com.test.domain
配置类
package com.test.config;
import com.test.mapper.RabbitmqSendLogMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Slf4j
@Configuration
public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Resource
private RabbitmqSendLogMapper rabbitmqSendLogMapper;
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//rabbitTemplate发送消息json转换配置
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
return rabbitTemplate;
}
/**
* 配置接收消息json转换为对象
*
* @return
*/
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
// 下边这样写也可以
// @Autowired
// private RabbitTemplate rabbitTemplate;
// @PostConstruct
// public void init() {
// rabbitTemplate.setMandatory(true);
// rabbitTemplate.setReturnCallback(this);
// rabbitTemplate.setConfirmCallback(this);
// }
//消息从生产者到Broker,则会触发confirmCallBack回调
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("confirm==>发送到broker.exchange失败\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
} else {
log.info("confirm==>发送到broker.exchange成功\r\n" +
"correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
correlationData, ack, cause);
String msgId = correlationData.getId();
Integer status = 1;
//拿到msgId,修改数据库中status为1,表示消息投递成功
rabbitmqSendLogMapper.updateRabbitmqSendLogStatus(msgId, status);
//update rabbitmq_send_log set status = #{status} where message_id=#{msg}
System.out.println("消息投递成功->msgId: " + msgId);
}
}
//消息从exchange到Queue,投递失败则会调用returnCallBack
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
"replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
message, replyCode, replyText, exchange, routingKey);
}
}
定时任务,每隔10秒冲数据库查询,然后再从新投递到mq。
package com.test.task;
import com.alibaba.fastjson.JSON;
import com.test.domain.RabbitmqSendLog;
import com.test.domain.Student;
import com.test.mapper.RabbitmqSendLogMapper;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* Created by IntelliJ IDEA.
*
* @Author : Yang Kai
* @create 2022/12/21 15:43
*/
@Component
@EnableScheduling
public class RabbitSendTask {
@Resource
private RabbitmqSendLogMapper rabbitmqSendLogMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
//每隔十秒执行一次
@Scheduled(cron = "0/10 * * * * ?")
public void messageSendTask() {
//select * from rabbitmq_send_log where status=0 and try_time < sysdate()
List<RabbitmqSendLog> rabbitmqSendLogs = rabbitmqSendLogMapper.getMsgStatus();
rabbitmqSendLogs.forEach(s->{
//3表示:如果重试次数大于或等于3
if (s.getCount()>=3){
//update rabbitmq_send_log set status = #{status} where message_id=#{msgId}
rabbitmqSendLogMapper.updateRabbitmqSendLogStatus(s.getMessageId(),2);//直接设置这条消息发送失败
}else {
System.out.println("重试消息: "+s);
//更新重试消息的次数
//update rabbitmq_send_log set count=count+1,update_time=#{date} where message_id=#{messageId}
rabbitmqSendLogMapper.updateCount(s.getMessageId(),new Date());
//拿到重试消息的json消息体,把json转换为消息对象
Student student = JSON.parseObject(s.getContent(), Student.class);
//投递消息
rabbitTemplate.convertAndSend(s.getExchange(),s.getRouteKey(),student,new CorrelationData(s.getMessageId()));
}
});
}
}
接收rabbitmq消息的类
package com.test.service;
import com.rabbitmq.client.Channel;
import com.test.domain.Student;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class WorkCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "queue.name.user", declare = "true"), // 创建info队列,declare默认队列持久化
key = {"route.user"}, // 路由key
exchange = @Exchange(type = "direct", name = "exchange-directs-user")
)})
public void receive12211(Student student, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
System.out.println("路由模式message1 = " + student);
}
}
发送消息
@Test
public void contextLoadsx() {
String msgId = UUID.randomUUID().toString();
System.out.println("1msgId:"+msgId);
RabbitmqSendLog rabbitmqSendLog = new RabbitmqSendLog();
rabbitmqSendLog.setMessageId(msgId);
rabbitmqSendLog.setExchange("exchange-directs-user");
rabbitmqSendLog.setRouteKey("route.user");
rabbitmqSendLog.setCount(1);
rabbitmqSendLog.setStatus(0);
//1分钟后重试的时间
rabbitmqSendLog.setTryTime(new Date(System.currentTimeMillis()+1000+60+1));
Student student = new Student();
student.setAddress("上海");
student.setAge("12");
student.setName("小明");
String s = JSON.toJSONString(student);
rabbitmqSendLog.setContent(s);
int insert = rabbitmqSendLogMapper.insert(rabbitmqSendLog);
try {
rabbitTemplate.convertAndSend("exchange-directs-user", "route.user", student,new CorrelationData(msgId));
}catch (Exception e){
//如果mq网络原因.发送邮件,活着其他方式通知
// rabbitmqSendLogMapper.insert(rabbitmqSendLog);
}
}