spi机制
- spi机制
- a.spi介绍
- b.缓存spi到本地
- c.加载spi并将实例缓存
- d.统一spi加载的配置
spi机制
a.spi介绍
SPI(Service Provider Interface),是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,比如java.sql.Driver接口,其他不同厂商可以针对同一接口做出不同的实现,MySQL和PostgreSQL都有不同的实现提供给用户,而Java的SPI机制可以为某个接口寻找服务实现。Java中SPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。
b.缓存spi到本地
在core模块下的resources
包下,创建META-INF.dcyrpc-services
- 复制Compressor接口的全限定名作为文件名
- 文件内:复制Compressor接口的实现类的全限定名
- 复制LoadBalancer接口的全限定名作为文件名
- 文件内:复制LoadBalancer接口的实现类的全限定名
在core模块下的config.Configuration
配置类的构造器中,写入spi代码:spi发现机制相关配置项
public Configuration() {
// 1.成员变量的默认配置项
// 2.spi发现机制相关配置项
SpiResolver spiResolver = new SpiResolver();
spiResolver.loadFromSpi(this);
// 3.读取xml上的配置信息
XmlResolver xmlResolver = new XmlResolver();
xmlResolver.loadFromXml(this);
// 4.编程配置项,dcyRpcBootstrap提供
}
在core模块的con.dcyrpc.config
包下,创建SpiResolver
类:spi自动发现机制相关配置项
/**
* spi发现机制相关配置项
*/
public class SpiResolver {
/**
* 通过spi方式加载配置项
* @param configuration
*/
public void loadFromSpi(Configuration configuration) {
LoadBalancer loadBalancer = SpiHandler.get(LoadBalancer.class);
if (loadBalancer != null) {
configuration.setLoadBalancer(loadBalancer);
}
Compressor compressor = SpiHandler.get(Compressor.class);
if (compressor != null) {
configuration.setCompressor(compressor);
}
Serializer serializer = SpiHandler.get(Serializer.class);
if (serializer != null) {
configuration.setSerializer(serializer);
}
}
}
在core模块的con.dcyrpc
包下,创建SpiHandler
类:缓存spi内容
-
设置Map:将spi的配置信息(原始内容)缓存到map中
-
设置静态代码块:加载当前类后将spi信息进行保存,逼迫运行时频繁执行IO
@Slf4j
public class SpiHandler {
// 定义basePath
private static final String BASE_PATH = "META-INF/dcyrpc-services";
// 定义缓存:保存spi相关的原始内容
private static final Map<String, List<String>> SPI_CONTENT = new ConcurrentHashMap<>(8);
// 加载当前类后将spi信息进行保存,逼迫运行时频繁执行IO
static {
// 加载当前工程和jar包中classPath中的资源
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
URL fileUrl = classLoader.getResource(BASE_PATH);
if (fileUrl != null) {
File file = new File(fileUrl.getPath());
File[] listFiles = file.listFiles();
if (listFiles != null && listFiles.length > 0) {
for (File listFile : listFiles) {
String key = listFile.getName();
List<String> value = getImplNames(listFile);
SPI_CONTENT.put(key, value);
}
}
}
}
private static List<String> getImplNames(File file) {
try (
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader)
){
List<String> implNameList = new ArrayList<>();
while (true) {
String line = bufferedReader.readLine();
if (line == null || line.equals("")) {
break;
} else {
implNameList.add(line);
}
}
return implNameList;
} catch (IOException e) {
log.error("读取spi文件时发生异常", e);
}
return null;
}
public static <T> T get(Class<T> clazz) {
}
}
c.加载spi并将实例缓存
在core模块的con.dcyrpc
包下的SpiHandler
类:完善get()
方法:获取一个实现
- 设置Map:每一个接口对应的实现的实例
- 1.先查找缓存 2.未命中则构建缓存 3.通过clazz获取与之匹配的实现名称(反射) 4.实例化所有的实现 5.放入缓存
创建getList()
方法,与get()
方法类似:获取所有和当前服务相关的实例
// 略...
// 缓存:每一个接口对应的实现的实例
private static final Map<Class<?>, List<Object>> SPI_IMPLEMENT = new ConcurrentHashMap<>(32);
// 略...
/**
* 获取一个实现
* @param clazz
* @return 实现类的实例集合
* @param <T>
*/
public static <T> T get(Class<T> clazz) {
// 1.先查找缓存
List<Object> implList = SPI_IMPLEMENT.get(clazz);
if (implList != null && implList.size() > 0) {
return (T) implList.get(0);
}
// 2.构建缓存
buildCache(clazz);
// 3.再次尝试获取第一个
List<Object> result = SPI_IMPLEMENT.get(clazz);
if (result == null || result.size() == 0) {
return null;
}
return (T) result.get(0);
}
/**
* 获取所有和当前服务相关的实例
* @param clazz
* @return
* @param <T>
*/
public static <T> List<T> getList(Class<T> clazz) {
// 1.先查找缓存
List<T> implList = (List<T>) SPI_IMPLEMENT.get(clazz);
if (implList != null && implList.size() > 0) {
return implList;
}
// 2.构建缓存
buildCache(clazz);
return (List<T>) SPI_IMPLEMENT.get(clazz);
}
/**
* 构建clazz相关的缓存缓存
* @param clazz
* @return
*/
private static void buildCache(Class<?> clazz) {
// 1.通过clazz获取与之匹配的实现名称
String name = clazz.getName();
List<String> implNameList = SPI_CONTENT.get(name);
if (implNameList == null && implNameList.size() == 0) {
return;
}
// 2.实例化所有的实现
List<Object> impls = new ArrayList<>();
for (String implName : implNameList) {
try {
Class<?> aClass = Class.forName(implName);
Object instance = aClass.getConstructor().newInstance();
impls.add(instance);
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e) {
log.error("实例化【{}】的实现时发生异常", implName, e);
}
}
// 3.放入缓存
SPI_IMPLEMENT.put(clazz, impls);
}
d.统一spi加载的配置
当完成加载spi后会发现,导入的配置无法得到工厂的统一管理。所以需要修改代码,让工厂对配置进行统一的管理
删除compress
包下的CompressWrapper
删除Serialize
包下的SerializerWrapper
在core模块下的com.dcyrpc.compress.impl
包下的CompressorFactory
类进行修改
@Slf4j
public class SerializerFactory {
private final static Map<String, ObjectWrapper<Serializer>> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);
private final static Map<Byte, ObjectWrapper<Serializer>> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);
static {
ObjectWrapper<Serializer> jdk = new ObjectWrapper<>((byte) 1, "jdk", new JdkSerializer());
ObjectWrapper<Serializer> json = new ObjectWrapper<>((byte) 2, "json", new JsonSerializer());
ObjectWrapper<Serializer> hessian = new ObjectWrapper<>((byte) 3, "hessian", new HessianSerializer());
SERIALIZER_CACHE.put("jdk", jdk);
SERIALIZER_CACHE.put("json", json);
SERIALIZER_CACHE.put("hessian", hessian);
SERIALIZER_CACHE_CODE.put((byte) 1, jdk);
SERIALIZER_CACHE_CODE.put((byte) 2, json);
SERIALIZER_CACHE_CODE.put((byte) 3, hessian);
}
/**
* 使用工厂方法获取一个SerializerWrapper
* @param serializeType 序列化的类型
* @return
*/
public static ObjectWrapper<Serializer> getSerializer(String serializeType) {
ObjectWrapper<Serializer> serializerObjectWrapper = SERIALIZER_CACHE.get(serializeType);
if (serializerObjectWrapper == null) {
log.error("未找到配置的序列化方式【{}】,将采用默认的jdk压缩", serializeType);
return SERIALIZER_CACHE.get("jdk");
}
return serializerObjectWrapper;
}
public static ObjectWrapper<Serializer> getSerializer(byte serializeCode) {
ObjectWrapper<Serializer> serializerObjectWrapper = SERIALIZER_CACHE_CODE.get(serializeCode);
if (serializerObjectWrapper == null) {
log.error("未找到配置的序列化方式【{}】,将采用默认的jdk压缩", serializeCode);
return SERIALIZER_CACHE_CODE.get((byte)1);
}
return serializerObjectWrapper;
}
/**
* 添加序列化策略
* @param serializerWrapper
*/
public static void addCompressor(ObjectWrapper<Serializer> serializerWrapper) {
SERIALIZER_CACHE.put(serializerWrapper.getType(), serializerWrapper);
SERIALIZER_CACHE_CODE.put(serializerWrapper.getCode(), serializerWrapper);
}
}
在core模块下的com.dcyrpc.serializer.impl
包下的SerializerFactory
类进行修改
@Slf4j
public class CompressorFactory {
private final static Map<String, ObjectWrapper<Compressor>> COMPRESSOR_CACHE = new ConcurrentHashMap<>(8);
private final static Map<Byte, ObjectWrapper<Compressor>> COMPRESSOR_CODE_CACHE = new ConcurrentHashMap<>(8);
static {
ObjectWrapper<Compressor> gzip = new ObjectWrapper<>((byte) 1, "gzip", new GzipCompressor());
COMPRESSOR_CACHE.put("gzip", gzip);
COMPRESSOR_CODE_CACHE.put((byte) 1, gzip);
}
/**
* 使用工厂方法获取一个CompressWrapper
* @param compressorType 压缩的类型
* @return
*/
public static ObjectWrapper<Compressor> getCompressor(String compressorType) {
ObjectWrapper<Compressor> compressorObjectWrapper = COMPRESSOR_CACHE.get(compressorType);
if (compressorObjectWrapper == null) {
log.error("未找到配置的压缩【{}】,将采用默认的gzip压缩", compressorObjectWrapper);
return COMPRESSOR_CACHE.get("gzip");
}
return compressorObjectWrapper;
}
public static ObjectWrapper<Compressor> getCompressor(byte compressorCode) {
ObjectWrapper<Compressor> compressorObjectWrapper = COMPRESSOR_CODE_CACHE.get(compressorCode);
if (compressorObjectWrapper == null) {
log.error("未找到配置的压缩【{}】,将采用默认的gzip压缩", compressorCode);
return COMPRESSOR_CACHE.get("gzip");
}
return compressorObjectWrapper;
}
/**
* 添加压缩策略
* @param compressor
*/
public static void addCompressor(ObjectWrapper<Compressor> compressor) {
COMPRESSOR_CACHE.put(compressor.getType(), compressor);
COMPRESSOR_CODE_CACHE.put(compressor.getCode(), compressor);
}
}
在core模块下的resources
包下,修改META-INF.dcyrpc-services
包类的文件:修改文件内容,code-type-name
- 如:1-Jdk-com.dcyrpc.serialize.impl.JdkSerializer
- 如:1-ConsistentHash-com.dcyrpc.loadbalancer.impl.ConsistentHashBalancer
- 如:1-gzip-com.dcyrpc.compress.impl.GzipCompressor
在core模块下的com.dcyrpc.config
包下的,创建ObjectWrapper
类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ObjectWrapper<T> {
private Byte code;
private String type;
private T impl;
}
修改config.SpiResolver
类
- 将原有的
SPI_IMPLEMENT
的map的value修改成 List<ObjectWrapper<?>>- 会有大量的报错在本类
- 修改
get()
- 修改
getList()
- 修改
buildCache()
private static final Map<Class<?>, List<ObjectWrapper<?>>> SPI_IMPLEMENT = new ConcurrentHashMap<>(32);
public synchronized static <T> ObjectWrapper<T> get(Class<T> clazz) {
// 1.先查找缓存
List<ObjectWrapper<?>> implList = SPI_IMPLEMENT.get(clazz);
if (implList != null && implList.size() > 0) {
return (ObjectWrapper<T>) implList.get(0);
}
// 2.构建缓存
buildCache(clazz);
// 3.再次尝试获取第一个
List<ObjectWrapper<?>> result = SPI_IMPLEMENT.get(clazz);
if (result == null || result.size() == 0) {
return null;
}
return (ObjectWrapper<T>) result.get(0);
}
public synchronized static <T> List<ObjectWrapper<T>> getList(Class<T> clazz) {
// 1.先查找缓存
List<ObjectWrapper<?>> implList = SPI_IMPLEMENT.get(clazz);
if (implList != null && implList.size() > 0) {
return implList.stream().map(wrapper -> (ObjectWrapper<T>) wrapper).collect(Collectors.toList());
}
// 2.构建缓存
buildCache(clazz);
implList = SPI_IMPLEMENT.get(clazz);
if (implList != null && implList.size() > 0) {
return implList.stream().map(wrapper -> (ObjectWrapper<T>) wrapper).collect(Collectors.toList());
}
return new ArrayList<>();
}
private static void buildCache(Class<?> clazz) {
// 1.通过clazz获取与之匹配的实现名称
String name = clazz.getName();
List<String> implNameList = SPI_CONTENT.get(name);
if (implNameList == null && implNameList.size() == 0) {
return;
}
// 2.实例化所有的实现
List<ObjectWrapper<?>> impls = new ArrayList<>();
for (String implName : implNameList) {
try {
// 进行分割
String[] codeAndTypeAndImpl = implName.split("-");
if (codeAndTypeAndImpl.length != 3) {
throw new SpiException("配置的spi文件不合法");
}
Byte code = Byte.valueOf(codeAndTypeAndImpl[0]);
String type = codeAndTypeAndImpl[1];
String implementName = codeAndTypeAndImpl[2];
Class<?> aClass = Class.forName(implementName);
Object instance = aClass.getConstructor().newInstance();
ObjectWrapper<?> objectWrapper = new ObjectWrapper<>(code, type, instance);
impls.add(objectWrapper);
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e) {
log.error("实例化【{}】的实现时发生异常", implName, e);
}
}
// 3.放入缓存
SPI_IMPLEMENT.put(clazz, impls);
}
在common模块下的exceptions
包中,创建SpiException
类
public class SpiException extends RuntimeException{
public SpiException() {
super();
}
public SpiException(String message) {
super(message);
}
public SpiException(String message, Throwable cause) {
super(message, cause);
}
}
修改config.SpiResolver
类的loadFromSpi()
方法
public void loadFromSpi(Configuration configuration) {
List<ObjectWrapper<LoadBalancer>> loadBalancerWrappers = SpiHandler.getList(LoadBalancer.class);
if (loadBalancerWrappers != null && loadBalancerWrappers.size() > 0) {
configuration.setLoadBalancer(loadBalancerWrappers.get(0).getImpl());
}
List<ObjectWrapper<Compressor>> compressorWrappers = SpiHandler.getList(Compressor.class);
if (compressorWrappers != null) {
compressorWrappers.forEach(CompressorFactory::addCompressor);
}
List<ObjectWrapper<Serializer>> serializerWrappers = SpiHandler.getList(Serializer.class);
if (serializerWrappers != null) {
serializerWrappers.forEach(SerializerFactory::addCompressor);
}
}
修改core模块下的resources
文件夹的 dcyrpc.xml
文件:修改部分内容
- provider-demo模块下的
resources
文件夹的dcyrpc.xml
文件 也是修改一样的内容
<!--压缩方式(二选一)-->
<compressType type="gzip"/>
<compressor code="1" type="gzip" class="com.dcyrpc.compress.impl.GzipCompressor"/>
<!--序列化配置(二选一)-->
<serializeType type="hessian"/>
<serializr code="3" type="Hessian" class="com.dcyrpc.serialize.impl.HessianSerializer" />
修改config.XmlResolver
类:修改部分内容
ObjectWrapper<Compressor> compressorObjectWrapper = resolveCompressCompressor(xPath, doc);
CompressorFactory.addCompressor(compressorObjectWrapper);
ObjectWrapper<Serializer> serializerObjectWrapper = resolveSerializer(xPath, doc);
SerializerFactory.addCompressor(serializerObjectWrapper);
private ObjectWrapper<Compressor> resolveCompressCompressor(XPath xPath, Document doc) {
String expression = "/configuration/compressor";
Compressor compressor = parseObject(xPath, doc, expression, null);
Byte code = Byte.valueOf(Objects.requireNonNull(parseString(xPath, doc, expression, "code")));
String type = parseString(xPath, doc, expression, "type");
return new ObjectWrapper<>(code, type, compressor);
}
private ObjectWrapper<Serializer> resolveSerializer(XPath xPath, Document doc) {
String expression = "/configuration/serializr";
Serializer serializer = parseObject(xPath, doc, expression, null);
Byte code = Byte.valueOf(Objects.requireNonNull(parseString(xPath, doc, expression, "code")));
String type = parseString(xPath, doc, expression, "type");
return new ObjectWrapper<>(code, type, serializer);
}
修改config.Configuration
类:删除以下代码
private ProtocolConfig protocolConfig = new ProtocolConfig("jdk");
private Serializer serializer = new JdkSerializer();
private Compressor compressor = new GzipCompressor();
修改DcyRpcBootstrap
类:删除以下代码
public DcyRpcBootstrap protocol(ProtocolConfig protocolConfig) {
configuration.setProtocolConfig(protocolConfig);
log.info("当前工程使用了:{}协议进行序列化", protocolConfig.toString());
return this;
}
修改provider模块下的启动类:替换代码
- 将
.protocol(new ProtocolConfig("jdk"))
替换为以下代码
.serialize("jdk")