【消息中间件】RocketMQ如何集成到SpringBoot

news2025/1/11 5:37:28

目录

一、前言

二、RocketMQ如何集成到SpringBoot项目

1、如果不用SpringBoot项目

1.1、引入依赖

1.2、启动BrokerStartup和NamesrvStartup服务端

2)启动NamesrvStartup

1.3、生产者创建并启动以及发送消息

1.4、消费者创建并启动

2、RocketMQ集成到SpringBoot项目入口

2.1、引入依赖

2.2、自动装配入口RocketMQAutoConfiguration 

3、读取application.yml配置(重点)

3.1、生产者配置

所有的生产者配置都在这里了,当然不同的RocketMQ版本可能略有不同

3.2、消费者配置

3.3、配置样例

4、@Import的具体配置内容

4.1、Jackson相关装配

4.2、注册监听器容器以及监听器

5、总结


一、前言

    前面的文章我们介绍了RocketMQ已经成为了Apache资金会的一个顶级项目以及SpringBoot如何整合RocketMQ,本篇文章我们将介绍RocketMQ如何集成到SpringBoot项目。如果不用SpringBoot,我们需做哪些工作?RocketMQ集成到SpringBoot项目的入口、哪些子模块集成进来了、依据的集成机制是什么、装配了哪些Bean?发送消息的方式有哪些?RocketMQ的配置内容有哪些?ApplicationContext上下文、如何织入、用来干什么呢?监听器注册、监听器容器注册与启动?

二、RocketMQ如何集成到SpringBoot项目

不管在什么项目使用RocketMQ,首先都要先安装RocketMQ,在《Window系统安装RocketMQ》文章已经详细介绍,感兴趣的可以去看看。然后启动RocketMQ服务端,即在RocketMQ项目里面的bin文件夹下双击mqnamesrv.cmd、mqbroker.cmd。如果是克隆下来的源码项目,则启动BrokerStartup和NamesrvStartup服务端。注意一下可能有问题,下面提供解决方案。

1、如果不用SpringBoot项目

1.1、引入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

1.2、启动BrokerStartup和NamesrvStartup服务端

1)在broker项目的pom.xml补充如下依赖,然后启动BrokerStartup

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-store</artifactId>
            <version>5.0.0-SNAPSHOT</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>io.openmessaging.storage</groupId>
            <artifactId>dledger</artifactId>
            <version>0.2.7</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.5.0</version>
            <scope>provided</scope>
        </dependency>

不加上这些依赖的话,启动就会报错或者启动失败

注意: 这里报红问题不大,已经启动BrokerStartup成功了。本地环境,端口号9876。

2)启动NamesrvStartup

 NamesrvStartup启动成功会在控制台输出这句话

1.3、生产者创建并启动以及发送消息

/**
 * 此类演示如何使用提供的 DefaultMQProducer 向代理发送消息。
 */
public class Producer {

    /**
     * The number of produced messages.消息数量
     */
    public static final int MESSAGE_COUNT = 1000;
    public static final String PRODUCER_GROUP = "please_rename_unique_group_name";
    // RocketMQ服务端主机
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";
    public static final String TAG = "TagA";

    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * 使用生产者组名实例化。
         */
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);

        /*
         * 指定名称服务器地址。
         *
         * 或者,您可以通过导出环境变量 NAMESRV _ ADDR 来指定名称服务器地址
         * <pre>
         * {@code
         *  producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        //在调试时取消注释以下行,namesrvAddr 应设置为您的本地地址 producer.setNamesrvAddr (DEFAULT _ NAMESRVADDR)
        /*
         * Launch the instance.启动实例。
         */
        producer.start();

        for (int i = 0; i < MESSAGE_COUNT; i++) {
            try {

                /*
                 * 创建一个消息实例,指定主题、标签和消息体。
                 */
                Message msg = new Message(TOPIC /* Topic */,
                    TAG /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * 调用发送消息将消息传递给其中一个代理。
                 */
                SendResult sendResult = producer.send(msg);
                /*
                 * 有不同的方式发送消息,下面举例
                 * 
                 */

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * 一旦生产者实例不再使用就关闭。
         * 这里注释掉,不然启动报bug
         */
        // producer.shutdown();
    }
}
  • 有不同的方式发送消息,如果不关心发送结果,可以使用这种方式
    producer.sendOneway(msg);
    
  • 如果希望以同步方式获取发送结果,可以使用此 send 方法
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
  • 如果希望以异步方式获取发送结果,可以使用此send方法,里面有成功、异常回调方法
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
             // do something
        }
    
        @Override
        public void onException(Throwable e) {
            // do something
        }
    });
    
    
    当然有时候一些特别场景需特别对待,而不是你发送一条消息就完事了。我们需要做些东西,如打印日志等,不管成功还是失败。所以异步回调是个好东西,仔细品味。

