使用入门
- 使用
@RefreshScope
+@Value
,实现动态刷新
@RestController
@RefreshScope
public class TestController {
@Value("${cls.name}")
private String clsName;
}
- 使用
ConfigurationProperties
,通过@Autowired
注入使用
@Data
@ConfigurationProperties(prefix = "redis")
@Component
public class RedisProperties {
private String userName;
private String password;
private String url;
}
源码解析
CacheData
CacheData#checkListenerMd5
,当发现监听到的数据和本地配置不一致,会进行提醒。核心点在于会执行listener.receiveConfigInfo(contentTmp);
。
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
NacosContextRefresher
NacosContextRefresher#onApplicationEvent
,系统启动的时候,会注册监听器。该监听器重写了innerReceive
方法。
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false, true)) {
this.registerNacosListenersForApplications();
}
}
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// todo feature: support single refresh for listening
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
AbstractSharedListener
执行receiveConfigInfo
方法,会调用innerReceive
方法。
public abstract class AbstractSharedListener implements Listener {
private volatile String dataId;
private volatile String group;
public AbstractSharedListener() {
}
public final void fillContext(String dataId, String group) {
this.dataId = dataId;
this.group = group;
}
public final void receiveConfigInfo(String configInfo) {
this.innerReceive(this.dataId, this.group, configInfo);
}
public Executor getExecutor() {
return null;
}
public abstract void innerReceive(String var1, String var2, String var3);
}
RefreshEventListener
RefreshEventListener
监听到RefreshEvent
,会执行this.refresh.refresh();
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
this.handle((ApplicationReadyEvent)event);
} else if (event instanceof RefreshEvent) {
this.handle((RefreshEvent)event);
}
}
public void handle(RefreshEvent event) {
if (this.ready.get()) {
log.debug("Event received " + event.getEventDesc());
Set<String> keys = this.refresh.refresh();
log.info("Refresh keys changed: " + keys);
}
}
ContextRefresher
ContextRefresher#refresh
,会发布EnvironmentChangeEvent
事件。ContextRefresher#addConfigFilesToEnvironment
启动了一个spring applicaiton
程序去加载了一次配置,将变化后的配置重新加载进来了,然后进行this.scope.refreshAll();
public synchronized Set<String> refresh() {
Set<String> keys = refreshEnvironment();
this.scope.refreshAll();
return keys;
}
public synchronized Set<String> refreshEnvironment() {
Map<String, Object> before = extract(
this.context.getEnvironment().getPropertySources());
addConfigFilesToEnvironment();
Set<String> keys = changes(before,
extract(this.context.getEnvironment().getPropertySources())).keySet();
this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
return keys;
}
ConfigurableApplicationContext addConfigFilesToEnvironment() {
ConfigurableApplicationContext capture = null;
try {
StandardEnvironment environment = copyEnvironment(
this.context.getEnvironment());
SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class)
.bannerMode(Mode.OFF).web(WebApplicationType.NONE)
.environment(environment);
// Just the listeners that affect the environment (e.g. excluding logging
// listener because it has side effects)
builder.application()
.setListeners(Arrays.asList(new BootstrapApplicationListener(),
new ConfigFileApplicationListener()));
capture = builder.run();
if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {
environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);
}
MutablePropertySources target = this.context.getEnvironment()
.getPropertySources();
String targetName = null;
for (PropertySource<?> source : environment.getPropertySources()) {
String name = source.getName();
if (target.contains(name)) {
targetName = name;
}
if (!this.standardSources.contains(name)) {
if (target.contains(name)) {
target.replace(name, source);
}
else {
if (targetName != null) {
target.addAfter(targetName, source);
// update targetName to preserve ordering
targetName = name;
}
else {
// targetName was null so we are at the start of the list
target.addFirst(source);
targetName = name;
}
}
}
}
}
finally {
ConfigurableApplicationContext closeable = capture;
while (closeable != null) {
try {
closeable.close();
}
catch (Exception e) {
// Ignore;
}
if (closeable.getParent() instanceof ConfigurableApplicationContext) {
closeable = (ConfigurableApplicationContext) closeable.getParent();
}
else {
break;
}
}
}
return capture;
}
RefreshScope#refreshAll
,会发布RefreshScopeRefreshedEvent
事件。
@ManagedOperation(description = "Dispose of the current instance of all beans "
+ "in this scope and force a refresh on next method execution.")
public void refreshAll() {
super.destroy();
this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
GenericScope#destroy()
,清理GenericScope
缓存。
protected boolean destroy(String name) {
GenericScope.BeanLifecycleWrapper wrapper = this.cache.remove(name);
if (wrapper != null) {
Lock lock = ((ReadWriteLock)this.locks.get(wrapper.getName())).writeLock();
lock.lock();
try {
wrapper.destroy();
} finally {
lock.unlock();
}
this.errors.remove(name);
return true;
} else {
return false;
}
}
获取Bean对象
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {
/**
* @see Scope#proxyMode()
* @return proxy mode
*/
ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;
}
AbstractBeanFactory#doGetBean
,获取Bean。如果该类有@Refresh
注解,获取对应的scope
,执行scope.get
来获取Bean对象。
protected <T> T doGetBean(
String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)
throws BeansException {
String beanName = transformedBeanName(name);
Object bean;
// Eagerly check singleton cache for manually registered singletons.
Object sharedInstance = getSingleton(beanName);
if (sharedInstance != null && args == null) {
//...
}
else {
// ...
try {
// ...
// Create bean instance.
if (mbd.isSingleton()) {
sharedInstance = getSingleton(beanName, () -> {
try {
return createBean(beanName, mbd, args);
}
catch (BeansException ex) {
// Explicitly remove instance from singleton cache: It might have been put there
// eagerly by the creation process, to allow for circular reference resolution.
// Also remove any beans that received a temporary reference to the bean.
destroySingleton(beanName);
throw ex;
}
});
bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}
else if (mbd.isPrototype()) {
// It's a prototype -> create a new instance.
Object prototypeInstance = null;
try {
beforePrototypeCreation(beanName);
prototypeInstance = createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}
else {
String scopeName = mbd.getScope();
if (!StringUtils.hasLength(scopeName)) {
throw new IllegalStateException("No scope name defined for bean ´" + beanName + "'");
}
Scope scope = this.scopes.get(scopeName);
if (scope == null) {
throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
}
try {
Object scopedInstance = scope.get(beanName, () -> {
beforePrototypeCreation(beanName);
try {
return createBean(beanName, mbd, args);
}
finally {
afterPrototypeCreation(beanName);
}
});
bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
}
catch (IllegalStateException ex) {
throw new BeanCreationException(beanName,
"Scope '" + scopeName + "' is not active for the current thread; consider " +
"defining a scoped proxy for this bean if you intend to refer to it from a singleton",
ex);
}
}
}
catch (BeansException ex) {
cleanupAfterBeanCreationFailure(beanName);
throw ex;
}
}
// Check if required type matches the type of the actual bean instance.
if (requiredType != null && !requiredType.isInstance(bean)) {
try {
T convertedBean = getTypeConverter().convertIfNecessary(bean, requiredType);
if (convertedBean == null) {
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
}
return convertedBean;
}
catch (TypeMismatchException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Failed to convert bean '" + name + "' to required type '" +
ClassUtils.getQualifiedName(requiredType) + "'", ex);
}
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
}
}
return (T) bean;
}
GenericScope#get
,获取缓存中的对象
public Object get(String name, ObjectFactory<?> objectFactory) {
GenericScope.BeanLifecycleWrapper value = this.cache.put(name, new GenericScope.BeanLifecycleWrapper(name, objectFactory));
this.locks.putIfAbsent(name, new ReentrantReadWriteLock());
try {
return value.getBean();
} catch (RuntimeException var5) {
this.errors.put(name, var5);
throw var5;
}
}
StandardScopeCache#put
,如果旧值不存在,存放成功,返回当前value值;如果旧值存在,返回旧值。
public Object put(String name, Object value) {
Object result = this.cache.putIfAbsent(name, value);
return result != null ? result : value;
}
GenericScope.BeanLifecycleWrapper#getBean
,调用objectFactory.getObject()
。也就是创建Bean。
public Object getBean() {
if (this.bean == null) {
synchronized(this.name) {
if (this.bean == null) {
this.bean = this.objectFactory.getObject();
}
}
}
return this.bean;
}
ConfigurationPropertiesRebinderAutoConfiguration
ConfigurationPropertiesRebinderAutoConfiguration
注入了ConfigurationPropertiesBeans
和ConfigurationPropertiesRebinder
。
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(ConfigurationPropertiesBindingPostProcessor.class)
public class ConfigurationPropertiesRebinderAutoConfiguration
implements ApplicationContextAware, SmartInitializingSingleton {
private ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.context = applicationContext;
}
@Bean
@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
public static ConfigurationPropertiesBeans configurationPropertiesBeans() {
return new ConfigurationPropertiesBeans();
}
@Bean
@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
public ConfigurationPropertiesRebinder configurationPropertiesRebinder(
ConfigurationPropertiesBeans beans) {
ConfigurationPropertiesRebinder rebinder = new ConfigurationPropertiesRebinder(
beans);
return rebinder;
}
}
ConfigurationPropertiesBeans
实现了BeanPostProcessor
,启动的时候会执行postProcessBeforeInitialization
。寻找带有ConfigurationProperties
注解的Bean对象,封装成ConfigurationPropertiesBean
,存放到ConfigurationPropertiesBeans
的beans
属性中。
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
if (isRefreshScoped(beanName)) {
return bean;
}
ConfigurationPropertiesBean propertiesBean = ConfigurationPropertiesBean
.get(this.applicationContext, bean, beanName);
if (propertiesBean != null) {
this.beans.put(beanName, propertiesBean);
}
return bean;
}
public static ConfigurationPropertiesBean get(ApplicationContext applicationContext, Object bean, String beanName) {
Method factoryMethod = findFactoryMethod(applicationContext, beanName);
return create(beanName, bean, bean.getClass(), factoryMethod);
}
private static ConfigurationPropertiesBean create(String name, Object instance, Class<?> type, Method factory) {
ConfigurationProperties annotation = findAnnotation(instance, type, factory, ConfigurationProperties.class);
if (annotation == null) {
return null;
}
Validated validated = findAnnotation(instance, type, factory, Validated.class);
Annotation[] annotations = (validated != null) ? new Annotation[] { annotation, validated }
: new Annotation[] { annotation };
ResolvableType bindType = (factory != null) ? ResolvableType.forMethodReturnType(factory)
: ResolvableType.forClass(type);
Bindable<Object> bindTarget = Bindable.of(bindType).withAnnotations(annotations);
if (instance != null) {
bindTarget = bindTarget.withExistingValue(instance);
}
return new ConfigurationPropertiesBean(name, instance, annotation, bindTarget);
}
ConfigurationPropertiesRebinder
实现了ApplicationListener<EnvironmentChangeEvent>
,当监听到EnvironmentChangeEvent
,会销毁Bean对象,再重新生成Bean对象。
public void onApplicationEvent(EnvironmentChangeEvent event) {
if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {
this.rebind();
}
}
@ManagedOperation
public void rebind() {
this.errors.clear();
for (String name : this.beans.getBeanNames()) {
rebind(name);
}
}
@ManagedOperation
public boolean rebind(String name) {
if (!this.beans.getBeanNames().contains(name)) {
return false;
}
if (this.applicationContext != null) {
try {
Object bean = this.applicationContext.getBean(name);
if (AopUtils.isAopProxy(bean)) {
bean = ProxyUtils.getTargetObject(bean);
}
if (bean != null) {
// TODO: determine a more general approach to fix this.
// see https://github.com/spring-cloud/spring-cloud-commons/issues/571
if (getNeverRefreshable().contains(bean.getClass().getName())) {
return false; // ignore
}
this.applicationContext.getAutowireCapableBeanFactory()
.destroyBean(bean);
this.applicationContext.getAutowireCapableBeanFactory()
.initializeBean(bean, name);
return true;
}
}
catch (RuntimeException e) {
this.errors.put(name, e);
throw e;
}
catch (Exception e) {
this.errors.put(name, e);
throw new IllegalStateException("Cannot rebind to " + name, e);
}
}
return false;
}
总结一下
- 所有的
RefreshScope
注解修饰的bean都保存在GenericScope
的缓存里,RefreshScope#refreshAll
会将缓存全部清空,当spring获取这些Bean的时候,会重新生成保存到缓存中。 - 有
ConfigurationProperties
注解的Bean在监听到EnvironmentChangeEvent
,会进行销毁和重新初始化的操作。 - 如果想要所有的
@Value
都可以热更新,可以看一下dynamic-config-starter这个项目的实现。