首页 文章 精选 留言 我的

精选列表

搜索[java],共10000篇文章
优秀的个人博客,低调大师

Java多线程——FutureTask源码解析

一个很常见的多线程案例是,我们安排主线程作为分配任务和汇总的一方,然后将计算工作切分为多个子任务,安排多个线程去计算,最后所有的计算结果由主线程进行汇总。比如,归并排序,字符频率的统计等等。 我们知道Runnable是不返回计算结果的,如果想利用多线程的话,只能存储到一个实例的内部变量里面进行交互,但存在一个问题,如何判断是否已经计算完成了。用Thread.join是一个方案,但是我们只能依次等待一个线程结束后处理一个线程,如果线程1恰好特别慢,则后续已经完成的线程不能被及时处理。我们希望能够获知线程的执行状态,发现哪个线程处理完就先统计它的计算结果。可以考虑使用Callable和FutureTask来完成。 先说Callable它是一个功能接口,它只有一个方法V call(),计算一个结果,失败的话抛出一个异常。和Runnable不同的是,它不能直接交给Thread来执行,所以需要一个别的类来封装它与Runnable,这个类就是FutureTask。FutureTask是一个类,继承了RunnableFuture,而RunnableFuture是一个多继承接口,它继承了Runnable和 Future,所以FutureTask是可以作为实现了Runnable的实例交给Thread执行。 从内部变量来看,含有一个下层Callable实例,一个状态表示,一个返回结果,以及对运行线程的记录 /** * 任务的运行状态,最初是NEW。运行状态只在set, setException和cancel方法中过度到最终状态。 * 在完成过程中,状态可能发生转移到COMPLETING(在设置结果时)或者INTERRUPTING(仅当中断运行来满足cancel(true)时)。 * 从这些中间状态转移到最终状态使用成本更低有序/懒惰写入,因为值是唯一的且之后不能再修改。 * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** 下层的callable,运行后为null */ private Callable<V> callable; /** get()操作返回的结果或者抛出的异常*/ private Object outcome; // 不是volatile,由reads/writes状态来保护 /** 运行callable的线程,在run()通过CAS修改*/ private volatile Thread runner; /** 等待线程的Treiber堆栈 */ private volatile WaitNode waiters; 构造函数 构造函数总共有两种重载,第一种直接给出Callable实例 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // 确保callable的可见性 } 第二种,给出Runnable实例和期望的返回结果,如果Runnable实例运行成功则返回的是result public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result);//创建一个callable this.state = NEW; // 确保callable的可见性 } run run方法是Thread运行FutureTask内任务的接口。首先,根据最上方的进入条件可以看出,只有成功竞争到修改runnerOffset成功的线程才能执行后续方法,而搜索整个类文件,可以发现只有在run结束后才会重置为null,所以同一时间只能有一个线程执行run方法成功。然后要检查state和callable的状态,因为run会将它们修改。调用callable.call方法获取返回结果,成功的话设置结果,失败的话设置返回结果为异常。无论是否执行成功,runner会被重置为null。 public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return;//状态必须是NEW且修改执行线程成功,否则直接返回,避免被多个线程同时执行 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call();//调用callable.call方法获取返回结果 ran = true;//执行成功 } catch (Throwable ex) { result = null; ran = false;//执行失败 setException(ex);//设置返回异常并唤醒等待线程解除阻塞 } if (ran) set(result);//设置结果 } } finally { //runner直到状态设置完成不能为null来避免并发调用run() runner = null; //在将runner设置为null后需要重新读取state避免漏掉中断 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } set方法先修改state为COMPLETING,然后将outcome设置为刚才计算出来的结果,最后设置state为NORMAL,并调用finishCompletion。这个方法移除并通知所有等待的线程解除阻塞,调用done(),并将callable设为null。done方法默认是什么也不做。 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态 finishCompletion();//唤醒等待线程,将callable设为null } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//如果线程被park阻塞,解除阻塞 } WaitNode next = q.next; if (next == null) break; q.next = null; // 取消连接帮助gc q = next; } break; } } done();//未重写时什么也不做 callable = null; // to reduce footprint减少覆盖区 } setException跟set逻辑上基本一样,除了设置返回结果是Throwable对象 protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//修改状态 outcome = t;//结果为Throwable UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态 finishCompletion(); } } get get方法如果FutureTask已经执行完成则返回结果,否则会等待并阻止线程调度。等待时长可以输入,单位为纳秒,不输入为不限时等待,限时等待超时仍然没有完成会抛出异常。 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L);//等待完成 return report(s);//检查时返回结果还是抛出异常 } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//限时等待完成 throw new TimeoutException(); return report(s); } awaitDone这个方法会阻塞当前线程(get方法的调用线程)的调度并增加等待结点,阻塞时长根据输入的时间长度决定。如果执行Callable任务的线程完成了运行或者被中断,则会解除栈中等待结点对应线程的阻塞。然后会根据执行结果决定是否要抛出异常还是返回执行完成的结果。 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false;//是否完成入栈 for (;;) { if (Thread.interrupted()) {//检查线程是否已经被中断 removeWaiter(q);//移除被中断的等待结点 throw new InterruptedException(); } int s = state; if (s > COMPLETING) {//已经完成 if (q != null) q.thread = null;//移除等待 return s; } else if (s == COMPLETING) // cannot time out yet还没有超时 Thread.yield();//已经在赋值,所以只需让出时间片等待赋值完成 //下方都是还在没有完成call方法的情况 else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);//q加入到栈的最前方 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) {//超时了 removeWaiter(q);//移除超时的等待结点 return state; } LockSupport.parkNanos(this, nanos);//阻塞当前线程nanos纳秒 } else LockSupport.park(this);//阻塞当前线程 } } cancel cancel输入的参数表示如果当前还在运行中是否要中断执行线程,如果输入参数是false则只有线程已经执行完成或者抛出异常或者已经被中断时可以把状态修改为CANCELLED,如果是true则会中断线程并将状态改为INTERRUPTED。所以,cancel在该任务已经结束或者已被取消,或者竞争修改状态失败时都会失败。如果中断成功,会释放所有被阻塞的等待线程。 public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false;//已经完成或者被取消或者竞争取消失败返回false try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } 状态检查 非常简单的两个方法。因为CANCELLED是state中最大的,所以只有cancel方法成功才会是这种状态。而isDone只要不是还在运行或者还没有被执行就是返回true。 public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } 简单的使用示例 public class CallableTest implements Callable<Integer>{ private int start; public CallableTest(int start) { this.start = start; } @Override public Integer call() throws Exception { Thread.sleep(500); return start + 1; } public static void main(String args[]) throws InterruptedException, ExecutionException{ long start = System.currentTimeMillis(); FutureTask<Integer> task1 = new FutureTask<>(new CallableTest(2)); new Thread(task1).start(); FutureTask<Integer> task2 = new FutureTask<>(new CallableTest(4)); new Thread(task2).start(); System.out.println(task1.get() + task2.get());//8 long end = System.currentTimeMillis(); System.out.println(end - start);//506 } }

