Netty源码分析-- FastThreadLocal分析(十)

2019-08-04

      上节讲过了ThreadLocal的源码,这一节我们来看下FastThreadLocal。这个我觉得要比ThreadLocal要简单,因为缺少了对于Entry的清理和整理工作,所以ThreadLocal的效率更高。

      跟ThreadLocal一样,我们也先给一个结构图:

      

      大家看这个图跟ThreadLocal有哪些区别,首先是用一个Object数组来替代了Entry数组,不再是key键值对的形式。 另外Object[0]存储一个Set<FastThreadLocal<?>>集合。

      OK,看完这个,源码就很好理解了,我们还是先看下 InternalThreadLocalMap 构造函数

1     private InternalThreadLocalMap() {
2         super(newIndexedVariableTable());
3     }
4 
5     private static Object[] newIndexedVariableTable() {
6         Object[] array = new Object[32]; //初始化 32 长度的Object数组
7         Arrays.fill(array, UNSET);  // 将每个元素初始化成UNSET 这里的UNSET 可以理解为占位符, 因为null会被认为成有效值
8         return array;
9     }

10    UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
11 this.indexedVariables = indexedVariables; // 将创建的array赋给 Object[] indexedVariables;
12 }

     看完这个,我们来看下数组的索引值是什么设置的,打开  FastThreadLocal, 看下全局变量。

1 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

    上面这个了解类的加载机制的应该很清楚 variablesToRemoveIndex  初始化的时机。想要了解类的加载过程的 请移步 类的加载机制(一)

 1     static final AtomicInteger nextIndex = new AtomicInteger(); // 原子类
 2 
 3     public static int nextVariableIndex() {
 4         int index = nextIndex.getAndIncrement(); // 先获取值,在增加,所以index = 0 
 5         if (index < 0) {
 6             nextIndex.decrementAndGet();
 7             throw new IllegalStateException("too many thread-local indexed variables");
 8         }
 9         return index;
10     }

       由此得出,variablesToRemoveIndex  = 0 固定值。再看下 FastThreadLocal 的构造方法

1     public FastThreadLocal() {
2         index = InternalThreadLocalMap.nextVariableIndex();
3     }

      大家看到这个,是不是就想到了,也就是说每个 FastThreadLocal 都有一个唯一的 index 值 , 那么跟ThreadLocal相比的话,ThreadLocal要 先获取一个hash值,然后再根据Entry数组的长度进行运算得到一个索引值,所以说这样也是Netty的这个FastThreadLocal效率更高的原因之一。

     为了更好地讲下面的内容,我们再看一个 FastThreadLocalThread 

     继承了Thread,增加了成员变量 InternalThreadLocalMap threadLocalMap.  不再是放在Thread中的成员变量了,看到这个想到了什么? 那么也就是说获取某个线程存储Object数组结构的Map,是从FastThreadLocalThread中获取。

     好了介绍完上面,看set方法

 1     public final void set(V value) {
 2         if (value != InternalThreadLocalMap.UNSET) { // 如果不是UNSET
 3             InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 获取map
 4             if (setKnownNotUnset(threadLocalMap, value)) { // 新增或者更新值
 5                 registerCleaner(threadLocalMap); // 如果返回true, 代表新增, 设置清理位
 6             }
 7         } else { 
 8             remove(); // 如果入参是一个UNSET,那么执行删除逻辑
 9         }
10     }

     看下 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();

1     public static InternalThreadLocalMap get() {
2         Thread thread = Thread.currentThread();
3         if (thread instanceof FastThreadLocalThread) { // 如果当前线程是一个FastThreadLocalThread
4             return fastGet((FastThreadLocalThread) thread); // 执行fastGet 这个名字是很有意思。。。。
5         } else {
6             return slowGet(); // 要不然就slowGet() Netty叫ThreadLocal  Slow。。。
7         }
8     }
1     private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { // 如果当前的map没有设置,则新创建一个
2         InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();  // 这里就是获取了 FastThreadLocalThread 中的成员变量 threadLocalMap
3         if (threadLocalMap == null) {
4             thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
5         }
6         return threadLocalMap;
7     }

     看下新增或者更新的逻辑

