源码分析之ThreadLocal
博客专区 > Mr_Qi 的博客 > 博客详情
源码分析之ThreadLocal
Mr_Qi 发表于2个月前
源码分析之ThreadLocal
  • 发表于 2个月前
  • 阅读 2467
  • 收藏 139
  • 点赞 5
  • 评论 7
腾讯云 技术升级10大核心产品年终让利>>>   

前言

Java项目中通常为了并发数据准确性经常使用Lock或者synchronized来作为并发的手段。 也就是说作为共享资源必然需要通过同步等手段来实现。那么转换一下思路, 我们确实在每个地方都需要用到共享资源么? 如果我们所有的变量都是私有的 那自然不需要同步就是thread-safe的 ThreadLocal就是这样应运而生。人如其名就是线程私有对象。

实现

ThreadLocal

/* <p>Each thread holds an implicit reference to its copy of a thread-local * variable as long as the thread is alive and the <tt>ThreadLocal</tt> * instance is accessible; after a thread goes away, all of its copies of * thread-local instances are subject to garbage collection (unless other * references to these copies exist). * * @author Josh Bloch and Doug Lea * @since 1.2 */ public class ThreadLocal<T> { /** * ThreadLocals rely on per-thread linear-probe hash maps attached * to each thread (Thread.threadLocals and * inheritableThreadLocals). The ThreadLocal objects act as keys, * searched via threadLocalHashCode. This is a custom hash code * (useful only within ThreadLocalMaps) that eliminates collisions * in the common case where consecutively constructed ThreadLocals * are used by the same threads, while remaining well-behaved in * less common cases. */ private final int threadLocalHashCode = nextHashCode(); /** * The next hash code to be given out. Updated atomically. Starts at * zero. */ private static AtomicInteger nextHashCode = new AtomicInteger(); /** * The difference between successively generated hash codes - turns * implicit sequential thread-local IDs into near-optimally spread * multiplicative hash values for power-of-two-sized tables. */ private static final int HASH_INCREMENT = 0x61c88647; /** * Returns the next hash code. */ private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } /** * Returns the current thread's "initial value" for this * thread-local variable. This method will be invoked the first * time a thread accesses the variable with the {@link #get} * method, unless the thread previously invoked the {@link #set} * method, in which case the <tt>initialValue</tt> method will not * be invoked for the thread. Normally, this method is invoked at * most once per thread, but it may be invoked again in case of * subsequent invocations of {@link #remove} followed by {@link #get}. * * <p>This implementation simply returns <tt>null</tt>; if the * programmer desires thread-local variables to have an initial * value other than <tt>null</tt>, <tt>ThreadLocal</tt> must be * subclassed, and this method overridden. Typically, an * anonymous inner class will be used. * * @return the initial value for this thread-local */ protected T initialValue() { return null; } /** * Creates a thread local variable. */ public ThreadLocal() { } /** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) return (T)e.value; } return setInitialValue(); } /** * Variant of set() to establish initialValue. Used instead * of set() in case user has overridden the set() method. * * @return the initial value */ private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } /** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */ public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } /** * Removes the current thread's value for this thread-local * variable. If this thread-local variable is subsequently * {@linkplain #get read} by the current thread, its value will be * reinitialized by invoking its {@link #initialValue} method, * unless its value is {@linkplain #set set} by the current thread * in the interim. This may result in multiple invocations of the * <tt>initialValue</tt> method in the current thread. * * @since 1.5 */ public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); } /** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } /** * Create the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the map * @param map the map to store. */ void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } } 从上述代码可以看到ThreadLocal是存放在当前线程中 Thread t = Thread.currentThread(); 通过上述代码获取到当前线程。而线程存在一个字段threadLocals这个 这个存放了一个ThreadLocalMap(名字是map但是没有实现map的接口 实际确实也是干的map的事情) 其实可以粗略的认为每个线程存在一个HashMap key是ThreadLocal变量 value为对应泛型的值 那么获取对应数据只要使用get即可(也提供了初始化变量功能) 放入数据直接调用set即可

ThreadLocalMap

