SpringBoot整合RocketMQ,高手都是这么玩的!

news2025/1/25 4:28:22

今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决他们,第三部分介绍如何封装RocketMQ以便更好地使用。

1. SpringBoot整合RocketMQ

在SpringBoot中集成RocketMQ,只需要简单四步:

  1. 引入相关依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
  1. 添加RocketMQ的相关配置

rocketmq:
    consumer:
        group: springboot_consumer_group
        # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
        pull-batch-size: 10
    name-server: 10.5.103.6:9876
    producer:
        # 发送同一类消息的设置为同一个group,保证唯一
        group: springboot_producer_group
        # 发送消息超时时间,默认3000
        sendMessageTimeout: 10000
        # 发送消息失败重试次数,默认2
        retryTimesWhenSendFailed: 2
        # 异步消息重试此处,默认2
        retryTimesWhenSendAsyncFailed: 2
        # 消息最大长度,默认1024 * 1024 * 4(默认4M)
        maxMessageSize: 4096
        # 压缩消息阈值,默认4k(1024 * 4)
        compressMessageBodyThreshold: 4096
        # 是否在内部发送失败时重试另一个broker,默认false
        retryNextServer: false
  1. 使用提供的模板工具类RocketMQTemplate发送消息

@RestController
public class NormalProduceController {
  @Setter(onMethod_ = @Autowired)
  private RocketMQTemplate rocketmqTemplate;
  
  @GetMapping("/test")
  public SendResult test() {
    Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();
    SendResult sendResult = rocketmqTemplate.send(topic, msg);
  }
}
  1. 实现RocketMQListener接口消费消息

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name")
public class MyConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理消息的逻辑
        System.out.println("Received message: " + message);
    }
}

以上4步即可实现SpringBoot与RocketMQ的整合,这部分属于基础知识,不做过多说明。

2 使用RocketMQ会遇到的问题

以下是一些在SpringBoot中使用RocketMQ时常遇到的问题,现在为您逐一解决。

2.1 WARN No appenders could be found for logger

启动项目时会在日志中看到如下告警

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Please initialize the logger system properly.

此时我们只需要在启动类中设置环境变量 rocketmq.client.logUseSlf4j 为 true 明确指定RocketMQ的日志框架

@SpringBootApplication
public class RocketDemoApplication {

    public static void main(String[] args) {
        /*
         * 指定使用的日志框架,否则将会告警
         * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
         * RocketMQLog:WARN Please initialize the logger system properly.
         */
        System.setProperty("rocketmq.client.logUseSlf4j", "true");
      
        SpringApplication.run(RocketDemoApplication.class, args);
    }
}

同时还得在配置文件中调整日志级别,不然在控制台会一直看到broker的日志信息

logging:
 level:
   RocketmqClient: ERROR
    io:
     netty: ERROR

2.2 不支持LocalDate 和 LocalDateTime

在使用Java8后经常会使用LocalDate/LocalDateTime这两个时间类型字段,然而RocketMQ原始配置并不支持Java时间类型,当我们发送的实体消息中包含上述两个字段时,消费端在消费时会出现如下所示的错误。

比如生产者的代码如下:

@GetMapping("/test")
public void test(){
  //普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发。
  RocketMessage rocketMessage = RocketMessage.builder().
    id(1111L).
    message("hello,world")
    .localDate(LocalDate.now())
    .localDateTime(LocalDateTime.now())
    .build();
  rocketmqTemplate.convertAndSend(destination,rocketMessage);
}

消费者的代码如下:

@Component
@RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic")
public class RocketMQConsumer implements RocketMQListener<RocketMessage> {
    @Override
    public void onMessage(RocketMessage message) {
        System.out.println("消费消息-" + message);
    }
}

消费者开始消费时会出现类型转换异常错误Cannot construct instance of java.time.LocalDate,错误详情如下:

图片

image-20230322163904100

原因:RocketMQ内置使用的转换器是RocketMQMessageConverter,转换Json时使用的是MappingJackson2MessageConverter,但是这个转换器不支持时间类型。

解决办法:需要自定义消息转换器,将MappingJackson2MessageConverter进行替换,并添加支持时间模块

@Configuration
public class RocketMQEnhanceConfig {

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }
}

2.3 RockeMQ环境隔离

