在接触RammitMQ时,好多文章都说在配置中设置属性
# rabbitmq 配置 rabbitmq: host: xxx.xxx.xxx.xxx port: xxxx username: xxx password: xxxxxx ## 生产端配置 # 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除 #确认消息已发送到队列(Queue) publisher-returns: true #确认消息已发送到交换机(Exchange) publisher-confirm-type: correlated listener: #消费者 端配置 retry: enabled: true # 是否支持重试 default-requeue-rejected: false max-attempts: 5 #最大重试次数 initial-interval: 3000 # 重试时间间隔 direct: acknowledge-mode: manual simple: acknowledge-mode: manual
消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。
----------------------------------正确思路---------------------------------------------------------------------------------
设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;
---------------------------------------代码
package com.charg.listener; import com.charg.common.constant.CacheConstants; import com.charg.common.constant.Constants; import com.charg.common.utils.JsonUtils; import com.charg.common.utils.redis.RedisUtils; import com.charg.constant.RabbitConstants; import com.charg.product.domain.bo.ProductDeviceBo; import com.charg.product.domain.bo.RabMsgLogBo; import com.charg.product.service.IProductDeviceService; import com.charg.product.service.IRabMsgLogService; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; import java.time.Duration; /** * rabbitmq 监听 */ @Slf4j @Component public class RabbitQueueListener { /** * 最大重试次数 */ private static int maxReconsumeCount = 3; @Autowired private StringRedisTemplate redisTemplate; /** * 监听 队列的处理器 * * @param message */ @RabbitListener(queues = "队列名称") @RabbitHandler public void onMessage(Message message, Channel channel) { //唯一标识 String messageId = message.getMessageProperties().getMessageId(); try { //判断messageId在redis中是否存在 if (verificationMessageId(messageId)) { log.error("消息已重复处理,拒绝再次接收..."); // 拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else {//不存在 则处理消息 // 接收消息 if (StringUtils.isNotBlank(new String(message.getBody()))) { //修改业务逻辑 if (!false) { log.error("消息即将再次返回队列处理...逻辑错误"); // 处理最大回调次数 getMaximumNumber(message, channel); } else { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //加入缓存 addMessageId(message); } } else { log.info("消息为空拒绝接收..."); // 拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); try { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理,拒绝再次接收----..."); // 拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { log.error("消息即将再次返回队列处理..."); // 处理最大回调次数 getMaximumNumber(message, channel); } } catch (Exception exception) { exception.printStackTrace(); } } } /** * 记录消息最大次数 * * @param message * @param channel * @throws IOException */ private void getMaximumNumber(Message message, Channel channel) { try { int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()); if (maxReconsumeCount > recounsumeCounts) { log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 记录重试次数 maxMessageId(message.getMessageProperties().getMessageId()); } else { log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId())); // 将消息重新放回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); // 清除缓存 RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId()); //重试三次后,还是失败 需记录到数据库 addRabMsgLog(message); } } catch (Exception e) { e.printStackTrace(); } } /** * 设置消息的最大重试次数 */ public void maxMessageId(String messageId) { String messageMax ="messageMaxKey"+ messageId; // 存入缓存,用来记录该消息重试了几次 if (RedisUtils.hasKey(messageMax)) { RedisUtils.incrAtomicValue(messageMax); } else { //错误的消息-插入数据库 RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME)); } } /** * 校验消息是否消费过该消息 * * @param messageId 消息id * @return */ public boolean verificationMessageId(String messageId) { // 消息是否存在key String verifyIsExistKey ="messageExistKey" + messageId; if ((RedisUtils.hasKey(verifyIsExistKey))) { return true; } return false; } /** * 保存消费过消息 * * @param message 消息 * @return */ public void addMessageId(Message message) { // 存入缓存 RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1); } /** * 消息队列 失败日志 操作 * 自己存数据库逻辑 */ public void addRabMsgLog(Message message) { log.info("====操作日志==="); //将内容记录到数据库 } } --------------------------------数据库表