如何创造一个属于自己的springboot stater
- 什么是stater
- stater是怎么实现注入进来的
- 如何进行约定
- 基于上述理论的demo
- 实现功能
- 代码目录
- 核心实现
- spring.factories
- SpringMessageSubscribe(扫描所有@Subscribe注解生成消息订阅)
- 基于Redis的消息订阅
- 基于redis的消息发布
- 针对多个相同topic的订阅者进行消息多播
什么是stater
stater是一种特殊的spring boot 工程,它实现了一些共同性的功能,使你可以依赖过来直接使用,又在配置上做出了一些默认的约定,是你不需要进行复杂的配置。
stater是怎么实现注入进来的
有spring boot基础知识的同学都知道,当boot工程启动时默认是扫描本包及其子孙包下的class,看其是否被注解,是否要纳入spring 容器进行管理,那么stater的包名肯定是和你的业务包名不一致的那么他是怎么实现上述配置的呢,也就是spring boot 约定大于配置的原理是什么呢?
秘密是在于spring.factories文件。spring容器启动时会读取META-INF/spring.factories文件内容,将其中的类进行实例化并放入容器进行管理。下面是一个例子
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.soft863.stream.core.eventbus.EventBus,\
com.soft863.stream.core.eventbus.subscribe.SpringMessageSubscribe,\
com.soft863.stream.core.eventbus.config.RedisConfig,\
com.soft863.stream.core.eventbus.subscribe.redis.RedisMessageBroker,\
com.soft863.stream.core.eventbus.publish.redis.RedisMessagePublisher
按照例子中所写,及时stater中的这些类并不在启动工程的application下面,也是可以扫面到进行实例化管理的。
如何进行约定
在一些共通性的功能中经常会出现不一样的分支处理,比如数据库可能有mysql,oracle等区别,比如我想实现一个MQ消息通信那么也可能有rabbitmq rocketmq这些不通的实现,那么我怎么能根据用户的配置进行不同的实例化,或者说我使用一个默认的功能实现呢,那么就需要用一个很重要的注解@ConditionalOnProperty
举一个例子,我想做一个消息总线,其中通信组件我有几个选型比如redis的stream,rabbitmq rocketmq。基于轻量化的考虑,我想让其默认为redis实现,那么就可以这样写
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
这一块代码的意思如下
- prefix 指的是yaml文件中配置前缀
- name 则指的是配置的名称
- havingValue 指name这个配置的值是否包含该值
- matchIfMissing 如果没有name这个配置,这个注解是否生效
综合下来就是查看配置文件中是否有stream.broker.type这个参数,如果有看他是否是redis如果是redis则实例化这个bean,如果没有也同样实例化(当然了这个类必须被
@Component所注解),只有存在这个参数且不是redis的时候才不实例化。这样就可以实现默认redis的功能了。
具有同样功能的注解还有以下,可以根据名称大致了解其中的含义
基于上述理论的demo
实现功能
默认基于redis stream进行发布订阅模式的事件处理消息总线。使用过程中只需在对应方法上加上@Subscribe注解即可订阅消息,不需要自己写其他的处理代码
代码目录
核心实现
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.soft863.stream.core.eventbus.EventBus,\
com.soft863.stream.core.eventbus.subscribe.SpringMessageSubscribe,\
com.soft863.stream.core.eventbus.config.RedisConfig,\
com.soft863.stream.core.eventbus.subscribe.redis.RedisMessageBroker,\
com.soft863.stream.core.eventbus.publish.redis.RedisMessagePublisher
SpringMessageSubscribe(扫描所有@Subscribe注解生成消息订阅)
package com.soft863.stream.core.eventbus.subscribe;
import com.soft863.stream.core.eventbus.EventBus;
import com.soft863.stream.core.annotation.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
@Component
@Slf4j
public class SpringMessageSubscribe implements BeanPostProcessor {
private final EventBus eventBus;
private final MessageAdapterCrater messageAdapterCrater = new DefaultMessageAdapterCrater();
public SpringMessageSubscribe(EventBus eventBus) {
this.eventBus = eventBus;
}
/**
* 为注解添加监听处理
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> type = ClassUtils.getUserClass(bean);
ReflectionUtils.doWithMethods(type, method -> {
// 取得所有订阅方法
AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
if (CollectionUtils.isEmpty(subscribes)) {
return;
}
// 生成订阅Adapter
MessageAdapter sub = messageAdapterCrater.createMessageAdapter(type, method, subscribes.getString("topic"));
// 注册订阅Adapter
eventBus.addAdapter(sub, subscribes.getString("topic"));
});
return bean;
}
}
基于Redis的消息订阅
package com.soft863.stream.core.eventbus.subscribe.redis;
import com.soft863.stream.core.eventbus.EventBus;
import com.soft863.stream.core.eventbus.subscribe.EventMessageBroker;
import com.soft863.stream.core.eventbus.message.AdapterMessage;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Collections;
/**
* 消息总线-集群间消息redis实现
*
* 实现规则
* 根据不同的MQ机制实现消息的分组消费,将原始消息转化为AdapterMessage,然后使用继承于EventMessageBroker进行消息多播
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessageBroker extends EventMessageBroker implements StreamListener<String, MapRecord<String, String, String>> {
@Value("${stream.topic:/stream}")
String topic;
@Value("${stream.consumer.id}")
String consumerId;
@Value("${stream.timeout:10}")
Integer timeout;
@Value("${stream.consumer.group}")
String groupName = "stream.redis";
private final RedisTemplate<String, String> streamRedisTemplate;
private final ApplicationContext applicationContext;
private final EventBus eventBus;
public RedisMessageBroker(RedisTemplate<String, String> streamRedisTemplate, ApplicationContext applicationContext, EventBus eventBus) {
this.streamRedisTemplate = streamRedisTemplate;
this.applicationContext = applicationContext;
this.eventBus = eventBus;
}
@SneakyThrows
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("消息内容-->{}", message.getValue());
StreamOperations<String, String, String> streamOperations = streamRedisTemplate.opsForStream();
// 服务内消息多播
AdapterMessage adapterMessage = new AdapterMessage();
adapterMessage.setTopic(message.getValue().get("topic"));
adapterMessage.setPayload(message.getValue().get("payload"));
try {
this.multicastEvent(adapterMessage);
} catch (Exception e) {
log.info("消息多播失败:" + e.getLocalizedMessage());
}
//消息应答
streamOperations.acknowledge(topic, groupName, message.getId());
}
@Bean
public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> emailListenerContainerOptions() {
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
//block读取超时时间
.pollTimeout(Duration.ofSeconds(timeout))
//count 数量(一次只获取一条消息)
.batchSize(1)
//序列化规则
.serializer(stringRedisSerializer)
.build();
}
/**
* 开启监听器接收消息
*/
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> emailListenerContainer(RedisConnectionFactory factory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions) {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory,
streamMessageListenerContainerOptions);
//如果 流不存在 创建 stream 流
if (!streamRedisTemplate.hasKey(topic)) {
streamRedisTemplate.opsForStream().add(topic, Collections.singletonMap("", ""));
log.info("初始化集群间通信Topic{} success", topic);
}
//创建消费者组
try {
streamRedisTemplate.opsForStream().createGroup(topic, groupName);
} catch (Exception e) {
log.info("消费者组 {} 已存在", groupName);
}
//注册消费者 消费者名称,从哪条消息开始消费,消费者类
// > 表示没消费过的消息
// $ 表示最新的消息
listenerContainer.receive(
Consumer.from(groupName, consumerId),
StreamOffset.create(topic, ReadOffset.lastConsumed()),
this
);
listenerContainer.start();
return listenerContainer;
}
@Override
public void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException {
super.doMulticastEvent(message, eventBus.getSubscribesPool(), applicationContext);
}
}
基于redis的消息发布
package com.soft863.stream.core.eventbus.publish.redis;
import com.soft863.stream.core.eventbus.message.AdapterMessage;
import com.soft863.stream.core.eventbus.publish.MessagePublisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* 消息总线发布Redis实现
*/
@Component
@ConditionalOnProperty(prefix = "stream.broker", name = "type", havingValue = "redis", matchIfMissing = true)
public class RedisMessagePublisher implements MessagePublisher {
@Value("${stream.topic:/stream}")
String topic;
private final RedisConnectionFactory connectionFactory;
public RedisMessagePublisher(RedisConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public Boolean publish(AdapterMessage message) {
Map value = new HashMap();
value.put("topic".getBytes(), message.getTopic().getBytes());
value.put("payload".getBytes(), message.getPayload().getBytes());
ByteRecord byteRecord = StreamRecords.rawBytes(value).withStreamKey(topic.getBytes());
// 刚追加记录的记录ID
RecordId recordId = connectionFactory.getConnection().xAdd(byteRecord);
return true;
}
}
针对多个相同topic的订阅者进行消息多播
package com.soft863.stream.core.eventbus.subscribe;
import com.alibaba.fastjson.JSON;
import com.soft863.stream.core.eventbus.message.AdapterMessage;
import com.soft863.stream.core.util.TopicMatcher;
import com.soft863.stream.core.util.TopicUtil;
import org.springframework.context.ApplicationContext;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Map;
public abstract class EventMessageBroker {
/**
* 消息多播
*
* 将集群间消息在自己服务内部广播,是的所有订阅消息都可以收到
*
* @param message
* @return
*/
public abstract void multicastEvent(AdapterMessage message) throws IllegalAccessException, InstantiationException, InvocationTargetException, IOException;
public void doMulticastEvent(AdapterMessage message, Map<TopicMatcher, Map<String, MessageAdapter>> subscribesPool, ApplicationContext applicationContext) throws InvocationTargetException, IllegalAccessException, InstantiationException, IOException {
// 查找相匹配的Adapter
List<MessageAdapter> adapterList = TopicUtil.getAllMatchedAdapter(message.getTopic(), subscribesPool);
for (MessageAdapter adapter : adapterList) {
if (adapter != null && adapter.isActive()) {
// 手动订阅消息
if (adapter.getCustomer() != null) {
adapter.getCustomer().consume(message);
} else {
// 取得对象
Object instance = applicationContext.getBean(adapter.getClazz());
if (instance != null) {
// 将消息转化为所需要的类型
if (adapter.getMethod().getParameterCount() > 0) {
Class<?> param = adapter.getMethod().getParameterTypes()[0];
adapter.getMethod().invoke(instance, JSON.parseObject(message.getPayload(), param));
} else {
adapter.getMethod().invoke(instance);
}
}
}
}
}
}
}