1     private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
2         if (threadLocalMap.setIndexedVariable(index, value)) { // index 是当前FastThreadLocal的索引 ,如果是 新增,那么这个方法返回true ,如果是修改则返回 false
3             addToVariablesToRemove(threadLocalMap, this); // 新增的话,需要将当前的这个FastThreadLocal添加到Set<FashThreadLocal>集合中
4             return true; // 新增返回trye
5         }
6         return false; // 更新返回false
7     }
 1     public boolean setIndexedVariable(int index, Object value) {
 2         Object[] lookup = indexedVariables; // 当前的Object[]对象
 3         if (index < lookup.length) { // 检查当前的索引是否超过了Object数组的长度
 4             Object oldValue = lookup[index]; // 获取当前的值
 5             lookup[index] = value; // 将新值设置进去
 6             return oldValue == UNSET; // 判断之前的值是不是占位符,如果是则代表新增,不是代表更新
 7         } else {
 8             expandIndexedVariableTableAndSet(index, value); // 如果超过了,则需要进行扩容
 9             return true;
10         }
11     }
 1     private void expandIndexedVariableTableAndSet(int index, Object value) { // 扩容逻辑
 2         Object[] oldArray = indexedVariables; // 当前的Object数组
 3         final int oldCapacity = oldArray.length; // 当前的长度
 4         int newCapacity = index;  // index是这个FastThreadLocal对应的索引值
 5         newCapacity |= newCapacity >>>  1;  
 6         newCapacity |= newCapacity >>>  2;
 7         newCapacity |= newCapacity >>>  4;
 8         newCapacity |= newCapacity >>>  8;
 9         newCapacity |= newCapacity >>> 16;
10         newCapacity ++;       // 这段其实也很有意思,其实是为了计算新的数组的容量,会变成下一个档位的2的次方大小,比如 1->2 2->4 3->4 4->8 5->8 6->8 7->8 8->16 18->32, 如果不理解,
// 可以自己写一个main方法试一下,后面我们在讲Netty内存模型的时候大家也会看到这么一段。
11 12 Object[] newArray = Arrays.copyOf(oldArray, newCapacity); // 将老的数据往新的数组进行拷贝 13 Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); // 将新的多出来的部分,设置占位符 14 newArray[index] = value; // 将index对应的值设置完 15 indexedVariables = newArray; // 将新的数组提升成Map中的indexedVariables 16 }

      接下来看下set中的这句 addToVariablesToRemove(threadLocalMap, this);  根据上面的结构,新增了value,那么就需要修改set集合

 1     private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
 2         Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 根据上面的分析 variablesToRemoveIndex = 0 获取第0个位置的Object 然后看是不是null或者占位符
 3         Set<FastThreadLocal<?>> variablesToRemove;
 4         if (v == InternalThreadLocalMap.UNSET || v == null) { // 如果是占位符
 5             variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
 6             threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 创建一个set集合放到第0的位置上
 7         } else {
 8             variablesToRemove = (Set<FastThreadLocal<?>>) v; // 已经有了,则直接取过来
 9         }
10 
11         variablesToRemove.add(variable); // 新增新的FastThreadLocal到set集合
12     }

      接下来看get方法

 1     public final V get() {
 2         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 获取map
 3         Object v = threadLocalMap.indexedVariable(index); // 根据当前的Thread的索引,获取Object
 4         if (v != InternalThreadLocalMap.UNSET) { // 不是占位符直接返回
 5             return (V) v;
 6         }
 7 
 8         V value = initialize(threadLocalMap); // 如果是,则调用初始化方法
 9         registerCleaner(threadLocalMap);
10         return value;
11     }
 1     private V initialize(InternalThreadLocalMap threadLocalMap) {
 2         V v = null;
 3         try {
 4             v = initialValue(); // 空方法,供子类调用
 5         } catch (Exception e) {
 6             PlatformDependent.throwException(e);
 7         }
 8 
 9         threadLocalMap.setIndexedVariable(index, v); // 设置初始化的值
10         addToVariablesToRemove(threadLocalMap, this);  // 添加set集合
11         return v;
12     }

       接下来看remove方法

 1     public final void remove(InternalThreadLocalMap threadLocalMap) {
 2         if (threadLocalMap == null) {
 3             return;
 4         }
 5 
 6         Object v = threadLocalMap.removeIndexedVariable(index); // 根据当前的Thread的索引,获取Object
 7         removeFromVariablesToRemove(threadLocalMap, this); // 移除set集合中的元素,这里就不展开说了,上面懂了就很简单了
 8 
 9         if (v != InternalThreadLocalMap.UNSET) {
10             try {
11                 onRemoval((V) v); // 空方法,供子类调用
12             } catch (Exception e) {
13                 PlatformDependent.throwException(e);
14             }
15         }
16     }

 好了,这就算是讲完了,当然FastThreadLocal 不会整理数据和清除过期数据,是怎么防止内存泄露的呢?

 看下 FastThreadLocalRunnable

 1 public class FastThreadLocalRunnable implements Runnable {
 2     private Runnable runnable;
 3 
 4     public FastThreadLocalRunnable(Runnable runnable) {
 5         this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
 6     }
 7 
 8     public static Runnable wrap(Runnable runnable) {
 9         return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
10     }
11 
12     @Override
13     public void run() {
14         try {
15             // 运行任务
16             this.runnable.run();
17         } finally {
18             /**
19              * 线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其InternalThreadLocalMap和其内的FastThreadLocal信息,
20              * 否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用时的信息;
21              * 另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其上的ThreadLocal将发生资源泄露。
22              */
23             FastThreadLocal.removeAll();
24         }
25     }
26 }

        使用该类将一个普通的Runnable对象进行wrap装饰,之后在调用FastThreadLocalRunnable.run()的时候,实际上会调用真实对象(即普通的Runnable对象)的run(),执行完成之后,会进行对当前线程的全量回收操作(删除当前线程上的InternalThreadLocalMap中的每一个value以及threadLocalMap本身),这样就可以有效的在线程池中复用当前线程而不必关心ftl的错乱和泄漏问题。