控制台如下效果:

生产者发送消息成功。 

1.4、消费者创建并启动

/**
 * 此示例演示如何使用提供 DefaultMQPushConsumer 订阅和使用消息。
 */
public class Consumer {

    public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";
    public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
    public static final String TOPIC = "TopicTest";

    public static void main(String[] args) throws MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */
        // Uncomment the following line while debugging, namesrvAddr should be set to your local address
//        consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);

        /*
         * 指定从哪里开始,以防特定的使用者组是一个全新的使用者组。
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more topic to consume.订阅另一个要消费的主题。
         */
        consumer.subscribe(TOPIC, "*");

        /*
         * 注册回调,以便在从代理获取的消息到达时执行。
         */
        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        /*
         *  启动消费者实例。
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

控制台效果如下:

消费者消费成功。 

可见如果不是SpringBoot项目的话,我们要自己手动去做很多事情。还要自己去封装(谈封装色变哈哈哈)请求模板等,工作量上都有了,太麻烦了。当然这里的例子大家感兴趣的,可以改改然后自己去探究探究。下面我们将介绍SpringBoot相关内容,带我们走入快乐路径:开箱即用。

2、RocketMQ集成到SpringBoot项目入口

2.1、引入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
        </dependency>

多了一个rocketmq-spring-boot-starter的jar包,如果你看过我之前写的Spring家族及微服务系列专栏文章,相信你已经知道什么意思了。这也是一个小项目,是阿里巴巴自己提供的,运用SpringBoot的自动装配机制(Spring家族及微服务系列专栏已经详细分析)把RocketMQ客户端集成到里面去的。那么这个项目有什么内容可以帮助我们走入快乐路径,我们只需引入依赖即可开箱即用,也提高了维护性呢?下面我们分析该starter项目:

2.2、自动装配入口RocketMQAutoConfiguration 

由于之前已经详细分析了自动装配原理,这里就没有过多分析了。这个配置类内容还是有点多的,我们把它由上到下分析,自动装配也是按照这个顺序来的。

  1. 配置类上面的注解
    @Configuration
    @EnableConfigurationProperties({RocketMQProperties.class})
    @ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
    @ConditionalOnProperty(
            prefix = "rocketmq",
            value = {"name-server"},
            matchIfMissing = true
    )
    @Import({JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
    @AutoConfigureAfter({JacksonAutoConfiguration.class})
    public class RocketMQAutoConfiguration {
    
      ....此处省略n行代码....
    }

    @Configuration标明该类是配置类,Spring会识别到;@EnableConfigurationProperties({RocketMQProperties.class})意思是先启动读取application.yml配置维护到RocketMQProperties;@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})意思是在配置前先配置MQAdmin以及ObjectMapper;@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"}, matchIfMissing = true)意思是配置前该属性需先配置;@Import({XXX})导入需要配置的类,加了@Configuration注解,这些配置类专门配置一些Bean;@AutoConfigureAfter({JacksonAutoConfiguration.class})意思是必须在JacksonAutoConfiguration配置后才配置。注意:配置是有顺序的,当然这些注解有一部分都是SpringBoot自动装配里面扩展Spring的@Conditional注解,为自动装配量身打造的。感兴趣的读者可以仔细研究研究,不过需要谨慎使用。

  2. 属性及构造方法
        @Autowired
        private Environment environment;
    
        public RocketMQAutoConfiguration() {
        }
    
        @PostConstruct
        public void checkProperties() {
            String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class);
            log.debug("rocketmq.nameServer = {}", nameServer);
            if (nameServer == null) {
                log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!");
            }
    
        }

    Environment是Spring里面的一个环境信息组件,这里使用@Autowired从Spring容器注入进来。提供了无参构造方法,在Spring初始化时会通过反射调用实构造方法例化该配置类。@PostConstruct也就是构造方法后置处理,构造方法调用后,Spring就会执行它标注的方法checkProperties(),检查RocketMQ是否在环境里面了,没有打印警告日志。

  3. 装配生产者
        @Bean
        @ConditionalOnMissingBean({DefaultMQProducer.class})
        @ConditionalOnProperty(
                prefix = "rocketmq",
                value = {"name-server", "producer.group"}
        )
        public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
            Producer producerConfig = rocketMQProperties.getProducer();
            String nameServer = rocketMQProperties.getNameServer();
            String groupName = producerConfig.getGroup();
            Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
            Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
            String accessChannel = rocketMQProperties.getAccessChannel();
            String ak = rocketMQProperties.getProducer().getAccessKey();
            String sk = rocketMQProperties.getProducer().getSecretKey();
            DefaultMQProducer producer;
            if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
                producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)), rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().getCustomizedTraceTopic());
                producer.setVipChannelEnabled(false);
            } else {
                producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().getCustomizedTraceTopic());
            }
    
            producer.setNamesrvAddr(nameServer);
            if (!StringUtils.isEmpty(accessChannel)) {
                producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
            }
    
            producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
            producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
            producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
            producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
            producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
            producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
            return producer;
        }

    @Bean标明需注入到Spring容器,@ConditionalOnProperty意思是先配置了属性,属性有值了。方法内部的配置在下面3分析。这个生产者就是默认生产者,将会通过setter注入到RocketMQ模板中。

  4. 配置发送消息模板
        @Bean(
                destroyMethod = "destroy"
        )
        @ConditionalOnBean({DefaultMQProducer.class})
        @ConditionalOnMissingBean(
                name = {"rocketMQTemplate"}
        )
        public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) {
            RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
            rocketMQTemplate.setProducer(mqProducer);
            rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper);
            return rocketMQTemplate;
        }

    配置RocketMQ模板,专门用来发送消息的,后面时间允许将写一篇详细的文章介绍下。值得注意的是,该Bean的配置指定了一个销毁方法。

  5. 配置事务管理器注册机
        @Bean
        @ConditionalOnBean(
                name = {"rocketMQTemplate"}
        )
        @ConditionalOnMissingBean({TransactionHandlerRegistry.class})
        public TransactionHandlerRegistry transactionHandlerRegistry(@Qualifier("rocketMQTemplate") RocketMQTemplate template) {
            return new TransactionHandlerRegistry(template);
        }

    这个Bean的作用顾名思义与事务有关,具体作用同样后面出文章分析。

  6. 配置事务注解处理器
        @Bean(
                name = {"org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor"}
        )
        @ConditionalOnBean({TransactionHandlerRegistry.class})
        @Role(2)
        public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {
            return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry);
        }

    它接收一个参数,也就是上面5配置的Bean。它的beanName另外指定了,也是跟消息事务有关的。

3、读取application.yml配置(重点)

所有的程序猿都必须知道的,所以在这里给你们划一下重点。

3.1、生产者配置

/**
 * rocketMQ在application.yml的配置,没有配置使用默认值
 * 但是有些是必须配置的
 *
 * @author CeaM
 * 2022/12/04 19:16
 **/
@ConfigurationProperties(
        prefix = "rocketmq"
)
public class RocketMQProperties {
    /**
     * 服务器主机
     */
    private String nameServer;
    private String accessChannel;
    /**
     * 生产者,公开静态内部类,有着默认的配置
     */
    private RocketMQProperties.Producer producer;

