项目地址
项目地址 https://toscode.gitee.com/nepxion/Aquarius
项目介绍
Nepxion Aquarius是一款基于Redis + Zookeeper的分布式应用组件集合,包含分布式锁,缓存,ID生成器,限速限流器。它采用Nepxion Matrix AOP框架进行切面架构,提供注解调用方式,也提供API调用方式
分布式缓存
Maven依赖
引入依赖
分布式缓存
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>aquarius-cache-starter</artifactId>
<version>${aquarius.version}</version>
</dependency>
源码解析
aquarius-cache-starter
项目下spring.factories
配置,Nepxion
自己实现了自动配置。
com.nepxion.aquarius.cache.annotation.EnableCache=\
com.nepxion.aquarius.cache.configuration.CacheConfiguration
EnableCache
注入了CacheImportSelector
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(CacheImportSelector.class)
public @interface EnableCache {
}
CacheImportSelector
,当isEnabled
方法返回true,会注入spring.factories
键是EnableCache
对应的类,即CacheConfiguration
。
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class CacheImportSelector extends AbstractImportSelector<EnableCache> {
@Override
protected boolean isEnabled() {
return new RelaxedPropertyResolver(getEnvironment()).getProperty(CacheConstant.CACHE_ENABLED, Boolean.class, Boolean.TRUE);
}
}
CacheConfiguration
注入了CacheAopConfiguration
和RedisCacheConfiguration
。
@Configuration
@Import({ AquariusConfiguration.class, CacheAopConfiguration.class, RedisCacheConfiguration.class })
public class CacheConfiguration {
}
RedisCacheConfiguration
会根据条件生成CacheDelegate
和RedisHandler
@Configuration
@Import({ RedisConfiguration.class })
public class RedisCacheConfiguration {
@Bean
@Conditional(RedisCacheCondition.class)
public CacheDelegate redisCacheDelegate() {
return new RedisCacheDelegateImpl();
}
@Bean
@Conditional(RedisCacheCondition.class)
@ConditionalOnMissingBean
public RedisHandler redisHandler() {
return new RedisHandlerImpl();
}
}
RedisCacheCondition
public class RedisCacheCondition extends AquariusCondition {
public RedisCacheCondition() {
super(CacheConstant.CACHE_TYPE, CacheConstant.CACHE_TYPE_REDIS);
}
}
AquariusCondition
会根据设置的键值对匹配环境变量,匹配条件为真。
public class AquariusCondition implements Condition {
private String key;
private String value;
public AquariusCondition(String key, String value) {
this.key = key;
this.value = value;
}
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String beanName = context.getEnvironment().getProperty(key);
return StringUtils.equals(beanName, value);
}
}
CacheAopConfiguration
会注入CacheAutoScanProxy
和CacheInterceptor
。
@Configuration
public class CacheAopConfiguration {
@Value("${" + CacheConstant.CACHE_SCAN_PACKAGES + ":}")
private String scanPackages;
@Bean
public CacheAutoScanProxy cacheAutoScanProxy() {
return new CacheAutoScanProxy(scanPackages);
}
@Bean
public CacheInterceptor cacheInterceptor() {
return new CacheInterceptor();
}
}
CacheAutoScanProxy
根据方法匹配,拦截 Cacheable.class, CachePut.class, CacheEvict.class
注解标注的方法,用cacheInterceptor
来获取拦截器。
public class CacheAutoScanProxy extends DefaultAutoScanProxy {
private static final long serialVersionUID = 5099476398968133135L;
private String[] commonInterceptorNames;
@SuppressWarnings("rawtypes")
private Class[] methodAnnotations;
public CacheAutoScanProxy(String scanPackages) {
super(scanPackages, ProxyMode.BY_METHOD_ANNOTATION_ONLY, ScanMode.FOR_METHOD_ANNOTATION_ONLY);
}
@Override
protected String[] getCommonInterceptorNames() {
if (commonInterceptorNames == null) {
commonInterceptorNames = new String[] { "cacheInterceptor" };
}
return commonInterceptorNames;
}
@SuppressWarnings("unchecked")
@Override
protected Class<? extends Annotation>[] getMethodAnnotations() {
if (methodAnnotations == null) {
methodAnnotations = new Class[] { Cacheable.class, CachePut.class, CacheEvict.class };
}
return methodAnnotations;
}
}
CacheInterceptor
获取对应的缓存注解,执行对应的方法。如果局部变量没配置,取全局变量值。
public class CacheInterceptor extends AbstractInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(CacheInterceptor.class);
@Autowired
private CacheDelegate cacheDelegate;
@Value("${" + AquariusConstant.PREFIX + "}")
private String prefix;
@Value("${" + AquariusConstant.FREQUENT_LOG_PRINT + ":false}")
private Boolean frequentLogPrint;
@Value("${" + CacheConstant.CACHE_EXPIRE + ":-1}")
private long expiration;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Cacheable cacheableAnnotation = getCacheableAnnotation(invocation);
if (cacheableAnnotation != null) {
String name = cacheableAnnotation.name();
String[] keys = cacheableAnnotation.key();
long expire = cacheableAnnotation.expire();
// 如果局部变量没配置,取全局变量值
if (expire == -1234567890L) {
expire = expiration;
}
return invokeCacheable(invocation, name, keys, expire);
}
CachePut cachePutAnnotation = getCachePutAnnotation(invocation);
if (cachePutAnnotation != null) {
String name = cachePutAnnotation.name();
String[] keys = cachePutAnnotation.key();
long expire = cachePutAnnotation.expire();
// 如果局部变量没配置,取全局变量值
if (expire == -1234567890L) {
expire = expiration;
}
return invokeCachePut(invocation, name, keys, expire);
}
CacheEvict cacheEvictAnnotation = getCacheEvictAnnotation(invocation);
if (cacheEvictAnnotation != null) {
String name = cacheEvictAnnotation.name();
String[] keys = cacheEvictAnnotation.key();
boolean allEntries = cacheEvictAnnotation.allEntries();
boolean beforeInvocation = cacheEvictAnnotation.beforeInvocation();
return invokeCacheEvict(invocation, name, keys, allEntries, beforeInvocation);
}
return invocation.proceed();
}
}
分布式锁
Maven依赖
引入依赖
分布式锁
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>aquarius-lock-starter</artifactId>
<version>${aquarius.version}</version>
</dependency>
aquarius-lock-starter
包下面spring.factories
引入LockConfiguration
。
com.nepxion.aquarius.lock.annotation.EnableLock=\
com.nepxion.aquarius.lock.configuration.LockConfiguration
LockConfiguration
会注入LockAopConfiguration
,RedisLockConfiguration
,LocalLockConfiguration
@Configuration
@Import({ AquariusConfiguration.class, LockAopConfiguration.class, RedisLockConfiguration.class, ZookeeperLockConfiguration.class, LocalLockConfiguration.class })
public class LockConfiguration {
}
LocalLockConfiguration
根据条件注入了LockDelegate
和LockExecutor
@Configuration
public class LocalLockConfiguration {
@Bean
@Conditional(LocalLockCondition.class)
public LockDelegate localLockDelegate() {
return new LocalLockDelegateImpl();
}
@Bean
@Conditional(LocalLockCondition.class)
public LockExecutor<Lock> localLockExecutor() {
return new LocalLockExecutorImpl();
}
}
本地锁的实现是由ReentrantLock
实现,Redis锁是由Redisson
实现的。
分布式限流
分布式限速限流
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>aquarius-limit-starter</artifactId>
<version>${aquarius.version}</version>
</dependency>
源码解析
aquarius-limit-starter
包下面spring.factories
引入LimitConfiguration
@Configuration
@Import({ AquariusConfiguration.class, LimitAopConfiguration.class, RedisLimitConfiguration.class, LocalLimitConfiguration.class })
public class LimitConfiguration {
}
LimitAopConfiguration
定义了拦截@Limit
注解,使用LimitInterceptor
拦截
@Configuration
public class LimitAopConfiguration {
@Value("${" + LimitConstant.LIMIT_SCAN_PACKAGES + ":}")
private String scanPackages;
@Bean
public LimitAutoScanProxy limitAutoScanProxy() {
return new LimitAutoScanProxy(scanPackages);
}
@Bean
public LimitInterceptor limitInterceptor() {
return new LimitInterceptor();
}
}
LimitInterceptor
获取注解信息,使用LimitDelegate
进行处理。
public class LimitInterceptor extends AbstractInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(LimitInterceptor.class);
@Autowired
private LimitDelegate limitDelegate;
@Value("${" + AquariusConstant.PREFIX + "}")
private String prefix;
@Value("${" + AquariusConstant.FREQUENT_LOG_PRINT + ":false}")
private Boolean frequentLogPrint;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Limit limitAnnotation = getLimitAnnotation(invocation);
if (limitAnnotation != null) {
String name = limitAnnotation.name();
String key = limitAnnotation.key();
int limitPeriod = limitAnnotation.limitPeriod();
int limitCount = limitAnnotation.limitCount();
return invoke(invocation, limitAnnotation, name, key, limitPeriod, limitCount);
}
return invocation.proceed();
}
private Object invoke(MethodInvocation invocation, Annotation annotation, String name, String key, int limitPeriod, int limitCount) throws Throwable {
if (StringUtils.isEmpty(name)) {
throw new AquariusException("Annotation [Limit]'s name is null or empty");
}
if (StringUtils.isEmpty(key)) {
throw new AquariusException("Annotation [Limit]'s key is null or empty");
}
String spelKey = null;
try {
spelKey = getSpelKey(invocation, key);
} catch (Exception e) {
spelKey = key;
}
String compositeKey = KeyUtil.getCompositeKey(prefix, name, spelKey);
String proxyType = getProxyType(invocation);
String proxiedClassName = getProxiedClassName(invocation);
String methodName = getMethodName(invocation);
if (frequentLogPrint) {
LOG.info("Intercepted for annotation - Limit [key={}, limitPeriod={}, limitCount={}, proxyType={}, proxiedClass={}, method={}]", compositeKey, limitPeriod, limitCount, proxyType, proxiedClassName, methodName);
}
return limitDelegate.invoke(invocation, compositeKey, limitPeriod, limitCount);
}
}
LocalLimitConfiguration
注入了LimitDelegate
和LimitExecutor
,LimitDelegate
就是调用LimitExecutor#tryAccess
@Configuration
public class LocalLimitConfiguration {
@Bean
@Conditional(LocalLimitCondition.class)
public LimitDelegate localLimitDelegate() {
return new LocalLimitDelegateImpl();
}
@Bean
@Conditional(LocalLimitCondition.class)
public LimitExecutor localLimitExecutor() {
return new GuavaLocalLimitExecutorImpl();
// return new JdkLimitExecutorImpl();
}
}
GuavaLocalLimitExecutorImpl#tryAccess(java.lang.String, int, int)
,根据key获取RateLimiterEntity
,从而获取到封装的RateLimiter
,进行缓存。封装主要是为了修改rate的时候,不需要重新生成RateLimiter
public boolean tryAccess(String compositeKey, int limitPeriod, int limitCount) throws Exception {
if (StringUtils.isEmpty(compositeKey)) {
throw new AquariusException("Composite key is null or empty");
}
if (limitPeriod != 1) {
throw new AquariusException("Limit period must be 1 second for Guava rate limiter");
}
RateLimiterEntity rateLimiterEntity = getRateLimiterEntity(compositeKey, limitCount);
RateLimiter rateLimiter = rateLimiterEntity.getRateLimiter();
return rateLimiter.tryAcquire();
}
private RateLimiterEntity getRateLimiterEntity(String compositeKey, double rate) {
RateLimiterEntity rateLimiterEntity = rateLimiterEntityMap.get(compositeKey);
if (rateLimiterEntity == null) {
RateLimiter newRateLimiter = RateLimiter.create(rate);
RateLimiterEntity newRateLimiterEntity = new RateLimiterEntity();
newRateLimiterEntity.setRateLimiter(newRateLimiter);
newRateLimiterEntity.setRate(rate);
rateLimiterEntity = rateLimiterEntityMap.putIfAbsent(compositeKey, newRateLimiterEntity);
if (rateLimiterEntity == null) {
rateLimiterEntity = newRateLimiterEntity;
}
} else {
if (rateLimiterEntity.getRate() != rate) {
rateLimiterEntity.getRateLimiter().setRate(rate);
rateLimiterEntity.setRate(rate);
}
}
return rateLimiterEntity;
}
RedisLimitExecutorImpl#tryAccess(java.lang.String, int, int)
,Redis实现主要是通过执行lua脚本。
@Override
public boolean tryAccess(String compositeKey, int limitPeriod, int limitCount) throws Exception {
if (StringUtils.isEmpty(compositeKey)) {
throw new AquariusException("Composite key is null or empty");
}
List<String> keys = new ArrayList<String>();
keys.add(compositeKey);
RedisTemplate<String, Object> redisTemplate = redisHandler.getRedisTemplate();
Number count = redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
if (count == null) {
return false;
}
if (frequentLogPrint) {
LOG.info("Access try count is {} for key={}", count, compositeKey);
}
return count.intValue() <= limitCount;
}
分布式全局唯一ID
<dependency>
<groupId>com.nepxion</groupId>
<artifactId>aquarius-id-generator-starter</artifactId>
<version>${aquarius.version}</version>
</dependency>
aquarius-id-generator-starter
项目中的spring.factories
会根据注解注入LocalIdGeneratorConfig
,EnableRedisIdGenerator
或者EnableZookeeperIdGenerator
。
com.nepxion.aquarius.idgenerator.annotation.EnableLocalIdGenerator=\
com.nepxion.aquarius.idgenerator.configuration.LocalIdGeneratorConfig
com.nepxion.aquarius.idgenerator.annotation.EnableRedisIdGenerator=\
com.nepxion.aquarius.idgenerator.configuration.RedisIdGeneratorConfig
com.nepxion.aquarius.idgenerator.annotation.EnableZookeeperIdGenerator=\
com.nepxion.aquarius.idgenerator.configuration.ZookeeperIdGeneratorConfig
LocalIdGeneratorConfig
注入LocalIdGeneratorConfiguration
@Configuration
@Import({ AquariusConfiguration.class, LocalIdGeneratorConfiguration.class })
public class LocalIdGeneratorConfig {
}
LocalIdGeneratorConfiguration
注入LocalIdGenerator
@Configuration
public class LocalIdGeneratorConfiguration {
@Bean
public LocalIdGenerator localIdGenerator() {
return new LocalIdGeneratorImpl();
}
}
LocalIdGeneratorImpl#nextUniqueId(long, long, long)
,获取ID生成器,执行nextId
方法
public String nextUniqueId(long startTimestamp, long dataCenterId, long machineId) throws Exception {
String nextUniqueId = getIdGenerator(startTimestamp, dataCenterId, machineId).nextId();
if (frequentLogPrint) {
LOG.info("Next unique id is {} for startTimestamp={}, dataCenterId={}, machineId={}", nextUniqueId, startTimestamp, dataCenterId, machineId);
}
return nextUniqueId;
}
private SnowflakeIdGenerator getIdGenerator(long startTimestamp, long dataCenterId, long machineId) {
String key = dataCenterId + "-" + machineId;
SnowflakeIdGenerator idGenerator = idGeneratorMap.get(key);
if (idGenerator == null) {
SnowflakeIdGenerator newIdGnerator = new SnowflakeIdGenerator(startTimestamp, dataCenterId, machineId);
idGenerator = idGeneratorMap.putIfAbsent(key, newIdGnerator);
if (idGenerator == null) {
idGenerator = newIdGnerator;
}
}
return idGenerator;
}
【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读 【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读 【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读
【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读
【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读
【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读【开源项目】Nepxion Aquarius实现分布式锁、缓存、ID生成、限速的源码解读