在使用RocketMQ时,通常会在代码中直接指定消息主题(topic),而且开发环境和测试环境可能共用一个RocketMQ环境。如果没有进行处理,在开发环境发送的消息就可能被测试环境的消费者消费,测试环境发送的消息也可能被开发环境的消费者消费,从而导致数据混乱的问题。

为了解决这个问题,我们可以根据不同的环境实现自动隔离。通过简单配置一个选项,如dev、test、prod等不同环境,所有的消息都会被自动隔离。例如,当发送的消息主题为consumer_topic时,可以自动在topic后面加上环境后缀,如consumer_topic_dev

那么,我们该如何实现呢?

可以编写一个配置类实现BeanPostProcessor,并重写postProcessBeforeInitialization方法,在监听器实例初始化前修改对应的topic。

BeanPostProcessor是Spring框架中的一个接口,它的作用是在Spring容器实例化、配置完bean之后,在bean初始化前后进行一些额外的处理工作。

具体来说,BeanPostProcessor接口定义了两个方法:

  • postProcessBeforeInitialization(Object bean, String beanName): 在bean初始化之前进行处理,可以对bean做一些修改等操作。

  • postProcessAfterInitialization(Object bean, String beanName): 在bean初始化之后进行处理,可以进行一些清理或者其他操作。

BeanPostProcessor可以在应用程序中对Bean的创建和初始化过程进行拦截和修改,对Bean的生命周期进行干预和操作。它可以对所有的Bean类实例进行增强处理,使得开发人员可以在Bean初始化前后自定义一些操作,从而实现自己的业务需求。比如,可以通过BeanPostProcessor来实现注入某些必要的属性值、加入某一个对象等等。

实现方案如下:

  1. 在配置文件中增加相关配置

rocketmq:
 enhance:
   # 启动隔离,用于激活配置类EnvironmentIsolationConfig
   # 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
   enabledIsolation: true
   # 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
   environment: dev
  1. 新增配置类,在实例化消息监听者之前把topic修改掉

@Configuration
public class EnvironmentIsolationConfig implements BeanPostProcessor {
   @Value("${rocketmq.enhance.enabledIsolation:true}")
    private boolean enabledIsolation;
    @Value("${rocketmq.enhance.environment:''}")
    private String environmentName;
  
    /**
     * 在装载Bean之前实现参数修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
       //拼接Topic
            if(enabledIsolation && StringUtils.hasText(environmentName)){
                container.setTopic(String.join("_", container.getTopic(),environmentName));
            }
            return container;
        }
        return bean;
    }
}
  1. 启动项目可以看到日志中消息监听的队列已经被修改了

2023-03-23 17:04:59.726 [main] INFO  o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}

3. RocketMQ二次封装

在解释为什么要二次封装之前先来看看RocketMQ官方文档中推荐的最佳实践

  1. 消息发送成功或者失败要打印消息日志,用于业务排查问题。

  2. 如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

  3. RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。

上面三个步骤基本每次发送消息或者消费消息都要实现,属于重复动作。

接下来讨论的是在RocketMQ中发送消息时选择何种消息类型最为合适。

在RocketMQ中有四种可选格式:

  1. 发送Json对象

  2. 发送转Json后的String对象

  3. 根据业务封装对应实体类

  4. 直接使用原生MessageExt接收。

对于如何选择消息类型,需要考虑到消费者在不查看消息发送者的情况下,如何获取消息的含义。因此,在这种情况下,使用第三种方式即根据业务封装对应实体类的方式最为合适,也是大多数开发者在发送消息时的常用方式。

有了上面两点结论以后我们来看看为什么要对RocketMQ二次封装。

3.1 为什么要二次封装

按照上述最佳实践,一个完整的消息传递链路从生产到消费应包括 准备消息、发送消息、记录消息日志、处理发送失败、记录接收消息日志、处理业务逻辑、异常处理和异常重试 等步骤。

虽然使用原生RocketMQ可以完成这些动作,但每个生产者和消费者都需要编写大量重复的代码来完成相同的任务,这就是需要进行二次封装的原因。我们希望通过二次封装,生产者只需准备好消息实体并调用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理。 

在二次封装中,关键是找出框架在日常使用中所涵盖的许多操作,以及区分哪些操作是可变的,哪些是不变的。以上述例子为例,实际上只有生产者的消息准备和消费者的业务处理是可变的操作,需要根据需求进行处理,而其他步骤可以固定下来形成一个模板。

当然,本文提到的二次封装不是指对源代码进行封装,而是针对工具的原始使用方式进行的封装。可以将其与Mybatis和Mybatis-plus区分开来。这两者都能完成任务,只不过Mybatis-plus更为简单便捷。

3.2 实现二次封装

实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。同时,在自定义starter中还需要解决文章第二部分中提到的一些问题。

代码结构如下所示:

图片

image-20230403160031944

3.2.1 消息实体类的封装
/**
 * 消息实体,所有消息都需要继承此类
 * 公众号:JAVA日知录
 */
@Data
public abstract class BaseMessage {
    /**
     * 业务键,用于RocketMQ控制台查看消费情况
     */
    protected String key;
    /**
     * 发送消息来源,用于排查问题
     */
    protected String source = "";