    public RocketMQProperties() {
    }

    ....此处省略n行setter/getter方法.....

    public static class Producer {
        /**
         * 分组
         */
        private String group;
        /**
         * 发送消息超时时间,默认3秒
         */
        private int sendMessageTimeout = 3000;
        private int compressMessageBodyThreshold = 4096;
        /**
         * 同步发送消息失败重试次数
         */
        private int retryTimesWhenSendFailed = 2;
        /**
         * 异步发送消息失败重试次数
         */
        private int retryTimesWhenSendAsyncFailed = 2;
        /**
         * 重试使用下一个服务器
         */
        private boolean retryNextServer = false;
        /**
         * 消息大小,默认最大允许为4m
         */
        private int maxMessageSize = 4194304;
        private String accessKey;
        private String secretKey;
        /**
         * 消息轨迹
         */
        private boolean enableMsgTrace = true;
        private String customizedTraceTopic = "RMQ_SYS_TRACE_TOPIC";

        public Producer() {
        }

        ....此处省略n行setter/getter方法.....
    }
}

所有的生产者配置都在这里了,当然不同的RocketMQ版本可能略有不同

3.2、消费者配置

/**
 * 消息监听器注解,这里的默认值有点意思,即application.yml中rocketmq配置指定值
 *
 * @author CeaM
 * 2022/12/04 19:20
 **/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";