/** * ThreadLocalMap is a customized hash map suitable only for * maintaining thread local values. No operations are exported * outside of the ThreadLocal class. The class is package private to * allow declaration of fields in class Thread. To help deal with * very large and long-lived usages, the hash table entries use * WeakReferences for keys. However, since reference queues are not * used, stale entries are guaranteed to be removed only when * the table starts running out of space. */ static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal k, Object v) { super(k); value = v; } } /** * The initial capacity -- MUST be a power of two. */ private static final int INITIAL_CAPACITY = 16; /** * The table, resized as necessary. * table.length MUST always be a power of two. */ private Entry[] table; /** * The number of entries in the table. */ private int size = 0; /** * The next size value at which to resize. */ private int threshold; // Default to 0 /** * Set the resize threshold to maintain at worst a 2/3 load factor. */ private void setThreshold(int len) { threshold = len * 2 / 3; } /** * Increment i modulo len. */ private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } /** * Decrement i modulo len. */ private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } /** * Construct a new map initially containing (firstKey, firstValue). * ThreadLocalMaps are constructed lazily, so we only create * one when we have at least one entry to put in it. */ ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } /** * Construct a new map including all Inheritable ThreadLocals * from given parent map. Called only by createInheritedMap. * * @param parentMap the map associated with parent thread. */ private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { ThreadLocal key = e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } } 看一下ThreadLocalMap 有一个很关键的静态类Entry private static class Entry<K,V> extends WeakReference<Object> implements Map.Entry<K,V> { V value; int hash; Entry<K,V> next; /** * Creates new entry. */ Entry(Object key, V value, ReferenceQueue<Object> queue, int hash, Entry<K,V> next) { super(key, queue); this.value = value; this.hash = hash; this.next = next; } @SuppressWarnings("unchecked") public K getKey() { return (K) WeakHashMap.unmaskNull(get()); } public V getValue() { return value; } public V setValue(V newValue) { V oldValue = value; value = newValue; return oldValue; } public boolean equals(Object o) { if (!(o instanceof Map.Entry)) return false; Map.Entry<?,?> e = (Map.Entry<?,?>)o; K k1 = getKey(); Object k2 = e.getKey(); if (k1 == k2 || (k1 != null && k1.equals(k2))) { V v1 = getValue(); Object v2 = e.getValue(); if (v1 == v2 || (v1 != null && v1.equals(v2))) return true; } return false; } public int hashCode() { K k = getKey(); V v = getValue(); return ((k==null ? 0 : k.hashCode()) ^ (v==null ? 0 : v.hashCode())); } public String toString() { return getKey() + "=" + getValue(); } } WeakReference可能大部分开发并没有注意过(Android开发者可能经常会使用)
 Java从1.2版本开始引入了4种引用,这4种引用的级别由高到低依次为:    强引用  >  软引用  >  弱引用  >  虚引用 ⑴强引用(StrongReference)
    强引用是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它。当内存空间不足,Java虚拟机宁愿抛出OutOfMemoryError错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足的问题。