优秀的个人博客,低调大师

Java多线程——ThreadLocal源码解析

ThreadLocal 这个类提供线程局部变量。这些变量在每一个线程中的正常副本都不相同,每一个线程访问一个副本(通过其 get或 set法),副本有自己独立的变量初始化复制。ThreadLocal实例通常是类中私有的静态字段希望关联状态和线程(例如,一个用户ID或交易ID)。 例如,下面的类为每个线程生成唯一的标识符。一个线程的ID在第一次调用ThreadId.get()时分配并且在后续调用中保持不变。 public class ThreadId { // Atomic integer containing the next thread ID to be assigned private static final AtomicInteger nextId = new AtomicInteger(0); // Thread local variable containing each thread's ID private static final ThreadLocal<Integer> threadId = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return nextId.getAndIncrement(); } }; // Returns the current thread's unique ID, assigning it if necessary public static int get() { return threadId.get(); } } 每一个线程在活着的时候就持有一个暗示对它线程本地变量拷贝的引用并且ThreadLocal实例可以访问;线程死去后它的线程本地实例拷贝受垃圾回收管辖(除非存在其他对这些副本引用)。 ——以上是对ThreadLocal注释的翻译 ThreadLocal是作为key值存储在ThreadLocalMap里面的,而ThreadLocalMap是一个典型的hash表,它的实例存储在了Thread.threadLocals,并且由于并非所有线程实例都需要用到threadLocals,它是懒汉式的初始化,在第一次插入时才会初始化创建ThreadLocalMap实例。输入的value是一个泛型对象,它可以是Integer、Double等基本装箱类型,也可以是自定义的bean类,可以认为Thread实例t拥有一个ThreadLocalMap实例threadLocals,它是一个hash表,它以ThreadLocal的实例作为key值,以具体内容泛型变量value作为value值,每个线程在存活期间有自己的ThreadLocalMap实例,所以各线程间互不干涉。 public class ThreadLocalTest { ThreadLocal<Long> longLocal = new ThreadLocal<Long>(); ThreadLocal<String> stringLocal = new ThreadLocal<String>(); public void set() { longLocal.set(Thread.currentThread().getId()); stringLocal.set(Thread.currentThread().getName()); } public long getLong() { return longLocal.get(); } public String getString() { return stringLocal.get(); } public static void main(String[] args) throws InterruptedException { final ThreadLocalTest test = new ThreadLocalTest(); test.set(); System.out.println(test.getLong()); System.out.println(test.getString()); Thread thread1 = new Thread() { public void run() { test.set(); System.out.println(test.getLong()); System.out.println(test.getString()); }; }; thread1.start(); thread1.join(); System.out.println(test.getLong()); System.out.println(test.getString()); /*1 main 11 Thread-0 1 main*/ } } 构造函数 因为ThreadLocal的作用是提供实例作为key值,它需要提供的内部变量就是hashcode,而hashcode是由别的方法生成,所以构造函数除了初始化实例外什么也不做 public ThreadLocal() { } set set方法设置当前线程的线程本地变量副本为指定的值。大部分子类不需要重写这个方法,只依靠initialValue来设置线程本地变量值。先获取当前线程中的ThreadLocalMap实例,然后检查是否为null。还没有创建的话需要先新创建一个,将这个ThreadLoca和value值放入map中。 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } /** * 获取关联ThreadLocal的map。在InheritableThreadLocal中重写。 */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } /** * 创建一个关联ThreadLocal的map。在InheritableThreadLocal中重写。 */ void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } get get返回这个线程本地变量在当前线程中的副本值。如果这个变量在当前线程中没有值,第一次通过调用initialValue初始化这个值并返回。 public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue();//map不存在或map中没有这个对象时调用setInitialValue } setInitialValue方法是set方法的变体,用于创建初始化值。如果用户已经重写了set方法,可以用作set方法的替代。 private T setInitialValue() { T value = initialValue();//未重写直接返回null Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } initialValue返回此线程局部变量的“初始值”。该方法将在一个线程第一次用get方法访问变量时被调用,除非线程之前调用了 set方法,在这种情况下, initialValue方法不会被这个线程调用。通常情况下,这种方法是每个线程调用一次,但它可能在后续调用get随后调用remove而再次调用。这种实现简单的返回null;如果程序员渴望线程局部变量有一个初始值而不是null,ThreadLocal必须有子类,并重写这个方法。通常情况下,将使用一个匿名内部类。简单来说,如果未set这个ThreadLocal的value值而直接调用get会导致在map中添加一个初始化的value值,如果没有重写ThreadLocal中的这个方法,那么初始值是null。 protected T initialValue() { return null; } remove remove方法移除当前线程的线程本地变量值。如果这个线程本地变量随后被当前线程用get方法读取,它的值会被initialValue重新初始化,除非这之间当前线程调用了set方法。这可能会导致当前线程中多次调用initialValue方法。 public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null)//map未初始化时什么都不做 m.remove(this); } ThreadLocalMap ThreadLocalMap是ThreadLocal的内部类,是一个典型的hash表,只适合存储线程本地变量。没有操作暴露到ThreadLocal类外部。这个类是包私有的,允许在Thread中声明字段。为了帮助解决非常大并且长期存活的使用,hash表条目对key使用WeakReference。然而,因为没有使用引用队列,旧的条目只在表用完空间时才保证移除。 Entry ThreadLocalMap的条目也是自己实现的。这个hash表的条目扩展了WeakReference,使用它的只要引用字段作为k(总是ThreadLocal对象)。注意null的key值(比如entry.get() == null)意味着key不再被引用,因此条目可以从表删除。这样的条目作为“旧条目”在下方代码中被引用。ThreadLocal是弱引用,而value是强引用,如果创建ThreadLocal的线程一直持续运行,那么这个Entry对象中的value就有可能一直得不到回收,发生内存泄露。 static class Entry extends WeakReference<ThreadLocal<?>> { /** 关联这个ThreadLocal的值 */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k);//将ThreadLocal作为key用于构造WeakReference value = v; } } 内部变量和构造函数 table是存储条目的数组,初始大小一定是16 /** * 初始容量,必需是2的指数次 */ private static final int INITIAL_CAPACITY = 16; /** * 表,有必要的话需要resize。table.length必须总是2的指数次 */ private Entry[] table; /** * 表中条目的数量 */ private int size = 0; /** * 到达后要进行resize的大小 */ private int threshold; // Default to 0 构造函数前面在createMap(Thread, T)中已经提到过,创建一个新的map初始化包含firstKey, firstValue)。ThreadLocalMap是懒汉式构建,因此我们只有当有至少一个条目要放入时再创建一个。负载因子至少是2/3。 ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } /** * 设置resize的门槛保证最差有2/3的负载因子 */ private void setThreshold(int len) { threshold = len * 2 / 3; } 还有一个版本是创建一个新的map包含所有来自父map的可继承的ThreadLocal。只会由createInheritedMap调用。因为较少使用就不说了。 getEntry getEntry获取和key关联的条目。这个方法本身只处理快速通道:已有key直接命中,否则会转移到getEntryAfterMiss。这种设计是为了最大化直接命中的效率,部分通过使这个方法容易非线性读取来实现。 private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1);//根据hash值计算出直接命中的位置 Entry e = table[i]; if (e != null && e.get() == key) return e;//命中 else return getEntryAfterMiss(key, i, e);//未命中 } 如果直接目标未命中,需要getEntryAfterMiss来进行线性探测。由于散列的hash值碰撞情况很小,所以比起直接用循环查找的方法效率更高。在线性探测过程中,移动就是最简单的移动到尾部循环至头部,如果发现ThreadLocal为null说明已经被移除,需要删除这个结点的引用使得可以进行回收。 private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i);//不再有引用了,需要删除 else i = nextIndex(i, len);//向后移动一位 e = tab[i]; } return null; } private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } hash 刚才提到了hash和表中对象的关系,那么要研究下ThreadLocalMap的hash值是怎么设计的。首先,要明确的一点是ThreadLocalMap使用的是开放地址法线性探测解决hash碰撞的问题,而hash值是在ThreadLocal中的。我们可以看到ThreadLocal中跟hash值计算有关的部分,用一个AtomicInteger来计算,是static也就是说有一个静态的值每新建一个ThreadLocal实例就会增加,但增加的间隔并不是1,而是0x61c88647,这个值得选取是为了减少碰撞的发生,具体原理和斐波那契散列法以及黄金分割有关。 所以,对于同样的ThreadLocal变量,各线程的ThreadLocalMap中它们都处于同一个数组位置。因此,还是建议每个线程只存一个变量,这样的话所有的线程存放到map中的Key都是相同的ThreadLocal,如果一个线程要保存多个变量,就需要创建多个ThreadLocal,多个ThreadLocal放入Map中时会极大的增加hash冲突的可能。如果必须使用多个变量,0x61c88647可以尽可能减少冲突的发生。因为表的大小永远是2的指数次,所以和len-1进行位与操作等价为直接取模。 /** * ThreadLocal依赖每个线程线性探测hash表关联到每个线程(Thread.threadLocals和inheritableThreadLocals)。 * ThreadLocal对象作为key,通过threadLocalHashCode来查找。 * 这是一个典型的hash值(只在ThreadLocalMaps内有用)消除当同样的线程连续创建ThreadLocal实例常见状况下的冲突,同时不太常见的状况也是良性的。 */ private final int threadLocalHashCode = nextHashCode(); /** * 下一个要给出的hash值,自动更新,从0开始。 */ private static AtomicInteger nextHashCode = new AtomicInteger(); /** * 连续产生的hash值之间的差值-使得连续的线程本地ID变得隐式连续,接近最佳地扩展到大小是2的指数次表的乘法倍数的hash值上。 */ private static final int HASH_INCREMENT = 0x61c88647;//‭0110 0001 1100 1000 1000 0110 0100 0111‬ /** * 返回下一个hash值 */ private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } set set没有像get那样使用快速路径是因为使用set创建一个新的条目至少和替换一个已有的是一样频率,这样快速路径会经常失败。set会从hash对应的位置开始线性查找指定的key值,如果找到entry存在但key为null的位置说明已经被remove,使用replaceStaleEntry替换过时的值。如果找到了这个key值则替换已有值。如果找到了entry为null的位置说明还未被使用过,直接新建一个entry放到这个位置上。 private void set(ThreadLocal<?> key, Object value) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value;//替换已有值 return; } if (k == null) { replaceStaleEntry(key, value, i);//替换过时的值 return; } } tab[i] = new Entry(key, value);//hash值对应的位置没有插入过元素 int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } replaceStaleEntry用一个有指定key的条目替换在set操作中遇到的过时条目。value参数传递的值存储在条目中,无论有这个key的条目是否已经存在。作为一个副作用,这个方法擦除了一趟中所有过时的条目(一趟指两个entry为null位置间的序列)。新的条目放在staleSlot位置上,而从这个位置开始向前和向后倒两个entry为null的位置之间所有key为null的条目都会被清除。 private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; //返回检查当前趟中之前的过期条目。我们一次清除整个趟避免因为垃圾回收释放串中的引用而频繁增加的rehash int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))//从staleSlot开始向前直到entry为null的位置为止最靠前的引用为null的条目 if (e.get() == null) slotToExpunge = i; //寻找key或者趟里后面null位置两者中先发生的 for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {//循环从staleSlot向后倒entry为null的位置 ThreadLocal<?> k = e.get(); //如果找到key,我们需要交换它和过期条目来保持hash表顺序。新的过期位置或者在它之前任何其他过期位置, //可以被发送给expungeStaleEntry来移除或者rehash趟中的其他所有条目 if (k == key) {//找到了set操作中要插入的key e.value = value; tab[i] = tab[staleSlot];//将key相同的结点与staleSlot位置的结点交换 tab[staleSlot] = e; // 如果有的话开始擦除先前的过期结点 if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);//第一个参数是从slotToExpunge到下一个null的位置,第二个参数是table长度 return; } //向前查找没有找到过期条目,在查找key时第一个发现的过期条目还是趟中的第一个 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // 如果没有找到key,将新的条目放在过期的位置 tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // 如果这一趟中还有其他过期条目,擦除它们 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); } expungeStaleEntry通过rehash任何在staleSlot与下一个null位置之间可能冲突条目来擦除过期条目。这也擦除了在随后的null之前遇到的所有其他过期条目。返回staleSlot之后下一个null的位置(在staleSlot和这个位置之间的都被检查是否要擦除)。 private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // 擦除在staleSlot的条目 tab[staleSlot].value = null; tab[staleSlot] = null; size--; // 直到遇到null之前rehash Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) {//key为null说明已经被remove,删除其他数据 e.value = null; tab[i] = null; size--; } else {//key存在,移动到接近hash直接定位的地方 int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; } cleanSomeSlots检查log2(n)个位置,除非找到了一个过期条目,额外检查log2(table.length)-1个位置。插入时调用这个参数是元素个数,replaceStaleEntry调用时这个参数是table的大小。 private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) {//找到过期的条目需要清除 n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0);//n=n/2 return removed; } set在新增一个条目到没有使用过的位置时,导致size增加,同时会触发rehash。会清空所有过期的条目,并根据size大小判断是否需要扩大数组,如果要扩大数组则数组大小*2。 private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize();//size达到了resize的大小,扩大数组 } /** * 两倍扩大表 */ private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (int j = 0; j < oldLen; ++j) { Entry e = oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; } /** * 清除表中稳定所有过期条目 */ private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } } } remove 移除key对应的条目,直接通过线性探测法找到对应的key值,条目调用clear()方法后key会变为null但条目仍然存在,通过expungeStaleEntry(int)将table中这个位置置为null,并且gc可以回收这个条目。当线程终止时,它的ThreadLocal对象引用会变为null,那么在后续使用中table中不再有引用的条目会被垃圾回收,但是如果线程长时间存活但ThreadLocal对象不再被使用,需要显示的调用remove方法,避免内存泄漏。 private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {//线性查找hash值相等的条目 if (e.get() == key) { e.clear();//清除引用 expungeStaleEntry(i);//删掉过期条目 return; } } }

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

用户登录
用户注册