在JetCache中不仅可以通过在类和接口的函数上使用注解@Cached、@CacheUpdate和@CacheInvalidate等实现缓存加载、更新和删除操作,也支持通过调用API接口的形式来实现缓存的加载、更新和删除操作。
缓存接口
缓存接口的定义如下:
/**
* 缓存接口,支持空值。
*/
public interface Cache<K, V> extends Closeable {
/**
* 从缓存中获取一个条目。
* <p>如果缓存的构造器指定了一个{@link CacheLoader},并且缓存中没有关联,
* 它会尝试加载该条目。</p>
* <p>如果在缓存访问过程中发生错误,方法会返回null而不是抛出异常。</p>
* @param key 要返回其关联值的键
* @return 与指定键关联的值。null可能表示:<ul>
* <li>条目不存在或已过期</li>
* <li>条目的值为null</li>
* <li>在缓存访问过程中发生错误(不抛出异常)</li>
* </ul>
* @throws CacheInvokeException 仅当加载器抛出异常时
* @see CacheLoader
* @see #GET(Object)
*/
default V get(K key) throws CacheInvokeException {
CacheGetResult<V> result = GET(key);
if (result.isSuccess()) {
return result.getValue();
} else {
return null;
}
}
/**
* 从缓存中获取一组条目,将它们作为与请求的键集相关联的值的Map返回。
* <p>如果缓存的构造器指定了一个{@link CacheLoader},并且缓存中没有关联,
* 它会尝试加载条目。</p>
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* @param keys 要返回其关联值的键集合。
* @return 为给定键找到的条目的映射。在缓存中未找到的键不包含在返回的映射中。
* @throws CacheInvokeException 仅当加载器抛出异常时
* @see CacheLoader
* @see #GET_ALL(Set)
*/
default Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
MultiGetResult<K, V> cacheGetResults = GET_ALL(keys);
return cacheGetResults.unwrapValues();
}
/**
* 将指定的值与指定的键在缓存中关联起来。
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* <p>如果实现支持异步操作,此方法的缓存操作为异步。</p>
* @param key 与指定值关联的键
* @param value 要与指定键关联的值
* @see #PUT(Object, Object)
*/
default void put(K key, V value) {
PUT(key, value);
}
/**
* 将指定映射中的所有条目复制到缓存中。
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* <p>如果实现支持异步操作,此方法的缓存操作为异步。</p>
* @param map 要存储在此缓存中的映射。
* @see #PUT_ALL(Map)
*/
default void putAll(Map<? extends K, ? extends V> map) {
PUT_ALL(map);
}
/**
* 将指定映射中的所有条目复制到缓存中。
* <p>如果在访问缓存时发生错误,该方法不会抛出异常。</p>
* @param map 要存储在缓存中的映射。
* @param expireAfterWrite KV关联的TTL(生存时间)
* @param timeUnit expireAfterWrite的时间单位
* @see #PUT_ALL(Map, long, TimeUnit)
*/
default void putAll(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
PUT_ALL(map, expireAfterWrite, timeUnit);
}
/**
* 原子地将指定的键与给定的值关联,如果它还没有与一个值关联的话。
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* <p>{@link MultiLevelCache} 不支持此方法。</p>
* @param key 要与指定的值关联的键
* @param value 要与指定的键关联的值
* @return 如果设置了值,则为true;如果KV关联在缓存中不存在,或在缓存访问过程中发生错误,则为false。
* @see #PUT_IF_ABSENT(Object, Object, long, TimeUnit)
*/
default boolean putIfAbsent(K key, V value) {
CacheResult result = PUT_IF_ABSENT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
return result.getResultCode() == CacheResultCode.SUCCESS;
}
/**
* 如果存在,则从缓存中移除指定键的映射。
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* @param key 要从缓存中移除映射的键
* @return 如果键成功移除,则为true;如果KV关联在缓存中不存在,或在缓存访问过程中发生错误,则为false。
* @see #REMOVE(Object)
*/
default boolean remove(K key) {
return REMOVE(key).isSuccess();
}
/**
* 移除指定键的条目。
* <p>如果在缓存访问过程中发生错误,方法不会抛出异常。</p>
* <p>如果实现支持异步操作,此方法的缓存操作是异步的。</p>
* @param keys 要移除的键
* @see #REMOVE_ALL(Set)
*/
default void removeAll(Set<? extends K> keys) {
REMOVE_ALL(keys);
}
/**
* 如果与给定键关联的有值,则返回该值;否则使用加载器加载值并返回,然后更新缓存。
* @param key 键
* @param loader 值加载器
* @return 与键关联的值
* @see CacheConfig#isCacheNullValue()
*/
default V computeIfAbsent(K key, Function<K, V> loader) {
return computeIfAbsent(key, loader, config().isCacheNullValue());
}
/**
* 如果与给定键关联的有值,则返回该值;否则使用加载器加载值并返回,然后根据参数决定是否更新缓存。
* @param key 键
* @param loader 值加载器
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null值放入缓存
* @return 与键关联的值
*/
V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull);
/**
* 如果与给定键关联的有值,则返回该值;否则使用加载器加载值并返回,然后根据参数决定是否更新缓存,并设置过期时间。
* @param key 键
* @param loader 值加载器
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null值放入缓存
* @param expireAfterWrite 缓存项的TTL(生存时间)
* @param timeUnit expireAfterWrite的时间单位
* @return 与键关联的值
*/
V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull, long expireAfterWrite, TimeUnit timeUnit);
}
上面的方法是对缓存的增删改查操作,代码逻辑相对比较简单,默认是直接调用Cache接口中的GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL等方法来实现的。其中computeIfAbsent方法是用于解决缓存穿透的问题,即当缓存中没有对应的数据时,需要调用指定的loader获取相应的数据并写入到缓存中,跟Map中computeIfAbsent方法基本上原理是相似的,即如果缓存中key不存在会调用loader获取缓存的值,并写入到缓存中。
在上面的接口定义中,我们注意到JetCache缺少批量加载缓存的功能,无论是JetCache的注解亦或是API接口都不支持,我们后续会专门增加一个章节用于介绍如何实现缓存的批量加载的功能。当然,JetCache也不支持根据缓存的键或值自定义缓存有效期的能力,我们在后面都会介绍如何进行扩展。
只需要实现GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL等方法就能够实现缓存的增删改查操作。对应的源码如下:
/**
* 将指定映射中的所有条目复制到缓存中。
* <p>如果实现支持异步操作,调用此方法后缓存访问可能并未完成。
* 可以通过调用结果的getResultCode()/isSuccess()/getMessage()方法进行阻塞等待直到缓存操作完成。
* 调用结果的future()方法将获取用于异步编程的CompletionStage实例。</p>
* @param map 要存储在缓存中的映射。
* @return 操作结果
*/
default CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
if (map == null) {
return CacheResult.FAIL_ILLEGAL_ARGUMENT;
}
return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
/**
* 将指定映射中的所有条目复制到缓存中。
* <p>如果实现支持异步操作,调用此方法后缓存访问可能并未完成。
* 可以通过调用结果的getResultCode()/isSuccess()/getMessage()方法进行阻塞等待直到缓存操作完成。
* 调用结果的future()方法将获取用于异步编程的CompletionStage实例。</p>
* @param map 要存储在缓存中的映射。
* @param expireAfterWrite KV关联的TTL(生存时间)
* @param timeUnit expireAfterWrite的时间单位
* @return 操作结果
*/
CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
/**
* 如果缓存中存在指定键的映射,则从缓存中移除该映射。
* <p>如果实现支持异步操作,调用此方法后缓存访问可能并未完成。
* 可以通过调用结果的getResultCode()/isSuccess()/getMessage()方法进行阻塞等待直到缓存操作完成。
* 调用结果的future()方法将获取用于异步编程的CompletionStage实例。</p>
* @param key 要从缓存中移除映射的键
* @return 操作结果
*/
CacheResult REMOVE(K key);
/**
* 移除指定键的映射。
* <p>如果实现支持异步操作,调用此方法后缓存访问可能并未完成。
* 可以通过调用结果的getResultCode()/isSuccess()/getMessage()方法进行阻塞等待直到缓存操作完成。
* 调用结果的future()方法将获取用于异步编程的CompletionStage实例。</p>
* @param keys 要移除的键集合
* @return 操作结果
*/
CacheResult REMOVE_ALL(Set<? extends K> keys);
/**
* 如果指定键尚未与值关联,则将其与给定值关联。
* <p>如果实现支持异步操作,调用此方法后缓存访问可能并未完成。
* 可以通过调用结果的getResultCode()/isSuccess()/getMessage()方法进行阻塞等待直到缓存操作完成。
* 调用结果的future()方法将获取用于异步编程的CompletionStage实例。</p>
* @param key 与指定值关联的键
* @param value 要与指定键关联的值
* @param expireAfterWrite KV关联的TTL(生存时间)
* @param timeUnit expireAfterWrite的时间单位
* @return 如果指定键尚未与值关联,则返回SUCCESS;如果指定键已与值关联,则返回EXISTS;如果发生错误,则返回FAIL。
*/
CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
我们知道缓存分为进程内缓存和远程缓存,进程内的缓存如LinkedHashMap和caffeine,远程缓存如通过lettuce、redisson和spring-data-redis操作的redis,还有就是现在使用越来越少的MemeryCache。这些只需要实现上面的现GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL等方法就可以了。
但是细节上不会那么简单,我会在下面继续进行介绍,但是介绍之前还是需要介绍一下锁,因为实现使用缓存的场景肯定会涉及多线程,这样就需要使用锁来避免不同线程同时去对相同的键进行缓存的增删改操作了。在Cache接口中默认实现了锁的方法,源码如下:
/**
* 尝试使用缓存获取指定键的独占锁,此方法不会阻塞。
* 用法示例:
* <pre>
* try(AutoReleaseLock lock = cache.tryLock("MyKey",100, TimeUnit.SECONDS)){
* if(lock != null){
* // 执行某些操作
* }
* }
* </pre>
* <p>{@link MultiLevelCache} 将使用最后一级缓存来支持此操作。</p>
* @param key 锁键
* @param expire 锁的过期时间
* @param timeUnit 锁的过期时间单位
* @return 如果成功获取锁,则返回一个 AutoReleaseLock 实例(实现了 java.lang.AutoCloseable 接口)。
* 如果尝试失败(表示另一个线程/进程/服务器已持有锁),或在访问缓存时发生错误,则返回 null。
* @see #tryLockAndRun(Object, long, TimeUnit, Runnable)
*/
@SuppressWarnings("unchecked")
default AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
if (key == null) {
return null;
}
// 生成唯一的UUID作为锁标识
final String uuid = UUID.randomUUID().toString();
// 计算锁的过期时间戳
final long expireTimestamp = System.currentTimeMillis() + timeUnit.toMillis(expire);
// 获取缓存配置
final CacheConfig config = config();
// 定义一个AutoReleaseLock,它包含解锁逻辑
AutoReleaseLock lock = () -> {
int unlockCount = 0;
// 尝试解锁次数
while (unlockCount++ < config.getTryLockUnlockCount()) {
// 如果锁未过期,则尝试解锁
if(System.currentTimeMillis() < expireTimestamp) {
CacheResult unlockResult = REMOVE(key);
// 解锁结果处理
if (unlockResult.getResultCode() == CacheResultCode.FAIL
|| unlockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
logger.info("[tryLock] [{} of {}] [{}] unlock failed. Key={}, msg = {}",
unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getMessage());
// 重试解锁
} else if (unlockResult.isSuccess()) {
logger.debug("[tryLock] [{} of {}] [{}] successfully release the lock. Key={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key);
return;
} else {
logger.warn("[tryLock] [{} of {}] [{}] unexpected unlock result: Key={}, result={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key, unlockResult.getResultCode());
return;
}
} else {
// 锁已过期
logger.info("[tryLock] [{} of {}] [{}] lock already expired: Key={}",
unlockCount, config.getTryLockUnlockCount(), uuid, key);
return;
}
}
};
int lockCount = 0;
Cache cache = this;
// 尝试加锁次数
while (lockCount++ < config.getTryLockLockCount()) {
// 尝试添加锁
CacheResult lockResult = cache.PUT_IF_ABSENT(key, uuid, expire, timeUnit);
// 加锁结果处理
if (lockResult.isSuccess()) {
logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock. Key={}",
lockCount, config.getTryLockLockCount(), uuid, key);
return lock;
} else if (lockResult.getResultCode() == CacheResultCode.FAIL || lockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
// 缓存访问失败时的处理逻辑
logger.info("[tryLock] [{} of {}] [{}] cache access failed during get lock, will inquiry {} times. Key={}, msg={}",
lockCount, config.getTryLockLockCount(), uuid,
config.getTryLockInquiryCount(), key, lockResult.getMessage());
int inquiryCount = 0;
// 尝试查询次数
while (inquiryCount++ < config.getTryLockInquiryCount()) {
// 尝试查询锁状态
CacheGetResult inquiryResult = cache.GET(key);
// 查询结果处理
if (inquiryResult.isSuccess()) {
if (uuid.equals(inquiryResult.getValue())) {
// 成功获得锁
logger.debug("[tryLock] [{} of {}] [{}] successfully get a lock after inquiry. Key={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key);
return lock;
} else {
// 不是锁的所有者
logger.debug("[tryLock] [{} of {}] [{}] not the owner of the lock, return null. Key={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key);
return null;
}
} else {
logger.info("[tryLock] [{} of {}] [{}] inquiry failed. Key={}, msg={}",
inquiryCount, config.getTryLockInquiryCount(), uuid, key, inquiryResult.getMessage());
// 重试查询
}
}
} else {
// 其他持有锁
logger.debug("[tryLock] [{} of {}] [{}] others holds the lock, return null. Key={}",
lockCount, config.getTryLockLockCount(), uuid, key);
return null;
}
}
// 所有尝试均未成功获得锁
logger.debug("[tryLock] [{}] return null after {} attempts. Key={}", uuid, config.getTryLockLockCount(), key);
return null;
}
/**
* 尝试以独占方式执行一个操作。
* <p>{@link MultiLevelCache} 将使用最后一级缓存来支持此操作。</p>
* 用法示例:
* <pre>
* cache.tryLock("MyKey",100, TimeUnit.SECONDS),() -> {
* // 执行某些操作
* });
* </pre>
* @param key 锁键
* @param expire 锁的过期时间
* @param timeUnit 锁的过期时间单位
* @param action 需要执行的操作
* @return 如果成功获取锁并执行了操作,则返回 true;否则返回 false。
*/
default boolean tryLockAndRun(K key, long expire, TimeUnit timeUnit, Runnable action){
try (AutoReleaseLock lock = tryLock(key, expire, timeUnit)) {
if (lock != null) {
action.run();
return true;
} else {
return false;
}
}
}
tryLock方法是尝试获取缓存键的锁,其中key对应的是缓存的键,这样可以保证即使在多线程条件下,同一个键在同一时间内只会获取到一个锁。
- 定义释放锁的方法,先判断尝试解锁次数小于配置的最大解锁次数,如果超过就不在尝试解锁。然后判断锁是否到期,如果锁已到期就直接返回。最后调用REMOVE方法删除锁信息,如果删除不成功就会进行重试,否则直接返回。
- 首先调用PUT_IF_ABSENT试图在缓存中添加锁标识(唯一的GUID),以及锁的到期时间,该方法仅在键不存在时才会添加成功,否则会添加失败。PUT_IF_ABSENT添加锁不成功有两种可能,即已经存在该键,或者是缓存中没有该键,但是因为特殊原因(如网络原因导致写入Redis失败)导致写入键值失败。如果缓存中存在该键,则获取锁失败,返回null。否则会继续尝试获取锁信息。
tryLockAndRun方法实现了获取锁,执行指定的操作并释放锁,如果获取锁失败就直接返回false。
AbstractCache
在JetCache中AbstractCache是一个抽象类,继承自Cache,实现了GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL等方法,源码如下:
/**
* 抽象缓存类,提供了缓存的基本实现,支持键值对的存取操作。该类是线程安全的。
*
* @param <K> 键的类型
* @param <V> 值的类型
*/
public abstract class AbstractCache<K, V> implements Cache<K, V> {
/**
* 通知缓存事件监听器。
*
* @param e 缓存事件。
*/
public void notify(CacheEvent e) {
List<CacheMonitor> monitors = config().getMonitors();
for (CacheMonitor m : monitors) {
m.afterOperation(e);
}
}
/**
* 获取缓存中指定键的值。
*
* @param key 键。
* @return CacheGetResult<V> 获取结果,包含值和操作状态。
*/
@Override
public final CacheGetResult<V> GET(K key) {
long t = System.currentTimeMillis();
CacheGetResult<V> result;
// 对于null键,直接返回错误结果。
if (key == null) {
result = new CacheGetResult<V>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
} else {
result = do_GET(key);
}
// 异步触发获取事件的通知。
result.future().thenRun(() -> {
CacheGetEvent event = new CacheGetEvent(this, System.currentTimeMillis() - t, key, result);
notify(event);
});
return result;
}
/**
* 实际获取缓存值的逻辑。
*
* @param key 键。
* @return CacheGetResult<V> 获取结果,包含值和操作状态。
*/
protected abstract CacheGetResult<V> do_GET(K key);
/**
* 批量获取缓存中多个键对应的值。
*
* @param keys 键的集合。
* @return MultiGetResult<K, V> 批量获取结果,包含值的映射和操作状态。
*/
@Override
public final MultiGetResult<K, V> GET_ALL(Set<? extends K> keys) {
long t = System.currentTimeMillis();
MultiGetResult<K, V> result;
// 对于null键集合,直接返回错误结果。
if (keys == null) {
result = new MultiGetResult<>(CacheResultCode.FAIL, CacheResult.MSG_ILLEGAL_ARGUMENT, null);
} else {
result = do_GET_ALL(keys);
}
// 异步触发批量获取事件的通知。
result.future().thenRun(() -> {
CacheGetAllEvent event = new CacheGetAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(event);
});
return result;
}
/**
* 实际批量获取缓存值的逻辑。
*
* @param keys 键的集合。
* @return MultiGetResult<K, V> 批量获取结果,包含值的映射和操作状态。
*/
protected abstract MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys);
/**
* 将键值对存储到缓存中。
* @param key 键,不能为null。
* @param value 值。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
@Override
public final CacheResult PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
long t = System.currentTimeMillis();
CacheResult result;
if (key == null) {
result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
} else {
result = do_PUT(key, value, expireAfterWrite, timeUnit);
}
// 在异步操作完成后触发事件通知
result.future().thenRun(() -> {
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(event);
});
return result;
}
/**
* 实际执行PUT操作的抽象方法。
* @param key 键。
* @param value 值。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
protected abstract CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
/**
* 批量将键值对存储到缓存中。
* @param map 要存储的键值对集合。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
@Override
public final CacheResult PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
long t = System.currentTimeMillis();
CacheResult result;
if (map == null) {
result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
} else {
result = do_PUT_ALL(map, expireAfterWrite, timeUnit);
}
// 在异步操作完成后触发事件通知
result.future().thenRun(() -> {
CachePutAllEvent event = new CachePutAllEvent(this, System.currentTimeMillis() - t, map, result);
notify(event);
});
return result;
}
/**
* 实际执行批量PUT操作的抽象方法。
* @param map 要存储的键值对集合。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
protected abstract CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit);
/**
* 从缓存中移除指定键的项。
* @param key 要移除的键,不能为null。
* @return 操作结果,包含操作是否成功等信息。
*/
@Override
public final CacheResult REMOVE(K key) {
long t = System.currentTimeMillis();
CacheResult result;
if (key == null) {
result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
} else {
result = do_REMOVE(key);
}
// 在异步操作完成后触发事件通知
result.future().thenRun(() -> {
CacheRemoveEvent event = new CacheRemoveEvent(this, System.currentTimeMillis() - t, key, result);
notify(event);
});
return result;
}
/**
* 实际执行移除操作的抽象方法。
* @param key 要移除的键。
* @return 操作结果,包含操作是否成功等信息。
*/
protected abstract CacheResult do_REMOVE(K key);
/**
* 从缓存中移除指定键集合对应的项。
* @param keys 要移除的键的集合,不能为null。
* @return 操作结果,包含操作是否成功等信息。
*/
@Override
public final CacheResult REMOVE_ALL(Set<? extends K> keys) {
long t = System.currentTimeMillis();
CacheResult result;
if (keys == null) {
result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
} else {
result = do_REMOVE_ALL(keys);
}
// 在异步操作完成后触发事件通知
result.future().thenRun(() -> {
CacheRemoveAllEvent event = new CacheRemoveAllEvent(this, System.currentTimeMillis() - t, keys, result);
notify(event);
});
return result;
}
/**
* 实际执行批量移除操作的抽象方法。
* @param keys 要移除的键的集合。
* @return 操作结果,包含操作是否成功等信息。
*/
protected abstract CacheResult do_REMOVE_ALL(Set<? extends K> keys);
/**
* 如果指定的键在缓存中不存在,则将其添加。
* @param key 键,不能为null。
* @param value 值。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
@Override
public final CacheResult PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
long t = System.currentTimeMillis();
CacheResult result;
if (key == null) {
result = CacheResult.FAIL_ILLEGAL_ARGUMENT;
} else {
result = do_PUT_IF_ABSENT(key, value, expireAfterWrite, timeUnit);
}
// 在异步操作完成后触发事件通知
result.future().thenRun(() -> {
CachePutEvent event = new CachePutEvent(this, System.currentTimeMillis() - t, key, value, result);
notify(event);
});
return result;
}
/**
* 实际执行PUT_IF_ABSENT操作的抽象方法。
* @param key 键。
* @param value 值。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 操作结果,包含操作是否成功等信息。
*/
protected abstract CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit);
在AbstractCache类中,GET、GET_ALL、PUT、PUT_ALL、REMOVE、REMOVE_ALL等方法主要是先判断键是否有效,然后调用do_GET、do_GET_ALL、do_PUT、do_PUT_ALL、do_REMOVE、do_REMOVE_ALL等方法对缓存进行增删改查,最后使用观察者模式发布缓存的操作事件。
上面的代码逻辑还是相对比较简单的就不做太多的讲解,接下来,我们着重介绍AbstractCache类中缓存加载的代码逻辑,源码如下:
// 使用ConcurrentHashMap来存储加载器锁,以支持并发控制。
private volatile ConcurrentHashMap<Object, LoaderLock> loaderMap;
// 标记缓存是否已关闭。
protected volatile boolean closed;
// 用于初始化loaderMap的互斥锁,确保线程安全。
private static final ReentrantLock reentrantLock = new ReentrantLock();
/**
* 初始化或获取loaderMap。
*
* @return ConcurrentHashMap<Object, LoaderLock> 返回loaderMap实例。
*/
ConcurrentHashMap<Object, LoaderLock> initOrGetLoaderMap() {
if (loaderMap == null) {
reentrantLock.lock();
try {
if (loaderMap == null) {
loaderMap = new ConcurrentHashMap<>();
}
}finally {
reentrantLock.unlock();
}
}
return loaderMap;
}
/**
* 如果给定的键在缓存中不存在,则使用给定的加载器函数来计算值,并将结果放入缓存。
*
* @param key 键,用于在缓存中查找值。
* @param loader 加载器函数,用于在缓存中找不到键对应的值时计算值。
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null缓存起来。
* @return 缓存中键对应的值,或者是通过加载器计算出的值。
*/
@Override
public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull) {
return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
0, null, this);
}
/**
* 如果给定的键在缓存中不存在,并且希望设置过期时间,则使用给定的加载器函数来计算值,并将结果放入缓存。
*
* @param key 键,用于在缓存中查找值。
* @param loader 加载器函数,用于在缓存中找不到键对应的值时计算值。
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null缓存起来。
* @param expireAfterWrite 缓存条目的写入后过期时间。
* @param timeUnit 缓存条目的过期时间单位。
* @return 缓存中键对应的值,或者是通过加载器计算出的值。
*/
@Override
public final V computeIfAbsent(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit) {
return computeIfAbsentImpl(key, loader, cacheNullWhenLoaderReturnNull,
expireAfterWrite, timeUnit, this);
}
/**
* 判断是否需要更新缓存。
*
* @param loadedValue 加载器计算出的值。
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null缓存起来。
* @param loader 加载器函数。
* @return 如果需要更新缓存,则返回true;否则返回false。
*/
private static <K, V> boolean needUpdate(V loadedValue, boolean cacheNullWhenLoaderReturnNull, Function<K, V> loader) {
if (loadedValue == null && !cacheNullWhenLoaderReturnNull) {
return false;
}
if (loader instanceof CacheLoader && ((CacheLoader<K, V>) loader).vetoCacheUpdate()) {
return false;
}
return true;
}
/**
* 实际执行computeIfAbsent逻辑的方法。
*
* @param key 键,用于在缓存中查找值。
* @param loader 加载器函数,用于在缓存中找不到键对应的值时计算值。
* @param cacheNullWhenLoaderReturnNull 当加载器返回null时,是否将null缓存起来。
* @param expireAfterWrite 缓存条目的写入后过期时间。
* @param timeUnit 缓存条目的过期时间单位。
* @param cache 缓存实例。
* @return 缓存中键对应的值,或者是通过加载器计算出的值。
*/
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
CacheGetResult<V> r;
if (cache instanceof RefreshCache) {
RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
r = refreshCache.GET(key);
refreshCache.addOrUpdateRefreshTask(key, newLoader);
} else {
r = cache.GET(key);
}
if (r.isSuccess()) {
return r.getValue();
} else {
Consumer<V> cacheUpdater = (loadedValue) -> {
if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
if (timeUnit != null) {
cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
} else {
cache.PUT(key, loadedValue).waitForResult();
}
}
};
V loadedValue;
if (cache.config().isCachePenetrationProtect()) {
loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
} else {
loadedValue = newLoader.apply(key);
cacheUpdater.accept(loadedValue);
}
return loadedValue;
}
}
/**
* 使用同步方式从缓存中加载值。
*
* @param config 缓存配置。
* @param abstractCache 缓存的抽象实现。
* @param key 键。
* @param newLoader 加载器函数。
* @param cacheUpdater 缓存更新的消费者接口。
* @return 加载的值。
*/
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
Object lockKey = buildLoaderLockKey(abstractCache, key);
while (true) {
boolean create[] = new boolean[1];
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
if (create[0] || ll.loaderThread == Thread.currentThread()) {
try {
CacheGetResult<V> getResult = abstractCache.GET(key);
if (getResult.isSuccess()) {
ll.success = true;
ll.value = getResult.getValue();
return getResult.getValue();
} else {
V loadedValue = newLoader.apply(key);
ll.success = true;
ll.value = loadedValue;
cacheUpdater.accept(loadedValue);
return loadedValue;
}
} finally {
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
try {
Duration timeout = config.getPenetrationProtectTimeout();
if (timeout == null) {
ll.signal.await();
} else {
boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if(!ok) {
logger.info("loader wait timeout:" + timeout);
return newLoader.apply(key);
}
}
} catch (InterruptedException e) {
logger.warn("loader wait interrupted");
return newLoader.apply(key);
}
if (ll.success) {
return (V) ll.value;
} else {
continue;
}
}
}
}
/**
* 构建加载锁的键。
*
* @param c 缓存实例。
* @param key 键。
* @return 用于加载锁的键。
*/
private static Object buildLoaderLockKey(Cache c, Object key) {
if (c instanceof AbstractEmbeddedCache) {
return ((AbstractEmbeddedCache) c).buildKey(key);
} else if (c instanceof AbstractExternalCache) {
byte bytes[] = ((AbstractExternalCache) c).buildKey(key);
return ByteBuffer.wrap(bytes);
} else if (c instanceof MultiLevelCache) {
c = ((MultiLevelCache) c).caches()[0];
return buildLoaderLockKey(c, key);
} else if(c instanceof ProxyCache) {
c = ((ProxyCache) c).getTargetCache();
return buildLoaderLockKey(c, key);
} else {
throw new CacheException("impossible");
}
}
这段Java代码定义了一个名为computeIfAbsentImpl的静态方法,该方法用于在给定的缓存中根据键获取值,如果缓存中不存在该键对应的值,则使用提供的加载器函数加载值,并将加载的值放入缓存中。方法有多个参数,包括键、加载器函数、是否缓存null值、过期时间、时间单位和缓存对象。
该方法首先通过CacheUtil.getAbstractCache(cache)获取缓存的抽象类对象,然后通过CacheUtil.createProxyLoader()创建一个代理加载器。接下来,根据缓存类型分别调用RefreshCache.GET()或cache.GET()方法获取缓存值。如果获取成功,则返回该值;否则,根据是否需要更新缓存以及是否设置了过期时间,调用cache.PUT()方法更新缓存。
如果缓存配置了穿透保护,则调用synchronizedLoad()方法进行加锁加载。该方法使用ConcurrentHashMap维护一个加载器映射表,通过构建锁键来实现细粒度锁。在加锁过程中,如果当前线程是加载器线程,则直接进行加载并更新缓存;否则,等待加载完成或超时后返回加载的值。
总之,这段代码实现了根据键加载或获取缓存值的功能,并提供了缓存穿透保护的机制。
内存缓存
进程内的缓存框架支持LinkedHashMap和caffeine,在实际开发过程中一般推荐使用caffeine,好处网上一大堆,这里不做赘述。
AbstractEmbeddedCache
抽象类AbstractEmbeddedCache继承自AbstractCache,作为内存缓存的父类,其实现了常见内存缓存类的共通实现,由于是在内存缓存,不需要对缓存的数据进行序列化处理,相对处理速度自然相对会更快,序列化缓存键的操作已经在AbstractCache中实现,AbstractEmbeddedCache中就不会重复造轮子了。AbstractEmbeddedCache源码如下:
/**
* 抽象嵌入式缓存类,扩展自AbstractCache,提供缓存的内部映射和配置管理。
* 需要由具体缓存实现类继承并实现createAreaCache方法。
*
* @param <K> 键的类型
* @param <V> 值的类型
*/
public abstract class AbstractEmbeddedCache<K, V> extends AbstractCache<K, V> {
protected EmbeddedCacheConfig<K, V> config; // 缓存配置
protected InnerMap innerMap; // 内部映射,用于缓存数据
/**
* 创建区域缓存,由子类具体实现。
*
* @return InnerMap 实例,表示特定缓存区域。
*/
protected abstract InnerMap createAreaCache();
private final ReentrantLock lock = new ReentrantLock(); // 用于并发控制的锁
/**
* 构造函数,初始化嵌入式缓存。
*
* @param config 缓存配置,包含缓存的各种设置项。
*/
public AbstractEmbeddedCache(EmbeddedCacheConfig<K, V> config) {
this.config = config;
innerMap = createAreaCache();
}
/**
* 获取缓存配置。
*
* @return CacheConfig 缓存配置。
*/
@Override
public CacheConfig<K, V> config() {
return config;
}
/**
* 基于提供的键构建缓存键,如果配置了键转换器,则会应用转换。
*
* @param key 原始键。
* @return 转换后的键,用于缓存存储。
*/
public Object buildKey(K key) {
Object newKey = key;
Function<K, Object> keyConvertor = config.getKeyConvertor();
if (keyConvertor != null) {
newKey = keyConvertor.apply(key);
}
return newKey;
}
/**
* 从缓存中获取指定键的值。
*
* @param key 键。
* @return CacheGetResult 包含获取结果的对象,可能为不存在、已过期或其他状态。
*/
@Override
protected CacheGetResult<V> do_GET(K key) {
Object newKey = buildKey(key);
CacheValueHolder<V> holder = (CacheValueHolder<V>) innerMap.getValue(newKey);
return parseHolderResult(holder);
}
/**
* 解析缓存值持有者结果,确定缓存状态(存在、过期等)。
*
* @param holder 缓存值持有者。
* @return CacheGetResult 包含获取结果的对象。
*/
protected CacheGetResult<V> parseHolderResult(CacheValueHolder<V> holder) {
long now = System.currentTimeMillis();
if (holder == null) {
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
} else if (now >= holder.getExpireTime()) {
return CacheGetResult.EXPIRED_WITHOUT_MSG;
} else {
lock.lock();
try{
long accessTime = holder.getAccessTime();
if (config.isExpireAfterAccess()) {
long expireAfterAccess = config.getExpireAfterAccessInMillis();
if (now >= accessTime + expireAfterAccess) {
return CacheGetResult.EXPIRED_WITHOUT_MSG;
}
}
holder.setAccessTime(now);
}finally {
lock.unlock();
}
return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
}
}
/**
* 批量获取缓存值。
*
* @param keys 键的集合。
* @return MultiGetResult 包含批量获取结果的对象。
*/
@Override
protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
// 将原始键转换为缓存键,并批量获取值
ArrayList<K> keyList = new ArrayList<K>(keys.size());
ArrayList<Object> newKeyList = new ArrayList<Object>(keys.size());
keys.stream().forEach((k) -> {
Object newKey = buildKey(k);
keyList.add(k);
newKeyList.add(newKey);
});
Map<Object, CacheValueHolder<V>> innerResultMap = innerMap.getAllValues(newKeyList);
Map<K, CacheGetResult<V>> resultMap = new HashMap<>();
for (int i = 0; i < keyList.size(); i++) {
K key = keyList.get(i);
Object newKey = newKeyList.get(i);
CacheValueHolder<V> holder = innerResultMap.get(newKey);
resultMap.put(key, parseHolderResult(holder));
}
MultiGetResult<K, V> result = new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
return result;
}
/**
* 向缓存中放入一个键值对,设置过期时间。
*
* @param key 键。
* @param value 值。
* @param expireAfterWrite 缓存项的写入过期时间。
* @param timeUnit 时间单位。
* @return CacheResult 描述操作结果的对象。
*/
@Override
protected CacheResult do_PUT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(value ,timeUnit.toMillis(expireAfterWrite));
innerMap.putValue(buildKey(key), cacheObject);
return CacheResult.SUCCESS_WITHOUT_MSG;
}
/**
* 批量放入缓存项。
*
* @param map 键值对的映射。
* @param expireAfterWrite 缓存项的写入过期时间。
* @param timeUnit 时间单位。
* @return CacheResult 描述操作结果的对象。
*/
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
HashMap newKeyMap = new HashMap();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
newKeyMap.put(buildKey(en.getKey()), cacheObject);
}
innerMap.putAllValues(newKeyMap);
final HashMap resultMap = new HashMap();
map.keySet().forEach((k) -> resultMap.put(k, CacheResultCode.SUCCESS));
return CacheResult.SUCCESS_WITHOUT_MSG;
}
/**
* 从缓存中移除指定键的项。
*
* @param key 键。
* @return CacheResult 描述操作结果的对象。
*/
@Override
protected CacheResult do_REMOVE(K key) {
innerMap.removeValue(buildKey(key));
return CacheResult.SUCCESS_WITHOUT_MSG;
}
/**
* 批量移除缓存项。
*
* @param keys 键的集合。
* @return CacheResult 描述操作结果的对象。
*/
@Override
protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
Set newKeys = keys.stream().map((key) -> buildKey(key)).collect(Collectors.toSet());
innerMap.removeAllValues(newKeys);
return CacheResult.SUCCESS_WITHOUT_MSG;
}
// 内部方法,用于彻底清除指定键的缓存项
public void __removeAll(Set<? extends K> keys) {
innerMap.removeAllValues(keys);
}
/**
* 如果指定键的缓存项不存在,则放入该键值对,并设置过期时间。
*
* @param key 键。
* @param value 值。
* @param expireAfterWrite 缓存项的写入过期时间。
* @param timeUnit 时间单位。
* @return CacheResult 描述操作结果的对象。
*/
@Override
protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
CacheValueHolder<V> cacheObject = new CacheValueHolder(value, timeUnit.toMillis(expireAfterWrite));
if (innerMap.putIfAbsentValue(buildKey(key), cacheObject)) {
return CacheResult.SUCCESS_WITHOUT_MSG;
} else {
return CacheResult.EXISTS_WITHOUT_MSG;
}
}
}
在上面的函数中,抽象函数createAreaCache()创建内部映射实例innerMap用于缓存数据,do_GET、do_GET_ALL、do_PUT、do_PUT_ALL、do_REMOVE、do_REMOVE_ALL等方法都是基于innerMap实例来实现缓存数据的增删改查的。缓存数据时,会构建CacheValueHolder实例,用于保存缓存的数值、缓存创建时间和到期时间。之所以要缓存数据的过期时间,是便于框架在缓存数据过期后可以及时的将缓存数据从内存中清除出去。
InnerMap是JetCache专为内存缓存定义的通用缓存处理接口,源码如下:
/**
* InnerMap 接口定义了一套操作映射数据的方法,包括获取值、存储值、移除值等操作。
*/
public interface InnerMap {
/**
* 通过键获取对应的值。
*
* @param key 用于获取值的键。
* @return 键对应的值,如果不存在则返回 null。
*/
Object getValue(Object key);
/**
* 通过键的集合获取所有对应的值。
*
* @param keys 键的集合。
* @return 一个映射,包含给定键集合中每个键对应的值。
*/
Map getAllValues(Collection keys);
/**
* 为指定的键存储一个值。
*
* @param key 要存储值的键。
* @param value 键对应的值。
*/
void putValue(Object key, Object value);
/**
* 从映射中批量添加键值对。
*
* @param map 包含要添加的键值对的映射。
*/
void putAllValues(Map map);
/**
* 移除指定键对应的值。
*
* @param key 要移除的键。
* @return 如果成功移除,返回 true;如果键不存在,返回 false。
*/
boolean removeValue(Object key);
/**
* 如果指定键不存在,则添加该键对应的值。
*
* @param key 要添加值的键。
* @param value 键对应的值。
* @return 如果成功添加,返回 true;如果键已存在,返回 false。
*/
boolean putIfAbsentValue(Object key, Object value);
/**
* 移除指定键集合中所有键对应的值。
*
* @param keys 要移除的键的集合。
*/
void removeAllValues(Collection keys);
}
LinkedHashMapCache和CaffeineCache类继承自AbstractEmbeddedCache,主要是实现抽象函数createAreaCache()创建内部映射InnerMap的实例。
LinkedHashMapCache
LinkedHashMapCache是一个基于 LinkedHashMap 的缓存实现类,继承自 AbstractEmbeddedCache。
它通过 LRUMap (最近最少使用)算法来管理缓存项,支持缓存过期清理。LinkedHashMapCache的源码如下:
/**
* LinkedHashMapCache是一个基于 LinkedHashMap 的缓存实现类,继承自 AbstractEmbeddedCache。
* 它通过 LRUMap (最近最少使用)算法来管理缓存项,支持缓存过期清理。
*
* @param <K> 缓存键的类型
* @param <V> 缓存值的类型
*/
public class LinkedHashMapCache<K, V> extends AbstractEmbeddedCache<K, V> {
private static Logger logger = LoggerFactory.getLogger(LinkedHashMapCache.class);
/**
* 构造函数,初始化缓存。
*
* @param config 缓存配置,包含缓存大小等设置。
*/
public LinkedHashMapCache(EmbeddedCacheConfig<K, V> config) {
super(config);
addToCleaner(); // 将当前缓存实例添加到缓存清理器中,以便定期清理。
}
/**
* 将当前缓存实例添加到缓存清理器。
*/
protected void addToCleaner() {
Cleaner.add(this);
}
/**
* 创建一个新的缓存区域,实际是一个 LRUMap 实例。
*
* @return InnerMap 实例,用于存储缓存项。
*/
@Override
protected InnerMap createAreaCache() {
return new LRUMap(config.getLimit());
}
/**
* 将当前缓存实例转换为指定类型的对象。
*
* @param clazz 需要转换成的目标类类型。
* @return 转换后的对象实例,如果无法转换则抛出异常。
* @throws IllegalArgumentException 如果指定的类不是 LinkedHashMap 类型,则抛出此异常。
*/
@Override
public <T> T unwrap(Class<T> clazz) {
if (clazz.equals(LinkedHashMap.class)) {
return (T) innerMap;
}
throw new IllegalArgumentException(clazz.getName());
}
/**
* 清理已经过期的缓存项。
*/
public void cleanExpiredEntry() {
((LRUMap) innerMap).cleanExpiredEntry();
}
/**
* LRUMap 是一个基于 LinkedHashMap 实现的缓存映射,支持 LRU 算法和缓存过期机制。
*/
final class LRUMap extends LinkedHashMap implements InnerMap {
private final int max; // 缓存最大容量
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 读写锁,用于并发控制
/**
* 构造函数,初始化 LRUMap。
*
* @param max 缓存的最大容量。
*/
public LRUMap(int max) {
super((int) (max * 1.4f), 0.75f, true); // 调整 LinkedHashMap 的构造参数,以适应 LRU 策略
this.max = max;
}
/**
* 当缓存项数量超过最大容量时,移除最老的缓存项。
*
* @param eldest 被考虑移除的最老缓存项。
* @return 返回 true 如果最老缓存项被移除,否则返回 false。
*/
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > max;
}
/**
* 获取指定键的缓存值,支持并发控制。
*
* @param key 缓存项的键。
* @return 返回对应键的缓存值,如果不存在则返回 null。
*/
@Override
public Object getValue(Object key) {
Lock lock = readWriteLock.readLock();
lock.lock();
try{
return get(key);
}finally {
lock.unlock();
}
}
/**
* 获取多个键对应的缓存值,支持并发控制。
*
* @param keys 缓存项的键集合。
* @return 返回一个映射,包含所有指定键的缓存值,如果键不存在则对应值为 null。
*/
@Override
public Map getAllValues(Collection keys) {
Lock lock = readWriteLock.readLock();
lock.lock();
Map values = new HashMap();
try{
for (Object key : keys) {
Object v = get(key);
if (v != null) {
values.put(key, v);
}
}
}finally {
lock.unlock();
}
return values;
}
/**
* 添加或更新一个缓存项,支持并发控制。
*
* @param key 缓存项的键。
* @param value 缓存项的值。
*/
@Override
public void putValue(Object key, Object value) {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
put(key, value);
}finally {
lock.unlock();
}
}
/**
* 批量添加或更新多个缓存项,支持并发控制。
*
* @param map 包含要添加或更新的缓存项的键值对集合。
*/
@Override
public void putAllValues(Map map) {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
Set<Map.Entry> set = map.entrySet();
for (Map.Entry en : set) {
put(en.getKey(), en.getValue());
}
}finally {
lock.unlock();
}
}
/**
* 移除指定键的缓存项,支持并发控制。
*
* @param key 缓存项的键。
* @return 如果缓存项被成功移除,返回 true,否则返回 false。
*/
@Override
public boolean removeValue(Object key) {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
return remove(key) != null;
}finally {
lock.unlock();
}
}
/**
* 移除多个键对应的缓存项,支持并发控制。
*
* @param keys 缓存项的键集合。
*/
@Override
public void removeAllValues(Collection keys) {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
for (Object k : keys) {
remove(k);
}
}finally {
lock.unlock();
}
}
/**
* 如果指定键的缓存项不存在或已过期,则添加新的缓存项,支持并发控制。
*
* @param key 缓存项的键。
* @param value 缓存项的值。
* @return 如果新的缓存项被成功添加,则返回 true,否则返回 false。
*/
@Override
@SuppressWarnings("unchecked")
public boolean putIfAbsentValue(Object key, Object value) {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
CacheValueHolder h = (CacheValueHolder) get(key);
if (h == null || parseHolderResult(h).getResultCode() == CacheResultCode.EXPIRED) {
put(key, value);
return true;
} else {
return false;
}
}finally {
lock.unlock();
}
}
}
}
LinkedHashMapCache创建的InnerMap实例类型是LRUMap,LRUMap继承自LinkedHashMap类并实现了InnerMap接口,由于LinkedHashMap可以维护插入顺序或访问顺序两种有序性,其中访问顺序指put和get操作已存在的Entry时,会将Entry移动到双向链表的表尾。
LRUMap重写了父类LinkedHashMap的removeEldestEntry函数,当缓存项数量超过最大容量时,移除最老的缓存项。可以看一下父类LinkedHashMap的afterNodeInsertion函数,该函数在removeEldestEntry返回true时,会删除链表的表头。源码如下:
void afterNodeInsertion(boolean evict) { // possibly remove eldest
LinkedHashMap.Entry<K,V> first;
if (evict && (first = head) != null && removeEldestEntry(first)) {
K key = first.key;
removeNode(hash(key), key, null, false, true);
}
}
由于LinkedHashMap的方法put和get操作已存在的Entry时,会将Entry移动到双向链表的表尾。则最近最少使用的缓存项自然而然的就被移动到双向链表的表头了。从字面理解,afterNodeInsertion是指LinkedHashMap中节点插入的后置函数,在HashMap的源码中,put、merge、compute和computeIfAbsent函数的最后会调用afterNodeInsertion函数。
现在看到的是,当向缓存LinkedHashMap中写入缓存数据,且缓存项大于LinkedHashMap的最大容量时才会删除最近最少使用的缓存数据。对于已经到期的缓存数据是在什么时候删除的呢?我们注意到addToCleaner函数会将当前的LinkedHashMapCache添加到缓存清理器中,缓存清理器Cleaner的功能就是定时的将已经过期的缓存数据从内存中清除掉。Cleaner源码如下:
class Cleaner {
static LinkedList<WeakReference<LinkedHashMapCache>> linkedHashMapCaches = new LinkedList<>();
private static final ReentrantLock reentrantLock = new ReentrantLock();
static {
ScheduledExecutorService executorService = JetCacheExecutor.defaultExecutor();
executorService.scheduleWithFixedDelay(() -> run(), 60, 60, TimeUnit.SECONDS);
}
static void add(LinkedHashMapCache cache) {
reentrantLock.lock();
try{
linkedHashMapCaches.add(new WeakReference<>(cache));
}finally {
reentrantLock.unlock();
}
}
static void run() {
reentrantLock.lock();
try{
Iterator<WeakReference<LinkedHashMapCache>> it = linkedHashMapCaches.iterator();
while (it.hasNext()) {
WeakReference<LinkedHashMapCache> ref = it.next();
LinkedHashMapCache c = ref.get();
if (c == null) {
it.remove();
} else {
c.cleanExpiredEntry();
}
}
}finally {
reentrantLock.unlock();
}
}
}
Cleaner会每隔60秒钟,调用所有LinkedHashMapCache实例的cleanExpiredEntry函数来清空已经到期的缓存数据。函数cleanExpiredEntry的源码如下:
public void cleanExpiredEntry() {
((LRUMap) innerMap).cleanExpiredEntry();
}
上面的函数会调用LRUMap类的cleanExpiredEntry函数来清空已经到期的缓存数据,cleanExpiredEntry源码如下:
/**
* 清理已经过期的缓存项。
*/
void cleanExpiredEntry() {
Lock lock = readWriteLock.writeLock();
lock.lock();
try{
// 遍历缓存项,移除所有过期的缓存。
for (Iterator it = entrySet().iterator(); it.hasNext();) {
Map.Entry en = (Map.Entry) it.next();
Object value = en.getValue();
if (value != null && value instanceof CacheValueHolder) {
CacheValueHolder h = (CacheValueHolder) value;
if (System.currentTimeMillis() >= h.getExpireTime()) {
it.remove();
}
} else {
// 如果缓存项为空或不是 CacheValueHolder 类型,则记录错误。
if (value == null) {
logger.error("key " + en.getKey() + " is null");
} else {
logger.error("value of key " + en.getKey() + " is not a CacheValueHolder. type=" + value.getClass());
}
}
}
}finally {
lock.unlock();
}
}
cleanExpiredEntry函数会轮询当前LinkedHashMap实例中的所有缓存数据,然后将已经过期的数据移除。
关于LinkedHashMapCache的设计还存在问题,在发现缓存数据已到期时,并没有及时的将其移除。由于间隔时间是60秒,在短时间内当有大量的缓存数据进行读写时,可能会存在部分读频繁的数据已经过期,而部分未到期,且读写不频繁的数据就会被移到双向列表的表头,一旦缓存数据量超过最大容量就会把未到期的数据给清了。
CaffeineCache
基于Caffeine实现的缓存类,扩展自AbstractEmbeddedCache,提供了缓存管理的功能。源码如下:
/**
* 基于Caffeine实现的缓存类,扩展自AbstractEmbeddedCache,提供了缓存管理的功能。
*/
public class CaffeineCache<K, V> extends AbstractEmbeddedCache<K, V> {
// Caffeine缓存实例
private com.github.benmanes.caffeine.cache.Cache cache;
/**
* 构造函数,初始化Caffeine缓存。
*
* @param config 缓存配置信息,包含缓存大小、过期时间等配置。
*/
public CaffeineCache(EmbeddedCacheConfig<K, V> config) {
super(config);
}
/**
* 将当前缓存实例转换为指定类型的缓存接口。
*
* @param clazz 需要转换的目标缓存接口的Class对象。
* @return 转换后的缓存实例,如果无法转换则抛出IllegalArgumentException。
* @throws IllegalArgumentException 当指定的类不是com.github.benmanes.caffeine.cache.Cache时抛出。
*/
@Override
public <T> T unwrap(Class<T> clazz) {
if (clazz.equals(com.github.benmanes.caffeine.cache.Cache.class)) {
return (T) cache;
}
throw new IllegalArgumentException(clazz.getName());
}
/**
* 创建缓存区域,配置缓存的大小、过期策略等。
*
* @return 内部映射接口,提供了对缓存操作的方法。
*/
@Override
@SuppressWarnings("unchecked")
protected InnerMap createAreaCache() {
// 基于配置初始化Caffeine缓存构建器
Caffeine<Object, Object> builder = Caffeine.newBuilder();
builder.maximumSize(config.getLimit()); // 设置缓存最大大小
// 配置缓存过期策略
final boolean isExpireAfterAccess = config.isExpireAfterAccess(); // 是否按访问时间过期
final long expireAfterAccess = config.getExpireAfterAccessInMillis(); // 访问过期时间
builder.expireAfter(new Expiry<Object, CacheValueHolder>() {
// 计算过期时间的方法
private long getRestTimeInNanos(CacheValueHolder value) {
long now = System.currentTimeMillis();
long ttl = value.getExpireTime() - now;
if(isExpireAfterAccess){
ttl = Math.min(ttl, expireAfterAccess);
}
return TimeUnit.MILLISECONDS.toNanos(ttl);
}
@Override
public long expireAfterCreate(Object key, CacheValueHolder value, long currentTime) {
return getRestTimeInNanos(value);
}
@Override
public long expireAfterUpdate(Object key, CacheValueHolder value,
long currentTime, long currentDuration) {
return currentDuration;
}
@Override
public long expireAfterRead(Object key, CacheValueHolder value,
long currentTime, long currentDuration) {
return getRestTimeInNanos(value);
}
});
// 构建并初始化缓存
cache = builder.build();
// 返回一个内部映射,提供了基本的缓存操作接口
return new InnerMap() {
@Override
public Object getValue(Object key) {
return cache.getIfPresent(key);
}
@Override
public Map getAllValues(Collection keys) {
return cache.getAllPresent(keys);
}
@Override
public void putValue(Object key, Object value) {
cache.put(key, value);
}
@Override
public void putAllValues(Map map) {
cache.putAll(map);
}
@Override
public boolean removeValue(Object key) {
return cache.asMap().remove(key) != null;
}
@Override
public void removeAllValues(Collection keys) {
cache.invalidateAll(keys);
}
@Override
public boolean putIfAbsentValue(Object key, Object value) {
return cache.asMap().putIfAbsent(key, value) == null;
}
};
}
}
CaffeineCache的代码相对比较简单,在JetCache中内存缓存推荐使用Caffeine,性能相对LinkedHashMap更快,官方的Caffeine性能测试结果如下:
上面的结果仅仅是Caffeine官方提供的参考,详细可以到Caffeine官方上查看。
远程缓存
由于MemoryCache逐渐被Redis所取代,且Redis相对更加通用,所以JetCache目前支持的远程缓存是Redis。鉴于信创的需求,已经有多家国产公司开发出可以替代Redis的内存缓存系统,如阿里巴巴的Tair,东方通的TongRDS等,不确定JetCache后续会不会支持其他内存缓存系统。
JetCache通过引入lettuce、redisson和spring-data-redis中间件来实现Redis缓存的操作,如果要支持阿里巴巴的Tair,东方通的TongRDS等相对也比较简单。
AbstractExternalCache
抽象外部缓存类,扩展自AbstractCache,提供了缓存配置、键的转换和构建、以及配置检查等功能。源码如下:
/**
* 抽象外部缓存类,扩展自AbstractCache,提供了缓存配置、键的转换和构建、以及配置检查等功能。
* @param <K> 键的类型
* @param <V> 值的类型
*/
public abstract class AbstractExternalCache<K, V> extends AbstractCache<K, V> {
private ExternalCacheConfig<K, V> config; // 缓存配置对象
/**
* 构造函数,初始化外部缓存配置。
* @param config 外部缓存的配置,不可为null。
*/
public AbstractExternalCache(ExternalCacheConfig<K, V> config) {
this.config = config;
checkConfig(); // 检查配置的合法性
}
/**
* 配置检查方法,确保必要的配置项已经设置。
*/
protected void checkConfig() {
// 检查值编码器、解码器和键前缀是否已设置,未设置则抛出异常
if (config.getValueEncoder() == null) {
throw new CacheConfigException("no value encoder");
}
if (config.getValueDecoder() == null) {
throw new CacheConfigException("no value decoder");
}
if (config.getKeyPrefix() == null) {
throw new CacheConfigException("keyPrefix is required");
}
}
/**
* 构建缓存键。
* @param key 用户提供的键,可能需要转换。
* @return 经过转换和组合前缀后的缓存键字节数组。
* @throws CacheException 如果构建过程发生异常。
*/
public byte[] buildKey(K key) {
try {
Object newKey = key; // 初始化为原始键
if (config.getKeyConvertor() != null) {
// 根据版本适配不同的键转换逻辑
if (config.getKeyConvertor() instanceof KeyConvertor) {
if (!isPreservedKey(key)) {
newKey = config.getKeyConvertor().apply(key);
}
} else {
// 旧版本的键转换处理
if (key instanceof byte[]) {
newKey = key;
} else if (key instanceof String) {
newKey = key;
} else {
newKey = config.getKeyConvertor().apply(key);
}
}
}
// 构建最终的键
return ExternalKeyUtil.buildKeyAfterConvert(newKey, config.getKeyPrefix());
} catch (IOException e) {
throw new CacheException(e);
}
}
/**
* 判断键是否被保留不进行转换。
* @param key 原始键。
* @return 如果键是保留键(如锁键或时间戳键),则返回true,否则返回false。
*/
private boolean isPreservedKey(Object key) {
if (key instanceof byte[]) {
byte[] keyBytes = (byte[]) key;
// 判断键是否以特定后缀结尾,决定是否转换
return endWith(keyBytes, RefreshCache.LOCK_KEY_SUFFIX)
|| endWith(keyBytes, RefreshCache.TIMESTAMP_KEY_SUFFIX);
}
return false;
}
/**
* 判断字节数组是否以指定的后缀结尾。
* @param key 待检查的字节数组。
* @param suffix 指定的后缀字节数组。
* @return 如果key以suffix结尾,返回true,否则返回false。
*/
private boolean endWith(byte[] key, byte[] suffix) {
int len = suffix.length;
if (key.length < len) {
return false;
}
int startPos = key.length - len;
// 比较key的后缀与suffix是否相同
for (int i = 0; i < len; i++) {
if (key[startPos + i] != suffix[i]) {
return false;
}
}
return true;
}
}
RedissonCache
/**
* 基于Redisson客户端实现的缓存类,继承自AbstractExternalCache,提供了缓存的基本操作。
*
* @param <K> 键的类型
* @param <V> 值的类型
*/
public class RedissonCache<K, V> extends AbstractExternalCache<K, V> {
private final RedissonClient client; // Redisson客户端实例
private final RedissonCacheConfig<K, V> config; // 缓存配置
private final Function<Object, byte[]> valueEncoder; // 值编码器
private final Function<byte[], Object> valueDecoder; // 值解码器
/**
* 构造函数,初始化Redisson缓存。
*
* @param config 缓存配置,包含Redisson客户端、值编码器和值解码器等配置项。
*/
public RedissonCache(final RedissonCacheConfig<K, V> config) {
super(config);
this.config = config;
this.client = config.getRedissonClient();
this.valueEncoder = config.getValueEncoder();
this.valueDecoder = config.getValueDecoder();
}
/**
* 获取缓存键的字符串形式。
*
* @param key 缓存的键。
* @return 键的字符串形式。
*/
protected String getCacheKey(final K key) {
final byte[] newKey = buildKey(key);
return new String(newKey, StandardCharsets.UTF_8);
}
/**
* 获取缓存配置。
*
* @return 缓存配置。
*/
@Override
public CacheConfig<K, V> config() {
return this.config;
}
/**
* 不支持unwrap操作。
*
* @param clazz 需要转换成的类类型。
* @throws UnsupportedOperationException 永远抛出此异常,表示不支持unwrap操作。
*/
@Override
public <T> T unwrap(final Class<T> clazz) {
throw new UnsupportedOperationException("RedissonCache does not support unwrap");
}
/**
* 获取Redisson使用的Codec。
*
* @return 返回ByteArrayCodec实例。
*/
private Codec getCodec() {
return ByteArrayCodec.INSTANCE;
}
/**
* 编码缓存值。
*
* @param holder 缓存值持有者。
* @return 编码后的值。
*/
private byte[] encoder(final CacheValueHolder<V> holder) {
if (Objects.nonNull(holder)) {
return valueEncoder.apply(holder);
}
return null;
}
/**
* 解码缓存值。
*
* @param key 缓存的键。
* @param data 缓存的数据。
* @param counter 解码尝试的次数。
* @return 解码后的缓存值持有者。
*/
@SuppressWarnings({"unchecked"})
private CacheValueHolder<V> decoder(final K key, final byte[] data, final int counter) {
CacheValueHolder<V> holder = null;
if (Objects.nonNull(data) && data.length > 0) {
try {
holder = (CacheValueHolder<V>) valueDecoder.apply(data);
} catch (CacheEncodeException e) {
holder = compatibleOldVal(key, data, counter + 1);
if(Objects.isNull(holder)){
logError("decoder", key, e);
}
} catch (Throwable e) {
logError("decoder", key, e);
}
}
return holder;
}
/**
* 简化版的解码缓存值,不传入尝试次数。
*
* @param key 缓存的键。
* @param data 缓存的数据。
* @return 解码后的缓存值持有者。
*/
private CacheValueHolder<V> decoder(final K key, final byte[] data) {
return decoder(key, data, 0);
}
/**
* 兼容旧版本值的解码。
*
* @param key 缓存的键。
* @param data 缓存的数据。
* @param counter 解码尝试的次数。
* @return 兼容旧版本后的缓存值持有者。
*/
private CacheValueHolder<V> compatibleOldVal(final K key, final byte[] data, final int counter) {
if (Objects.nonNull(key) && Objects.nonNull(data) && data.length > 0 && counter <= 1) {
try {
final Codec codec = this.client.getConfig().getCodec();
if (Objects.nonNull(codec)) {
final Class<?> cls = ByteArrayCodec.class;
if (codec.getClass() != cls) {
final ByteBuf in = ByteBufAllocator.DEFAULT.buffer().writeBytes(data);
final byte[] out = (byte[]) codec.getValueDecoder().decode(in, null);
return decoder(key, out, counter);
}
}
} catch (Throwable e) {
logError("compatibleOldVal", key, e);
}
}
return null;
}
/**
* 获取缓存项。
*
* @param key 缓存的键。
* @return 缓存获取结果,包含缓存状态和值(如果存在)。
*/
@Override
@SuppressWarnings({"unchecked"})
protected CacheGetResult<V> do_GET(final K key) {
try {
final RBucket<byte[]> rb = this.client.getBucket(getCacheKey(key), getCodec());
final CacheValueHolder<V> holder = decoder(key, rb.get());
if (Objects.nonNull(holder)) {
final long now = System.currentTimeMillis(), expire = holder.getExpireTime();
if (expire > 0 && now >= expire) {
return CacheGetResult.EXPIRED_WITHOUT_MSG;
}
return new CacheGetResult<>(CacheResultCode.SUCCESS, null, holder);
}
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
} catch (Throwable e) {
logError("GET", key, e);
return new CacheGetResult<>(e);
}
}
/**
* 批量获取缓存项。
*
* @param keys 缓存的键集合。
* @return 缓存批量获取结果,包含每个键的获取状态和值(如果存在)。
*/
@Override
@SuppressWarnings({"unchecked"})
protected MultiGetResult<K, V> do_GET_ALL(final Set<? extends K> keys) {
try {
final Map<K, CacheGetResult<V>> retMap = new HashMap<>(1 << 4);
if (Objects.nonNull(keys) && !keys.isEmpty()) {
final Map<K, String> keyMap = new HashMap<>(keys.size());
keys.stream().filter(Objects::nonNull).forEach(k -> {
final String key = getCacheKey(k);
if (Objects.nonNull(key)) {
keyMap.put(k, key);
}
});
if (!keyMap.isEmpty()) {
final Map<String, byte[]> kvMap = this.client.getBuckets(getCodec()).get(keyMap.values().toArray(new String[0]));
final long now = System.currentTimeMillis();
for (K k : keys) {
final String key = keyMap.get(k);
if (Objects.nonNull(key) && Objects.nonNull(kvMap)) {
final CacheValueHolder<V> holder = decoder(k, kvMap.get(key));
if (Objects.nonNull(holder)) {
final long expire = holder.getExpireTime();
final CacheGetResult<V> ret = (expire > 0 && now >= expire) ? CacheGetResult.EXPIRED_WITHOUT_MSG :
new CacheGetResult<>(CacheResultCode.SUCCESS, null, holder);
retMap.put(k, ret);
continue;
}
}
retMap.put(k, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
}
}
}
return new MultiGetResult<>(CacheResultCode.SUCCESS, null, retMap);
} catch (Throwable e) {
logError("GET_ALL", "keys(" + (Objects.nonNull(keys) ? keys.size() : 0) + ")", e);
return new MultiGetResult<>(e);
}
}
/**
* 添加缓存项。
*
* @param key 缓存的键。
* @param value 缓存的值。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 缓存操作结果。
*/
@Override
protected CacheResult do_PUT(final K key, final V value, final long expireAfterWrite, final TimeUnit timeUnit) {
try {
final CacheValueHolder<V> holder = new CacheValueHolder<>(value, timeUnit.toMillis(expireAfterWrite));
this.client.getBucket(getCacheKey(key), getCodec()).set(encoder(holder), expireAfterWrite, timeUnit);
return CacheGetResult.SUCCESS_WITHOUT_MSG;
} catch (Throwable e) {
logError("PUT", key, e);
return new CacheResult(e);
}
}
/**
* 批量添加缓存项。
*
* @param map 键值对映射。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 过期时间的单位。
* @return 缓存操作结果。
*/
@Override
protected CacheResult do_PUT_ALL(final Map<? extends K, ? extends V> map, final long expireAfterWrite, final TimeUnit timeUnit) {
try {
if (Objects.nonNull(map) && !map.isEmpty()) {
final long expire = timeUnit.toMillis(expireAfterWrite);
final RBatch batch = this.client.createBatch();
map.forEach((k, v) -> {
final CacheValueHolder<V> holder = new CacheValueHolder<>(v, expire);
batch.getBucket(getCacheKey(k), getCodec()).setAsync(encoder(holder), expireAfterWrite, timeUnit);
});
batch.execute();
}
return CacheResult.SUCCESS_WITHOUT_MSG;
} catch (Throwable e) {
logError("PUT_ALL", "map(" + map.size() + ")", e);
return new CacheResult(e);
}
}
/**
* 删除缓存项。
*
* @param key 缓存的键。
* @return 缓存操作结果。
*/
@Override
protected CacheResult do_REMOVE(final K key) {
try {
final boolean ret = this.client.getBucket(getCacheKey(key), getCodec()).delete();
return ret ? CacheResult.SUCCESS_WITHOUT_MSG : CacheResult.FAIL_WITHOUT_MSG;
} catch (Throwable e) {
logError("REMOVE", key, e);
return new CacheResult(e);
}
}
/**
* 批量删除缓存项。
*
* @param keys 缓存的键集合。
* @return 缓存操作结果。
*/
@Override
protected CacheResult do_REMOVE_ALL(final Set<? extends K> keys) {
try {
if (Objects.nonNull(keys) && !keys.isEmpty()) {
final RBatch batch = this.client.createBatch();
keys.forEach(key -> batch.getBucket(getCacheKey(key), getCodec()).deleteAsync());
batch.execute();
}
return CacheResult.SUCCESS_WITHOUT_MSG;
} catch (Throwable e) {
logError("REMOVE_ALL", "keys(" + keys.size() + ")", e);
return new CacheResult(e);
}
}
/**
* 在缓存中添加一个键值对,如果该键不存在。此操作等同于 PUT 操作,但仅当指定的键不存在时才执行。
* 如果键已存在,则不进行任何操作,并返回表示操作失败的 CacheResult。
*
* @param key 缓存中的键,不可为 null。
* @param value 缓存中的值,不可为 null。
* @param expireAfterWrite 缓存项的过期时间。
* @param timeUnit 缓存项的过期时间单位。
* @return 返回一个 CacheResult 对象,表示操作的结果。成功且键不存在时返回 SUCCESS_WITHOUT_MSG,键已存在时返回 EXISTS_WITHOUT_MSG,操作失败时返回包含错误信息的 CacheResult。
*/
@Override
protected CacheResult do_PUT_IF_ABSENT(final K key, final V value, final long expireAfterWrite, final TimeUnit timeUnit) {
try {
// 将过期时间转换为毫秒,并创建 CacheValueHolder 对象
final Duration expire = Duration.ofMillis(timeUnit.toMillis(expireAfterWrite));
final CacheValueHolder<V> holder = new CacheValueHolder<>(value, expire.toMillis());
// 尝试在缓存中设置键值对,如果键不存在,则设置成功
final boolean success = this.client.getBucket(getCacheKey(key), getCodec()).setIfAbsent(encoder(holder), expire);
// 根据操作结果返回相应的 CacheResult
return success ? CacheResult.SUCCESS_WITHOUT_MSG : CacheResult.EXISTS_WITHOUT_MSG;
} catch (Throwable e) {
// 记录操作失败的日志
logError("PUT_IF_ABSENT", key, e);
// 返回包含错误信息的 CacheResult
return new CacheResult(e);
}
}
多级缓存
多级缓存是指内存缓存+远程缓存,操作时先操作内存缓存,然后再操作远程缓存。例如查询缓存数据时,先从内存缓存中获取数据,如果不存在才会从远程缓存中获取数据。增删改缓存数据时,会同时修改内存缓存和远程缓存的数据。
MultiLevelCache
MultiLevelCache类的属性private Cache[] caches的第一个Cache实例是内存缓存实例,第二个Cache实例是远程缓存实例。在缓存的增删改查操作时,会分别对caches中的Cache实例进行增删改查处理。源码如下:
/**
* 多级缓存类,继承自AbstractCache,支持多级缓存策略。
*
* @param <K> 键的类型
* @param <V> 值的类型
*/
public class MultiLevelCache<K, V> extends AbstractCache<K, V> {
private Cache[] caches; // 缓存数组
private MultiLevelCacheConfig<K, V> config; // 多级缓存配置
/**
* 已弃用的构造函数,使用可变参数初始化多级缓存。
*
* @param caches 缓存实例数组
* @throws CacheConfigException 配置异常
*/
@SuppressWarnings("unchecked")
@Deprecated
public MultiLevelCache(Cache... caches) throws CacheConfigException {
this.caches = caches;
checkCaches();
CacheConfig lastConfig = caches[caches.length - 1].config();
config = new MultiLevelCacheConfig<>();
config.setCaches(Arrays.asList(caches));
config.setExpireAfterWriteInMillis(lastConfig.getExpireAfterWriteInMillis());
config.setCacheNullValue(lastConfig.isCacheNullValue());
}
/**
* 使用多级缓存配置构造函数。
*
* @param cacheConfig 多级缓存配置
* @throws CacheConfigException 配置异常
*/
@SuppressWarnings("unchecked")
public MultiLevelCache(MultiLevelCacheConfig<K, V> cacheConfig) throws CacheConfigException {
this.config = cacheConfig;
this.caches = cacheConfig.getCaches().toArray(new Cache[]{});
checkCaches();
}
/**
* 检查缓存配置是否合法。
*/
private void checkCaches() {
if (caches == null || caches.length == 0) {
throw new IllegalArgumentException();
}
for (Cache c : caches) {
if (c.config().getLoader() != null) {
throw new CacheConfigException("Loader on sub cache is not allowed, set the loader into MultiLevelCache.");
}
}
}
/**
* 获取缓存数组。
*
* @return 缓存数组
*/
public Cache[] caches() {
return caches;
}
/**
* 获取缓存配置。
*
* @return 多级缓存配置
*/
@Override
public MultiLevelCacheConfig<K, V> config() {
return config;
}
/**
* 存储键值对到缓存中。
*
* @param key 键
* @param value 值
* @return 缓存操作结果
*/
@Override
public CacheResult PUT(K key, V value) {
if (config.isUseExpireOfSubCache()) {
return PUT(key, value, 0, null);
} else {
return PUT(key, value, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
}
/**
* 批量存储键值对到缓存中。
*
* @param map 键值对映射
* @return 缓存操作结果
*/
@Override
public CacheResult PUT_ALL(Map<? extends K, ? extends V> map) {
if (config.isUseExpireOfSubCache()) {
return PUT_ALL(map, 0, null);
} else {
return PUT_ALL(map, config().getExpireAfterWriteInMillis(), TimeUnit.MILLISECONDS);
}
}
/**
* 从缓存中获取值。
*
* @param key 键
* @return 值获取结果
*/
@Override
protected CacheGetResult<V> do_GET(K key) {
for (int i = 0; i < caches.length; i++) {
Cache cache = caches[i];
CacheGetResult result = cache.GET(key);
if (result.isSuccess()) {
CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
checkResultAndFillUpperCache(key, i, holder);
return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
}
}
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
}
/**
* 解包缓存值持有者。
*
* @param h 缓存值持有者
* @return 解包后的缓存值持有者
*/
private CacheValueHolder<V> unwrapHolder(CacheValueHolder<V> h) {
Objects.requireNonNull(h);
if (h.getValue() instanceof CacheValueHolder) {
return (CacheValueHolder<V>) h.getValue();
} else {
return h;
}
}
/**
* 检查获取结果,并填充上级缓存。
*
* @param key 键
* @param i 缓存索引
* @param h 缓存值持有者
*/
private void checkResultAndFillUpperCache(K key, int i, CacheValueHolder<V> h) {
Objects.requireNonNull(h);
long currentExpire = h.getExpireTime();
long now = System.currentTimeMillis();
if (now <= currentExpire) {
if(config.isUseExpireOfSubCache()){
PUT_caches(i, key, h.getValue(), 0, null);
} else {
long restTtl = currentExpire - now;
if (restTtl > 0) {
PUT_caches(i, key, h.getValue(), restTtl, TimeUnit.MILLISECONDS);
}
}
}
}
/**
* 批量获取缓存值。
*
* @param keys 键集合
* @return 缓存批量获取结果
*/
@Override
protected MultiGetResult<K, V> do_GET_ALL(Set<? extends K> keys) {
HashMap<K, CacheGetResult<V>> resultMap = new HashMap<>();
Set<K> restKeys = new HashSet<>(keys);
for (int i = 0; i < caches.length; i++) {
if (restKeys.size() == 0) {
break;
}
Cache<K, CacheValueHolder<V>> c = caches[i];
MultiGetResult<K, CacheValueHolder<V>> allResult = c.GET_ALL(restKeys);
if (allResult.isSuccess() && allResult.getValues() != null) {
for (Map.Entry<K, CacheGetResult<CacheValueHolder<V>>> en : allResult.getValues().entrySet()) {
K key = en.getKey();
CacheGetResult result = en.getValue();
if (result.isSuccess()) {
CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
checkResultAndFillUpperCache(key, i, holder);
resultMap.put(key, new CacheGetResult(CacheResultCode.SUCCESS, null, holder));
restKeys.remove(key);
}
}
}
}
for (K k : restKeys) {
resultMap.put(k, CacheGetResult.NOT_EXISTS_WITHOUT_MSG);
}
return new MultiGetResult<>(CacheResultCode.SUCCESS, null, resultMap);
}
/**
* 在指定缓存中存储键值对。
*
* @param key 键
* @param value 值
* @param expire 过期时间
* @param timeUnit 时间单位
* @return 缓存操作结果
*/
@Override
protected CacheResult do_PUT(K key, V value, long expire, TimeUnit timeUnit) {
return PUT_caches(caches.length, key, value, expire, timeUnit);
}
/**
* 在所有缓存中批量存储键值对。
*
* @param map 键值对映射
* @param expire 过期时间
* @param timeUnit 时间单位
* @return 缓存操作结果
*/
@Override
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expire, TimeUnit timeUnit) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache c : caches) {
CacheResult r;
if(timeUnit == null) {
r = c.PUT_ALL(map);
} else {
r = c.PUT_ALL(map, expire, timeUnit);
}
future = combine(future, r);
}
return new CacheResult(future);
}
/**
* 在指定缓存层级存储键值对。
*
* @param lastIndex 最后一个缓存索引
* @param key 键
* @param value 值
* @param expire 过期时间
* @param timeUnit 时间单位
* @return 缓存操作结果
*/
private CacheResult PUT_caches(int lastIndex, K key, V value, long expire, TimeUnit timeUnit) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (int i = 0; i < lastIndex; i++) {
Cache cache = caches[i];
CacheResult r;
if (timeUnit == null) {
r = cache.PUT(key, value);
} else {
r = cache.PUT(key, value, expire, timeUnit);
}
future = combine(future, r);
}
return new CacheResult(future);
}
/**
* 合并多个CompletableFuture结果。
*
* @param future 当前CompletableFuture
* @param result 新的缓存结果
* @return 合并后的CompletableFuture
*/
private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
return future.thenCombine(result.future(), (d1, d2) -> {
if (d1 == null) {
return d2;
}
if (d1.getResultCode() != d2.getResultCode()) {
return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
}
return d1;
});
}
/**
* 从缓存中移除指定键的条目。
*
* @param key 键
* @return 缓存操作结果
*/
@Override
protected CacheResult do_REMOVE(K key) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache cache : caches) {
CacheResult r = cache.REMOVE(key);
future = combine(future, r);
}
return new CacheResult(future);
}
/**
* 从缓存中移除指定键集合的条目。
*
* @param keys 键集合
* @return 缓存操作结果
*/
@Override
protected CacheResult do_REMOVE_ALL(Set<? extends K> keys) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (Cache cache : caches) {
CacheResult r = cache.REMOVE_ALL(keys);
future = combine(future, r);
}
return new CacheResult(future);
}