ThreadLocal使用案例
在并发编程中有时候需要让线程互相协作,而协作可以使用共享数据的方式来实现。针对共享数据的操作就需要锁机制来控制并发行为。锁虽好,但是毕竟会在一定程度上让线程之间互相阻塞。前辈们认为在线程需要互相协作的前提下,使用锁是最稳妥的方式。但是如果没有这个前提呢?两个线程没有关系,那当然不用做任何事情。但是如果在这个前提下,两个线程类需要使用同一个field上的数据来干自己的事儿,但是本质上不需要协作怎么办?这就变成被迫要进行共享,被迫进行加锁操作了。
ThreadLocal的出现就是为了解决这个问题,ThreadLocal可以做到每个线程都携带各自的信息,实例的值在各个线程互相不影响。这里我们写个demo看看ThreadLocal是怎么用的:
public class ThreadLocalTest {
//共享的 ThreadLocal类,里面包裹着线程访问的值
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
public void setThreadLocal(String value) {
threadLocal.set(value);
}
// 打印当前的ThreadLocal包裹的数据
public void getThreadLocal() {
System.out.println(threadLocal.get());
}
public static void main(String[] args) {
ThreadLocalTest test = new ThreadLocalTest();
new Thread(new Runnable() {
@Override
public void run() {
test.setThreadLocal("1");
test.getThreadLocal();
}
},"t1").start();
new Thread(new Runnable() {
@Override
public void run() {
test.setThreadLocal("2");
test.getThreadLocal();
}
},"t2").start();
}
}
对于两个线程而言,threadLocal是他们的共享数据,两个线程需要依赖threadLocal进行工作,但是两个线程之间不需要针对这个共享数据进行同步。对于线程t1而言,threadLocal里面的值会跟随这个线程的生命周期一直存在,而不会影响其他的所有线程。相当于对于t1线程而言,threadLocal内的数据是t1私有的。
ThreadLocal的工作原理
每个线程实例内部都维护了一个 ThreadLocalMap,它是一个 Map(key,value)数据格式,key 是一个弱引用,也就是 ThreadLocal 本身,而 value 存的是线程变量的值。也就是说 ThreadLocal 本身并不存储线程的变量值,它只是一个工具,用来维护线程内部的 Map,帮助存和取变量。
// thread
public class Thread{
...
ThreadLocal.ThreadLocalMap threadLocals = null;
...
}
// ThreadLocal.ThreadLocalMap
class ThreadLocal {
static class ThreadLocalMap {
private Entry[] table;
static class Entry extends WeakReference<ThreadLocal<?>> { ... }
}
}
ThreadLocal的存储模型如下:
栈上的线程实例引用指向堆上的线程实例数据单元,线程实例内部引用ThreadLocalMap,ThreadLocalMap提供了一种用键值对方式存储每一个线程的变量副本的方法,key为当前ThreadLocal对象,value则是对应线程的变量副本。
其中虚线是弱引用的意思。
ThreadLocal源码解析
ThreadLocal定义了四个方法:
- get():返回此线程局部变量的当前线程副本中的值。
- initialValue():返回此线程局部变量的当前线程的“初始值”。
- remove():移除此线程局部变量当前线程的值。
- set(T value):将此线程局部变量的当前线程副本中的值设置为指定值。
除了这四个方法,ThreadLocal内部还有一个静态内部类ThreadLocalMap,该内部类才是实现线程隔离机制的关键,get()、set()、remove()都是基于该内部类操作。
set方法
set方法可以设置当前线程的线程局部变量的值。
public void set(T value) {
// 得到当前的线程
Thread t = Thread.currentThread();
// 根据当前的线程,得到ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// 如果得到的ThreadLocalMap不为空,那就生成一个数据对,存入ThreadLocalMap
map.set(this, value);
else
// 如果得到的ThreadLocalMap为空,那就初始化,并生成一个数据对,存入ThreadLocalMap
createMap(t, value);
}
//直接从线程内部取到threadLocals
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
get方法
get方法可以返回当前线程所对应的线程变量
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的成员变量 threadLocal
ThreadLocalMap map = getMap(t);
if (map != null) {
// 从当前线程的ThreadLocalMap获取相对应的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
// 获取目标值
T result = (T)e.value;
return result;
}
}
//初始化一个entry, 默认返回null,除非initialValue方法被子类覆盖
return setInitialValue();
}
//直接从线程内部取到threadLocals
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
// 该方法定义为protected级别且返回为null,很明显是要子类实现它的,所以我们在使用ThreadLocal的时候一般都应该覆盖该方法。
// 该方法不能显示调用,只有在第一次调用get()或者set()方法时才会被执行,并且仅执行1次。
private T setInitialValue() {
//默认返回null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
//可被子类覆盖,初始化赋值为用户指定值
protected T initialValue() {
return null;
}
remove方法
将当前线程局部变量的值删除。
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
个线程结束后,它所对应的局部变量就会被垃圾回收。在线程的生命周期很短的时候自然不需要主动删除数据。但是如果这个线程的生命周期相当长,为了避免长时间无法垃圾回收,需要手动删除,减少内存的占用。
ThreadLocalMap源码解析
ThreadLocal提供出来的常用核心方法是比较少的,你会发现,大部分逻辑是围绕着ThreadLocalMap进行操作,所以深扒一下ThreadLocalMap还是蛮有必要的。ThreadLocalMap 和HashMap的功能类似,但是实现上却有很大的不同:
- HashMap 的数据结构是数组+链表,但是ThreadLocalMap的数据结构仅仅是数组
- HashMap 是通过链地址法解决hash 冲突的问题,而ThreadLocalMap 是通过开放地址法来解决hash 冲突的问题
- HashMap 里面的Entry 内部类的引用都是强引用,ThreadLocalMap里面的Entry 内部类中的key 是弱引用,value 是强引用
这里 ThreadLocalMap 采用开放地址法原因需要解释一下,ThreadLocal 往往存放的数据量不会特别大(而且key 是弱引用又会被垃圾回收,及时让数据量更小),这个时候开放地址法简单的结构会显得更省空间,同时数组的查询效率也是非常高,除此之外 ThreadLocal 中有一个属性 HASH_INCREMENT = 0x61c88647 ,0x61c88647 是一个神奇的数字,让哈希码能均匀的分布在2的N次方的数组里,因此冲突概率也低,开放地址策略基本可以满足要求。
ThreadLocalMap.Entry
我们先把Entry的源码拿出来:
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
key是ThreadLocal的实例,同时ThreadLocal被包裹为弱引用,其value值就是ThreadLocal对应的数据。
ThreadLocalMap的构造器如下:
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//table是一个数组,INITIAL_CAPACITY = 16
table = new Entry[INITIAL_CAPACITY];
// 先获得到ThreadLocal实例对应的hash值,这里的位运算等价于取模操作,计算出需要存放的位置
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 创建一个entry,并将它放入计算出来的位置上
table[i] = new Entry(firstKey, firstValue);
// size表达当前的table内的entry数量
size = 1;
// 设置扩容阈值
setThreshold(INITIAL_CAPACITY);
}
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
这里的firstKey.threadLocalHashCode
是比较有趣的,我们把它相关量的代码拿出来:
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
nextHashCode表示分配下一个ThreadLocal实例的threadLocalHashCode的值,整体逻辑还是比较清晰的,使用原子类实现并发地递增hash值,增加的值是0x61c88647
,实际上这个值是斐波那契散列乘数,它的优点是通过它散列出来的结果分布会比较均匀,可以很大程度上避免hash冲突
set方法
private void set(ThreadLocal<?> key, Object value) {
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
// 根据 ThreadLocal 的散列值,查找对应元素在数组中的位置
int i = key.threadLocalHashCode & (len-1);
// 采用"线性探测法",寻找合适位置
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// key 存在,直接覆盖
if (k == key) {
e.value = value;
return;
}
// key == null,但是存在值(因为此处的e != null),说明之前的ThreadLocal对象已经被回收了
if (k == null) {
//用新元素替换陈旧的元素,这个方法进行了不少的垃圾清理动作,防止内存泄漏
replaceStaleEntry(key, value, i);
return;
}
}
// ThreadLocal对应的key实例不存在也没有陈旧元素,new 一个
tab[i] = new ThreadLocal.ThreadLocalMap.Entry(key, value);
int sz = ++size;
// cleanSomeSlots 清楚陈旧的Entry(key == null)
// 如果没有清理陈旧的 Entry 并且数组中的元素大于了阈值,则进行 rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
set()操作除了存储元素外,还有一个很重要的作用,就是replaceStaleEntry()和cleanSomeSlots(),这两个方法可以清除掉key == null 的实例,防止内存泄漏。
先说一下replaceStaleEntry方法,这个方法并非简单地使用新entry替换过期entry,而是从过期entry所在的slot(staleSlot)向前、向后查找过期entry,并通过slotToExpunge 标记过期entry最早的index,最后使用cleanSomeSlots() 方法从slotToExpunge开始清理过期entry
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
//表示开始探测式清理过期数据的开始下标,默认从当前的staleSlot开始
int slotToExpunge = staleSlot;
//从staleSlot的前一个位置开始,向前查找过期entry并更新slotToExpunge,直到遇到空slot
for (int i = prevIndex(staleSlot, len);(e = tab[i]) != null;i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 从staleSlot的后一个位置开始,向后查找,直到遇到空slot
for (int i = nextIndex(staleSlot, len);(e = tab[i]) != null;i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 由于开放定址法,可能相同的key存放于预期的位置(staleSlot)之后
// 如果遇到相同的key,则更新value,并交换索引staleSlot与索引i的entry
// 交换的原因:让有效的entry占据预期的位置(staleSlot),避免重复key的情况
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// slotToExpunge == staleSlot,说明索引staleSlot处前一个entry为null
// 未找到过期entry,更新slotToExpunge为i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 从slotToExpunge开始,清理一些过期entry
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 向后查找,未找到过期entry,更新slotToExpunge为当前index
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 直到遇到空slot也未发现相同的key,则在staleSlot的位置新建一个entry
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 存在过期entry,需要进行清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
/**
* 获取环形数组的前一个索引
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
接下来再来看看 expungeStaleEntry方法,这个方法目的是清除当前过期entry到下一个空slot之间所有过期entry,并将有效entry通过hash & len-1重新计算索引位置,可能会遇到slot被占用的情况(开放地址法移位导致),需要向后遍历,找到空的slot放置,返回空slot的index
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 清理过期的entry
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// 对后续entry进行rehash,直到遇到空slot
Entry e;
int i;
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) { // 过期entry,继续清理
e.value = null;
tab[i] = null;
size--;
} else { // 有效entry,rehash到合适的位置(补齐空slot)
int h = k.threadLocalHashCode & (len - 1);
// 期望的index与当前index不相等,说明是开放地址法移位导致的,需要将其放到最近的有效entry之后
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i; // 返回空slot的index
最后再看看cleanSomeSlots方法,方法逻辑是通过循环扫描,尽可能多的清理ThreadLocalMap中的过期entry
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) { // 遇到过期entry,需要重置n
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0); //无符号右移动一位,可以简单理解为除以2
return removed;
}
get方法
// ThreadLocal
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
//
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
由于采用了开放定址法,所以当前key的散列值和元素在数组的索引并不是完全对应的,首先取一个探测数(key的散列值),如果所对应的key就是我们所要找的元素,则返回,否则调用getEntryAfterMiss()。
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
这里有一个重要的地方,当key == null时,调用了expungeStaleEntry()方法,该方法用于处理key == null,有利于GC回收,能够有效地避免内存泄漏。
ThreadLocal内存泄露问题
内存泄漏指的是,当某一个对象不再有用的时候,占用的内存却不能被回收,这就叫作内存泄漏。
因为通常情况下,如果一个对象不再有用,那么我们的垃圾回收器 GC,就应该把这部分内存给清理掉。这样的话,就可以让这部分内存后续重新分配到其他的地方去使用;否则,如果对象没有用,但一直不能被回收,这样的垃圾对象如果积累的越来越多,则会导致我们可用的内存越来越少,最后发生内存不够用的 OOM 错误。我们研究内存泄漏的前提是,这个线程的生命周期十分长。在这个前提下,参考前面描述的ThreadLocal的存储模型:
我们可能会在业务代码中执行了 ThreadLocal instance = null 操作,想清理掉这个 ThreadLocal 实例,对照图的样子,会变成
你会发现,ThreadLocal还是一个被引用的状态。GC 在垃圾回收的时候会进行可达性分析,它会发现这个 ThreadLocal 对象依然是可达的,所以对于这个 ThreadLocal 对象不会进行垃圾回收,这样的话就造成了内存泄漏的情况。
JDK 开发者考虑到了这一点,所以 ThreadLocalMap 中的 Entry 继承了 WeakReference 弱引用。弱引用的特点是,如果这个对象只被弱引用关联,而没有任何强引用关联,那么这个对象就可以被回收。因此,这个弱引用的机制就避免了 ThreadLocal 的内存泄露问题。
但是于此同时ThreadLocal作为key被回收的之后,对应的value数据还是被强引用联系者,无法被GC回收,这样一来时间一久,就会发生内存泄露。那么JDK的解决方式是在下一次 ThreadLocalMap 调用 set、get、remove 的时候,主动扫描出key是null的entry,然后删除对应的Value。
但是还有一种情况,假设 ThreadLocal 已经不被使用了,那么实际上 set、remove、rehash 方法也不会被调用,与此同时,如果这个线程又一直存活、不终止的话,那么刚才的那个调用链就一直存在,也就导致了 value 的内存泄漏。所以最稳的方式是由开发者主动调用ThreadLocal 的 remove 方法进行主动删除。
expungeStaleEntry() 方法是帮助垃圾回收的,根据源码,我们可以发现 get 和set 方法都可能触发清理方法expungeStaleEntry(),所以正常情况下是不会有内存溢出的 但是如果我们没有调用get 和set 的时候就会可能面临着内存溢出,养成好习惯不再使用的时候调用remove(),加快垃圾回收,避免内存溢出
退一步说,就算我们没有调用get 和set 和remove 方法,线程结束的时候,也就没有强引用再指向ThreadLocal 中的ThreadLocalMap了,这样ThreadLocalMap 和里面的元素也会被回收掉,但是有一种危险是,如果线程是线程池的, 在线程执行完代码的时候并没有结束,只是归还给线程池,这个时候ThreadLocalMap 和里面的元素是不会回收掉的
总结
ThreadLocal存储在线程对象中,在设计上可以让两个线程互不影响地操作变量。其底层使用map来存储,key是弱引用会在GC的时候被回收。map使用开放地址的策略解决冲突问题,当使用量是底层数组长度一半的时候引发扩容,变为原先容量的两倍。