    /**
     * 发送时间
     */
    protected LocalDateTime sendTime = LocalDateTime.now();

    /**
     * 重试次数,用于判断重试次数,超过重试次数发送异常警告
     */
    protected Integer retryTimes = 0;
}

后面所有发送的消息实体都需要继承此实体类。

3.2.2 消息发送工具类的封装
@Slf4j
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RocketMQEnhanceTemplate {
    private final RocketMQTemplate template;

    @Resource
    private RocketEnhanceProperties rocketEnhanceProperties;

    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 根据系统上下文自动构建隔离后的topic
     * 构建目的地
     */
    public String buildDestination(String topic, String tag) {
        topic = reBuildTopic(topic);
        return topic + ":" + tag;
    }

    /**
     * 根据环境重新隔离topic
     * @param topic 原始topic
     */
    private String reBuildTopic(String topic) {
        if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
            return topic +"_" + rocketEnhanceProperties.getEnvironment();
        }
        return topic;
    }

    /**
     * 发送同步消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(buildDestination(topic,tag), message);
    }


    public <T extends BaseMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 发送延迟消息
     */
    public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(buildDestination(topic,tag), message, delayLevel);
    }

    public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }
}

这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。

3.2.3 消费者的封装
@Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> {
    /**
     * 默认重试次数
     */
    private static final int MAX_RETRY_TIMES = 3;

    /**
     * 延时等级
     */
    private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND;


    @Resource
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    /**
     * 消息处理
     *
     * @param message 待处理消息
     * @throws Exception 消费异常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超过重试次数消息,需要启用isRetry
     *
     * @param message 待处理消息
     */
    protected abstract void handleMaxRetriesExceeded(T message);


    /**
     * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    protected boolean filter(T message) {
        return false;
    }

    /**
     * 是否异常时重复发送
     *
     * @return true: 消息重试,false:不重试
     */
    protected abstract boolean isRetry();

    /**
     * 消费异常时是否抛出异常
     * 返回true,则由rocketmq机制自动重试
     * false:消费异常(如果没有开启重试则消息会被自动ack)
     */
    protected abstract boolean throwException();

    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默认5次
     */
    protected int getMaxRetryTimes() {
        return MAX_RETRY_TIMES;
    }

    /**
     * isRetry开启时,重新入队延迟时间
     * @return -1:立即入队重试
     */
    protected int getDelayLevel() {
        return DELAY_LEVEL;
    }

    /**
     * 使用模板模式构建消息消费框架,可自由扩展或删减
     */
    public void dispatchMessage(T message) {
        // 基础日志记录被父类处理了
        log.info("消费者收到消息[{}]", JSONObject.toJSON(message));

        if (filter(message)) {
            log.info("消息id{}不满足消费条件,已过滤。",message.getKey());
            return;
        }
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > getMaxRetryTimes()) {
            handleMaxRetriesExceeded(message);
            return;
        }
        try {
            long now = System.currentTimeMillis();
            handleMessage(message);
            long costTime = System.currentTimeMillis() - now;
            log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime);
        } catch (Exception e) {
            log.error("消息{}消费异常", message.getKey(),e);
            // 是捕获异常还是抛出,由子类决定
            if (throwException()) {
                //抛出异常,由DefaultMessageListenerConcurrently类处理
                throw new RuntimeException(e);
            }
            //此时如果不开启重试机制,则默认ACK了
            if (isRetry()) {
                handleRetry(message);
            }
        }
    }

    protected void handleRetry(T message) {
        // 获取子类RocketMQMessageListener注解拿到topic和tag
        RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
        if (annotation == null) {
            return;
        }
        //重新构建消息体
        String messageSource = message.getSource();
        if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){
            message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource);
        }
        message.setRetryTimes(message.getRetryTimes() + 1);

        SendResult sendResult;

        try {
            // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
            sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel());
        } catch (Exception ex) {
            // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
            //由生产者直接发送
            throw new RuntimeException(ex);
        }
        // 发送失败的处理就是不进行ACK,由RocketMQ重试
        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
            throw new RuntimeException("重试消息发送失败");
        }

    }
}

