Java - ThreadLocal数据存储和传递方式的演变之路
- 前言
- 一. InheritableThreadLocal - 父子线程数据传递
- 1.1 父子线程知识预热和 InheritableThreadLocal 实现原理
- 1.2 InheritableThreadLocal 的诟病
- 二. TransmittableThreadLocal (TTL) 横空出世
- 2.1 跨线程变量传递测试案例
- 2.2 TTL的基本原理
- 2.2.1 静态属性 holder 和 threadLocalHolder
- 2.2.2 静态内部类 Transmitter
- 2.3 总结
前言
我在 Java - ThreadLocal原理 这篇文章里面主要介绍了ThreadLocal
的使用、基本原理。不过有一点需要在这里做个特殊提醒:
ThreadLocal
仅仅用于单线程内的上下文数据传递。多线程情况下数据隔离。
但是现实往往并没有那么简单,我们开发过程中,往往都有多线程的场景。有时候更需要一些变量在多线程中传递。那ThreadLocal
显然并不满足这样的场景,那么我们来看看它的一个 “演变之路”。
一. InheritableThreadLocal - 父子线程数据传递
我们先用ThreadLocal
来演示一个小Demo
,我们先准备一个实体类User
:
@Data
@ToString
public class User {
private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
private String name;
private Integer age;
public static void setAge(Integer age) {
THREAD_LOCAL.set(age);
System.out.println("当前线程: " + Thread.currentThread().getName() + ", 赋值: " + age);
}
public static Integer getAge() {
Integer res = Optional.ofNullable(THREAD_LOCAL.get()).orElse(0);
System.out.println("当前线程: " + Thread.currentThread().getName() + ", 获取到的值: " + res);
return res;
}
}
然后编写测试用例:
public static void main(String[] args) throws InterruptedException {
User.setAge(100);
// 异步获取
Thread t = new Thread(() -> User.getAge());
t.start();
t.join();
// 同步获取
User.getAge();
System.out.println("**************Finish****************");
}
执行结果如下:
我们得出以下结论:
- 我们在主线程中往
ThreadLocal
塞的值,只有主线程才能看到,子线程看不到。 - 父子线程之间,使用
ThreadLocal
无法进行数据传递。线程隔离。
1.1 父子线程知识预热和 InheritableThreadLocal 实现原理
在上面的案例中,我们在主线程中new
了一个子线程。结合结果图文来看,也就是Thread-0
是main
的一个子线程。为啥呢?我们来看下一下线程的初始化动作:
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
↓↓↓↓↓↓↓↓ 最终都会走到下面的代码 ↓↓↓↓↓↓↓↓
// 这里有个参数:inheritThreadLocals,代表是否继承线程的本地变量们(默认是true)
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 当前线程作为父线程
Thread parent = currentThread();
// 如果需要继承变量,并且父线程中的变量非空,即拷贝一份变量(浅拷贝)
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
// 赋值一个线程ID(自增)
tid = nextThreadID();
}
- 创建线程的时候,当前线程总是会作为父线程。
- 若父线程绑定了变量并且允许继承,那么就会把变量拷贝到子线程里面。(浅拷贝,若需要深拷贝则需要重写
childValue()
函数)
我们再来看下InheritableThreadLocal
的源码:
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
// 重写childValue函数,返回父线程绑定的变量引用。也是浅拷贝
protected T childValue(T parentValue) {
return parentValue;
}
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
// 如果父线程当中绑定的变量不为null,就可以在子线程中创建一份拷贝
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
针对这一点,我们如果将上述案例Demo中的这行代码修改为:
private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
↓↓↓↓↓↓↓↓
private static final ThreadLocal<Integer> THREAD_LOCAL = new InheritableThreadLocal<>();
再次运行程序可得:
我们Debug下走下流程:
- 主线程赋值,调用到了
ThreadLocal
的set
函数。由于我们使用的是InheritableThreadLocal
,重写了getMap
,返回的是主线程的inheritableThreadLocals
引用(默认为null
)。
- 发现为
null
,就会调用createMap
函数,同样,InheritableThreadLocal
重写了它:
因此这一步执行完毕之后,主线程的inheritableThreadLocals
属性是有值的。 - 然后创建子线程的时候(调用
new Thread
构造函数):
发现主线程已经绑定了相关变量,因此会将该变量传递给子线程,同样赋值于inheritableThreadLocals
变量上。 - 后续以此类推,我们只关心
inheritableThreadLocals
变量上是否有值。从而实现父子线程的变量传递。
1.2 InheritableThreadLocal 的诟病
InheritableThreadLocal
虽然能解决父子之间的变量传递问题。但是大家从源码的角度来看,传递变量的关键步骤,它实现于线程的创建过程。那么如果说我有个单例线程池,复用同一个线程,会有什么问题?
private static final ExecutorService THREAD_POOL = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws InterruptedException {
User.setAge(100);
THREAD_POOL.execute(() -> User.getAge());
TimeUnit.SECONDS.sleep(2);
User.setAge(200);
THREAD_POOL.execute(() -> User.getAge());
TimeUnit.SECONDS.sleep(2);
// 同步获取
User.getAge();
System.out.println("**************Finish****************");
}
执行结果如下:
如果我们
结果虽意料之内但是又是给人惊喜:
- 虽然父子线程间的变量传递成功了,但是当值发生变更的时候。子线程拿到的值依旧是老的。
- 结合上面的源码来看,也就是说通过
InheritableThreadLocal
拿到的变量,永远是第一次创建子线程时,父线程中存储的变量值。后续不再更改。 - 因为我们本案例中使用的是单例线程池,线程对象只会
new
一次。
总结就是:
InheritableThreadLocal
中拷贝的数据始终来自于第一个提交任务的父线程,这样非常容易造成线程本地变量混乱。 由于是浅拷贝,一旦传递链路上变量值被改变,那么整个链路线程上的所有变量都会随之更改。- 还有个很重要的点就是,
InheritableThreadLocal
只支持父子线程间的数据传递,而不支持跨线程之间的数据传递!
二. TransmittableThreadLocal (TTL) 横空出世
TransmittableThreadLocal
简称TTL
,是阿里写的一个专门用于解决InheritableThreadLocal
诟病的一个功能类。可以在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。
针对上述Demo
(单例线程池),我们做出如下更改:添加Pom
依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.4</version>
</dependency>
InheritableThreadLocal
替换成TTL
:
private static final ThreadLocal<Integer> THREAD_LOCAL = new InheritableThreadLocal<>();
↓↓↓↓↓↓↓↓
private static final ThreadLocal<Integer> THREAD_LOCAL = new TransmittableThreadLocal<>();
以及线程池做一下装饰:
private static final ExecutorService THREAD_POOL = Executors.newSingleThreadExecutor();
↓↓↓↓↓↓↓↓
private static final ExecutorService THREAD_POOL = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());
执行结果如下:
从这里可以发现,TTL
解决了什么问题?
JDK
自带的的InheritableThreadLocal
虽然能在父子之间进行变量传递,但是这个变量只有在创建子线程的时候才会被创建并传递。并不会感应其更新。
我们再看另外一个案例
public static void main(String[] args) throws InterruptedException {
User.setAge(100);
User.getAge();
THREAD_POOL.execute(() -> {
User.getAge();
User.setAge(200);
User.getAge();
});
TimeUnit.SECONDS.sleep(2);
THREAD_POOL.execute(() -> {
User.getAge();
User.setAge(300);
User.getAge();
});
TimeUnit.SECONDS.sleep(2);
User.setAge(666);
THREAD_POOL.execute(() -> {
User.getAge();
User.setAge(400);
User.getAge();
});
TimeUnit.SECONDS.sleep(2);
// 同步获取
User.getAge();
System.out.println("**************Finish****************");
}
运行结果如下:
从这个结果可以得出以下结论:
- 子线程拿到的
ThreadLocal
变量总是最新的。(依据的是主线程的变量)主线程在修改变量值为666
之后,子线程pool-1-thread-3
获取到的值是最新的666
。 - 子线程之间的变量赋值操作,互相不影响,子线程哪怕更改了
ThreadLocal
变量,在执行结束之后,会恢复备份,即原值。 所以上图中pool-1-thread-2
线程,第一次拿到的值依旧是100
。
其实上面这个案例还并不是很明显,因为我们的主线程始终只有一个。我们来看下一个彻彻底底的跨线程案例。来个高并发看看。
2.1 跨线程变量传递测试案例
需求背景:
- 我们准备一个变量,这里我们准备用
AtomicInteger
。方便修改值。 - 准备一个线程池
A
:用于做业务处理。 - 准备一个线程池
B
:用于模拟HTTP
请求层面的高并发。这样每个HTTP
请求都有属于自己的主线程,也就是有个主变量。我们要验证的就是HTTP
层面的并发对变量的影响。 - 业务处理要做的事情:将
AtomicInteger
类型的index
变量,累加至10。
我们先来看一下InheritableThreadLocal
版本的:
public class TestTTL {
// 定义一个线程池执行ttl,这里必须要用TTL线程池封装
private static ExecutorService TTL_EXECUTOR = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
// 定义另外一个线程池循环执行,模拟业务场景下多Http请求调用的情况
private static ExecutorService HTTP_EXECUTOR = Executors.newFixedThreadPool(5);
private static AtomicInteger INDEX = new AtomicInteger(0);
// TTL的ThreadLocal
private static ThreadLocal THREAD_LOCAL = new TransmittableThreadLocal<>(); //这里采用TTL的实现
public static void main(String[] args) {
while (true) {
/**
* 模拟HTTP请求的高并发
*/
HTTP_EXECUTOR.execute(() -> {
// 累加到10我们就停止
if (INDEX.get() < 10) {
THREAD_LOCAL.set(INDEX.getAndAdd(1));
TTL_EXECUTOR.execute(() -> {
System.out.println(String.format("子线程名称-%s, 变量值=%s", Thread.currentThread().getName(), THREAD_LOCAL.get()));
});
}
});
}
}
}
运行结果如下:
很直观的是,我们打印了10条记录,这一块是由AtomicInteger
控制的,但是我们存入到InheritableThreadLocal
中的变量,却没有累加到10,说明啥,无法跨线程传递变量。别急,我们再看下TTL
版本的:
public class TestTTL {
// 定义一个线程池执行ttl,这里必须要用TTL线程池封装
private static ExecutorService TTL_EXECUTOR = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(5));
// 定义另外一个线程池循环执行,模拟业务场景下多Http请求调用的情况
private static ExecutorService HTTP_EXECUTOR = Executors.newFixedThreadPool(5);
private static AtomicInteger INDEX = new AtomicInteger(0);
// TTL的ThreadLocal
private static ThreadLocal THREAD_LOCAL = new TransmittableThreadLocal<>(); //这里采用TTL的实现
public static void main(String[] args) {
while (true) {
/**
* 模拟HTTP请求的高并发
*/
HTTP_EXECUTOR.execute(() -> {
// 累加到10我们就停止
if (INDEX.get() < 10) {
THREAD_LOCAL.set(INDEX.getAndAdd(1));
TTL_EXECUTOR.execute(() -> {
System.out.println(String.format("子线程名称-%s, 变量值=%s", Thread.currentThread().getName(), THREAD_LOCAL.get()));
});
}
});
}
}
}
结果如下:
这个结果和上面的结果一对比,就可以发现,使用TTL
进行封装,在跨线程的变量传递下,它是生效的。
2.2 TTL的基本原理
首先,TTL
是在InheritableThreadLocal
的基础上开发的,也就是说TTL
继承于InheritableThreadLocal
类。我们来看下官网给出的一个执行时序图:
在讲这个流程之前,我们先来看下TTL中几个比较重要的成员类和属性。
2.2.1 静态属性 holder 和 threadLocalHolder
TTL
中有个静态属性holder
,它的特点和作用有以下几点:
- 存储所有使用了
TTL
的引用类。用于复制值的时候,可以通过这个holder
去遍历到所有的TTL
。 key
就是当前TTL
。value
则固定是null
。虽然使用了WeakHashMap
作为存储,但是它的使用方式被用来当做Set
集合。
我们来看下这个静态字段:
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
我们再来看下,TTL
在get/se
t的时候做了啥,先看set
函数:
@Override
public final void set(T value) {
// disableIgnoreNullValueSemantics 控制序列化TTL对象的时候,是否忽略空值
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
我们先说下disableIgnoreNullValueSemantics
这个参数在啥场景下需要用到:当我们在线程之间传递变量,在赋值的时候,如果变量值为null
,那么在新的线程中,这个变量将不会存在,因此有些时候我们需要将这个属性值设置为true
,保证变量值在传递过程中不会丢失,哪怕其值为null
。
根据代码来看,也就是默认情况下,如果变量传递过程中值为null
,就会将它剔除。反之,如果正常的存储流程是怎样的呢?
super.set(value);
addThisToHolder();
先调用了父类的set
函数,在InheritableThreadLocal
的基础上,在调用了addThisToHolder
函数。就是将当前TTL
引用存储到这个静态变量holder
中罢了。
@SuppressWarnings("unchecked")
private void addThisToHolder() {
if (!holder.get().containsKey(this)) {
holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
}
}
那总结下就是:
TTL
有一个静态属性holder
,用来存储所有的TTL
引用。- 在
set
赋值变量的时候,会触发将当前TTL
存储进去。
我们再来看下另外一个字段threadLocalHolder
:
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
它的作用是对于在项目中使用了ThreadLocal
,但是却无法替换为TTL
,那么这个时候就可以使用Transmitter
提供的注册方法,将项目中的threadLocal
注册到threadLocalHolder
中。在生成快照的时候,也会对这部分变量进行处理。案例代码:
// 注册
Transmitter.registerThreadLocalWithShadowCopier(threadLocal);
// 注销
Transmitter.unregisterThreadLocal(threadLocal);
2.2.2 静态内部类 Transmitter
TTL
中有一个静态核心内部类Transmitter
,它主要作用于线程切换的时候,其功能如下:
ThreadLocal
变量的快照保存:capture
。- 重放:
replay
。 - 快照恢复:
restore
。
我们先看下快照的创建(主线程执行):
public static class Transmitter {
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 循环遍历当前holder记录过的所有TTL引用,将TTL取出来并保存到Map里面
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 循环遍历注册过的ThreadLocal。(普通的)
private static WeakHashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final WeakHashMap<ThreadLocal<Object>, Object> threadLocal2Value = new WeakHashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
}
再来看下快照的重放(子线程执行):
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
跟快照创建很像,我们这里分析ttl2Value
(TTL
引用),就不再分析threadLocal2Value
(原生ThreadLocal
的注册),我们看下replayTtlValues
函数:
// 入参 captured 是从其他线程(TTL)中捕获到的变量
private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) {
// 创建一个备份Map
WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
// 遍历当前所有的TTL引用
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 将当前子线程的数据进行备份
backup.put(threadLocal, threadLocal.get());
// 如果快照中不存在当前TTL实例则要删除,因为有些TTL引用可能是在调用capture生成快照之后才创建的。
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 将快照值赋值到当前线程中
setTtlValuesTo(captured);
// 执行execute之前的一些逻辑
doExecuteCallback(true);
return backup;
}
最后来看下快照的恢复操作:
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
同样,我们关注ttlValue
:
private static void restoreTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> backup) {
doExecuteCallback(false);
// 遍历所有的TTL
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// 如果备份中不存在当前TTL实例则要删除,因为有些TTL引用可能是在调用capture生成快照之后才创建的。
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 重新将备份的值设置一下
setTtlValuesTo(backup);
}
一般我们会将自定义的线程池用TtlExecutors
进行封装,这样里面的任务就会被TtlRunnable
封装。那么我们看下TtlRunnable
类:
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
// 创建任务时调用的构造函数
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
// 调用capture函数,即创建一个快照。
this.capturedRef = new AtomicReference<Object>(capture());
// 将原生的业务代码Jobs进行保存。
this.runnable = runnable;
//
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
// 子线程开始执行,先取得快找数据。
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
// 将快照中的数据设置到当前线程中(因为这些数据来自于其他线程,当前线程还没有赋值呢),同时创建一个备份数据backup
Object backup = replay(captured);
try {
// 执行真正的业务逻辑
runnable.run();
} finally {
// 最后恢复备份快照
restore(backup);
}
}
}
2.3 总结
总结下TTL
的原理大概如下:
- 首先,我们使用的线程池,需要通过
TtlExecutors
进行封装,本质上和单独的任务被TtlRunnable
封装是一致的。只不过前者更方便了。 TtlRunnable
封装过的任务,也就是存储了当前TTL
快照的一个引用。在创建的时候就会捕捉当前父线程中的TTL
以及一些注册过的ThreadLocal
(就是用于兼容老版本的ThreadLocal,让他们有跨线程传递的特性)TTL
中有一个静态属性holder
,存储了所有的TTL
引用。存储的时机在我们set
的时候发生。- 子线程开始执行的时候,先调用
Transmitter.replay
,获取到其他线程里面存储的变量(也就是上一步产生的快照capturedRef
这里体现到了跨线程的变量传递),然后将值赋值给当前线程。同时产生一个备份backup
并返回。 - 子线程执行完毕,执行
Transmitter.restore
,根据backup
备份将数据恢复。
FAQ:
为什么TTL
中需要有一个holder
静态变量用来存储所有的TTL
引用呢?回答:
holder
首先是一个InheritableThreadLocal
,里面放的是一个WeakHashMap
。用来收集线程中所有的TTL
。反之,如果我们不用holder
,是否可以拿到其他线程中存储的变量呢?Thread
中的ThreadLocal.ThreadLocalMap inheritableThreadLocals
成员变量,它的值虽然可以被子线程继承。但是我们在业务代码中无法对这个变量直接访问。因此还是需要通过自己定义的holder
进行存储和访问。
为什么子线程执行完毕之后,要恢复快照?难道不是下一次线程复用的时候,对应线程存储的TTL
变量吗?回答:
- 当线程池满了,并且采取的拒绝策略是
CallerRunsPolicy
。那么此时原本要执行子业务逻辑的操作可能会在主线程中执行。也就是两个操作在同一个线程中执行。 - 倘若没有
restore
这个操作,倘若中途对TTL
的内容进行更改,就会导致最终上下文数据被污染。
下面是我从笑傲君这截取的图,这个是正常的变量拷贝图:子线程可以对TTL
中的内容做任意修改(拷贝了一份)
以下则是同线程执行情况下:这种情况下若发生TTL值变更,就会发生上下文污染。
TTL
存在线程安全问题吗?回答:
- 存在,因为默认是引用类型拷贝,如果子线程修改了数据,主线程可以感知的到。