    /**
     * 消费组
     *
     * @return 消费组
     */
    String consumerGroup();

    /**
     * 主题
     *
     * @return 主题
     */
    String topic();

    SelectorType selectorType() default SelectorType.TAG;

    String selectorExpression() default "*";

    /**
     * 默认并发消费模式
     *
     * @return 消费模式
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;

    /**
     * 默认集群消息模式
     *
     * @return 消息模式
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;

    /**
     * 消费最大线程数
     *
     * @return 最大线程数
     */
    int consumeThreadMax() default 64;

    /**
     * 消费时间默认30秒,应该会调大些吧
     *
     * @return 消费时长
     */
    long consumeTimeout() default 30000L;

    String accessKey() default "${rocketmq.consumer.access-key:}";

    String secretKey() default "${rocketmq.consumer.secret-key:}";

    /**
     * 启用消息轨迹
     *
     * @return boolean
     */
    boolean enableMsgTrace() default true;

    /**
     *  自定义的消息轨迹主题
     *
     * @return  自定义的消息轨迹主题
     */
    String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";

    String nameServer() default "${rocketmq.name-server:}";

    String accessChannel() default "${rocketmq.access-channel:}";
}

消费者配置信息里面同样application.yml没有指定,那么就会使用一些默认值。它的初始化我们下面再分析。

3.3、配置样例

rocketmq:
  # RocketMQ服务端主机
  name-server: localhost:9876
  # 生产者配置
  producer:
    group: ceam_group
  # 消费者配置
  consumer:
    group: ceam_group

其它的配置上面的属性类已经给出了,感兴趣的读者可以自行去探究探究。注意一下的是,无论是生产者还是消费者,都是在我们的客户端。

4、@Import的具体配置内容

从上面我们已经知道有3个配置类,ExtProducerResetConfiguration属于扩展类这里不分析。

4.1、Jackson相关装配

@Configuration
@ConditionalOnMissingBean({ObjectMapper.class})
class JacksonFallbackConfiguration {
    JacksonFallbackConfiguration() {
    }

    @Bean
    public ObjectMapper rocketMQMessageObjectMapper() {
        return new ObjectMapper();
    }
}

相信大家对Jackson不陌生吧,如果不知道的就得赶紧补一补啦。关键就是ObjectMapper了,而与之功效差不多的是fastjson里面的工具类,不过有bug很多程序猿再闻之恐怕色变吧。