使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常处理,异常重试等公共逻辑,消息过滤(查重)、业务处理则交由子类实现。

3.2.4 基础配置类
@Configuration
@EnableConfigurationProperties(RocketEnhanceProperties.class)
public class RocketMQEnhanceAutoConfiguration {

    /**
     * 注入增强的RocketMQEnhanceTemplate
     */
    @Bean
    public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){
        return new RocketMQEnhanceTemplate(rocketMQTemplate);
    }

    /**
     * 解决RocketMQ Jackson不支持Java时间类型配置
     * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration}
     */
    @Bean
    @Primary
    public RocketMQMessageConverter enhanceRocketMQMessageConverter(){
        RocketMQMessageConverter converter = new RocketMQMessageConverter();
        CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();
        List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();
        for (MessageConverter messageConverter : messageConverterList) {
            if(messageConverter instanceof MappingJackson2MessageConverter){
                MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;
                ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();
                objectMapper.registerModules(new JavaTimeModule());
            }
        }
        return converter;
    }


    /**
     * 环境隔离配置
     */
    @Bean
    @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true")
    public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){
        return new EnvironmentIsolationConfig(rocketEnhanceProperties);
    }

}
public class EnvironmentIsolationConfig implements BeanPostProcessor {
    private RocketEnhanceProperties rocketEnhanceProperties;

    public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) {
        this.rocketEnhanceProperties = rocketEnhanceProperties;
    }


    /**
     * 在装载Bean之前实现参数修改
     */
    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof DefaultRocketMQListenerContainer){

            DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;

            if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){
                container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment()));
            }
            return container;
        }
        return bean;
    }
}
@ConfigurationProperties(prefix = "rocketmq.enhance")
@Data
public class RocketEnhanceProperties {

    private boolean enabledIsolation;

    private String environment;
}

3.3 封装后的使用

3.3.1 引入依赖
 <dependency>
   <groupId>com.jianzh5</groupId>
   <artifactId>cloud-rocket-starter</artifactId>
</dependency>
3.3.2 自定义配置
rocketmq:
 ...
 enhance:
  # 启动隔离,用于激活配置类EnvironmentIsolationConfig
   # 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果
   enabledIsolation: true
    # 隔离环境名称,拼接到topic后,topic_dev,默认空字符串
    environment: dev
3.3.3 发送消息
@RestController
@RequestMapping("enhance")
@Slf4j
public class EnhanceProduceController {

    //注入增强后的模板,可以自动实现环境隔离,日志记录
    @Setter(onMethod_ = @Autowired)
    private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;

    private static final String topic = "rocket_enhance";
    private static final String tag = "member";

    /**
     * 发送实体消息
     */
    @GetMapping("/member")
    public SendResult member() {
        String key = UUID.randomUUID().toString();
        MemberMessage message = new MemberMessage();
        // 设置业务key
        message.setKey(key);
        // 设置消息来源,便于查询
        message.setSource("MEMBER");
        // 业务消息内容
        message.setUserName("Java日知录");
        message.setBirthday(LocalDate.now());

        return rocketMQEnhanceTemplate.send(topic, tag, message);
    }
}

注意这里使用的是封装后的模板工具类,一旦在配置文件中启动环境隔离,则生产者的消息也自动发送到隔离后的topic中。

3.3.4 消费者
@Slf4j
@Component
@RocketMQMessageListener(
        consumerGroup = "enhance_consumer_group",
        topic = "rocket_enhance",
        selectorExpression = "*",
        consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够
)
public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> {

    @Override
    protected void handleMessage(MemberMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        System.out.println("业务消息处理:"+message.getUserName());
    }

    @Override
    protected void handleMaxRetriesExceeded(MemberMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
        log.error("消息消费失败,请执行后续处理");
    }


    /**
     * 是否执行重试机制
     */
    @Override
    protected boolean isRetry() {
        return true;
    }

    @Override
    protected boolean throwException() {
        // 是否抛出异常,false搭配retry自行处理异常
        return false;
    }
  
    @Override
    protected boolean filter() {
        // 消息过滤
        return false;
    }

    /**
     * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型
     */
    @Override
    public void onMessage(MemberMessage memberMessage) {
        super.dispatchMessage(memberMessage);
    }
}

