【源码分析】【netty】FastThreadLocal 为什么快?

news2024/9/29 11:27:30

写在前面

接下来几篇文章,我们来聊一聊 netty 相关的。这里作者想先从 FastThreadLocal 开始说,而不是可能大家更熟悉的 reactor 啊,责任链设计啊,ByteBuf 啊,池化啊等等。不过虽然说 FastThreadLocal 熟知程度不如其他的,但是其实还是很有内容的。比如最核心的为啥快呢?它解决了 jdk 的 ThreadLocal 什么问题?


版本约定

				<dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>            <version>4.1.92.Final</version>        </dependency>

复制代码

JDK:1.8.0_181


名词约定

ThreadLocal

直译就是本地线程,作者一般喜欢叫线程变量。从 1.2 开始便在 jdk 中了。


带着疑问

这个疑问应该显而易见拉,为什么快啊?这么嚣张在 jdk 的 ThreadLocal 前面加上 Fast!


源码分析

既然说是 FastThreadLocal,那我们肯定要先看一下 ThreadLocal 是大概怎么实现的

  • 先来看一下 javadoc。大意就是与一般我们使用的 get,set 的变量不同,本地线程的变量是单独初始化,并且共享的是副本。并且推荐本地线程的变量声明推荐私有静态,用于希望让变量声明周期与线程关联上

 * This class provides thread-local variables.  These variables differ from * their normal counterparts in that each thread that accesses one (via its * {@code get} or {@code set} method) has its own, independently initialized * copy of the variable.  {@code ThreadLocal} instances are typically private * static fields in classes that wish to associate state with a thread (e.g., * a user ID or Transaction ID).

