目录
一、前言
二、RocketMQ如何集成到SpringBoot项目
1、如果不用SpringBoot项目
1.1、引入依赖
1.2、启动BrokerStartup和NamesrvStartup服务端
2)启动NamesrvStartup
1.3、生产者创建并启动以及发送消息
1.4、消费者创建并启动
2、RocketMQ集成到SpringBoot项目入口
2.1、引入依赖
2.2、自动装配入口RocketMQAutoConfiguration
3、读取application.yml配置(重点)
3.1、生产者配置
所有的生产者配置都在这里了,当然不同的RocketMQ版本可能略有不同
3.2、消费者配置
3.3、配置样例
4、@Import的具体配置内容
4.1、Jackson相关装配
4.2、注册监听器容器以及监听器
5、总结
一、前言
前面的文章我们介绍了RocketMQ已经成为了Apache资金会的一个顶级项目以及SpringBoot如何整合RocketMQ,本篇文章我们将介绍RocketMQ如何集成到SpringBoot项目。如果不用SpringBoot,我们需做哪些工作?RocketMQ集成到SpringBoot项目的入口、哪些子模块集成进来了、依据的集成机制是什么、装配了哪些Bean?发送消息的方式有哪些?RocketMQ的配置内容有哪些?ApplicationContext上下文、如何织入、用来干什么呢?监听器注册、监听器容器注册与启动?
二、RocketMQ如何集成到SpringBoot项目
不管在什么项目使用RocketMQ,首先都要先安装RocketMQ,在《Window系统安装RocketMQ》文章已经详细介绍,感兴趣的可以去看看。然后启动RocketMQ服务端,即在RocketMQ项目里面的bin文件夹下双击mqnamesrv.cmd、mqbroker.cmd。如果是克隆下来的源码项目,则启动BrokerStartup和NamesrvStartup服务端。注意一下可能有问题,下面提供解决方案。
1、如果不用SpringBoot项目
1.1、引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
1.2、启动BrokerStartup和NamesrvStartup服务端
1)在broker项目的pom.xml补充如下依赖,然后启动BrokerStartup
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-store</artifactId>
<version>5.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
<version>0.2.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.0</version>
<scope>provided</scope>
</dependency>
不加上这些依赖的话,启动就会报错或者启动失败。
注意: 这里报红问题不大,已经启动BrokerStartup成功了。本地环境,端口号9876。
2)启动NamesrvStartup
NamesrvStartup启动成功会在控制台输出这句话
1.3、生产者创建并启动以及发送消息
/** * 此类演示如何使用提供的 DefaultMQProducer 向代理发送消息。 */ public class Producer { /** * The number of produced messages.消息数量 */ public static final int MESSAGE_COUNT = 1000; public static final String PRODUCER_GROUP = "please_rename_unique_group_name"; // RocketMQ服务端主机 public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876"; public static final String TOPIC = "TopicTest"; public static final String TAG = "TagA"; public static void main(String[] args) throws MQClientException, InterruptedException { /* * 使用生产者组名实例化。 */ DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP); /* * 指定名称服务器地址。 * * 或者,您可以通过导出环境变量 NAMESRV _ ADDR 来指定名称服务器地址 * <pre> * {@code * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */ // Uncomment the following line while debugging, namesrvAddr should be set to your local address // producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); //在调试时取消注释以下行,namesrvAddr 应设置为您的本地地址 producer.setNamesrvAddr (DEFAULT _ NAMESRVADDR) /* * Launch the instance.启动实例。 */ producer.start(); for (int i = 0; i < MESSAGE_COUNT; i++) { try { /* * 创建一个消息实例,指定主题、标签和消息体。 */ Message msg = new Message(TOPIC /* Topic */, TAG /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); /* * 调用发送消息将消息传递给其中一个代理。 */ SendResult sendResult = producer.send(msg); /* * 有不同的方式发送消息,下面举例 * */ System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } /* * 一旦生产者实例不再使用就关闭。 * 这里注释掉,不然启动报bug */ // producer.shutdown(); } }
- 有不同的方式发送消息,如果不关心发送结果,可以使用这种方式
producer.sendOneway(msg);
- 如果希望以同步方式获取发送结果,可以使用此 send 方法
SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult);
- 如果希望以异步方式获取发送结果,可以使用此send方法,里面有成功、异常回调方法
当然有时候一些特别场景需特别对待,而不是你发送一条消息就完事了。我们需要做些东西,如打印日志等,不管成功还是失败。所以异步回调是个好东西,仔细品味。producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // do something } @Override public void onException(Throwable e) { // do something } });
控制台如下效果:
生产者发送消息成功。
1.4、消费者创建并启动
/**
* 此示例演示如何使用提供 DefaultMQPushConsumer 订阅和使用消息。
*/
public class Consumer {
public static final String CONSUMER_GROUP = "please_rename_unique_group_name_4";
public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";
public static final String TOPIC = "TopicTest";
public static void main(String[] args) throws MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
/*
* 指定从哪里开始,以防特定的使用者组是一个全新的使用者组。
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more topic to consume.订阅另一个要消费的主题。
*/
consumer.subscribe(TOPIC, "*");
/*
* 注册回调,以便在从代理获取的消息到达时执行。
*/
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
/*
* 启动消费者实例。
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
控制台效果如下:
消费者消费成功。
可见如果不是SpringBoot项目的话,我们要自己手动去做很多事情。还要自己去封装(谈封装色变哈哈哈)请求模板等,工作量上都有了,太麻烦了。当然这里的例子大家感兴趣的,可以改改然后自己去探究探究。下面我们将介绍SpringBoot相关内容,带我们走入快乐路径:开箱即用。
2、RocketMQ集成到SpringBoot项目入口
2.1、引入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
多了一个rocketmq-spring-boot-starter的jar包,如果你看过我之前写的Spring家族及微服务系列专栏文章,相信你已经知道什么意思了。这也是一个小项目,是阿里巴巴自己提供的,运用SpringBoot的自动装配机制(Spring家族及微服务系列专栏已经详细分析)把RocketMQ客户端集成到里面去的。那么这个项目有什么内容可以帮助我们走入快乐路径,我们只需引入依赖即可开箱即用,也提高了维护性呢?下面我们分析该starter项目:
2.2、自动装配入口RocketMQAutoConfiguration
由于之前已经详细分析了自动装配原理,这里就没有过多分析了。这个配置类内容还是有点多的,我们把它由上到下分析,自动装配也是按照这个顺序来的。
- 配置类上面的注解
@Configuration @EnableConfigurationProperties({RocketMQProperties.class}) @ConditionalOnClass({MQAdmin.class, ObjectMapper.class}) @ConditionalOnProperty( prefix = "rocketmq", value = {"name-server"}, matchIfMissing = true ) @Import({JacksonFallbackConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class}) @AutoConfigureAfter({JacksonAutoConfiguration.class}) public class RocketMQAutoConfiguration { ....此处省略n行代码.... }
@Configuration标明该类是配置类,Spring会识别到;@EnableConfigurationProperties({RocketMQProperties.class})意思是先启动读取application.yml配置维护到RocketMQProperties;@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})意思是在配置前先配置MQAdmin以及ObjectMapper;@ConditionalOnProperty(prefix = "rocketmq",value = {"name-server"}, matchIfMissing = true)意思是配置前该属性需先配置;@Import({XXX})导入需要配置的类,加了@Configuration注解,这些配置类专门配置一些Bean;@AutoConfigureAfter({JacksonAutoConfiguration.class})意思是必须在JacksonAutoConfiguration配置后才配置。注意:配置是有顺序的,当然这些注解有一部分都是SpringBoot自动装配里面扩展Spring的@Conditional注解,为自动装配量身打造的。感兴趣的读者可以仔细研究研究,不过需要谨慎使用。
- 属性及构造方法
@Autowired private Environment environment; public RocketMQAutoConfiguration() { } @PostConstruct public void checkProperties() { String nameServer = (String)this.environment.getProperty("rocketmq.name-server", String.class); log.debug("rocketmq.nameServer = {}", nameServer); if (nameServer == null) { log.warn("The necessary spring property 'rocketmq.name-server' is not defined, all rockertmq beans creation are skipped!"); } }
Environment是Spring里面的一个环境信息组件,这里使用@Autowired从Spring容器注入进来。提供了无参构造方法,在Spring初始化时会通过反射调用实构造方法例化该配置类。@PostConstruct也就是构造方法后置处理,构造方法调用后,Spring就会执行它标注的方法checkProperties(),检查RocketMQ是否在环境里面了,没有打印警告日志。
- 装配生产者
@Bean @ConditionalOnMissingBean({DefaultMQProducer.class}) @ConditionalOnProperty( prefix = "rocketmq", value = {"name-server", "producer.group"} ) public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) { Producer producerConfig = rocketMQProperties.getProducer(); String nameServer = rocketMQProperties.getNameServer(); String groupName = producerConfig.getGroup(); Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); String ak = rocketMQProperties.getProducer().getAccessKey(); String sk = rocketMQProperties.getProducer().getSecretKey(); DefaultMQProducer producer; if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)), rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); } else { producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(), rocketMQProperties.getProducer().getCustomizedTraceTopic()); } producer.setNamesrvAddr(nameServer); if (!StringUtils.isEmpty(accessChannel)) { producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); } producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); return producer; }
@Bean标明需注入到Spring容器,@ConditionalOnProperty意思是先配置了属性,属性有值了。方法内部的配置在下面3分析。这个生产者就是默认生产者,将会通过setter注入到RocketMQ模板中。
- 配置发送消息模板
@Bean( destroyMethod = "destroy" ) @ConditionalOnBean({DefaultMQProducer.class}) @ConditionalOnMissingBean( name = {"rocketMQTemplate"} ) public RocketMQTemplate rocketMQTemplate(DefaultMQProducer mqProducer, ObjectMapper rocketMQMessageObjectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setProducer(mqProducer); rocketMQTemplate.setObjectMapper(rocketMQMessageObjectMapper); return rocketMQTemplate; }
配置RocketMQ模板,专门用来发送消息的,后面时间允许将写一篇详细的文章介绍下。值得注意的是,该Bean的配置指定了一个销毁方法。
- 配置事务管理器注册机
@Bean @ConditionalOnBean( name = {"rocketMQTemplate"} ) @ConditionalOnMissingBean({TransactionHandlerRegistry.class}) public TransactionHandlerRegistry transactionHandlerRegistry(@Qualifier("rocketMQTemplate") RocketMQTemplate template) { return new TransactionHandlerRegistry(template); }
这个Bean的作用顾名思义与事务有关,具体作用同样后面出文章分析。
- 配置事务注解处理器
@Bean( name = {"org.springframework.rocketmq.spring.starter.internalRocketMQTransAnnotationProcessor"} ) @ConditionalOnBean({TransactionHandlerRegistry.class}) @Role(2) public static RocketMQTransactionAnnotationProcessor transactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { return new RocketMQTransactionAnnotationProcessor(transactionHandlerRegistry); }
它接收一个参数,也就是上面5配置的Bean。它的beanName另外指定了,也是跟消息事务有关的。
3、读取application.yml配置(重点)
所有的程序猿都必须知道的,所以在这里给你们划一下重点。
3.1、生产者配置
/**
* rocketMQ在application.yml的配置,没有配置使用默认值
* 但是有些是必须配置的
*
* @author CeaM
* 2022/12/04 19:16
**/
@ConfigurationProperties(
prefix = "rocketmq"
)
public class RocketMQProperties {
/**
* 服务器主机
*/
private String nameServer;
private String accessChannel;
/**
* 生产者,公开静态内部类,有着默认的配置
*/
private RocketMQProperties.Producer producer;
public RocketMQProperties() {
}
....此处省略n行setter/getter方法.....
public static class Producer {
/**
* 分组
*/
private String group;
/**
* 发送消息超时时间,默认3秒
*/
private int sendMessageTimeout = 3000;
private int compressMessageBodyThreshold = 4096;
/**
* 同步发送消息失败重试次数
*/
private int retryTimesWhenSendFailed = 2;
/**
* 异步发送消息失败重试次数
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* 重试使用下一个服务器
*/
private boolean retryNextServer = false;
/**
* 消息大小,默认最大允许为4m
*/
private int maxMessageSize = 4194304;
private String accessKey;
private String secretKey;
/**
* 消息轨迹
*/
private boolean enableMsgTrace = true;
private String customizedTraceTopic = "RMQ_SYS_TRACE_TOPIC";
public Producer() {
}
....此处省略n行setter/getter方法.....
}
}
所有的生产者配置都在这里了,当然不同的RocketMQ版本可能略有不同
3.2、消费者配置
/**
* 消息监听器注解,这里的默认值有点意思,即application.yml中rocketmq配置指定值
*
* @author CeaM
* 2022/12/04 19:20
**/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* 消费组
*
* @return 消费组
*/
String consumerGroup();
/**
* 主题
*
* @return 主题
*/
String topic();
SelectorType selectorType() default SelectorType.TAG;
String selectorExpression() default "*";
/**
* 默认并发消费模式
*
* @return 消费模式
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 默认集群消息模式
*
* @return 消息模式
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 消费最大线程数
*
* @return 最大线程数
*/
int consumeThreadMax() default 64;
/**
* 消费时间默认30秒,应该会调大些吧
*
* @return 消费时长
*/
long consumeTimeout() default 30000L;
String accessKey() default "${rocketmq.consumer.access-key:}";
String secretKey() default "${rocketmq.consumer.secret-key:}";
/**
* 启用消息轨迹
*
* @return boolean
*/
boolean enableMsgTrace() default true;
/**
* 自定义的消息轨迹主题
*
* @return 自定义的消息轨迹主题
*/
String customizedTraceTopic() default "${rocketmq.consumer.customized-trace-topic:}";
String nameServer() default "${rocketmq.name-server:}";
String accessChannel() default "${rocketmq.access-channel:}";
}
消费者配置信息里面同样application.yml没有指定,那么就会使用一些默认值。它的初始化我们下面再分析。
3.3、配置样例
rocketmq:
# RocketMQ服务端主机
name-server: localhost:9876
# 生产者配置
producer:
group: ceam_group
# 消费者配置
consumer:
group: ceam_group
其它的配置上面的属性类已经给出了,感兴趣的读者可以自行去探究探究。注意一下的是,无论是生产者还是消费者,都是在我们的客户端。
4、@Import的具体配置内容
从上面我们已经知道有3个配置类,ExtProducerResetConfiguration属于扩展类这里不分析。
4.1、Jackson相关装配
@Configuration
@ConditionalOnMissingBean({ObjectMapper.class})
class JacksonFallbackConfiguration {
JacksonFallbackConfiguration() {
}
@Bean
public ObjectMapper rocketMQMessageObjectMapper() {
return new ObjectMapper();
}
}
相信大家对Jackson不陌生吧,如果不知道的就得赶紧补一补啦。关键就是ObjectMapper了,而与之功效差不多的是fastjson里面的工具类,不过有bug很多程序猿再闻之恐怕色变吧。
4.2、注册监听器容器以及监听器
- 从Spring容器获取已经配置的监听器
在这里: ApplicationContextAware接口的作用就是织入ApplicationContext容器引用,SmartInitializingSingleton接口的作用是在该配置类实例化后进行一些初始化工作,工作量比较大。从前面的文章SpringBoot整合RocketMQ可知我们的监听器上面加了@Component注解,所以会以组件注入到Spring容器中。所以在afterSingletonsInstantiated()方法中,我们可以通过Spring容器即applicationContext获取已经实例化的监听器,然后遍历监听器map(key是监听器beanName,value是bean)将监听器注册到监听器容器。当然有部分上面已经分析 @Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private static final Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); /** * Spring容器 */ private ConfigurableApplicationContext applicationContext; /** * 原子类 */ private AtomicLong counter = new AtomicLong(0L); private StandardEnvironment environment; /** * RocketMQ配置信息,application.yml没有指定的,则使用默认的 */ private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext)applicationContext; } /** * 实现了Spring的SmartInitializingSingleton接口,所以在单例实例化后执行 */ @Override public void afterSingletonsInstantiated() { // 从Spring上下文applicationContext获取已经实例化的所有监听器 Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); // 预检验以防空指针异常 if (Objects.nonNull(beans)) { // 遍历监听器map注册到容器 beans.forEach(this::registerContainer); } }
- 注册监听器容器到Spring容器并启动
/** * 将监听器注册到监听器容器 * * @param beanName 监听器beanName * @param bean 监听器bean */ private void registerContainer(String beanName, Object bean) { // 通过Spring的AopProxyUtils获取监听器Bean的class对象 Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 判断是否是监听器 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } else { // 获取监听器注解 RocketMQMessageListener annotation = (RocketMQMessageListener)clazz.getAnnotation(RocketMQMessageListener.class); // 校验 this.validate(annotation); // 容器名称 String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), this.counter.incrementAndGet()); // 强制类型转换为GenericApplicationContext上下文 GenericApplicationContext genericApplicationContext = (GenericApplicationContext)this.applicationContext; // 注册到Spring容器 genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> { return this.createRocketMQListenerContainer(containerBeanName, bean, annotation); }, new BeanDefinitionCustomizer[0]); // 获取实例化的默认RocketMQ监听器容器实例 DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer)genericApplicationContext .getBean(containerBeanName, DefaultRocketMQListenerContainer.class); // 监听器容器不是运行状态,则启动 if (!container.isRunning()) { try { container.start(); } catch (Exception var9) { log.error("Started container failed. {}", container, var9); throw new RuntimeException(var9); } } // 监听器注册到监听器容器成功,打印日志 log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName); } }
通过Spring的AopProxyUtils获取监听器Bean的class对象;判断是否是监听器,不是抛异常;如果是从其class反射获取监听器注解,校验注解是否合法非法抛异常;拼接容器名称;强制类型转换为GenericApplicationContext上下文,将监听器容器注册到Spring容器,其中涉及创建对象下面分析;从Spring容器获取实例化的默认RocketMQ监听器容器实例;监听器容器不是运行状态,则启动;监听器注册到监听器容器成功,打印日志
- 创建监听器容器以及将监听器添加进来
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { // 直接创建监听器容器 DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); // 从监听器注解获取RocketMQ服务器 String nameServer = this.environment.resolvePlaceholders(annotation.nameServer()); // 为空从application.yml获取 nameServer = StringUtils.isEmpty(nameServer) ? this.rocketMQProperties.getNameServer() : nameServer; String accessChannel = this.environment.resolvePlaceholders(annotation.accessChannel()); // 将服务器设置到容器 container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } /*将监听器的信息注册到监听器容器*/ // 将主题设置到容器,如: container.setTopic(this.environment.resolvePlaceholders(annotation.topic())); // 将消费分组设置到容器,如: container.setConsumerGroup(this.environment.resolvePlaceholders(annotation.consumerGroup())); // 将监听器注解RocketMQMessageListener设置到容器 container.setRocketMQMessageListener(annotation); // 将监听器bean设置到容器,强制类型转换为RocketMQListener接口类型 container.setRocketMQListener((RocketMQListener)bean); // 将objectMapper设置到容器 container.setObjectMapper(this.objectMapper); // 将监听器beanName设置到容器 container.setName(name); return container; }
直接创建监听器容器,从监听器注解获取RocketMQ服务器,将服务器设置到监听器容器,将监听器的信息注册到监听器容器(具体看上面代码注释)。
5、总结
如果不使用SpringBoot开发得话还是有一些工作量的,而使用了SpringBoot,你只需加上一个starter依赖以及配置就可以做到开箱即用了。配置信息维护到了application.yml,灵活可维护性高。应用了SpringBoot的自动装配机制集成到我们的项目,即RocketMQ客户端。另外配置的重点如下:
- 当RocketMQProperties的Bean被实例化时,@ConfigurationProperties会将对应前缀的后面的属性与Bean对象的属性匹配。符合条件则进行赋值。重点。其中涉及到rocketmq的配置,包括生产者、消费者的配置。重点
- 配置ObjectMapper类使用Jackson解析JSON,重点
- @Import、ApplicationContextAware、SmartInitializingSingleton接口、 @PostConstruct、@Component以及Spring容器等都是重点
- 从Spring容器获取已经注册的监听器,将监听器注册到监听器容器。监听器容器是一种特殊Bean,通过一种特殊的方式注册到Spring容器,并启动监听器容器。