4.2、注册监听器容器以及监听器

  1. 从Spring容器获取已经配置的监听器
    ​
    @Configuration
    public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
        private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
        /**
         * Spring容器
         */
        private ConfigurableApplicationContext applicationContext;
        /**
         * 原子类
         */
        private AtomicLong counter = new AtomicLong(0L);
        private StandardEnvironment environment;
        /**
         * RocketMQ配置信息,application.yml没有指定的,则使用默认的
         */
        private RocketMQProperties rocketMQProperties;
        private ObjectMapper objectMapper;
    
        public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
                                              StandardEnvironment environment,
                                              RocketMQProperties rocketMQProperties) {
            this.objectMapper = rocketMQMessageObjectMapper;
            this.environment = environment;
            this.rocketMQProperties = rocketMQProperties;
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = (ConfigurableApplicationContext)applicationContext;
        }
    
        /**
         * 实现了Spring的SmartInitializingSingleton接口,所以在单例实例化后执行
         */
        @Override
        public void afterSingletonsInstantiated() {
            // 从Spring上下文applicationContext获取已经实例化的所有监听器
            Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
            // 预检验以防空指针异常
            if (Objects.nonNull(beans)) {
                // 遍历监听器map注册到容器
                beans.forEach(this::registerContainer);
            }
    
        }
    
    ​
    在这里: ApplicationContextAware接口的作用就是织入ApplicationContext容器引用,SmartInitializingSingleton接口的作用是在该配置类实例化后进行一些初始化工作,工作量比较大。从前面的文章SpringBoot整合RocketMQ可知我们的监听器上面加了@Component注解,所以会以组件注入到Spring容器中。所以在afterSingletonsInstantiated()方法中,我们可以通过Spring容器即applicationContext获取已经实例化的监听器,然后遍历监听器map(key是监听器beanName,value是bean)将监听器注册到监听器容器。当然有部分上面已经分析
  2. 注册监听器容器到Spring容器并启动
        /**
         * 将监听器注册到监听器容器
         *
         * @param beanName 监听器beanName
         * @param bean 监听器bean
         */
        private void registerContainer(String beanName, Object bean) {
            // 通过Spring的AopProxyUtils获取监听器Bean的class对象
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            // 判断是否是监听器
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
            } else {
                // 获取监听器注解
                RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class);
                // 校验
                this.validate(annotation);
                // 容器名称
                String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                        this.counter.incrementAndGet());
                // 强制类型转换为GenericApplicationContext上下文
                GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext;
                // 注册到Spring容器
                genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> {
                    return this.createRocketMQListenerContainer(containerBeanName, bean, annotation);
                }, new BeanDefinitionCustomizer[0]);
                // 获取实例化的默认RocketMQ监听器容器实例
                DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext
                        .getBean(containerBeanName, DefaultRocketMQListenerContainer.class);
                // 监听器容器不是运行状态,则启动
                if (!container.isRunning()) {
                    try {
                        container.start();
                    } catch (Exception var9) {
                        log.error("Started container failed. {}", container, var9);
                        throw new RuntimeException(var9);
                    }
                }
    
                // 监听器注册到监听器容器成功,打印日志
                log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}"
                        , beanName, containerBeanName);
            }
        }

    通过Spring的AopProxyUtils获取监听器Bean的class对象;判断是否是监听器,不是抛异常;如果是从其class反射获取监听器注解,校验注解是否合法非法抛异常;拼接容器名称;强制类型转换为GenericApplicationContext上下文,将监听器容器注册到Spring容器,其中涉及创建对象下面分析;从Spring容器获取实例化的默认RocketMQ监听器容器实例;监听器容器不是运行状态,则启动;监听器注册到监听器容器成功,打印日志

  3. 创建监听器容器以及将监听器添加进来
        private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name,
                                                                                 Object bean,
                                                                                 RocketMQMessageListener annotation) {
            // 直接创建监听器容器
            DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
            // 从监听器注解获取RocketMQ服务器
            String nameServer = this.environment.resolvePlaceholders(annotation.nameServer());
            // 为空从application.yml获取
            nameServer = StringUtils.isEmpty(nameServer) ? this.rocketMQProperties.getNameServer() : nameServer;
            String accessChannel = this.environment.resolvePlaceholders(annotation.accessChannel());
            // 将服务器设置到容器
            container.setNameServer(nameServer);
            if (!StringUtils.isEmpty(accessChannel)) {
                container.setAccessChannel(AccessChannel.valueOf(accessChannel));
            }
    
    
            /*将监听器的信息注册到监听器容器*/
    
            // 将主题设置到容器,如:
            container.setTopic(this.environment.resolvePlaceholders(annotation.topic()));
            // 将消费分组设置到容器,如:
            container.setConsumerGroup(this.environment.resolvePlaceholders(annotation.consumerGroup()));
            // 将监听器注解RocketMQMessageListener设置到容器
            container.setRocketMQMessageListener(annotation);
            // 将监听器bean设置到容器,强制类型转换为RocketMQListener接口类型
            container.setRocketMQListener((RocketMQListener)bean);
            // 将objectMapper设置到容器
            container.setObjectMapper(this.objectMapper);
            // 将监听器beanName设置到容器
            container.setName(name);
            return container;
        }

    直接创建监听器容器,从监听器注解获取RocketMQ服务器,将服务器设置到监听器容器,将监听器的信息注册到监听器容器(具体看上面代码注释)。

