写在前面
接下来几篇文章,我们来聊一聊 netty 相关的。这里作者想先从 FastThreadLocal 开始说,而不是可能大家更熟悉的 reactor 啊,责任链设计啊,ByteBuf 啊,池化啊等等。不过虽然说 FastThreadLocal 熟知程度不如其他的,但是其实还是很有内容的。比如最核心的为啥快呢?它解决了 jdk 的 ThreadLocal 什么问题?
版本约定
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.92.Final</version></dependency>
复制代码
JDK:1.8.0_181
名词约定
ThreadLocal
直译就是本地线程,作者一般喜欢叫线程变量。从 1.2 开始便在 jdk 中了。
带着疑问
这个疑问应该显而易见拉,为什么快啊?这么嚣张在 jdk 的 ThreadLocal 前面加上 Fast!
源码分析
既然说是 FastThreadLocal,那我们肯定要先看一下 ThreadLocal 是大概怎么实现的
-
先来看一下 javadoc。大意就是与一般我们使用的 get,set 的变量不同,本地线程的变量是单独初始化,并且共享的是副本。并且推荐本地线程的变量声明推荐私有静态,用于希望让变量声明周期与线程关联上
* This class provides thread-local variables. These variables differ from* their normal counterparts in that each thread that accesses one (via its* {@code get} or {@code set} method) has its own, independently initialized* copy of the variable. {@code ThreadLocal} instances are typically private* static fields in classes that wish to associate state with a thread (e.g.,* a user ID or Transaction ID).
复制代码
-
我们从 get 方法切入去看一看原理
public T get() {Thread t = Thread.currentThread();//可以看出存储结构类似一个mapThreadLocalMap map = getMap(t);if (map != null) {//也是通过hash获取key,不过具体算法与HashMap有些差异,//既然是hash,那么就要处理哈希冲突,HasjMap我们都知道是通过链式去处理的,//而ThreadLocal是通过开放地址法的,因为作者Josh Bloch and Doug Lea认为线程变量中并不会存放太多entry//所以使用开放地址法,一来设计更加简单,二来节约空间。不过开放地址也有自己的缺点比如删除之后需要移动entryThreadLocalMap.Entry e = map.getEntry(this);if (e != null) {@SuppressWarnings("unchecked")T result = (T)e.value;return result;}}return setInitialValue();}//map结构存储在Thread对象中,也就是为什么也叫做线程变量ThreadLocalMap getMap(Thread t) {return t.threadLocals;}
复制代码
-
再看看 map 具体的内部构造
static class ThreadLocalMap {/*** The entries in this hash map extend WeakReference, using* its main ref field as the key (which is always a* ThreadLocal object). Note that null keys (i.e. entry.get()* == null) mean that the key is no longer referenced, so the* entry can be expunged from table. Such entries are referred to* as "stale entries" in the code that follows.*///entry的对象,存储kv结构static class Entry extends WeakReference<ThreadLocal<?>> {/** The value associated with this ThreadLocal. */Object value;Entry(ThreadLocal<?> k, Object v) {super(k);value = v;}}/*** The initial capacity -- MUST be a power of two.*/private static final int INITIAL_CAPACITY = 16;/*** The table, resized as necessary.* table.length MUST always be a power of two.*///使用数组存储entryprivate Entry[] table;
复制代码
-
为了下文做铺垫,我们来看看 ThreadLocal 是怎么做资源回收的。
-
首先 Entry 继承了 WeakReference
-
其次 set 的时候也有清理的逻辑,来看一下 map 的 set 方法
private void set(ThreadLocal<?> key, Object value) {// We don't use a fast path as with get() because it is at// least as common to use set() to create new entries as// it is to replace existing ones, in which case, a fast// path would fail more often than not.Entry[] tab = table;int len = tab.length;//计算index的哈希值int i = key.threadLocalHashCode & (len-1);//遍历table,条件是Entry对象非空,也就是说,第一次插入的话,一定都是nullfor (Entry e = tab[i];e != null;e = tab[i = nextIndex(i, len)]) {ThreadLocal<?> k = e.get();//key相等则替换if (k == key) {e.value = value;return;}//key是空,这里就是清理的逻辑,一般来说不会走到这里,因为Threadlocal在remove的时候,//不仅会设置entry的为空,也会设置table对应的元素为空,还会做entry的移动。//这里应该就是为了处理没有调用remove,但是ThreadLocal对象空了的异常情况,大部分情况是gc导致的,//因为entry的key是WeakReferenceif (k == null) {replaceStaleEntry(key, value, i);return;}}//没有找到替换值,或者key空的情况,正常插入tab[i] = new Entry(key, value);int sz = ++size;//清理table,从i开始,长度就是table的大小,处理的就是entry非空,key(ThreadLocal)为空的情况,与//replaceStaleEntry类似if (!cleanSomeSlots(i, sz) && sz >= threshold)rehash();}
复制代码
-
上面说到的资源回收,细心的读者会发现,当我们没有手动调用 remove,也没有调用 set 的话,那么就不会触发清理的操作,如果有大量这种情况,那么 table 中就会有大量 entry 的可以是空(gc 了),value 还没有被清理的情况。
FastThreadLocal 来啦
-
跟 ThreadLocal 一样,我们先来看看 javadoc。首先是说提供了更高的查询性能(一波自吹),然后就是关键拉,用了一个常量 index 取代了原来的哈希值去检索变量。为了最大化的发挥 ThreadLocal 的优势,建议线程使用 FastThreadLocalThread,因为可以避免走回 ThreadLocal 的逻辑。
* A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a* {@link FastThreadLocalThread}.* <p>* Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table,* to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash* table, and it is useful when accessed frequently.* </p><p>* To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype.* By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason.* </p><p>* Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires* a special field to store the necessary state. An access by any other kind of thread falls back to a regular* {@link ThreadLocal}.* </p>
复制代码
-
先看看这个常量 index。构造中就初始化了。底层通过 io.netty.util.internal.UnpaddedInternalThreadLocalMap#nextIndex,一个 AtomicInteger 去分配。通过 get,set 都是使用这个 index 去操作 io.netty.util.internal.UnpaddedInternalThreadLocalMap#indexedVariables。是一个 Object[]的结构,使用数组检索元素,效率确实高
public FastThreadLocal() {index = InternalThreadLocalMap.nextVariableIndex();}
复制代码
-
那么再来看看为啥说要配合使用 FastThreadLocalThread,才能快起来?set 方法为例
/*** Set the value for the current thread.*/public final void set(V value) {if (value != InternalThreadLocalMap.UNSET) {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();setKnownNotUnset(threadLocalMap, value);} else {remove();}}//io.netty.util.internal.InternalThreadLocalMap的方法public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {//快速获取,因为FastThreadLocalThread内部就有InternalThreadLocalMap的成员变量return fastGet((FastThreadLocalThread) thread);} else {//走回ThreadLocal,通过io.netty.util.internal.UnpaddedInternalThreadLocalMap#slowThreadLocalMap去获取//这个变量就是一个ThreadLocal,也就是说netty兼容非FastThreadLocalThread的处理方式就是//把自己fast模式下需要使用的InternalThreadLocalMap变量,使用ThreadLocal作为存储媒介,相当于做了一下中转//其实总结一下就是如果不使用FastThreadLocalThread,那么完全多此一举return slowGet();}}/*** @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.*/private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {//直接数组赋值if (threadLocalMap.setIndexedVariable(index, value)) {addToVariablesToRemove(threadLocalMap, this);}}//这里就是netty的清理逻辑拉,variablesToRemoveIndex这个index跟之前说的常量index类似,他是在实例初始化的时候初始化的//对象是一个Set<FastThreadLocal<?>>。用来存放需要被清理的FastThreadLocal的对象,每次set都会加入这个set//用set方便去重,因为一个FastThreadLocal多次set就会加入多次private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);Set<FastThreadLocal<?>> variablesToRemove;if (v == InternalThreadLocalMap.UNSET || v == null) {variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {variablesToRemove = (Set<FastThreadLocal<?>>) v;}variablesToRemove.add(variable);}
复制代码
上面已经说到了清理部分的逻辑,提到了待清理的 FastThreadLocal 集合,那么这个集合什么时候被清理的呢?
-
来看可以看 usage

-
removeAll。先看看 javadoc,清理当前线程变量中的所有 FastThreadLocal。再来看看源码。
/*** Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you* are in a container environment, and you don't want to leave the thread local variables in the threads you do not* manage.*/public static void removeAll() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();if (threadLocalMap == null) {return;}try {//获取待清理的FastThreadLocal setObject v = threadLocalMap.indexedVariable(variablesToRemoveIndex);if (v != null && v != InternalThreadLocalMap.UNSET) {@SuppressWarnings("unchecked")Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;FastThreadLocal<?>[] variablesToRemoveArray =variablesToRemove.toArray(new FastThreadLocal[0]);//遍历removefor (FastThreadLocal<?> tlv: variablesToRemoveArray) {tlv.remove(threadLocalMap);}}} finally {InternalThreadLocalMap.remove();}}/*** Sets the value to uninitialized for the specified thread local map;* a proceeding call to get() will trigger a call to initialValue().* The specified thread local map must be for the current thread.*/@SuppressWarnings("unchecked")public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap == null) {return;}//删除线程变量Object v = threadLocalMap.removeIndexedVariable(index);//把自己从待清理的FastThreadLocal set中移除removeFromVariablesToRemove(threadLocalMap, this);if (v != InternalThreadLocalMap.UNSET) {try {//子类实现onRemoval((V) v);} catch (Exception e) {PlatformDependent.throwException(e);}}}
复制代码
-
之前提过的添加到 set 的逻辑
-
io.netty.util.concurrent.FastThreadLocal#remove(io.netty.util.internal.InternalThreadLocalMap)中使用,用于把自己从待清理的 FastThreadLocal set 中移除,因为已经清理过了
-
关于清理,这里我们对比一下跟 jdk 原生的区别,很明显,netty 提供了 removeAll 去处理线程绑定的所有线程变量。背后的语义,就是 netty 关注线程对象销毁之后,绑定的线程变量有没有被即使清理,而不会去造成内存溢出。但是这里也可也可以看出,netty 的方式也需要手动维护,那为什么不使用自动化的方式呢?
-
netty 在 4.1.27.Final 之前的版本使用了一个 ObjectCleaner 的对象。这个对象依旧被保留了,但是原先使用 ObjectCleaner 去清理线程变量的逻辑被注释了,并最终在 netty-4.1.35.Final 中被删除。简单提一下之前的思路,在 set 方法中会注册一个 Cleaner 线程。原理就是利用 AutomaticCleanerReference 的父类构造 java.lang.ref.WeakReference#WeakReference(T, java.lang.ref.ReferenceQueue<? super T>)提供的语义,在 T 对象被销毁之后,会加入 ReferenceQueue。Cleaner 在第一次注册清理线程之后,会启动一个后台线程 CLEANER_TASK 去自旋从这个 ReferenceQueue 中获取对象,如果获取到了就会调用对象对应的清理线程(AutomaticCleanerReference 构造中传入)去执行清理逻辑
-
那么为什么 netty 现在不用这个逻辑了呢?官网 issue 的大意就是 cleaner 线程无法被停止和控制,所以可能导致线程引用的变量泄漏
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {Thread current = Thread.currentThread();if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || //线程是FastThreadLocalThread类型并且构造这个线程时传入了runnablethreadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) { //已经注册过了return;}// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime// of the thread, and this Object will be discarded if the associated thread is GCed.threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE); //设置value,避免重复注册// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released// and FastThreadLocal.onRemoval(...) will be called.//即为 每个FastThreadLocal注册对象清理器,即线程销毁的时候,把线程的变量map清理掉ObjectCleaner.register(current, new Runnable() {@Overridepublic void run() {remove(threadLocalMap);// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.}});}//后台线程private static final Runnable CLEANER_TASK = new Runnable() {@Overridepublic void run() {boolean interrupted = false;for (;;) {//自旋条件,就是注册的AutomaticCleanerReference集合非空,//为什么官网说不可停止,就是因为这个set不对外暴露可以清理的方法,同时集合元素AutomaticCleanerReference也不对外暴露// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty// See if we can let this thread complete.while (!LIVE_SET.isEmpty()) {final AutomaticCleanerReference reference;try {//销毁队列中获取已经被销毁的对象reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);} catch (InterruptedException ex) {// Just consume and move oninterrupted = true;continue;}if (reference != null) {try {//启动对象的清理线程reference.cleanup();} catch (Throwable ignored) {// ignore exceptions, and don't log in case the logger throws an exception, blocks, or has// other unexpected side effects.}LIVE_SET.remove(reference);}}CLEANER_RUNNING.set(false);// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct// behavior in multi-threaded environments.if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {// There was nothing added after we set STARTED to false or some other cleanup Thread// was started already so its safe to let this Thread complete now.break;}}if (interrupted) {// As we caught the InterruptedException above we should mark the Thread as interrupted.Thread.currentThread().interrupt();}}};
复制代码
总结
本文从 jdk 原生 ThreadLocal 切入,介绍了为什么 FastThreadLocal 更快,FastThreadLocal 的清理逻辑做了什么优化,去避免线程变量的内存溢出。下一篇我们继续聊聊 netty 拉,再会!想念家宝~



