⑵软引用(SoftReference)     如果一个对象只具有软引用,则内存空间足够,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。     软引用可以和一个引用队列(ReferenceQueue)联合使用,如果软引用所引用的对象被垃圾回收器回收,Java虚拟机就会把这个软引用加入到与之关联的引用队列中。 ⑶弱引用(WeakReference)     弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程,因此不一定会很快发现那些只具有弱引用的对象。     弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java虚拟机就会把这个弱引用加入到与之关联的引用队列中。 ⑷虚引用(PhantomReference)     “虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收。     虚引用主要用来跟踪对象被垃圾回收器回收的活动。虚引用与软引用和弱引用的一个区别在于:虚引用必须和引用队列 (ReferenceQueue)联合使用。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之 关联的引用队列中。
ThreadLocalMap中使用Entry来作为迭代元素,其中WeakReference修饰了ThreadLocal,那么当某个ThreadLocal变量没有强引用那么当GC扫描到就会回收该Entry,那么就无内存泄漏之虞了。 当然我们线程中一定会保留ThreadLocal对象的强引用,因此这边也不会回收。因此一个合理的使用每次线程池归还一定要调用remove(方便释放同时也不会对下一次borrow线程造成影响) /** * Set the value associated with key. * * @param key the thread local object * @param value the value to be set */ private void set(ThreadLocal key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } /** * Re-pack and/or re-size the table. First scan the entire * table removing stale entries. If this doesn't sufficiently * shrink the size of the table, double the table size. */ private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } /** * Double the capacity of the table. */ private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; } 这边数据的策略比较常见,当发生hash冲突直接将对应数据放入下一个不为空节点即可。当然涉及到HashMap(类似)免不了的就是扩容和rehash等操作 当然数据在删除的时候可能要将后面不为空的节点重新 计算需要挪动到指定的位置 /** * Remove the entry for key. */ private void remove(ThreadLocal key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } /** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; } 当然为了线程互不干扰我们可以需要调用ThreadLocal.remove方法移除数据 /** * Removes the current thread's value for this thread-local * variable. If this thread-local variable is subsequently * {@linkplain #get read} by the current thread, its value will be * reinitialized by invoking its {@link #initialValue} method, * unless its value is {@linkplain #set set} by the current thread * in the interim. This may result in multiple invocations of the * <tt>initialValue</tt> method in the current thread. * * @since 1.5 */ public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) m.remove(this); }

InheritableThreadLocal

对于ThreadLocal各位可能会有些问题,比如将对应的数据封装到了线程中,但是后面调用比如异步任务之后就会发现对应的ThreadLocal变量取不出数据了。 这个场景很正常  比如在线程池起个线程发送消息等等 那么我们可以使用InheritableThreadLocal来实现,顾名思义,从这个线程中起的子线程将会可以继承对应变量。 /** * This class extends <tt>ThreadLocal</tt> to provide inheritance of values * from parent thread to child thread: when a child thread is created, the * child receives initial values for all inheritable thread-local variables * for which the parent has values. Normally the child's values will be * identical to the parent's; however, the child's value can be made an * arbitrary function of the parent's by overriding the <tt>childValue</tt> * method in this class. * * <p>Inheritable thread-local variables are used in preference to * ordinary thread-local variables when the per-thread-attribute being * maintained in the variable (e.g., User ID, Transaction ID) must be * automatically transmitted to any child threads that are created. * * @author Josh Bloch and Doug Lea * @see ThreadLocal * @since 1.2 */ public class InheritableThreadLocal<T> extends ThreadLocal<T> { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. * <p> * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * * @param t the current thread * @param firstValue value for the initial entry of the table. * @param map the map to store. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } } women看一下Thread的初始化 /** * Initializes a Thread. * * @param g the Thread group * @param target the object whose run() method gets called * @param name the name of the new Thread * @param stackSize the desired stack size for the new thread, or * zero to indicate that this parameter is to be ignored. */ private void init(ThreadGroup g, Runnable target, String name, long stackSize) { if (name == null) { throw new NullPointerException("name cannot be null"); } Thread parent = currentThread(); SecurityManager security = System.getSecurityManager(); if (g == null) { /* Determine if it's an applet or not */ /* If there is a security manager, ask the security manager what to do. */ if (security != null) { g = security.getThreadGroup(); } /* If the security doesn't have a strong opinion of the matter use the parent thread group. */ if (g == null) { g = parent.getThreadGroup(); } } /* checkAccess regardless of whether or not threadgroup is explicitly passed in. */ g.checkAccess(); /* * Do we have the required permissions? */ if (security != null) { if (isCCLOverridden(getClass())) { security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); } } g.addUnstarted(); this.group = g; this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); this.name = name.toCharArray(); if (security == null || isCCLOverridden(parent.getClass())) this.contextClassLoader = parent.getContextClassLoader(); else this.contextClassLoader = parent.contextClassLoader; this.inheritedAccessControlContext = AccessController.getContext(); this.target = target; setPriority(priority); if (parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); /* Stash the specified stack size in case the VM cares */ this.stackSize = stackSize; /* Set thread ID */ tid = nextThreadID(); } 很明显可以看到当parent的inheritableThreadLocals变量不为空将会传递到子线程中。