5、总结

    如果不使用SpringBoot开发得话还是有一些工作量的,而使用了SpringBoot,你只需加上一个starter依赖以及配置就可以做到开箱即用了。配置信息维护到了application.yml,灵活可维护性高。应用了SpringBoot的自动装配机制集成到我们的项目,即RocketMQ客户端。另外配置的重点如下:

  • 当RocketMQProperties的Bean被实例化时,@ConfigurationProperties会将对应前缀的后面的属性与Bean对象的属性匹配。符合条件则进行赋值。重点。其中涉及到rocketmq的配置,包括生产者、消费者的配置。重点
  • 配置ObjectMapper类使用Jackson解析JSON,重点
  • @Import、ApplicationContextAware、SmartInitializingSingleton接口、 @PostConstruct、@Component以及Spring容器等都是重点
  • 从Spring容器获取已经注册的监听器,将监听器注册到监听器容器。监听器容器是一种特殊Bean,通过一种特殊的方式注册到Spring容器,并启动监听器容器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/76617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

谷歌seo外链建设指南

今天我分享的这些绝对是最接地气、最基础、最实用的&#xff0c;你用来做英文外链用这些技巧也会事半功倍。 发外链之前&#xff0c;首先是搜索&#xff0c;搜索可以发外链的地方。 通常只有两处地方可以让我们又简单、又轻松的发布优质的免费外链。 这两处地方就是博客和论…

客快物流大数据项目(九十五):ClickHouse的CollapsingMergeTree入了解

文章目录 ClickHouse的CollapsingMergeTree入了解 一、创建CollapsingMergeTree引擎表的语法 二、创建CollapsingMergeTree引擎的表

关于Integer

