在接触RammitMQ时,好多文章都说在配置中设置属性
# rabbitmq 配置
rabbitmq:
host: xxx.xxx.xxx.xxx
port: xxxx
username: xxx
password: xxxxxx
## 生产端配置
# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除
#确认消息已发送到队列(Queue)
publisher-returns: true
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
listener: #消费者 端配置
retry:
enabled: true # 是否支持重试
default-requeue-rejected: false
max-attempts: 5 #最大重试次数
initial-interval: 3000 # 重试时间间隔
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。
----------------------------------正确思路---------------------------------------------------------------------------------
设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;
---------------------------------------代码
package com.charg.listener;
import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.Duration;
/**
* rabbitmq 监听
*/
@Slf4j
@Component
public class RabbitQueueListener {
/**
* 最大重试次数
*/
private static int maxReconsumeCount = 3;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 监听 队列的处理器
*
* @param message
*/
@RabbitListener(queues = "队列名称")
@RabbitHandler
public void onMessage(Message message, Channel channel) {
//唯一标识
String messageId = message.getMessageProperties().getMessageId();
try {
//判断messageId在redis中是否存在
if (verificationMessageId(messageId)) {
log.error("消息已重复处理,拒绝再次接收...");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {//不存在 则处理消息
// 接收消息
if (StringUtils.isNotBlank(new String(message.getBody()))) {
//修改业务逻辑
if (!false) {
log.error("消息即将再次返回队列处理...逻辑错误");
// 处理最大回调次数
getMaximumNumber(message, channel);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//加入缓存
addMessageId(message);
}
} else {
log.info("消息为空拒绝接收...");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
try {
if (message.getMessageProperties().getRedelivered()) {
log.error("消息已重复处理,拒绝再次接收----...");
// 拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
log.error("消息即将再次返回队列处理...");
// 处理最大回调次数
getMaximumNumber(message, channel);
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
/**
* 记录消息最大次数
*
* @param message
* @param channel
* @throws IOException
*/
private void getMaximumNumber(Message message, Channel channel) {
try {
int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());
if (maxReconsumeCount > recounsumeCounts) {
log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
// 记录重试次数
maxMessageId(message.getMessageProperties().getMessageId());
} else {
log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));
// 将消息重新放回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 清除缓存
RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());
//重试三次后,还是失败 需记录到数据库
addRabMsgLog(message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 设置消息的最大重试次数
*/
public void maxMessageId(String messageId) {
String messageMax ="messageMaxKey"+ messageId;
// 存入缓存,用来记录该消息重试了几次
if (RedisUtils.hasKey(messageMax)) {
RedisUtils.incrAtomicValue(messageMax);
} else {
//错误的消息-插入数据库
RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));
}
}
/**
* 校验消息是否消费过该消息
*
* @param messageId 消息id
* @return
*/
public boolean verificationMessageId(String messageId) {
// 消息是否存在key
String verifyIsExistKey ="messageExistKey" + messageId;
if ((RedisUtils.hasKey(verifyIsExistKey))) {
return true;
}
return false;
}
/**
* 保存消费过消息
*
* @param message 消息
* @return
*/
public void addMessageId(Message message) {
// 存入缓存
RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);
}
/**
* 消息队列 失败日志 操作
* 自己存数据库逻辑
*/
public void addRabMsgLog(Message message) {
log.info("====操作日志===");
//将内容记录到数据库
}
}
--------------------------------数据库表



