为了方便消费者对RocketMQ中的消息进行处理,我们可以使用EnhanceMessageHandler来进行消息的处理和逻辑的处理。

消费者实现了RocketMQListener的同时,可以继承EnhanceMessageHandler来进行公共逻辑的处理,而核心业务逻辑需要自己实现handleMessage方法。 如果需要对消息进行过滤或者去重的处理,则可以重写父类的filter方法进行实现。这样可以更加方便地对消息进行处理,减轻开发者的工作量。

以上,就是今天的主要内容,希望对你有所帮助!(本文涉及的相关代码已整合至Dailymart项目并适配DDD架构模型)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1301467.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序、uniapp选择器,包含一级,二级级联,三级级联

效果预览&#xff1a; 已知问题: 不能与页面下拉一起使用 滑动选择后,scroll-view指定scrollTop时,scrollview滚动会有500ms左右的延迟(官方help),现在加了个loaing 参数说明: show(类型:Boolean,默认 false):控制组件显示隐藏 list(类型:Array):选择器绑定的数据 type(类型…

我的网站服务器被入侵了该怎么办?

最近有用户咨询到德迅云安全&#xff0c;说自己再用的网站服务器遇到了入侵情况&#xff0c;询问该怎么处理入侵问题&#xff0c;有什么安全方案可以解决服务器被入侵的问题。下面&#xff0c;我们就来简单讲下服务器遇到入侵了&#xff0c;该从哪方面入手处理&#xff0c;在预…

pandas 使用方法(1)

目录 1. excel 表格处理 (1) 读取excel 表格 (2) 抽取excel表部分列数据 (3) 保存数据到excel表格 (4) 保存到 excel 表中的不同sheet 2. 判断二维数组中的某个数值是否为空 3. 删除二维数组中的空行 4. 在列表中添加某列属性 本文是将使用pandas过程中遇到的问题进行了…

SpringDataJPA基础

简介 Spring Data为数据访问层提供了熟悉且一致的Spring编程模版&#xff0c;对于每种持久性存储&#xff0c;业务代码通常需要提供不同存储库提供对不同CURD持久化操作。Spring Data为这些持久性存储以及特定实现提供了通用的接口和模版。其目的是统一简化对不同类型持久性存储…

基于Java SSM框架实现个性化影片推荐系统项目【项目源码+论文说明】

基于java的SSM框架实现个性化影片推荐系统演示 摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;个性化影片推荐系统当然也不能排除在外。个性化影片推荐系统是以实际运用…

【Citespace】从Citespace开始的引文可视化分析

CiteSpace 译“引文空间”&#xff0c;是一款着眼于分析科学分析中蕴含的潜在知识&#xff0c;是在科学计量学、数据可视化背景下逐渐发展起来的引文可视化分析软件。由于是通过可视化的手段来呈现科学知识的结构、规律和分布情况&#xff0c;因此也将通过此类方法分析得到的可…

经典目标检测YOLO系列(一)引言_目标检测架构

经典目标检测YOLO系列(一)引言_目标检测架构 一个常见的目标检测网络&#xff0c;其本身往往可以分为一下三大块&#xff1a; Backbone network&#xff0c;即主干网络&#xff0c;是目标检测网络最为核心的部分&#xff0c;backbone选择的好坏&#xff0c;对检测性能影响是十…

Jol-分析Java对象的内存布局

Jol-分析Java对象的内存布局 Open JDK提供的JOL(Java Object Layout)工具为我们方便分析、了解一个Java对象在内存当中的具体布局情况。本文实验环境为64位HotSpot虚拟机。 Java对象的内存布局 Java的实例对象、数组对象在内存中的组成包括&#xff1a;对象头、实例数据和内存…

一键优化工具,十分不错的win7、win10系统优化的工具,可以帮助用户轻松快速优化系统,供大家学习研究参考~

主要功能 01、禁用索引服务 02、禁止window发送错误报告 03、禁用"最近使用的项目” 04、关闭Windows Defender 05、关闭防火墙 06、检查更新而不自动下载更新 07、启动电源计划“高性能” 08、调整电源选项 09、禁用休眠(删除休眠文件) 10、开启快速启动 11、…

