Spring Boot 整合 RocketMQ 之消息消费手动提交 ACK 实战【案例分享】

news2024/10/29 1:14:23

前言:

上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

Spring Boot 整合 RocketMQ 之事务消息

RocketMQ 之消息重试机制

同步消息手动提交 ACK 案例分享

同步消息 Producer 消息发送代码案例

同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName: OneWayMessageProducer
 * @Author: zhangyong
 * @Date: 2024/9/27 17:27
 * @Description: 同步消息发送者
 */
@Slf4j
@Component
public class SyncMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步消息发送
     */
    public void sendSyncMessage(String message) {
        rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());
    }

}

同步消息手动 ACK Consumer 端代码案例分享

RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:

package org.apache.rocketmq.spring.core;

public interface RocketMQListener<T> {
    void onMessage(T var1);
}

我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。

这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitSyncCounsumer
 * @Author: Author
 * @Date: 2024/10/16 16:23
 * @Description:
 */
@Slf4j
@Component
public class ManualCommitSyncCounsumer {


    /**
     * @date 2024/10/16 17:19
     * @description 同步消息消费成功 手动提交
     */
    @PostConstruct
    public void onSyncMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("sync-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 同步消费端手动 ACK 结果验证:

正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS

2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了

没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。

模拟除 0 异常

2024-10-19 10:19:59.026  INFO 34052 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:09.029  INFO 34052 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:39.032  INFO 34052 --- [MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

正常消费但是返回 NULL

2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:26:27.347  INFO 27256 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。

不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:

  • 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
  • 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。

顺序消息 Producer 消息发送代码案例

顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


/**
 * @ClassName: OneWayMessageProducer
 * @Author: Author
 * @Date: 2024/9/27 17:27
 * @Description: 顺序消息发送者
 */
@Slf4j
@Component
public class OrderlyMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @date 2024/10/11 15:45
     * @description 同步顺序消息
     */
    public void syncSendOrderly() {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");
    }

}

顺序消息 Consumer 消息消费代码案例

同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitOrderlyMessageConsumer
 * @Author: Author
 * @Date: 2024/10/10 17:35
 * @Description: 顺序消息消费
 */
@Slf4j
@Component
public class ManualCommitOrderlyMessageConsumer {

    /**
     * @date 2024/10/16 17:19
     * @description 顺序消息消费成功 手动提交
     */
    @PostConstruct
    public void onOrderlyMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("orderly-topic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeOrderlyStatus.SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 顺序消费端手动 ACK 结果验证:

正常消费返回 ConsumeOrderlyStatus.SUCCESS

2024-10-19 13:46:44.293  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.294  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.302  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:46:44.303  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,结果符合预期。

模拟除 0 异常

2024-10-19 13:50:45.587  INFO 27604 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:46.588  INFO 27604 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:48.593  INFO 27604 --- [orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:50.610  INFO 27604 --- [orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:52.617  INFO 27604 --- [rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:54.620  INFO 27604 --- [rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:56.629  INFO 27604 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:57.631  INFO 27604 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。

这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

模拟返回 NULL

2024-10-19 14:00:51.484  INFO 22728 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:52.485  INFO 22728 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:54.492  INFO 22728 --- [rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:56.495  INFO 22728 --- [rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:58.510  INFO 22728 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:00.521  INFO 22728 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:02.525  INFO 22728 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:03.527  INFO 22728 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。

总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2225835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JAVA基础:集合 (习题笔记)

写完一定记得 CtrlAltL 让代码格式标准 1.使用List和Map存放多个图书信息&#xff0c;遍历并输出。其中商品属性&#xff1a;编号&#xff0c;名称&#xff0c;单价&#xff0c;出版社&#xff1b;使用商品编号作为Map中的key。 Books类 package set.saturdayPlan;public class…

经纬恒润AUTOSAR成功适配芯钛科技Alioth TTA8车规级芯片

在汽车电子领域&#xff0c;功能安全扮演着守护者的角色&#xff0c;它确保了车辆在复杂多变的情况下保持稳定可靠的运行。随着汽车电子的复杂性增加&#xff0c;市场对产品功能安全的要求也日益提高。基于此背景&#xff0c;经纬恒润AUTOSAR基础软件产品INTEWORK-EAS-CP成功适…

STL-常用容器-list

1list基本概念 **功能&#xff1a;**将数据进行链式存储 链表&#xff08;list&#xff09;是一种物理存储单元上非连续的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接实现的 链表的组成&#xff1a;链表由一系列结点组成 结点的组成&#xff1a;一个是存储…

如何将 PDF 转换成JPG图片?这里有4个详细指南

通常情况下&#xff0c;图片文件比 PDF 文件加载速度更快&#xff0c;所以如果将PDF转换成图片的格式&#xff0c;或更容易分享以及浏览。所以&#xff0c;今天就教大家4个方法&#xff0c;帮助大家快速的进行PDF和JPG图片之间的转换。 1、PDF转换大师 直通车&#xff1a;www.…

深度学习超参数调优指南

文章目录 深度学习超参数调优指南一、超参数相关基础知识1. 神经网络中包含哪些超参数2. 超参数的重要性顺序3. 部分超参数如何影响模型性能4. 部分超参数合适的范围 二、超参数调整技巧1. 如何选择激活函数2. 如何调整 Batch Size3. 如何调整学习率 三、自动调参方法1. 网格搜…

【JIT/极态云】技术文档--函数设计

一、简介 函数是计算机编程中非常重要的概念。它是一段代码&#xff0c;可以在程序中多次调用&#xff0c;用于完成特定的任务。 函数通常接受输入参数&#xff0c;执行特定的操作&#xff0c;并返回一个结果。这个结果可以被程序中的其他代码使用。 二、新建函数 在函数列表…

Ubuntu下Mysql修改默认存储路径

首先声明&#xff0c;亲身经验&#xff0c;自己实践&#xff0c;网上百度了好几个帖子&#xff0c;全是坑&#xff0c;都TMD的不行&#xff0c;修改各种配置文件&#xff0c;就是服务起不来&#xff0c;有以下几种配置文件需要修改 第一个文件/etc/mysql/my.cnf 这个文件是存…

【论文阅读】FUNNELRAG:一个从粗到精的逐级检索范式

论文地址&#xff1a;https://arxiv.org/abs/2410.10293 github&#xff1a; 研究背景 现有的检索范式存在两个主要问题&#xff1a;一是平铺检索(flat retrieval)对单个检索器造成巨大负担&#xff1b;二是恒定粒度(constant granularity)限制了检索性能的上限。研究难点在于…

map 和 set 的使用

文章目录 一.序列式容器和关联式容器二. set 系列的使用1. set 和 multiset 参考文档2. set 类介绍3. set 的构造和迭代器4. set 的增删查5. insert 和迭代器遍历使用样例6. find 和 erase 使用样例7. multiset 和 set 的差异 三. map 系列的使用1. map 和 multimap参考文档2. …

11张思维导图带你快速学习java

简介 Java是一种跨平台的编程语言&#xff0c;广泛应用于开发各种类型的应用程序。从零开始学习Java可能会感到困惑&#xff0c;因为Java拥有广泛的功能和概念。为了更好地学习和理解Java&#xff0c;可以使用思维导图来整理和归纳Java的主要概念和特点。思维导图可以帮助学习…

iOS 18.2开发者预览版 Beta 1版本发布,欧盟允许卸载应用商店

苹果今天为开发人员推送了iOS 18.2开发者预览版 Beta 1版本 更新&#xff08;内部版本号&#xff1a;22C5109p&#xff09;&#xff0c;本次更新距离上次发布 Beta / RC 间隔 2 天。该版本仅适用于支持Apple Intelligence的设备&#xff0c;包括iPhone 15 Pro系列和iPhone 16系…

算法通关--单调栈

单调栈 单调栈是在栈的先进后出的规则基础上&#xff0c;要求从栈底到栈顶的元素满足单调的关系。如果是大压小&#xff0c;那么从栈顶观察是递减栈&#xff0c;如果是小压大&#xff0c;那么从栈顶观察使递减栈 经典用法&#xff1a; 判断一个数组每个位置都求&#xff1a;…

性价比高的宠物空气净化器选购指南,双十一有哪几款值得购买?

养猫家庭注意养猫家庭注意&#xff0c;换毛季它又来啦&#xff01;不管你家猫是多么瘦小&#xff0c;这个时候都会变成一年两次限定的蒲公英小猫。这都是因为它在疯狂的掉毛&#xff0c;没来得及清理的毛发就留在身上&#xff0c;不断堆积&#xff0c;家里也到处都是它掉落的猫…

医学影像学基础:理解CT、MRI、X射线和超声等医学影像设备的基本工作原理和成像技术

目录 医学影像学基础 1. X射线成像 2. 计算机断层扫描&#xff08;CT&#xff09; 3. 磁共振成像&#xff08;MRI&#xff09; 4. 超声成像 综合对比 1、成像原理对比 2、安全性对比 3、应用领域对比 4、设备特点对比 总结 医学影像学基础 在医学影像学中&#xff0…

TCP simultaneous open测试

源代码 /*************************************************************************> File Name: common.h> Author: hsz> Brief:> Created Time: 2024年10月23日 星期三 09时47分51秒**********************************************************************…

windows录屏软件工具推荐!!

如今&#xff0c;科技的进步&#xff0c;互联网的普及&#xff0c;使我们的生活越来越便利&#xff0c;录屏工具的出现&#xff0c;大大提高我们的工作效率。如果你经常需要录制屏幕上的内容&#xff0c;比如制作教学视频、游戏实况记录、演示文稿等等&#xff0c;那这几款软件…

“令牌化”革命:数据货币化如何重塑企业竞争格局

在科技日新月异的今天&#xff0c;英伟达CEO黄仁勋在Gartner IT研讨会/XPO大会上的主题演讲无疑为企业创业者们提供了一场思想的盛宴。作为科技行业的领军企业&#xff0c;英伟达不仅在图形处理器&#xff08;GPU&#xff09;领域取得了巨大成功&#xff0c;更在人工智能&#…

前端新人手册:入职第一天的环境配置秘籍

在前端开发的世界里&#xff0c;一个高效、稳定的开发环境是高效工作的基石。它不仅能够提升你的工作效率&#xff0c;还能帮助你更快地适应团队的工作节奏。本文将详细介绍前端开发需要具备的环境及工具。 开发环境 Node.js 通常我们的前端项目都是依赖Node.js环境的&#…

JavaScript入门中-流程控制语句

本文转载自&#xff1a;https://fangcaicoding.cn/article/52 大家好&#xff01;我是方才&#xff0c;目前是8人后端研发团队的负责人&#xff0c;拥有6年后端经验&3年团队管理经验&#xff0c;截止目前面试过近200位候选人&#xff0c;主导过单表上10亿、累计上100亿数据…

C++ 日志管理 spdlog 使用笔记

文章目录 Part.I IntroductionChap.I 预备知识Chap.II 常用语句 Part.II 使用Chap.I 简单使用Chap.II 自定义日志格式 Part.III 问题&解决方案Chap.I 如果文件存在则删除 Reference Part.I Introduction spdlog 是一个开源的 C 日志管理工具&#xff0c;Git 上面的地址为 …