深入理解 Netty FastThreadLocal
作者:vivo 互联网服务器团队- Jiang Zhu
本文以线上诡异问题为切入点,通过对比JDK ThreadLocal和Netty FastThreadLocal实现逻辑以及优缺点,并深入解读源码,由浅入深理解Netty FastThreadLocal。
一、前言
最近在学习Netty相关的知识,在看到Netty FastThreadLocal章节中,回想起一起线上诡异问题。
问题描述:外销业务获取用户信息判断是否支持https场景下,获取的用户信息有时候竟然是错乱的。
问题分析:使用ThreadLocal保存用户信息时,未能及时进行remove()操作,而Tomcat工作线程是基于线程池的,会出现线程重用情况,所以获取的用户信息可能是之前线程遗留下来的。
问题修复:ThreadLocal使用完之后及时remove()、ThreadLocal使用之前也进行remove()双重保险操作。
接下来,我们继续深入了解下JDK ThreadLocal和Netty FastThreadLocal吧。
二、JDK ThreadLocal介绍
ThreadLocal是JDK提供的一个方便对象在本线程内不同方法中传递、获取的类。用它定义的变量,仅在本线程中可见,不受其他线程的影响,与其他线程相互隔离。
那具体是如何实现的呢?如图1所示,每个线程都会有个ThreadLocalMap实例变量,其采用懒加载的方式进行创建,当线程第一次访问此变量时才会去创建。
ThreadLocalMap使用线性探测法存储ThreadLocal对象及其维护的数据,具体操作逻辑如下:
假设有一个新的ThreadLocal对象,通过hash计算它应存储的位置下标为x。
此时发现下标x对应位置已经存储了其他的ThreadLocal对象,则它会往后寻找,步长为1,下标变更为x+1。
接下来发现下标x+1对应位置也已经存储了其他的ThreadLocal对象,同理则它会继续往后寻找,下标变更为x+2。
直到寻找到下标为x+3时发现是空闲的,然后将该ThreadLocal对象及其维护的数据构建一个entry对象存储在x+3位置。
在ThreadLocalMap中数据很多的情况下,很容易出现hash冲突,解决冲突需要不断的向下遍历,该操作的时间复杂度为O(n),效率较低。
图1
从下面的代码中可以看出:
Entry 的 key 是弱引用,value 是强引用。在 JVM 垃圾回收时,只要发现弱引用的对象,不管内存是否充足,都会被回收。
但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁,从而造成内存泄漏。
static class ThreadLocalMap {// 弱引用,在资源紧张的时候可以回收部分不再引用的ThreadLocal变量static class Entry extends WeakReference<ThreadLocal<?>> {// 当前ThreadLocal对象所维护的数据Object value;Entry(ThreadLocal<?> k, Object v) {super(k);value = v;}}// 省略其他代码}
综上所述,既然JDK提供的ThreadLocal可能存在效率较低和内存泄漏的问题,为啥不做相应的优化和改造呢?
1.从ThreadLocal类注释看,它是JDK1.2版本引入的,早期可能不太关注程序的性能。
2.大部分多线程场景下,线程中的ThreadLocal变量较少,因此出现hash冲突的概率相对较小,及时偶尔出现了hash冲突,对程序的性能影响也相对较小。
3.对于内存泄漏问题,ThreadLocal本身已经做了一定的保护措施。作为使用者,在线程中某个ThreadLocal对象不再使用或出现异常时,立即调用 remove() 方法删除 Entry 对象,养成良好的编码习惯。
三、Netty FastThreadLocal介绍
FastThreadLocal是Netty中对JDK提供的ThreadLocal优化改造版本,从名称上来看,它应该比ThreadLocal更快了,以应对Netty处理并发量大、数据吞吐量大的场景。
那具体是如何实现的呢?如图2所示,每个线程都会有个InternalThreadLocalMap实例变量。
每个FastThreadLocal实例创建时,都会采用AtomicInteger保证顺序递增生成一个不重复的下标index,它是该FastThreadLocal对象维护的数据应该存储的位置。
读写数据的时候通过FastThreadLocal的下标 index 直接定位到该FastThreadLocal的位置,时间复杂度为 O(1),效率较高。
如果该下标index递增到特别大,InternalThreadLocalMap维护的数组也会特别大,所以FastThreadLocal是通过空间换时间来提升读写性能的。
图2
四、Netty FastThreadLocal源码分析
4.1 构造方法
public class FastThreadLocal<V> {// FastThreadLocal中的index是记录了该它维护的数据应该存储的位置// InternalThreadLocalMap数组中的下标, 它是在构造函数中确定的private final int index;public InternalThreadLocal() {index = InternalThreadLocalMap.nextVariableIndex();}// 省略其他代码}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {// 自增索引, ⽤于计算下次存储到Object数组中的位置private static final AtomicInteger nextIndex = new AtomicInteger();private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;public static int nextVariableIndex() {int index = nextIndex.getAndIncrement();if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);throw new IllegalStateException("too many thread-local indexed variables");}return index;}// 省略其他代码}
上面这两段代码在Netty FastThreadLocal介绍中已经讲解过,这边就不再重复介绍了。
4.2 get 方法
public class FastThreadLocal<V> {// FastThreadLocal中的index是记录了该它维护的数据应该存储的位置private final int index;public final V get() {// 获取当前线程的InternalThreadLocalMapInternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();// 根据当前线程的index从InternalThreadLocalMap中获取其绑定的数据Object v = threadLocalMap.indexedVariable(index);// 如果获取当前线程绑定的数据不为缺省值UNSET,则直接返回;否则进行初始化if (v != InternalThreadLocalMap.UNSET) {return (V) v;}return initialize(threadLocalMap);}// 省略其他代码}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;// 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSETpublic static final Object UNSET = new Object();// 存储绑定到当前线程的数据的数组private Object[] indexedVariables;// slowThreadLocalMap为JDK ThreadLocal存储InternalThreadLocalMapprivate static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =new ThreadLocal<InternalThreadLocalMap>();// 从绑定到当前线程的数据的数组中取出index位置的元素public Object indexedVariable(int index) {Object[] lookup = indexedVariables;return index < lookup.length? lookup[index] : UNSET;}public static InternalThreadLocalMap get() {Thread thread = Thread.currentThread();// 判断当前线程是否是FastThreadLocalThread类型if (thread instanceof FastThreadLocalThread) {return fastGet((FastThreadLocalThread) thread);} else {return slowGet();}}private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {// 直接获取当前线程的InternalThreadLocalMapInternalThreadLocalMap threadLocalMap = thread.threadLocalMap();// 如果当前线程的InternalThreadLocalMap还未创建,则创建并赋值if (threadLocalMap == null) {thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());}return threadLocalMap;}private static InternalThreadLocalMap slowGet() {// 使用JDK ThreadLocal获取InternalThreadLocalMapInternalThreadLocalMap ret = slowThreadLocalMap.get();if (ret == null) {ret = new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret;}private InternalThreadLocalMap() {indexedVariables = newIndexedVariableTable();}// 初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSETprivate static Object[] newIndexedVariableTable() {Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];Arrays.fill(array, UNSET);return array;}// 省略其他代码}
源码中 get() 方法主要分为下面3个步骤处理:
通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap。根据当前线程的index 从InternalThreadLocalMap中获取其绑定的数据。如果不是缺省值UNSET,直接返回;如果是缺省值,则执行initialize方法进行初始化。
下面我们继续分析一下
InternalThreadLocalMap.get()方法的实现逻辑。
首先判断当前线程是否是FastThreadLocalThread类型,如果是FastThreadLocalThread类型则直接使用fastGet方法获取InternalThreadLocalMap,如果不是FastThreadLocalThread类型则使用slowGet方法获取InternalThreadLocalMap兜底处理。兜底处理中的slowGet方法会退化成JDK原生的ThreadLocal获取InternalThreadLocalMap。获取InternalThreadLocalMap时,如果为null,则会直接创建一个InternalThreadLocalMap返回。其创建过过程中初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET。
4.3 set 方法
public class FastThreadLocal<V> {// FastThreadLocal初始化时variablesToRemoveIndex被赋值为0private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();public final void set(V value) {// 判断value值是否是未赋值的Object变量(缺省值)if (value != InternalThreadLocalMap.UNSET) {// 获取当前线程对应的InternalThreadLocalMapInternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();// 将InternalThreadLocalMap中数据替换为新的value// 并将FastThreadLocal对象保存到待清理的Set中setKnownNotUnset(threadLocalMap, value);} else {remove();}}private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {// 将InternalThreadLocalMap中数据替换为新的valueif (threadLocalMap.setIndexedVariable(index, value)) {// 并将当前的FastThreadLocal对象保存到待清理的Set中addToVariablesToRemove(threadLocalMap, this);}}private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {// 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);Set<FastThreadLocal<?>> variablesToRemove;if (v == InternalThreadLocalMap.UNSET || v == null) {// 下标index为0的数据为空,则创建FastThreadLocal对象Set集合variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());// 将InternalThreadLocalMap中下标为0的数据,设置成FastThreadLocal对象Set集合threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);} else {variablesToRemove = (Set<FastThreadLocal<?>>) v;}// 将FastThreadLocal对象保存到待清理的Set中variablesToRemove.add(variable);}// 省略其他代码}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {// 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSETpublic static final Object UNSET = new Object();// 存储绑定到当前线程的数据的数组private Object[] indexedVariables;// 绑定到当前线程的数据的数组能再次采用x2扩容的最大量private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;// 将InternalThreadLocalMap中数据替换为新的valuepublic boolean setIndexedVariable(int index, Object value) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object oldValue = lookup[index];// 直接将数组 index 位置设置为 value,时间复杂度为 O(1)lookup[index] = value;return oldValue == UNSET;} else { // 绑定到当前线程的数据的数组需要扩容,则扩容数组并数组设置新valueexpandIndexedVariableTableAndSet(index, value);return true;}}private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray = indexedVariables;final int oldCapacity = oldArray.length;int newCapacity;// 判断可进行x2方式进行扩容if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {newCapacity = index;// 位操作,提升扩容效率newCapacity |= newCapacity >>> 1;newCapacity |= newCapacity >>> 2;newCapacity |= newCapacity >>> 4;newCapacity |= newCapacity >>> 8;newCapacity |= newCapacity >>> 16;newCapacity ++;} else { // 不支持x2方式扩容,则设置绑定到当前线程的数据的数组容量为最大值newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;}// 按扩容后的大小创建新数组,并将老数组数据copy到新数组Object[] newArray = Arrays.copyOf(oldArray, newCapacity);// 新数组扩容后的部分赋UNSET缺省值Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);// 新数组的index位置替换成新的valuenewArray[index] = value;// 绑定到当前线程的数据的数组用新数组替换indexedVariables = newArray;}// 省略其他代码}
源码中 set() 方法主要分为下面3个步骤处理:
判断value是否是缺省值UNSET,如果value不等于缺省值,则会通过InternalThreadLocalMap.get()方法获取当前线程的InternalThreadLocalMap,具体实现3.2小节中get()方法已做讲解。通过FastThreadLocal中的setKnownNotUnset()方法将InternalThreadLocalMap中数据替换为新的value,并将当前的FastThreadLocal对象保存到待清理的Set中。如果等于缺省值UNSET或null(else的逻辑),会调用remove()方法,remove()具体见后面的代码分析。
接下来我们看下
InternalThreadLocalMap.setIndexedVariable方法的实现逻辑。
判断index是否超出存储绑定到当前线程的数据的数组indexedVariables的长度,如果没有超出,则获取index位置的数据,并将该数组index位置数据设置新value。
如果超出了,绑定到当前线程的数据的数组需要扩容,则扩容该数组并将它index位置的数据设置新value。
扩容数组以index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省值UNSET,最终把新数组赋值给 indexedVariables。
下面我们再继续看下
FastThreadLocal.addToVariablesToRemove方法的实现逻辑。
1.取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合中),如果该数据是缺省值UNSET或null,则会创建FastThreadLocal对象Set集合,并将该Set集合填充到下标index为0的数组位置。
2.如果该数据不是缺省值UNSET,说明Set集合已金被填充,直接强转获取该Set集合。
3.最后将FastThreadLocal对象保存到待清理的Set集合中。
4.4 remove、removeAll方法
public class FastThreadLocal<V> {// FastThreadLocal初始化时variablesToRemoveIndex被赋值为0private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();public final void remove() {// 获取当前线程的InternalThreadLocalMap// 删除当前的FastThreadLocal对象及其维护的数据remove(InternalThreadLocalMap.getIfSet());}public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap == null) {return;}// 根据当前线程的index,并将该数组下标index位置对应的值设置为缺省值UNSETObject v = threadLocalMap.removeIndexedVariable(index);// 存储待清理的FastThreadLocal对象Set集合中删除当前FastThreadLocal对象removeFromVariablesToRemove(threadLocalMap, this);if (v != InternalThreadLocalMap.UNSET) {try {// 空方法,用户可以继承实现onRemoval((V) v);} catch (Exception e) {PlatformDependent.throwException(e);}}}public static void removeAll() {InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();if (threadLocalMap == null) {return;}try {// 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);if (v != null && v != InternalThreadLocalMap.UNSET) {("unchecked")Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;// 遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据FastThreadLocal<?>[] variablesToRemoveArray =variablesToRemove.toArray(new FastThreadLocal[0]);for (FastThreadLocal<?> tlv: variablesToRemoveArray) {tlv.remove(threadLocalMap);}}} finally {// 删除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap数据InternalThreadLocalMap.remove();}}private static void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {// 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);if (v == InternalThreadLocalMap.UNSET || v == null) {return;}("unchecked")// 存储待清理的FastThreadLocal对象Set集合中删除该FastThreadLocal对象Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;variablesToRemove.remove(variable);}// 省略其他代码}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {// 根据当前线程获取InternalThreadLocalMappublic static InternalThreadLocalMap getIfSet() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return ((FastThreadLocalThread) thread).threadLocalMap();}return slowThreadLocalMap.get();}// 数组下标index位置对应的值设置为缺省值UNSETpublic Object removeIndexedVariable(int index) {Object[] lookup = indexedVariables;if (index < lookup.length) {Object v = lookup[index];lookup[index] = UNSET;return v;} else {return UNSET;}}// 删除threadLocalMap和slowThreadLocalMap数据public static void remove() {Thread thread = Thread.currentThread();if (thread instanceof FastThreadLocalThread) {((FastThreadLocalThread) thread).setThreadLocalMap(null);} else {slowThreadLocalMap.remove();}}// 省略其他代码}
源码中 remove() 方法主要分为下面2个步骤处理:
通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。具体和3.2小节get()方法里面获取当前线程的InternalThreadLocalMap相似,这里就不再重复介绍了。删除当前的FastThreadLocal对象及其维护的数据。
源码中 removeAll() 方法主要分为下面3个步骤处理:
通过InternalThreadLocalMap.getIfSet()获取当前线程的InternalThreadLocalMap。取下标index为0的数据(用于存储待清理的FastThreadLocal对象Set集合),然后遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据。最后会将InternalThreadLocalMap本身从线程中移除。
五、总结
那么使用ThreadLocal时最佳实践又如何呢?
每次使用完ThreadLocal实例,在线程运行结束之前的finally代码块中主动调用它的remove()方法,清除Entry中的数据,避免操作不当导致的内存泄漏。
使⽤Netty的FastThreadLocal一定比JDK原生的ThreadLocal更快吗?
不⼀定。当线程是FastThreadLocalThread,则添加、获取FastThreadLocal所维护数据的时间复杂度是 O(1),⽽使⽤ThreadLocal可能存在哈希冲突,相对来说使⽤FastThreadLocal更⾼效。但如果是普通线程则可能更慢。
使⽤FastThreadLocal有哪些优点?
正如文章开头介绍JDK原生ThreadLocal存在的缺点,FastThreadLocal全部优化了,它更⾼效、而且如果使⽤的是FastThreadLocal,它会在任务执⾏完成后主动调⽤removeAll⽅法清除数据,避免潜在的内存泄露。
END
猜你喜欢
本文分享自微信公众号 - vivo互联网技术(vivoVMIC)。
 如有侵权,请联系 support@oschina.cn 删除。
 本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
关注公众号
					低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 
							
								
								    上一篇
								    
								
								【直播预告】头哥对谈厉启鹏:开源技术的应用和商业价值分析
OSCHINA 直播——【开源漫谈】第四期来咯!10 月 22 日周日晚 8:00,头哥对谈厉启鹏,聊一聊开源技术的应用和商业价值分析。 王晔倞(头哥),「头哥侃码」主理人,专注分享技术、创业与产品创新等主题内容。 厉启鹏(寈峰),现为 vanus.ai CEO,曾就职于阿里云,Apache RocketMQ PMC ,长期专注于 AI 基础设施软件及中间件。 本次直播,将回答四个问题: 1、数字化产品如何做商业化? 2、做项目有前途吗? 3、技术怎么能变现? 4、如何经营一个优秀的开源项目? 直播主题:头哥对谈寈峰:开源技术的应用和商业价值分析 直播时间:10 月 22 日 20:00 直播平台:“OSC 开源社区” 视频号、“头哥侃码”视频号 主办方:开源中国、头哥侃码 微信扫码预约直播,欢迎加入 OSC 直播交流群,一起唠嗑~ 直播福利 互动抽奖:在直播评论区提问,被直播嘉宾回复的用户可获 OSC T 恤 1 件,名额不限。 福袋抽奖:直播中将有多轮抽奖,参与就有机会获得 OSC T 恤、笔记本、马克杯 、前沿技术书籍等。 我们直播间见吧~ 特别感谢本次直播合作社区: Vanus...
 - 
							
								
								    下一篇
								    
								
								MySQL字段的字符类型该如何选择?千万数据下varchar和char性能竟然相差30%?
MySQL字段的字符类型该如何选择?千万数据下varchar和char性能竟然相差30%? 前言 上篇文章MySQL字段的时间类型该如何选择?千万数据下性能提升10%~30%🚀我们讨论过时间类型的选择 本篇文章来讨论MySQL中字符类型的选择并来深入实践char与varchar类型的最佳使用场景 字符类型 我们最经常使用的字符串类型应该是char与varchar,它们作为本篇文章的主角,对于它们的描述我们放在后文详细介绍 文本字符串 当需要存储长文本时,可以使用文本类型 先来看看存储文本字符串的类型,从小到大依次为TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXT 它们分别用于存储不同大小的文本,读取文本时(由于文本可能较大),因此是从磁盘中读取的 文本类型的查询会慢,但是可以存放的内容多 类型 范围(单位字符) TINYTEXT 0到2^8-1(255B) TEXT 0到2^16-1(64KB) MEDIUMTEXT 0到2^24-1(16MB) LONGTEXT 0到2^32-1 (4GB) 字节串 当存储二进制数据流时,可以选择二进制类型 它们从小到大依次是:T...
 
相关文章
文章评论
共有0条评论来说两句吧...

			

				
				
				
				
				
				
				
微信收款码
支付宝收款码