Guava LocalCache源码分析:LocalCache的get、put、expand
- 前言
- 一、get
- 二、put
- 三、expand
前言
上篇文章,详细描写了Guava LocalCache怎样如ConcurrentHashMap对缓存数据进行了分段存储。本章主要针对LocalCache重要的几个接口进行说明。
一、get
@CanIgnoreReturnValue
@Override
@CheckForNull
public V get(@CheckForNull Object key) {
if (key == null) {
return null;
}
int hash = hash(key);
return segmentFor(hash).get(key, hash);
}
@CanIgnoreReturnValue
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
return segmentFor(hash).get(key, hash, loader);
}
如上代码,LocalCache的get方法首先根据key计算出hash值,并根据hash值找到对应的segment,再调用segment的get方法获取最终结果。
以传入loader参数的get方法为例,看一下segment如何获取值的。
@CanIgnoreReturnValue
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) {
//不要调用getLiveEntry,这将忽略正在加载的值
//根据key和hash获取值
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
//获取当前时间
long now = map.ticker.read();
//判断该值是否过期
V value = getLiveValue(e, now);
//如果未过期
if (value != null) {
//记录读取时间
recordRead(e, now);
//累计命中+1,这里Guava似乎认为当多条线程在更新统计数据时,
//而不是细粒度同步控制的情况下,LongAdder比AtomicLong更好用。
statsCounter.recordHits(1);
//检查是否需要刷新,如果设置了刷新时长且过了刷新时长,则刷新,否则返回该值
return scheduleRefresh(e, key, hash, value, now, loader);
}
//值已经过期
ValueReference<K, V> valueReference = e.getValueReference();
//如果增在加载
if (valueReference.isLoading()) {
//等待并返回加载后的值
return waitForLoadingValue(e, key, valueReference);
}
}
}
//segment中为空或者未获取到值
//加锁,尝试从加载中的值中获取,若获取不到则调用load方法。
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
//累计到一定读取次数后,清理超时缓存
postReadCleanup();
}
}
相关调用逻辑如图所示:
二、put
public V put(K key, V value) {
checkNotNull(key);
checkNotNull(value);
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}
同样,LocalCache的put方法也是首先根据key计算出hash值,并根据hash值找到对应的segment,再调用segment的put方法。
V put(K key, int hash, V value, boolean onlyIfAbsent) {
//直接加锁
lock();
try {
long now = map.ticker.read();
//清理过期缓存
preWriteCleanup(now);
//数量+1
int newCount = this.count + 1;
if (newCount > this.threshold) {
//扩容
expand();
//因为扩容后的segment内的缓存数量可能会变化,所以重新计算
newCount = this.count + 1;
}
//找到要插入的位置,并获取该位置的头节点
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//寻找该key是否存在
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
//判断key是否相等
&& map.keyEquivalence.equivalent(key, entryKey)) {
//意味着map中存在该key
//获取对应的值
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
//如果值是空的
if (entryValue == null) {
++modCount;
//判断该值是否在等待删除中
if (valueReference.isActive()) {
//将旧值移放入移除通知队列中,主要是Guava Cache有移除回调机制,故不能直接移除,队列方便用于回调通知。
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
//加入缓存
setValue(e, key, value, now);
//因为一删一增,所以,数量不变
newCount = this.count; // count remains unchanged
} else {
//加入缓存
setValue(e, key, value, now);
//这里不知道为啥又算了一遍,可能是再setValue中存在某些机制导致count发生了变化?
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
//移除旧值
evictEntries(e);
return null;
} else if (onlyIfAbsent) {
//onlyIfAbsent为true,如果存在于map中,仅更新访问时间
recordLockedRead(e, now);
return entryValue;
} else {
//删除现有缓存,计数保持不变
++modCount;
enqueueNotification(
key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
//map中不存在,则插入
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries(newEntry);
return null;
} finally {
//解锁
unlock();
//清除过期缓存
postWriteCleanup();
}
}
可见,整个put过程是用了锁保证执行的线程安全。
三、expand
LocalCache的扩容也是对段进行的扩容,当段的大小超过阈值时,便会出发扩容(详细见上面的put函数),段的阈值为段大小的3/4,而每次扩容段的大小会变为原来的2倍。
代码如下:
@GuardedBy("this")
void expand() {
AtomicReferenceArray<ReferenceEntry<K, V>> oldTable = table;
int oldCapacity = oldTable.length();
//如果容量超过最大容量,直接返回
if (oldCapacity >= MAXIMUM_CAPACITY) {
return;
}
int newCount = count;
//新段的大小是旧段的两倍
AtomicReferenceArray<ReferenceEntry<K, V>> newTable = newEntryArray(oldCapacity << 1);
//阈值为新段大小的3/4
threshold = newTable.length() * 3 / 4;
//因为段扩容,所以要重新计算哈希映射
int newMask = newTable.length() - 1;
for (int oldIndex = 0; oldIndex < oldCapacity; ++oldIndex) {
//我们需要保证对旧map的任何现有读取都可以继续进行。所以我们还不能清空旧段。
ReferenceEntry<K, V> head = oldTable.get(oldIndex);
if (head != null) {
ReferenceEntry<K, V> next = head.getNext();
int headIndex = head.getHash() & newMask;
// Single node on list
if (next == null) {
//对于单个的节点的,直接插入新段中
newTable.set(headIndex, head);
} else {
//重复使用列表末尾具有相同目标索引的连续节点序列。tail指向可重用序列中的第一个节点。
ReferenceEntry<K, V> tail = head;
int tailIndex = headIndex;
for (ReferenceEntry<K, V> e = next; e != null; e = e.getNext()) {
int newIndex = e.getHash() & newMask;
if (newIndex != tailIndex) {
tailIndex = newIndex;
tail = e;
}
}
//将可重用序列中的第一个节点放入新映射位置
newTable.set(tailIndex, tail);
//将头尾之间的节点重新映射到新段中
for (ReferenceEntry<K, V> e = head; e != tail; e = e.getNext()) {
int newIndex = e.getHash() & newMask;
ReferenceEntry<K, V> newNext = newTable.get(newIndex);
//拷贝当前节点作为新的头节点,并将映射位置的头节点链接到当前节点e的next。
ReferenceEntry<K, V> newFirst = copyEntry(e, newNext);
if (newFirst != null) {
//如果拷贝的新头节点不为null,则set到新段中
newTable.set(newIndex, newFirst);
} else {
//否则,删除e
removeCollectedEntry(e);
newCount--;
}
}
}
}
}
table = newTable;
this.count = newCount;
}
其中,对段节点的重映射逻辑如图所示:
这里需要留意的是Cache在重映射时,是将后续节点作为头节点插入到冲突位中,即首插入。故,新表映射的链路顺序与旧表会有比较大的区别。