一、前言
在现代的分布式系统中,消息传递已成为一个非常流行的模式。它使得系统内的不同部分可以松耦合地通信,从而实现更高效、更可靠的应用程序。本博客将介绍SpringBoot如何提供简单易用的消息传递机制,并展示如何自定义消息总线以满足特定需求。
二、依赖引入
// gradle 自身需求资源库 放头部
buildscript {
repositories {
maven { url 'https://maven.aliyun.com/repository/public' }// 加载其他Maven仓库
mavenCentral()
}
dependencies {
classpath('org.springframework.boot:spring-boot-gradle-plugin:2.1.1.RELEASE')// 加载插件,用到里面的函数方法
}
}
apply plugin: 'java'
apply plugin: 'idea'
// 使用spring boot 框架
apply plugin: 'org.springframework.boot'
// 使用spring boot的自动依赖管理
apply plugin: 'io.spring.dependency-management'
// 版本信息
group 'com.littledyf'
version '1.0-SNAPSHOT'
// 执行项目中所使用的的资源仓库
repositories {
maven { url 'https://maven.aliyun.com/repository/public' }
mavenCentral()
}
// 项目中需要的依赖
dependencies {
// 添加 jupiter 测试的依赖
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
// 添加 jupiter 测试的依赖
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
// 添加 spring-boot-starter-web 的依赖 必须 排除了security 根据自身需求
implementation('org.springframework.boot:spring-boot-starter-web') {
exclude group: 'org.springframework.security', module: 'spring-security-config'
}
// 添加 spring-boot-starter-test 该依赖对于编译测试是必须的,默认包含编译产品依赖和编译时依赖
testImplementation 'org.springframework.boot:spring-boot-starter-test'
// 添加 junit 测试的依赖
testImplementation group: 'junit', name: 'junit', version: '4.11'
// 添加 lombok
annotationProcessor 'org.projectlombok:lombok:1.18.22' // annotationProcessor代表main下代码的注解执行器
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'// testAnnotationProcessor代表test下代码的注解执行器
compileOnly group: 'org.projectlombok', name: 'lombok', version: '1.18.22' // compile代表编译时使用的lombok
}
test {
useJUnitPlatform()
}
三、代码
定义注册器实现类:
import org.springframework.context.ApplicationContext;
import org.springframework.core.GenericTypeResolver;
import java.util.HashMap;
import java.util.Map;
/**
* @description 注册器
*/
public class Registry {
/**
* Query对象和命令提供者的对应关系
*/
private Map<Class<? extends Query>,QueryProvider> queryProviderMap = new HashMap<>();
/**
* Event对象和命令提供者的对应关系
*/
private Map<Class<? extends Event>,EventProvider> eventProviderMap = new HashMap<>();
public Registry(ApplicationContext applicationContext){
String[] names = applicationContext.getBeanNamesForType(QueryHandler.class);
for (String name : names) {
registerQuery(applicationContext,name);
}
names = applicationContext.getBeanNamesForType(EventHandler.class);
for (String name : names) {
registerEvent(applicationContext,name);
}
}
private void registerQuery(ApplicationContext applicationContext, String name) {
Class<QueryHandler<?,?>> handlerClass = (Class<QueryHandler<?,?>>) applicationContext.getType(name);
Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, QueryHandler.class);
Class<? extends Query> queryType = (Class<? extends Query>) generics[1];
queryProviderMap.put(queryType, new QueryProvider(applicationContext, handlerClass));
}
private void registerEvent(ApplicationContext applicationContext, String name) {
Class<EventHandler<?>> handlerClass = ( Class<EventHandler<?>>) applicationContext.getType(name);
Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, EventHandler.class);
Class<? extends Event> eventType = (Class<? extends Event>) generics[0];
eventProviderMap.put(eventType, new EventProvider(applicationContext, handlerClass));
}
/**
* 获取具体的QueryHandler <R, Q extends Query<R>>定义R Q的类型
* @param queryClass
* @param <R>
* @param <Q>
* @return
*/
<R, Q extends Query<R>> QueryHandler<R,Q> getQuery(Class<Q> queryClass) {
return queryProviderMap.get(queryClass).get();
}
/**
* 获取具体的EventHandler
* @param eventClass
* @return
*/
<E extends Event> EventHandler<E> getEvent(Class<? extends Event> eventClass) {
return eventProviderMap.get(eventClass).get();
}
}
消息总线接口,定义两个方法,一个执行查询,一个执行事件:
/**
* @description 消息总线
*/
public interface Bus {
<R,Q extends Query<R>> R executeQuery(Q query);
<E extends Event> void dispatchEvent(E event);
}
消息总线实现类:
public class SpringBus implements Bus {
private final Registry registry;
public SpringBus(Registry registry) {
this.registry = registry;
}
@Override
public <R, Q extends Query<R>> R executeQuery(Q query) {
QueryHandler<R, Q> queryHandler = (QueryHandler<R, Q>) registry.getQuery(query.getClass());
return queryHandler.handle(query);
}
@Override
public <E extends Event> void dispatchEvent(E event) {
EventHandler<E> eventHandler = (EventHandler<E>) registry.getEvent(event.getClass());
eventHandler.process(event);
}
}
Query接口:
public interface Query<R> {
}
QueryHandler接口:
public interface QueryHandler<R, C extends Query<R>> {
R handle(C query);
}
QueryProvider类:
import org.springframework.context.ApplicationContext;
/**
* query 提供者
* @param <H>
*/
public class QueryProvider<H extends QueryHandler<?, ?>> {
private final ApplicationContext applicationContext;
private final Class<H> type;
QueryProvider(ApplicationContext applicationContext, Class<H> type) {
this.applicationContext = applicationContext;
this.type = type;
}
public H get() {
return applicationContext.getBean(type);
}
}
Event类似,Event接口:
public interface Event {
}
EventHandler接口:
/**
* @description 事件处理器
*/
public interface EventHandler<E extends Event> {
/**
*
* @param event 事件
*/
void process(E event);
}
EventProvider类:
import org.springframework.context.ApplicationContext;
/**
* event 提供者
* @param <H>
*/
public class EventProvider<H extends EventHandler<?>> {
private final ApplicationContext applicationContext;
private final Class<H> type;
EventProvider(ApplicationContext applicationContext, Class<H> type) {
this.applicationContext = applicationContext;
this.type = type;
}
public H get() {
return applicationContext.getBean(type);
}
}
实体类:
import com.littledyf.cqs.Query;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class TestDto implements Serializable, Query<List<TestVo>> {
private String name;
}
import lombok.Data;
@Data
public class TestVo {
private String nameVo;
}
Query具体实现类:
import com.littledyf.cqs.QueryHandler;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
@NoArgsConstructor
public class TestQueryHandler implements QueryHandler<List<TestVo>, TestDto> {
@Override
public List<TestVo> handle(TestDto testDto) {
List<TestVo> testVos = new ArrayList<>();
TestVo testVo = new TestVo();
testVo.setNameVo(testDto.getName());
testVos.add(testVo);
return testVos;
}
}
Controller层:
import com.littledyf.cqs.Bus;
import com.littledyf.cqs.domain.TestDto;
import com.littledyf.cqs.domain.TestVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.List;
@Slf4j
@RestController
@RequestMapping("/my-test/cqs")
public class CqsController {
@Resource
private Bus bus;
@PostMapping(value = "/query-test")
public List<TestVo> queryTest(@RequestBody TestDto testDto) {
return bus.executeQuery(testDto);
}
}
SpringBoot启动类,启动类中进行ApplicationContext的注入:
import com.littledyf.cqs.Bus;
import com.littledyf.cqs.Registry;
import com.littledyf.cqs.SpringBus;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class MyTestApplication {
public static void main(String[] args) {
SpringApplication.run(MyTestApplication.class, args);
}
/**
* 注册器
*/
@Bean
public Registry registry(ApplicationContext applicationContext) {
return new Registry(applicationContext);
}
/**
* 消息总线
*/
@Bean
public Bus commandBus(Registry registry) {
return new SpringBus(registry);
}
}
yml文件配置:
server:
port: 8080
spring:
application:
name: my-test-service
四、测试
这里只要模拟了查询,事件等与查询类似,需要实现具体的接口。整体实现就是在SpringBoot启动时加载注册类,注册类会根据具体的类注入相应的bean,在具体调用时,会根据不同的类实现调用相关的bean。