文章目录
- Java-Chassis
- @EnableServiceComb初始化SCB
- SPIServiceUtils自定义SPI加载器
- 职责链管理器FilterChainsManager/ConsumerHandlerManager
- @RpcSchema
- 注册服务如何保活?
- @RpcReference
- PropertySourcesPlaceholderConfigurer
- ThreadPoolExecutorEx/LinkedBlockingQueueEx
- ConcurrentHashMapEx
- AopProxyUtils/BeanUtils
- ReflectionUtils
- StringValueResolver
Java-Chassis
ServiceComb中是实现RPC的框架类似Dubbo,又称Java底座。
@EnableServiceComb初始化SCB
通过@ImportResource
的方式导入ServiceComb的核心Bean定义文件"classpath*:META-INF/spring/scb-core-bean.xml";和"classpath*:META-INF/spring/*.bean.xml";
,注入如下Bean定义。
-
ConfigurationSpringInitializer
-
CseApplicationListener,ContextRefreshedEvent事件监听器
-
初始化相关的连接
if (this.applicationContext == applicationContext) { // same object. avoid initialize many times. return; } this.applicationContext = applicationContext; BeanUtils.setContext(applicationContext); HttpClients.load(); // Http相关的 RegistrationManager.INSTANCE.init(); // 服务注册 管理初始化,通过SPI获取。 DiscoveryManager.INSTANCE.init(); // 服务发现 管理初始化,通过SPI获取。
-
ContextRefreshedEvent Bean实例化完事件触达,初始化ServiceComb引擎SCBEngine
@Override public void onApplicationEvent(ApplicationEvent event) { if (initEventClass.isInstance(event)) { if (applicationContext instanceof AbstractApplicationContext) { ((AbstractApplicationContext) applicationContext).registerShutdownHook(); } SCBEngine scbEngine = SCBEngine.getInstance(); // 获取单例 //SCBEngine init first, hence we do not need worry that when other beans need use the //producer microserviceMeta, the SCBEngine is not inited. // String serviceName = RegistryUtils.getMicroservice().getServiceName(); // SCBEngine.getInstance().setProducerMicroserviceMeta(new MicroserviceMeta(serviceName).setConsumer(false)); // SCBEngine.getInstance().setProducerProviderManager(applicationContext.getBean(ProducerProviderManager.class)); // SCBEngine.getInstance().setConsumerProviderManager(applicationContext.getBean(ConsumerProviderManager.class)); // SCBEngine.getInstance().setTransportManager(applicationContext.getBean(TransportManager.class)); scbEngine.setApplicationContext(applicationContext); scbEngine.setPriorityPropertyManager(applicationContext.getBean(PriorityPropertyManager.class)); scbEngine.setFilterChainsManager(applicationContext.getBean(FilterChainsManager.class)); // 指责链管理类 scbEngine.getConsumerProviderManager().getConsumerProviderList() .addAll(applicationContext.getBeansOfType(ConsumerProvider.class).values()); // 获取支持的Consumer形式,如Rest/Pojo scbEngine.getProducerProviderManager().getProducerProviderList() .addAll(applicationContext.getBeansOfType(ProducerProvider.class).values()); // // 获取Provider注册的元信息,支持从Rest/Pojo中获取 scbEngine.addBootListeners(applicationContext.getBeansOfType(BootListener.class).values()); // 获取全量SCB初始化监听器BootListener,用于监听SCB在生命周期内的各种事件。 scbEngine.run(); } else if (event instanceof ContextClosedEvent) { if (SCBEngine.getInstance() != null) { SCBEngine.getInstance().destroy(); } } }
-
-
org.apache.servicecomb.core.executor.GroupExecutor
SPIServiceUtils.getOrLoadSortedService(Registration.class)
SPIServiceUtils自定义SPI加载器
ServiceComb大量使用SPI动态扩展对应的实现,SPI定义的使能及优先级,从全部jar包中
META-INF/services的路径文件中加载扩展类,文件名为接口类的全路径里面内容为接口实现。
-
SPIOrder,加载SPI的顺序,排序从小到大,order数值越小,优先级越高,调用越靠前。
-
SPIEnabled,当前SPI是否使能。
-
SPIServiceUtils,代理到JDK自带的ServiceLoader动态加载SPI类,并使用ReflectionUtils用于获取SPIOrder排序对应的实现类。
// 加载SPI全部类 public static <T> List<T> loadSortedService(Class<T> serviceType) { List<Entry<Integer, T>> serviceEntries = new ArrayList<>(); ServiceLoader<T> serviceLoader = ServiceLoader.load(serviceType); // 代理到JDK serviceLoader.forEach(service -> { int serviceOrder = 0; Method getOrder = ReflectionUtils.findMethod(service.getClass(), "getOrder"); // 默认为0,如果获取方法 if (getOrder != null) { serviceOrder = (int) ReflectionUtils.invokeMethod(getOrder, service); // 触发方法 } Entry<Integer, T> entry = new SimpleEntry<>(serviceOrder, service); serviceEntries.add(entry); }); List<T> services = serviceEntries.stream() .sorted(Comparator.comparingInt(Entry::getKey)) .map(Entry::getValue) .collect(Collectors.toList()); // 根据 getOrder 排序,并返回全部的serviceType。 LOGGER.info("Found SPI service {}, count={}.", serviceType.getName(), services.size()); for (int idx = 0; idx < services.size(); idx++) { T service = services.get(idx); LOGGER.info(" {}. {}.", idx, service.getClass().getName()); } // 输出加载的日志及其顺序,方便定位调试功能 return services; } // Map<Class<?>, List<Object>> cache = new ConcurrentHashMap<>(); // loadSortedService 并非线程安全,因此添加类锁 public static <T> List<T> getOrLoadSortedService(Class<T> serviceType) { List<Object> services = cache.get(serviceType); if (services == null) { synchronized (LOCK) { services = cache.get(serviceType); if (services == null) { services = (List<Object>) loadSortedService(serviceType); cache.put(serviceType, services); } } }
SPI加载类的全量日志:
职责链管理器FilterChainsManager/ConsumerHandlerManager
不论在服务的提供方或者在服务的调用方,在触发请求或者响应请求的时候,都会通过职责链的方式顺序调用各种Handler,框架动态从handler.xml文件中读取Handler算子,支持在yaml文件中通过配置文件动态装配,非常灵活。
-
实现了Handler仓库HandlerConfigUtils。PaaSResourceUtils继承Spring ResourceUtils,读取资源文件
private static Config loadConfig() throws Exception { Config config = new Config(); // 1、读取配置文件 classpath* 代表在全量jar包中寻找 // 2、classpath*:config/cse.handler.xml classpath*:config/cse.*.handler.xml // 3、PathMatchingResourcePatternResolver 加载全部的Resource资源路径 // 4、排序资源的路径(xxxx.handler.cse,根据名字xxx进行排序) List<Resource> resList = PaaSResourceUtils.getSortedResources("classpath*:config/cse.handler.xml", ".handler.xml"); for (Resource res : resList) { // 5. 通过XmlMapper读取handler.xml文件映射到Config类中 Config tmpConfig = XmlLoaderUtils.load(res, Config.class); config.mergeFrom(tmpConfig); }
-
Handler算子的编排,使用ConsumerHandlerManager和ProducerHandlerManager读取yaml中的配置文件,通过名字对Handler进行编排,Handler并未实现Order接口,调用顺序交给编排方。
-
Consumer/Provider支持自定义ConsumerHandlerManager/ProducerHandlerManager类生成职责链列表,最后的Consumer Handler-TransportClientHandler.INSTANCE, 最后的Provider Handler–ProducerOperationHandler.INSTANCE
以下动态创建职责链配置:
// 职责链Key,配置在yaml文件中 protected List<Handler> create(String microserviceName) { // 是否定义服务对应的handler编排servicecomb.handler.chain.Provider.service.calculator // 没有则使用默认的 servicecomb.handler.chain.Provider.default String handlerChainKey = "servicecomb.handler.chain." + getName() + ".service." + microserviceName; String chainDef = DynamicPropertyFactory.getInstance() .getStringProperty(handlerChainKey, defaultChainDef) .get(); LOGGER.info("get handler chain for [{}]: [{}]", handlerChainKey, chainDef); return createHandlerChain(chainDef); }
-
BootListener
PojoInvocation保存List<Handler>
职责链,并实现
@RpcSchema
@RpcSchema(schemaId = "hello")
public class HelloImpl implements Hello {
@Override
public String sayHi(String name) {
return "Hello " + name;
}
@Override
public String sayHello(Person person) {
return "Hello person " + person.getName();
}
}
-
注解会被PojoProducers这个BeanPostProcessor处理,用于往注册中心注册服务。
protected void processProvider(String beanName, Object bean) { // 1、aop后,新的实例的父类可能是原class,也可能只是个proxy,父类不是原class,所以,需要先取出原class,再取标注 // 调用 AopProxyUtils.ultimateTargetClass Class<?> beanCls = BeanUtils.getImplClassFromBean(bean); if (beanCls == null) { return; } RpcSchema rpcSchema = beanCls.getAnnotation(RpcSchema.class); if (rpcSchema == null) { return; } // 2、获取schemaId,如果没有传递则获取接口名 String schemaId = rpcSchema.schemaId(); if (StringUtils.isEmpty(schemaId)) { Class<?>[] intfs = beanCls.getInterfaces(); if (intfs.length == 1) { schemaId = intfs[0].getName(); } else { throw new Error("Must be schemaId or implements only one interface"); } } // 3、注册producer元信息 PojoProducerMeta pojoProducerMeta = new PojoProducerMeta(); pojoProducerMeta.setSchemaId(schemaId); pojoProducerMeta.setSchemaInterface(rpcSchema.schemaInterface()); pojoProducerMeta.setInstance(bean); registerPojoProducer(pojoProducerMeta); }
-
在SCB启动的时候,scbEngine.getProducerProviderManager会获取PojoProducerProvider -> 读取PojoProducers中注册的全部元信息。
public List<ProducerMeta> init() { // for some test cases, there is no spring context if (BeanUtils.getContext() == null) { return Collections.emptyList(); } PojoProducers pojoProducers = BeanUtils.getContext().getBean(PojoProducers.class); for (ProducerMeta producerMeta : pojoProducers.getProducerMetas()) { PojoProducerMeta pojoProducerMeta = (PojoProducerMeta) producerMeta; initPojoProducerMeta(pojoProducerMeta); } return pojoProducers.getProducerMetas(); }
-
SCB.Run的时候,会通过获取到的元信息,将其注册到注册中心上对外提供服务。
注册服务如何保活?
@RpcReference
@RpcReference(microserviceName = "hello", schemaId = "hello")
private static Hello hello;
Pojo形式的Rpc消费方的注解,将自动从注册中心拉取对应的服务名,动态代理Compute并注入代理类。
-
注解会被RpcReferenceProcessor这个BeanPostProcessor处理,动态代理增强Hello。十分注意StringValueResolver支持动态解析占位符。
public class RpcReferenceProcessor implements BeanPostProcessor, EmbeddedValueResolverAware { private StringValueResolver resolver; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { // 扫描所有field,处理扩展的field标注 ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() { public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { processConsumerField(bean, field); } }); return bean; } protected void processConsumerField(Object bean, Field field) { RpcReference reference = field.getAnnotation(RpcReference.class); if (reference == null) { return; } handleReferenceField(bean, field, reference); } private void handleReferenceField(Object obj, Field field, RpcReference reference) { String microserviceName = reference.microserviceName(); microserviceName = resolver.resolveStringValue(microserviceName); PojoReferenceMeta pojoReference = new PojoReferenceMeta(); pojoReference.setMicroserviceName(microserviceName); pojoReference.setSchemaId(reference.schemaId()); pojoReference.setConsumerIntf(field.getType()); // proxy = Invoker.createProxy(microserviceName, schemaId, consumerIntf); 动态代理 pojoReference.afterPropertiesSet(); ReflectionUtils.makeAccessible(field); ReflectionUtils.setField(field, obj, pojoReference.getProxy()); } }
PropertySourcesPlaceholderConfigurer
BeanFactoryPostProcessor,用于解析Bean对应的Property。
- @Value注解,会被
AutowiredAnnotationBeanPostProcessor
解析,并替换占位符注入合适的值。 - XML配置如果含有属性注入配置,则
PropertySourcesPlaceholderConfigurer#postProcessBeanFactory
会被替换占位符注入合适的值。
ThreadPoolExecutorEx/LinkedBlockingQueueEx
ThreadPoolExecutorEx
背景:扩展的ThreadPool,在coreSize到达的时候,并发继续增加,会优先创建Thread直到达到maxSize,才会放入BlockQueue,Dubbo和Tomcat都扩展了类似的功能,当BlockQueue的size为无限大的时候,避免Thread永远无法到达maxSize。
实现:默认ThreadPool按照如下逻辑执行Task。
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1、workTask小于coreSize,则直接创建新的Thread执行command。
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2、已达到coreSize,则放入BlockQueue调用offer方法。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 3、BlockQueue放满,则继续添加worker,直到达到最大。
reject(command);
}
如上要点在第二步,只需要修改workQueue的offer逻辑,就可以优先创建Thead,可使用扩展的LinkedBlockingQueueEx。
在offer的时候,获取TheadPool的size,当小于最大的时候,直接创建新的Thead,执行Task。
public class LinkedBlockingQueueEx extends LinkedBlockingQueue<Runnable> {
private transient volatile ThreadPoolExecutorEx owner = null;
public void setOwner(ThreadPoolExecutorEx owner) {
this.owner = owner;
}
@Override
public boolean offer(Runnable runnable) {
// task can come before owner available
if (owner == null) {
return super.offer(runnable);
}
// can not create more thread, just queue the task
if (owner.getPoolSize() == owner.getMaximumPoolSize()) {
return super.offer(runnable);
}
// no need to create more thread, just queue the task
if (owner.getNotFinished() <= owner.getPoolSize()) {
return super.offer(runnable);
}
// all threads are busy, and can create new thread, not queue the task
if (owner.getPoolSize() < owner.getMaximumPoolSize()) {
return false;
}
return super.offer(runnable);
}
/*
* when task is rejected (thread pool if full), force the item onto queue.
*/
public boolean force(Runnable runnable) {
if (owner == null || owner.isShutdown()) {
throw new RejectedExecutionException("queue is not running.");
}
return super.offer(runnable);
}
}
相应的扩展的TheadPool也override部分方法,用于当前提交的任务总数,完成的人数总数,拒绝的任务总数。
public class ThreadPoolExecutorEx extends ThreadPoolExecutor {
private AtomicInteger submittedCount = new AtomicInteger();
private AtomicInteger finishedCount = new AtomicInteger();
private AtomicInteger rejectedCount = new AtomicInteger();
public ThreadPoolExecutorEx(int coreThreads, int maxThreads, int maxIdleInSecond, TimeUnit timeUnit,
BlockingQueue<Runnable> queue, ThreadFactory threadFactory) {
super(coreThreads, maxThreads, maxIdleInSecond, timeUnit, queue, threadFactory);
if (queue instanceof LinkedBlockingQueueEx) {
((LinkedBlockingQueueEx) queue).setOwner(this);
}
setRejectedExecutionHandler(this::rejectedExecution);
}
@Override
public void execute(Runnable command) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException e) {
if (getQueue() instanceof LinkedBlockingQueueEx) {
final LinkedBlockingQueueEx queue = (LinkedBlockingQueueEx) getQueue();
if (!queue.force(command)) {
throw new RejectedExecutionException("thread pool queue is full");
}
} else {
throw e;
}
}
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
rejectedCount.incrementAndGet();
finishedCount.incrementAndGet();
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
finishedCount.incrementAndGet();
}
public int getNotFinished() {
return submittedCount.get() - finishedCount.get();
}
public int getRejectedCount() {
return rejectedCount.get();
}
}
ConcurrentHashMapEx
默认的ConcurrentHashMap当Key都在同一个桶的时候,调用computeIfAbsent的时候,都会对当前的桶加锁(分段锁),当在高并发读多余写的场景,这里的加锁会影响单个桶的性能,因此可以优先通过CAS获取下元素,然后再调用默认方法,扩展性能。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dyFvn88b-1670145579656)(images/image-20221204161851865.png)]
// ConcurrentHashMap.computeIfAbsent always do "synchronized" operation
// so we wrap it to improve performance
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
V value = get(key); // CAS GET
if (value != null) {
return value;
}
return super.computeIfAbsent(key, mappingFunction);
}