/*** 基本数据类型 包装类型* ----------------------------------------------* byte java.lang.Byte (父类是java.lang.Number)* short java.lang.Short (父类是java.lang.Number)* int …

基于微信小程序的springboot客运汽车票购票系统源码和论文

在客运公司工作 7 年之余&#xff0c;对客运管理的难度深有感触。特别是在春运期 间购票难依旧是长途汽车订票的一大难题。长途汽车和火车的订票管理虽然有 差异&#xff0c;但大体上是相同的。长途汽车在售票的过程中需要对旅客的起始地、目 的地、车次、订票和退票进行管理。…

论文精读:《DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries》

DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries 文章目录DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries论文精读摘要&#xff08;Abstract&#xff09;1. 介绍&#xff08;Introduction&#xff09;2. 相关工作&#x…

优维科技EasyOps®一体化运维平台入选“大信创产品目录”

以云计算、大数据为代表的新一代信息技术正在带来新的架构模式和应用模式&#xff0c;IT行业整个技术体系正面临一次大的换代升级机遇。在新技术更新换代的同时&#xff0c;中国IT产业也正面临前所未有的供应链安全问题&#xff0c;自主可控的信息技术应用创新&#xff08;信创…

Oracle项目管理之Primavera Unifier三种管理员模式

目录 一、系统管理员 二、公司管理员 三、项目管理员 Oracle Primavera Unifier 中有不同类型或级别的管理员&#xff0c;它们包括&#xff1a; 站点管理员&#xff08;也称为系统管理员&#xff09;公司管理员项目管理员或外壳管理员&#xff08;项目/外壳管理员&#xff0…

Servlet(三):基于Servlet实现程序、Cookie和Session、实现用户登录、上传文件

目录表白墙Cookie和Session实现用户登录上传文件表白墙 【服务器版的表白墙】 在之前通过前端代码实现的表白墙有一个问题&#xff0c;当我们关闭页面后&#xff0c;表白的数据也就丢失了&#xff0c;下面我们要做的是做一个服务器版的表白墙&#xff0c;这样即使关闭页面&…

RocketMq01_概述及背景、主题、标签、队列、生产者、消费者、注册中心、工作流程

文章目录①. RocketMQ - 概述、背景②. 消息、主题、标签、队列、唯一标识③. 生产者、消费者、NameServer、Broker④. RocketMq - 工作流程⑤. Topic的创建模式、读写队列①. RocketMQ - 概述、背景 ①. RocketMQ是⼀款阿⾥巴巴开源的消息中间件。2016年11⽉28⽇,阿⾥巴巴向Ap…

Metal每日分享,波动滤镜/涂鸦滤镜效果

本案例的目的是理解如何用Metal实现图像波动效果滤镜&#xff0c;还可类似涂鸦效果&#xff0c;主要就是对纹理坐标进行正余弦偏移处理&#xff1b; Demo HarbethDemo地址 实操代码 // 波动效果 let filter C7Fluctuate.init(extent: 50, amplitude: 0.003, fluctuate: 2.5…

GPB外链是什么?

GPB外链的意思是&#xff1a;Guangsuan Private Backlinks 全称&#xff1a;光算科技私人链接 拥有高质量&#xff0c;高权重&#xff0c;100%包收录的特点&#xff0c;且dofollow 因其效果明显&#xff0c;因其效果明显受到市场上广大的外贸SEO从业者喜欢。 它可以帮助网站…

图神经网络GNN

前言 图与图的表示 图是由一些点和一些线构成的&#xff0c;能表示一些实体之间的关系&#xff0c;图中的点就是实体&#xff0c;线就是实体间的关系。如下图&#xff0c;v就是顶点&#xff0c;e是边&#xff0c;u是整张图。attrinbutes是信息的意思&#xff0c;每个点、每条…

SQLite Expert 5.X 通用注册版-你的SQL好帮手

SQLITE 专家&#xff1a;发现 SQLITE 的力量 SQLite Expert 是一个强大的工具&#xff0c;旨在简化 SQLite3 数据库的开发。它是一个功能丰富的SQLite管理和开发工具&#xff0c;旨在满足所有用户的需求&#xff0c;从编写简单的 SQL 查询到开发复杂的数据库。 图形界面支持所…

神经网络入门(二)

卷积神经网络 文章目录卷积神经网络1. 从全连接到卷积2. 卷积层2.1 一维卷积2.2 二维卷积3. 填充与步幅4. 感受野5. 多输入多输出通道6. 池化层&#xff08;汇聚层&#xff09;7. 全连接层8. 卷积网络的整体结构9. 利用pytorch构建一个CNN网络卷积神经网络&#xff08;CNN&…

Metal每日分享,图像处理色彩丢失和模糊效果

本案例的目的是理解如何用Metal实现图像包装效果滤镜&#xff0c;用于图像处理色彩丢失和模糊效果&#xff1b; Demo HarbethDemo地址 实操代码 // 色彩丢失和模糊效果 let filter C7ColorPacking.init(horizontalTexel: 2.5, verticalTexel: 5)// 方案1: let dest BoxxIO…

[附源码]计算机毕业设计架构的博客平台设计Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

时不我待(第十八课)项目环境搭建

后台管理的项目搭建过程&#xff08;第一课&#xff09; 123第一部分 认识项目的搭配环境开发 4567第二部分 项目的创建需要的环境依赖如下 Element - The worlds most popular Vue UI framework ElementUl组件库Sass世界上最成熟、稳定和强大的CSS扩展语言 | Sass中文网 …

【数据结构】- 数组

数组基础1.1 什么是数组1.2 数组特点无法动态修改容量内存中顺序存储2. 基本操作2.1 结构2.2 添加元素 - add(E element)、add(int index, E element)代码实现2.3 删除元素 - remove(int index)、清空数组 - clear()代码实现2.4 扩容 - ensureCapacity(int capacity)3. 代码基础…

[附源码]Python计算机毕业设计SSM基于移动端的药方收集系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

在Mac系统下搭建Selenium环境并驱动Chrome浏览器

本文带领那些使用Mac的童鞋们实现Selenium驱动Chrome浏览器&#xff0c;虽然会有坑&#xff0c;但是我们可以凭借敏捷的身手躲过。下面就开始吧&#xff1a; 安装selenium 打开终端 ->pip安装&#xff08;安装命令&#xff1a;pip3 install selenium&#xff09; 安装浏览…