【lesson3】数据库表的操作

文章目录 创建修改修改表名增加表类型修改表的某一类型的类型修改表某一类型的类型名 删除删除表的某一列删除表 查看查看表信息查看表内容 创建 建表指令&#xff1a; 查看是否建表成功&#xff1a; 查看表的具体信息&#xff1a; 修改 修改表名 法一&#xff1a;修改…

yolov5目标检测

一、安装 1.源码下载 git clone git://github.com/ultralytics/yolov5.git cd yolov5 2.环境配置 conda create -n yolov5 python3.8 conda activate yolov5 nvcc -V查看cuda版本 pytorch官网下载对应版本&#xff0c;例如当cuda版本为11.6 pip install torch1.13.1cu…

阿里云服务器租用价格分享,阿里云服务器热门配置最新活动价格汇总

在我们购买阿里云服务器的时候&#xff0c;1核2G、2核2G、2核4G、2核8G、4核8G、8核16G、8核32G等配置属于用户购买最多的热门配置&#xff0c;1核2G、2核2G、2核4G这些配置低一点的云服务器基本上能够满足绝大部分个人建站和普通企业用户建站需求&#xff0c;而4核8G、8核16G、…

Vue之模板语法

模板语法有两大类&#xff1a; 1.插值语法 2.指令语法 让我为大家介绍一下吧&#xff01; 一、插值语法 功能:用于解析标签体内容。 写法: {{xxx}}&#xff0c;xxx是js表达式&#xff0c;且可以直接读取到data中的所有属性。 举个例子&#xff1a; <!DOCTYPE html> &l…

六级高频词汇3

目录 单词 参考链接 单词 400. nonsense n. 胡说&#xff0c;冒失的行动 401. nuclear a. 核子的&#xff0c;核能的 402. nucleus n. 核 403. retail n. /v. /ad. 零售 404. retain vt. 保留&#xff0c;保持 405. restrict vt. 限制&#xff0c;约束 406. sponsor n. …

LinuxBasicsForHackers笔记 -- BASH 脚本

你的第一个脚本&#xff1a;“你好&#xff0c;黑客崛起&#xff01;” 首先&#xff0c;您需要告诉操作系统您要为脚本使用哪个解释器。 为此&#xff0c;请输入 shebang&#xff0c;它是井号和感叹号的组合&#xff0c;如下所示&#xff1a;#! 然后&#xff0c;在 shebang …

“我“的测试之路,从初级测试到测试开发,往后前景...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试工程师的现…

C++ //习题2.5 请写出下列表达式的值。

C程序设计 &#xff08;第三版&#xff09; 谭浩强 习题2.5 习题2.5 请写出下列表达式的值。 (1) 3.5 * 3 2 * 7 - ‘a’ (2) 26 / 3 34 % 3 2.5 (3) 45 / 2 (int)3.14159 / 2 (4) a b (c a 6) 设a的初值为3 (5) a 3 * 5, a b 3 * 2 (6) (int)(a 6.5) % 2 …

DevEco Studio 3.1IDE环境配置(HarmonyOS 3.1)

DevEco Studio 3.1IDE环境配置&#xff08;HarmonyOS 3.1&#xff09; 一、安装环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、环境安装 IDE下载地址&#xff1a;HUAWEI DevEco Studio和SDK下载和升级 | HarmonyOS开发者 IDE的安装就是…

家具制造ERP软件包含哪些功能?家具制造业ERP系统哪个好

不同的家具有不同的用料、品质、制造工时、营销渠道等&#xff0c;而有些家具制造企业采用传统的管理方式在处理物料BOM、生产实际成本核算、库存盘点、供应商选择、班组计件核对、生产领用以及物料追溯等方面存在不少提升空间。 与此同时也有很多的皮具制造企业借助ERP软件优…

探索HarmonyOS_开发软件安装

随着华为推出HarmonyOS NEXT 宣布将要全面启用鸿蒙原声应用&#xff0c;不在兼容安卓应用&#xff0c; 现在开始探索鸿蒙原生应用的开发。 HarmonyOS应用开发官网 - 华为HarmonyOS打造全场景新服务 鸿蒙官网 开发软件肯定要从这里下载 第一个为微软系统(windows)&#xff0c;第…