复制代码

  • 我们从 get 方法切入去看一看原理

    public T get() {        Thread t = Thread.currentThread();      //可以看出存储结构类似一个map        ThreadLocalMap map = getMap(t);        if (map != null) {          //也是通过hash获取key,不过具体算法与HashMap有些差异,          //既然是hash,那么就要处理哈希冲突,HasjMap我们都知道是通过链式去处理的,          //而ThreadLocal是通过开放地址法的,因为作者Josh Bloch and Doug Lea认为线程变量中并不会存放太多entry          //所以使用开放地址法,一来设计更加简单,二来节约空间。不过开放地址也有自己的缺点比如删除之后需要移动entry            ThreadLocalMap.Entry e = map.getEntry(this);            if (e != null) {                @SuppressWarnings("unchecked")                T result = (T)e.value;                return result;            }        }        return setInitialValue();    }//map结构存储在Thread对象中,也就是为什么也叫做线程变量    ThreadLocalMap getMap(Thread t) {        return t.threadLocals;    }

复制代码

  • 再看看 map 具体的内部构造

static class ThreadLocalMap {
        /**         * The entries in this hash map extend WeakReference, using         * its main ref field as the key (which is always a         * ThreadLocal object).  Note that null keys (i.e. entry.get()         * == null) mean that the key is no longer referenced, so the         * entry can be expunged from table.  Such entries are referred to         * as "stale entries" in the code that follows.         */  //entry的对象,存储kv结构        static class Entry extends WeakReference<ThreadLocal<?>> {            /** The value associated with this ThreadLocal. */            Object value;
            Entry(ThreadLocal<?> k, Object v) {                super(k);                value = v;            }        }
        /**         * The initial capacity -- MUST be a power of two.         */        private static final int INITIAL_CAPACITY = 16;
        /**         * The table, resized as necessary.         * table.length MUST always be a power of two.         */  //使用数组存储entry        private Entry[] table;

复制代码

  • 为了下文做铺垫,我们来看看 ThreadLocal 是怎么做资源回收的。

  • 首先 Entry 继承了 WeakReference

  • 其次 set 的时候也有清理的逻辑,来看一下 map 的 set 方法

private void set(ThreadLocal<?> key, Object value) {
            // We don't use a fast path as with get() because it is at            // least as common to use set() to create new entries as            // it is to replace existing ones, in which case, a fast            // path would fail more often than not.
            Entry[] tab = table;            int len = tab.length;  //计算index的哈希值            int i = key.threadLocalHashCode & (len-1);//遍历table,条件是Entry对象非空,也就是说,第一次插入的话,一定都是null            for (Entry e = tab[i];                 e != null;                 e = tab[i = nextIndex(i, len)]) {                ThreadLocal<?> k = e.get();//key相等则替换                if (k == key) {                    e.value = value;                    return;                }//key是空,这里就是清理的逻辑,一般来说不会走到这里,因为Threadlocal在remove的时候,//不仅会设置entry的为空,也会设置table对应的元素为空,还会做entry的移动。//这里应该就是为了处理没有调用remove,但是ThreadLocal对象空了的异常情况,大部分情况是gc导致的,//因为entry的key是WeakReference                if (k == null) {                    replaceStaleEntry(key, value, i);                    return;                }            }//没有找到替换值,或者key空的情况,正常插入            tab[i] = new Entry(key, value);            int sz = ++size;  //清理table,从i开始,长度就是table的大小,处理的就是entry非空,key(ThreadLocal)为空的情况,与  //replaceStaleEntry类似            if (!cleanSomeSlots(i, sz) && sz >= threshold)                rehash();        }

复制代码

  • 上面说到的资源回收,细心的读者会发现,当我们没有手动调用 remove,也没有调用 set 的话,那么就不会触发清理的操作,如果有大量这种情况,那么 table 中就会有大量 entry 的可以是空(gc 了),value 还没有被清理的情况。

FastThreadLocal 来啦

  • 跟 ThreadLocal 一样,我们先来看看 javadoc。首先是说提供了更高的查询性能(一波自吹),然后就是关键拉,用了一个常量 index 取代了原来的哈希值去检索变量。为了最大化的发挥 ThreadLocal 的优势,建议线程使用 FastThreadLocalThread,因为可以避免走回 ThreadLocal 的逻辑。

 * A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a * {@link FastThreadLocalThread}. * <p> * Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table, * to look for a variable.  Although seemingly very subtle, it yields slight performance advantage over using a hash * table, and it is useful when accessed frequently. * </p><p> * To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype. * By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason. * </p><p> * Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires * a special field to store the necessary state.  An access by any other kind of thread falls back to a regular * {@link ThreadLocal}. * </p>

复制代码

  • 先看看这个常量 index。构造中就初始化了。底层通过 io.netty.util.internal.UnpaddedInternalThreadLocalMap#nextIndex,一个 AtomicInteger 去分配。通过 get,set 都是使用这个 index 去操作 io.netty.util.internal.UnpaddedInternalThreadLocalMap#indexedVariables。是一个 Object[]的结构,使用数组检索元素,效率确实高

    public FastThreadLocal() {        index = InternalThreadLocalMap.nextVariableIndex();    }

复制代码

  • 那么再来看看为啥说要配合使用 FastThreadLocalThread,才能快起来?set 方法为例

    /**     * Set the value for the current thread.     */    public final void set(V value) {        if (value != InternalThreadLocalMap.UNSET) {            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();            setKnownNotUnset(threadLocalMap, value);        } else {            remove();        }    }//io.netty.util.internal.InternalThreadLocalMap的方法    public static InternalThreadLocalMap get() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {          //快速获取,因为FastThreadLocalThread内部就有InternalThreadLocalMap的成员变量            return fastGet((FastThreadLocalThread) thread);        } else {          //走回ThreadLocal,通过io.netty.util.internal.UnpaddedInternalThreadLocalMap#slowThreadLocalMap去获取          //这个变量就是一个ThreadLocal,也就是说netty兼容非FastThreadLocalThread的处理方式就是          //把自己fast模式下需要使用的InternalThreadLocalMap变量,使用ThreadLocal作为存储媒介,相当于做了一下中转          //其实总结一下就是如果不使用FastThreadLocalThread,那么完全多此一举            return slowGet();        }    }    /**     * @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.     */    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {      //直接数组赋值        if (threadLocalMap.setIndexedVariable(index, value)) {            addToVariablesToRemove(threadLocalMap, this);        }    }//这里就是netty的清理逻辑拉,variablesToRemoveIndex这个index跟之前说的常量index类似,他是在实例初始化的时候初始化的//对象是一个Set<FastThreadLocal<?>>。用来存放需要被清理的FastThreadLocal的对象,每次set都会加入这个set//用set方便去重,因为一个FastThreadLocal多次set就会加入多次    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);        Set<FastThreadLocal<?>> variablesToRemove;        if (v == InternalThreadLocalMap.UNSET || v == null) {            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);        } else {            variablesToRemove = (Set<FastThreadLocal<?>>) v;        }
        variablesToRemove.add(variable);    }

复制代码

上面已经说到了清理部分的逻辑,提到了待清理的 FastThreadLocal 集合,那么这个集合什么时候被清理的呢?

  • 来看可以看 usage

  • removeAll。先看看 javadoc,清理当前线程变量中的所有 FastThreadLocal。再来看看源码。

    /**     * Removes all {@link FastThreadLocal} variables bound to the current thread.  This operation is useful when you     * are in a container environment, and you don't want to leave the thread local variables in the threads you do not     * manage.     */    public static void removeAll() {        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();        if (threadLocalMap == null) {            return;        }
        try {          //获取待清理的FastThreadLocal set            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);            if (v != null && v != InternalThreadLocalMap.UNSET) {                @SuppressWarnings("unchecked")                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;                FastThreadLocal<?>[] variablesToRemoveArray =                        variablesToRemove.toArray(new FastThreadLocal[0]);              //遍历remove                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {                    tlv.remove(threadLocalMap);                }            }        } finally {            InternalThreadLocalMap.remove();        }    }

    /**     * Sets the value to uninitialized for the specified thread local map;     * a proceeding call to get() will trigger a call to initialValue().     * The specified thread local map must be for the current thread.     */    @SuppressWarnings("unchecked")    public final void remove(InternalThreadLocalMap threadLocalMap) {        if (threadLocalMap == null) {            return;        }//删除线程变量        Object v = threadLocalMap.removeIndexedVariable(index);      //把自己从待清理的FastThreadLocal set中移除        removeFromVariablesToRemove(threadLocalMap, this);
        if (v != InternalThreadLocalMap.UNSET) {            try {              //子类实现                onRemoval((V) v);            } catch (Exception e) {                PlatformDependent.throwException(e);            }        }    }

复制代码

  • 之前提过的添加到 set 的逻辑

  • io.netty.util.concurrent.FastThreadLocal#remove(io.netty.util.internal.InternalThreadLocalMap)中使用,用于把自己从待清理的 FastThreadLocal set 中移除,因为已经清理过了

  • 关于清理,这里我们对比一下跟 jdk 原生的区别,很明显,netty 提供了 removeAll 去处理线程绑定的所有线程变量。背后的语义,就是 netty 关注线程对象销毁之后,绑定的线程变量有没有被即使清理,而不会去造成内存溢出。但是这里也可也可以看出,netty 的方式也需要手动维护,那为什么不使用自动化的方式呢?

  • netty 在 4.1.27.Final 之前的版本使用了一个 ObjectCleaner 的对象。这个对象依旧被保留了,但是原先使用 ObjectCleaner 去清理线程变量的逻辑被注释了,并最终在 netty-4.1.35.Final 中被删除。简单提一下之前的思路,在 set 方法中会注册一个 Cleaner 线程。原理就是利用 AutomaticCleanerReference 的父类构造 java.lang.ref.WeakReference#WeakReference(T, java.lang.ref.ReferenceQueue<? super T>)提供的语义,在 T 对象被销毁之后,会加入 ReferenceQueue。Cleaner 在第一次注册清理线程之后,会启动一个后台线程 CLEANER_TASK 去自旋从这个 ReferenceQueue 中获取对象,如果获取到了就会调用对象对应的清理线程(AutomaticCleanerReference 构造中传入)去执行清理逻辑

  • 那么为什么 netty 现在不用这个逻辑了呢?官网 issue 的大意就是 cleaner 线程无法被停止和控制,所以可能导致线程引用的变量泄漏

private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {    Thread current = Thread.currentThread();    if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||	//线程是FastThreadLocalThread类型并且构造这个线程时传入了runnable        threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {	//已经注册过了        return;    }    // removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime    // of the thread, and this Object will be discarded if the associated thread is GCed.    threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);	//设置value,避免重复注册
    // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released    // and FastThreadLocal.onRemoval(...) will be called.//即为 每个FastThreadLocal注册对象清理器,即线程销毁的时候,把线程的变量map清理掉    ObjectCleaner.register(current, new Runnable() {        @Override        public void run() {            remove(threadLocalMap);
            // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once            // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.        }    });}//后台线程private static final Runnable CLEANER_TASK = new Runnable() {        @Override        public void run() {            boolean interrupted = false;            for (;;) {              //自旋条件,就是注册的AutomaticCleanerReference集合非空,              //为什么官网说不可停止,就是因为这个set不对外暴露可以清理的方法,同时集合元素AutomaticCleanerReference也不对外暴露                // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty                // See if we can let this thread complete.                while (!LIVE_SET.isEmpty()) {                    final AutomaticCleanerReference reference;                    try {                      //销毁队列中获取已经被销毁的对象                        reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);                    } catch (InterruptedException ex) {                        // Just consume and move on                        interrupted = true;                        continue;                    }                    if (reference != null) {                        try {                          //启动对象的清理线程                            reference.cleanup();                        } catch (Throwable ignored) {                            // ignore exceptions, and don't log in case the logger throws an exception, blocks, or has                            // other unexpected side effects.                        }                        LIVE_SET.remove(reference);                    }                }                CLEANER_RUNNING.set(false);
                // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct                // behavior in multi-threaded environments.                if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {                    // There was nothing added after we set STARTED to false or some other cleanup Thread                    // was started already so its safe to let this Thread complete now.                    break;                }            }            if (interrupted) {                // As we caught the InterruptedException above we should mark the Thread as interrupted.                Thread.currentThread().interrupt();            }        }    };

复制代码


总结

本文从 jdk 原生 ThreadLocal 切入,介绍了为什么 FastThreadLocal 更快,FastThreadLocal 的清理逻辑做了什么优化,去避免线程变量的内存溢出。下一篇我们继续聊聊 netty 拉,再会!想念家宝~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/570739.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剑指offer 2--数组中重复的元素

数组中重复的数字_牛客题霸_牛客网 (nowcoder.com) 【排序法】思路和代码&#xff1a; 对数组进行排序。遍历排序后的数组&#xff0c;如果当前元素与下一个元素相等&#xff0c;则找到了重复数字&#xff0c;返回该数字。如果遍历完数组都没有找到重复数字&#xff0c;则返回-…

ChatGPT应用组队学习来了!

Datawhale学习 联合主办&#xff1a;Datawhale、百度文心 Datawhale联合百度文心&#xff0c;五月为大家带来AIGC应用专题&#xff1a;大模型从入门到应用&#xff0c;学习大纲如下&#xff08;文末整理了这次学习的所有资料&#xff09;&#xff1a; 参与学习 ▶ 活动时间&am…

量子力学专题:线性谐振子

任何体系在平衡位置附近的小振动&#xff0c;例如 分子振动、晶格振动、原子核表面振动以及辐射场的振动等往往都可以分解成 若干彼此独立的一维简谐振动简谐振动往往还作为复杂运动的初步近似 见理论力学专题&#xff08;小振动&#xff09; 双原子分子&#xff0c;两原子间的…

kubernetes02

pod pod生命周期 pod的状态 1.挂起pending:API server创建了pod资源对象已存入etcd中&#xff0c;但它尚未被调度完成&#xff0c;或者仍处于从仓库下载镜像的过程中 2.运行中running:pod已经被调度到某节点&#xff0c;并且所有容器都已经被kubelet创建完成 3.成功complet:…

物业设备管理系统

物业服务质量难以保证&#xff0c;工单处理慢&#xff0c;巡检记录不规范&#xff1b;物业设备设施管理混乱&#xff0c;维修保养成本高&#xff0c;风险隐患多&#xff1b;物业数据分散&#xff0c;难以统计分析&#xff0c;无法提供决策支持&#xff1b;每天需要检查和保养的…

Hadoop学习---8、Hadoop数据压缩

1、Hadoop数据压缩 1.1 概述 1、压缩的好处和坏处 &#xff08;1&#xff09;优点&#xff1a;减少磁盘IO、减少磁盘储存空间 &#xff08;2&#xff09;缺点&#xff1a;增加CPU开销 2、压缩原则 &#xff08;1&#xff09;运算密集型的Job&#xff0c;少用压缩 &#xff08…

亚马逊云科技赋能中国出海企业创新及开拓海外业务

向全球价值链上游奋进 中国企业增强国际竞争力的关键&#xff0c;是努力朝全球价值链上游奋进&#xff0c;发力技术出海。中国的出海新机遇&#xff0c;背后曾是疫情在全球按下数字互联和数字化升级的快进键&#xff0c;跨境电商、在线社交、移动支付、数字服务等数字经济迎来…

【技巧】如何保护Word文档不被改动?

工作上&#xff0c;很多小伙伴需要将Word文档发给对方看&#xff0c;但又不想在传看时&#xff0c;被对方改动上面的内容。这种情况&#xff0c;我们可以通过以下两种方法&#xff0c;让Word文档不能改动。 首先&#xff0c;我们可以把Word文档设置限制编辑&#xff0c;被限制后…

FPGA采集CameraLink相机Full模式解码输出,附带工程源码和技术支持

目录 1、前言2、CameraLink协议基础3、目前我已有的CameraLink收发工程4、设计方案输入CameraLink相机视频缓存视频输出软件配置 5、vivado工程详解6、上板调试验证7、福利&#xff1a;工程代码的获取 1、前言 FPGA实现CameraLink视频编解码目前有两种方案&#xff1a; 一是使…

美团面试:接口被恶意狂刷,怎么办?

如果Java接口被恶意狂刷&#xff0c;我们一般可以采取以下措施&#xff1a; 用TimeStamp &#xff08;兵不厌诈&#xff09; 比如给客户端提供一个timestamp参数&#xff0c;值是13位的毫秒级时间戳&#xff0c;可以在第12位或者13位做一个校验位&#xff0c;通过一定的算法给…

Docker实战2-发布后端Java项目

有了上篇Docker实战1-发布前端Vue项目的经验&#xff0c;发布后端就轻车熟路了。 1 准备文件 java打包 运行maven的package,生成jar文件&#xff0c;target/dsm-service-1.0-SNAPSHOT.jar DockerFile # Docker image for springboot file run FROM openjdk:11.0.11-jdk-sli…

【JavaSE】Java基础语法(十二):ArrayList

文章目录 1. ArrayList的构造方法和添加方法2. ArrayList类常用方法3. ArrayList存储学生对象并遍历 集合和数组的区别 : 共同点&#xff1a;都是存储数据的容器不同点&#xff1a;数组的容量是固定的&#xff0c;集合的容量是可变的 1. ArrayList的构造方法和添加方法 ArrayL…

2023亚马逊云科技游戏开发者大会从技术角度探索游戏的广阔边界

自上世纪五十年代诞生以来&#xff0c;电子游戏产业蓬勃发展&#xff0c;这与人类想象力的解放有着无比紧密地联系。伴随着全球游戏市场竞争的加剧&#xff0c;“游戏人”面临着很多全新的挑战。因此&#xff0c;2023亚马逊云科技游戏开发者大会不仅带来了最新的游戏行业举措&a…

基于多智能体深度强化学习的体系任务分配方法

源自&#xff1a;指挥与控制学报 作者&#xff1a;林萌龙, 陈涛, 任棒棒, 张萌萌, 陈洪辉 摘 要 1 背景 1.1 集中式决策VS分布式决策 图1集中式决策示意图 1.2 多智能体强化学习 2 问题描述 2.1 场景描述 图2分布式决策场景下的体系任务分配 2.2 状态空间、动作…

PyTorch-DataLoader

DataLoader&#xff1a;从Dataset中取数据&#xff0c;怎么取&#xff0c;每次取多少可以由DataLoader中的参数进行设定&#xff0c;并将数据加载到神经网络中。 dataloader.py import torchvision from torch.utils.data import DataLoader from torch.utils.tensorboard im…

Python框架比较:Django、Flask和Pyramid三者的优缺点和应用场景

第一章&#xff1a;引言 在当今快节奏的软件开发行业中&#xff0c;选择合适的开发框架对于开发人员来说至关重要。Python作为一种流行的编程语言&#xff0c;拥有众多强大的框架&#xff0c;其中包括Django、Flask和Pyramid。本文将比较这三个Python框架的优缺点和应用场景&a…

企业级低代码开发,迈向企业数字化时代

当下&#xff0c;随着科技的快速发展&#xff0c;软件开发的成本不断降低&#xff0c;越来越多的人可以参与到软件开发的过程中。但是在这个过程中&#xff0c;我们也发现了一个问题&#xff0c;就是软件开发的成本越来越高。传统的开发模式需要投入大量人力物力&#xff0c;而…

旅游信息推荐系统

文章目录 旅游信息推荐系统一、系统演示二、项目介绍三、系统运行界面图四、系统部分功能截图五、部分代码展示六、底部获取源码 旅游信息推荐系统 一、系统演示 旅游信息推荐系统 二、项目介绍 数据库版本&#xff1a; mysql8.0 数据库可视化工具&#xff1a; navicat 服务器…

新技术越来越多,作为程序员,我们应该怎么规划职业生涯? | 社区征文

随着科技的不断进步&#xff0c;新技术不断涌现&#xff0c;对程序员的要求也在不断提高。作为一名程序员&#xff0c;要想在这个竞争激烈的行业中立足&#xff0c;就需要制定一份明确的职业规划&#xff0c;不断学习和掌握新技术&#xff0c;提升自己的职业能力和竞争力。 确定…

自古以来,反射也是兵家必争之地

成文耗时1小时&#xff0c;阅读5min&#xff0c;有用指数5颗星。 这几天收到一个战术性需求&#xff0c;将一大坨字段序列化为特定格式的字符串。 大概是下表&#xff1a; 序号字段名描述是否必填0logVersion日志版本是1productName产品是2serviceName服务是.........25extend3…