没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;
那么问题来了,有没有更高效的ThreadLocal有;
今天我们就来分析一波FastThreadLocalThread
一、FastThreadLocalThread源码分析
Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;
Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;
1、FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;
- io.netty.util.concurrent.DefaultThreadFactory
- @Override
- public Thread newThread(Runnable r) {
- Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
- // 一般daemon为false,意思是不设置为守护线程
- if (t.isDaemon() != daemon) {
- t.setDaemon(daemon);
- }
- // 优先级 默认为5
- if (t.getPriority() != priority) {
- t.setPriority(priority);
- }
- return t;
- }
- protected Thread newThread(Runnable r, String name) {
- return new FastThreadLocalThread(threadGroup, r, name);
- }
FastThreadLocalThread 继承自Thread类,有如下成员变量:
- io.netty.util.concurrent.FastThreadLocalThread
- // 任务执行完,是否清除FastThreadLocal的标记
- private final boolean cleanupFastThreadLocals;
- // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal
- private InternalThreadLocalMap threadLocalMap;
2、 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:
- private InternalThreadLocalMap() {
- //为了简便,InternalThreadLocalMap父类
- //UnpaddedInternalThreadLocalMap不展开介绍
- super(newIndexedVariableTable());
- }
- //默认的数组大小为32,且使用UNSET对象填充数组
- //如果下标处数据为UNSET,则表示没有数据
- private static Object[] newIndexedVariableTable() {
- Object[] array = new Object[32];
- Arrays.fill(array, UNSET);
- return array;
- }
为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;
- //InternalThreadLocalMap
- // Cache line padding (must be public)
- // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
- public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:
- static final AtomicInteger nextIndex = new AtomicInteger();
- public static int nextVariableIndex() {
- //Netty中所有FTL数组下标都是通过递增这个静态变量实现的
- //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
- //会造成InternalThreadLocalMap中数组不必要的自动扩容
- int index = nextIndex.getAndIncrement();
- if (index < 0) {
- nextIndex.decrementAndGet();
- throw new IllegalStateException("too many thread-local indexed variables");
- }
- return index;
- }
InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:
- private void expandIndexedVariableTableAndSet(int index, Object value) {
- Object[] oldArray = indexedVariables;
- final int oldCapacity = oldArray.length;
- //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,
- //这段代码在早期的 jdk HashMap 中见过
- int newCapacity = index;
- newCapacity |= newCapacity >>> 1;
- newCapacity |= newCapacity >>> 2;
- newCapacity |= newCapacity >>> 4;
- newCapacity |= newCapacity >>> 8;
- newCapacity |= newCapacity >>> 16;
- newCapacity ++;
- Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
- Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
- newArray[index] = value;
- indexedVariables = newArray;
- }
3、FastThreadLocal
构造函数:
有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;
- //如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
- //该FastThreadLocalThread.threadLocalMap数组的
- // variablesToRemoveIndex下标处放置一个IdentityHashMap,
- //并将该FTL放入IdentityHashMap中,在后续清理时会取出
- //variablesToRemoveIndex下标处的IdentityHashMap进行清理
- private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
- //在threadLocalMap数组中存放实际数据的下标
- private final int index;
- public FastThreadLocal() {
- index = InternalThreadLocalMap.nextVariableIndex();
- }
用户可扩展的函数:
- //初始化 value 函数
- protected V initialValue() throws Exception {
- return null;
- }
- //让使用者在该FTL被移除时可以有机会做些操作。
- protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了
-
- // This will be set to true if we have a chance to wrap the Runnable.
- //这个字段则用于标识该线程在结束时是否会主动清理FTL
- private final boolean cleanupFastThreadLocals;
- //次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建
- private InternalThreadLocalMap threadLocalMap;
- public FastThreadLocalThread(Runnable target) {
- super(FastThreadLocalRunnable.wrap(target));
- cleanupFastThreadLocals = true;
- }
4、 set 方法
- public final void set(V value) {
- //判断设置的 value 值是否是缺省值
- if (value != InternalThreadLocalMap.UNSET) {
- //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取
- //否则通过 jdk 原生的 threadLocal 获取
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- //FastThreadLocal 对应的 index 下标的 value 替换成新的 value
- setKnownNotUnset(threadLocalMap, value);
- } else {
- //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象
- remove();
- }
- }
这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
- private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
- //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.
- if (threadLocalMap.setIndexedVariable(index, value)) {
- //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的
- //IdentityHashMap,等待后续清理
- addToVariablesToRemove(threadLocalMap, this);
- }
- }
-
- @SuppressWarnings("unchecked")
- private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal> variable) {
- //获取 variablesToRemoveIndex下标处的 IdentityHashMap
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- Set
> variablesToRemove; - //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET
- if (v == InternalThreadLocalMap.UNSET || v == null) {
- //新建一个新的 IdentityHashMap 并
- variablesToRemove = Collections.newSetFromMap(new IdentityHashMap
, Boolean>()); - //放入到下标variablesToRemoveIndex处
- threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
- } else {
- variablesToRemove = (Set
>) v; - }
- //将该FTL放入该IdentityHashMap中
- variablesToRemove.add(variable);
- }
下面看InternalThreadLocalMap.get()实现:
- public static InternalThreadLocalMap get() {
- Thread thread = Thread.currentThread();
- //首先看当前 thread 是否为FastThreadLocalThread实例
- //如果是的话,可以快速通过引用,获取到其 threadLocalMap
- if (thread instanceof FastThreadLocalThread) {
- return fastGet((FastThreadLocalThread) thread);
- } else {
- //如果不是,则 jdk 原生慢速获取到其 threadLocalMap
- return slowGet();
- }
- }
5、 get 方法
get方法极为简单,实现如下:
- ===========================FastThreadLocal==========================
- public final V get() {
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- Object v = threadLocalMap.indexedVariable(index);
- if (v != InternalThreadLocalMap.UNSET) {
- return (V) v;
- }
- return initialize(threadLocalMap);
- }
首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;
- initialize
- ============================FastThreadLocal==========================
- private V initialize(InternalThreadLocalMap threadLocalMap) {
- V v = null;
- try {
- //1、获取初始值
- v = initialValue();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- // 2、设置value到InternalThreadLocalMap中
- threadLocalMap.setIndexedVariables(index, v);
- // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set
>中 - addToVariablesToRemove(threadLocalMap, this);
- return v;
- }
- //初始化参数:由子类复写
- protected V initialValue() throws Exception {
- return null;
- }
- 获取 ThreadLocalMap
- 直接通过索引取出对象
- 如果为空那么调用初始化方法初始化
6、ftl的资源回收机制
netty中ftl的两种回收机制回收机制:
自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;
手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;
- FastThreadLocalRunnable
- final class FastThreadLocalRunnable implements Runnable {
- private final Runnable runnable;
- @Override
- public void run() {
- try {
- runnable.run();
- } finally {
- FastThreadLocal.removeAll();
- }
- }
- static Runnable wrap(Runnable runnable) {
- return runnable instanceof FastThreadLocalRunnable
- ? runnable : new FastThreadLocalRunnable(runnable);
- }
- }
如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。
- ===============================FastThreadLocal===========================
- public static void removeAll() {
- // 获取到map
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
- if (threadLocalMap == null) {
- return;
- }
- try {
- // 获取到Set
集合 - Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- if (v != null && v != InternalThreadLocalMap.UNSET) {
- @SuppressWarnings("unchecked")
- Set
> variablesToRemove = ( Set>) v; - // 将Set转换为数组
- FastThreadLocal>[] variablesToRemoveArray =
- variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
- // 遍历数组,删除每一个FastThreadLocal对应的value
- for (FastThreadLocal> tlv: variablesToRemoveArray) {
- tlv.remove(threadLocalMap);
- }
- }
- } finally {
- // 删除当前线程的InternalThreadLocalMap
- InternalThreadLocalMap.remove();
- }
- }
- public static void remove() {
- Thread thread = Thread.currentThread();
- if (thread instanceof FastThreadLocalThread) {
- // 将FastThreadLocalThread 内部的map置位null
- ((FastThreadLocalThread) thread).setThreadLocalMap(null);
- } else {
- // 将 ThreadLocal内部ThreadLocalMap 中的value置位null
- slowThreadLocalMap.remove();
- }
- }
remove方法:
- ===============================FastThreadLocal==========================
- private void remove() {
- remove(InternalThreadLocalMap.getIfSet());
- }
- private void remove(InternalThreadLocalMap threadLocalMap) {
- if (threadLocalMap == null) {
- return;
- }
- // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET
- Object v = threadLocalMap.removeIndexedVariable(index);
- // 从 InternalThreadLocalMap 中的Set
>中删除当前的FastThreadLocal对象 - removeFromVariablesToRemove(threadLocalMap, this);
- // 如果删除的是有效值,则进行onRemove方法的回调
- if (v != InternalThreadLocalMap.UNSET) {
- try {
- // 回调子类复写的onRemoved方法,默认为空实现
- onRemoved((V) v);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
总结
只有不断的学习,不断的找到自己的缺点才可以进步;
一起加油;