应用

通常会定义ThreadLocal变量为static final 比如我们会在定义 private static final ThreadLocal<List<String>> SQL_LIST_TL = new ThreadLocal<>(); private static final ThreadLocal<String> IP_TL = new ThreadLocal<>(); private static final ThreadLocal<String> USER_TL = new ThreadLocal<>(); private static final ThreadLocal<String> ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<String> MAIN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<Set<String>> IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<Set<String>> PERMISSION_IDS_OWN_ORG_TL = new ThreadLocal<>(); private static final ThreadLocal<String> DATASOURCE_ROUTING_TL = new ThreadLocal<>(); private static final ThreadLocal<String> ACTION_TL = new ThreadLocal<>(); private static final ThreadLocal<ActionType> TYPE_TL = new ThreadLocal<>(); private static final ThreadLocal<String> CUSTOMER_FOR_SUPPLIER_TL = new ThreadLocal<>(); private static final ThreadLocal<Channel> CHANNEL_TL = new ThreadLocal<>(); private static final ThreadLocal<Boolean> SECURITY_ENABLE_TL = new ThreadLocal<>(); private static final ThreadLocal<Boolean> COUNT_ENABLE_TL = new ThreadLocal<>(); private static final List<ThreadLocal> THREAD_LOCAL_LIST = new ArrayList<>(); private static final ThreadLocal<LoginDomain> LOGIN_DOMAIN_TL=new ThreadLocal<>(); static { THREAD_LOCAL_LIST.add(SQL_LIST_TL); THREAD_LOCAL_LIST.add(IP_TL); THREAD_LOCAL_LIST.add(USER_TL); THREAD_LOCAL_LIST.add(ORG_TL); THREAD_LOCAL_LIST.add(IDS_OWN_ORG_TL); THREAD_LOCAL_LIST.add(PERMISSION_IDS_OWN_ORG_TL); THREAD_LOCAL_LIST.add(DATASOURCE_ROUTING_TL); THREAD_LOCAL_LIST.add(ACTION_TL); THREAD_LOCAL_LIST.add(TYPE_TL); THREAD_LOCAL_LIST.add(CUSTOMER_FOR_SUPPLIER_TL); THREAD_LOCAL_LIST.add(CHANNEL_TL); THREAD_LOCAL_LIST.add(SECURITY_ENABLE_TL); THREAD_LOCAL_LIST.add(COUNT_ENABLE_TL); THREAD_LOCAL_LIST.add(MAIN_ORG_TL); THREAD_LOCAL_LIST.add(LOGIN_DOMAIN_TL); } public static void clearThreadLocal() { for (ThreadLocal tl : THREAD_LOCAL_LIST) { if (tl != null) { tl.remove(); } } } clearThreadLocal会由统一释放切面的地方进行调用。这样完成threadLocal的清理
共有 人打赏支持
粉丝 191
博文 199
码字总数 207127
评论 (7)
星光似霰
:+1:
cukes
我就看到一个women
Mr_Qi

引用来自“cukes”的评论

我就看到一个women
眼神不错哈哈,笔误了
gm100861

引用来自“Mr_Qi”的评论

引用来自“cukes”的评论

我就看到一个women
眼神不错哈哈,笔误了
women不是女人的意思吗?
Mr_Qi

引用来自“gm100861”的评论

引用来自“Mr_Qi”的评论

引用来自“cukes”的评论

我就看到一个women
眼神不错哈哈,笔误了
women不是女人的意思吗?
回复@gm100861 : 拼音 我们……women
tanghc
我觉得理解ThreadLocal关键搞清楚两个问题,1:value在ThreadLocal中的存储方式,2:ThreadLocal与Thread之间的关联。
Mr_Qi

引用来自“tanghc”的评论

我觉得理解ThreadLocal关键搞清楚两个问题,1:value在ThreadLocal中的存储方式,2:ThreadLocal与Thread之间的关联。
回复@tanghc : 对的~ThreadLocalMap
×
Mr_Qi
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: