本文将详细介绍 ConcurrentHashMap 构造方法、添加值方法和扩容操作等源码实现。ConcurrentHashMap 是线程安全的哈希表,此哈希表的设计主要目的是在最小化更新操作对哈希表的占用,以保持并发可读性,次要目的是保持空间消耗与 HashMap 相同或更好,并支持利用多线程在空表上高效地插入初始值。在 Java 8 及之后的版本,使用 CAS 操作、 synchronized 关键字、合适的 自旋重试 和 volatile关键字(保证可见性和禁止指令重排)来保证并发安全,并对节点进行了优化:采用了链表和红黑树的实现,在链表节点数量大于等于 8 且数组(在后文中会称每个元素位置为桶)大小大于等于 64 时会转变为红黑树,在扩容逻辑中,当树节点小于等于 6 时又会转换成链表(删除逻辑中链表转换红黑树的逻辑并不严格按照大小为 6 的阈值),优化空间利用并提高查询效率。它的默认大小为 16,负载因子为 0.75F,负载因子不支持指定其他值,这是与 HashMap 的不同点,在讲解构造方法的源码时,会提到这一点,大家需要留意,接下来步入正文:
构造方法
首先我们来看它的构造方法,重点关注注释信息:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private transient volatile int sizeCtl; private static final int MAXIMUM_CAPACITY = 1 << 30; * 构造方法,其他构造函数最终都会调用该方法,但实际上在构造方法中并没有完成初始化 * * @param initialCapacity 指定初始化大小 * @param loadFactor 负载因子,注意该值并没有被任何字段记录下来,而是只参与了 size 的计算 * @param concurrencyLevel 指定并发线程数,用于校正大小 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long) (1.0 + (long) initialCapacity / loadFactor); int cap = (size >= (long) MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int) size); this.sizeCtl = cap; } private static final int tableSizeFor(int c) { int n = -1 >>> Integer.numberOfLeadingZeros(c - 1); return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }
}
负载因子 loadFactor 作为局部变量计算完 size 后,并没有被记录下来,后续有关该值的逻辑,如扩容阈值的计算均使用了默认值 0.75F。这么做的原因在源码 JavaDoc 中有详细的解释:
理想情况下,容器中的节点遵循泊松分布,一个桶中有 k 个元素的概率分布如下:0:0.606530661:0.303265332:0.075816333:0.012636064:0.001579525:0.000157956:0.000013167:0.000000948:0.00000006更多:不到千万分之一在随机散列下,两个线程访问不同元素的锁争用概率约为 1 / (8 * 元素数量)。
在负载因子为 0.75F 时,能够较好的防止多个元素发生碰撞,在随机散列的情况下,多线程发生锁争抢的概率较低。
ConcurrentHashMap#tableSizeFor 方法计算结果会将数组大小固定为 2 的 n 次幂,这样做是为了 提高性能和简化实现,以下为详细解释:
-
位运算优化:当哈希表的大小是 2 的 n 次幂时,可以使用位运算来代替取模运算(%),从而提高哈希表操作的性能。比如计算某元素在数组中的位置,index = hash % table.length 可以简化为 index = hash & (table.length - 1),位运算 & 通常比取模运算更快
-
哈希分布均匀性:2 的 n 次幂减 1 的 2 进制表示中低位均为 1,哈希值与它进行位与计算可直接获取索引值,这样可以减少哈希冲突的概率,使分布更加均匀
-
简化扩容逻辑:在扩容时,直接指定新表的大小是旧表的两倍(也是 2 的 n 次幂),元素的重新分配变得更加简便,要么元素的位置要么保持不变,要么移动到新位置 index + oldCapacity,这种移动逻辑可以通过简单的位运算实现
put 方法
put 方法是核心方法,需要重点关注添加值时使用到的 CAS + synchronized 的同步机制。更新元素计数的 addCount 方法采用了非常巧妙的实现,后文中我们会详细介绍。除此之外,扩容操作也会专门进行说明,它协调多线程共同完成扩容的解决方案也很值得学习,这些内容掌握之后,ConcurrentHashMap 中也再没有更难的内容。我们以 put 为切入点,重点关注注释信息:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { static final int TREEIFY_THRESHOLD = 8; static final int HASH_BITS = 0x7fffffff; private static final int DEFAULT_CAPACITY = 16;
transient volatile Node<K,V>[] table; private transient volatile int sizeCtl;
public V put(K key, V value) { return putVal(key, value, false); }
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }}
putTreeVal 方法为向红黑树中添加元素的方法,如果想了解红黑树可以参考这篇文章深入理解经典红黑树,在此处就不再解释相关逻辑了。
在这段源码逻辑中,我能能发现一些具有“隐藏”性的赋值操作,比如在如下逻辑中,变量 n 的赋值:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
final V putVal(K key, V value, boolean onlyIfAbsent) {
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); } }
}
它是在 if 条件 判断中完成赋值的,这样写代码确实相对精简一些,但是也仅限于在此(或技术组件中),我觉得如果在业务代码中这样写,可读性就比较差了。
put 方法向数组中每个桶添加第一个元素时,都会使用 CAS 操作。为了节省空间,并没有为每个桶都创建一个锁,而是将每个桶中第一个元素作为锁,当发生哈希碰撞时,依赖 synchronized 完成锁定。这里值得关注的是:在获取到该元素的锁后,又重复判断了链表头节点是否仍然为该元素(双重检测),因为该元素可能被其他线程操作删除,在接下来的源码中还能看到很多在执行完同步操作后重新再判断是否符合条件的逻辑,这也是在提醒我们:写线程同步相关代码时有时需要再校验。当某个桶中第一个元素被锁定时,其他操作该桶元素的线程操作会被阻塞,如果 equals 方法耗时较长,可能会影响性能,但实际上,这种情况并不常见。
addCount 更新元素计数方法
接下来我们看一下 addCount 更新元素数量的方法,该方法实现的元素计数非常有意思:在更新元素数量时未发生冲突则始终使用 baseCount 来表示哈希表中元素适量,一旦 CAS 更新数量失败,它便会创建一个 CounterCell[] counterCells 来协助统计元素数量,总数量为 baseCount 和 CounterCell[] counterCells 中计数值的累加。并且判断哈希表是否需要扩容也是在这里完成的。本节方法非常重要,需要大家重点关注:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private static final int RESIZE_STAMP_BITS = 16; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
transient volatile Node<K,V>[] table; private transient volatile Node<K,V>[] nextTable;
private transient volatile CounterCell[] counterCells; private static final long BASECOUNT; private transient volatile long baseCount; transient volatile int cellsBusy; static final int NCPU = Runtime.getRuntime().availableProcessors();
final V putVal(K key, V value, boolean onlyIfAbsent) {
addCount(1L, binCount); return null; }
private final void addCount(long x, int check) { CounterCell[] cs; long b, s; if ((cs = counterCells) != null || !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell c; long v; int m; boolean uncontended = true; if (cs == null || (m = cs.length - 1) < 0 || (c = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT; if (sc < 0) { if (sc == rs + MAX_RESIZERS || sc == rs + 1 || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2)) transfer(tab, null); s = sumCount(); } } } private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; for (;;) { CounterCell[] cs; CounterCell c; int n; long v; if ((cs = counterCells) != null && (n = cs.length) > 0) { if ((c = cs[(n - 1) & h]) == null) { if (cellsBusy == 0) { CounterCell r = new CounterCell(x); if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; } } collide = false; } else if (!wasUncontended) wasUncontended = true; else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x)) break; else if (counterCells != cs || n >= NCPU) collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == cs) counterCells = Arrays.copyOf(cs, n << 1); } finally { cellsBusy = 0; } collide = false; continue; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == cs && U.compareAndSetInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { if (counterCells == cs) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x)) break; } } final long sumCount() { CounterCell[] cs = counterCells; long sum = baseCount; if (cs != null) { for (CounterCell c : cs) if (c != null) sum += c.value; } return sum; } static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }
}
在元素计数 fullAddCount 方法中,会为新增的元素数量 x 在 CounterCell[] 中找到一个槽位记录或累加,在进行该操作时使用了自旋重试保证执行成功。元素计数累加完成后,会对是否需要扩容进行判断,如果元素数量超过 sizeCtl 则会进行扩容操作,在扩容开始时,会将 sizeCtl 赋值为负数,这样在执行扩容方法时,只有一个线程能调用 transfer(tab, null); 方法,保证扩容后哈希表 nextTable 仅被初始化一次,完成多线程扩容的协调。这里弄明白之后,接下来重点看扩容方法 transfer。
transfer 扩容方法
扩容方法 transfer 允许多线程协同扩容,实现协同扩容的方法很巧妙,它定义了全局变量 transferIndex 用于记录当前扩容操作的进度(为 0 时表示快完成或已经完成,初始值为哈希表长度),规定每个线程的处理步长 stride 最小值为 16,如果 transferIndex 大于步长值 stride 时,其他线程调用该方法时会被分配扩容任务协助完成扩容,这样每个线程便被分配了一段处理范围,线程与线程间扩容互不影响,提高了扩容效率。以下为源码,其中已经注明了详细注释,大家需要根据源码和注释信息理解该过程:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { private static final int RESIZE_STAMP_BITS = 16; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
private static final int MIN_TRANSFER_STRIDE = 16; static final int MOVED = -1;
static final int NCPU = Runtime.getRuntime().availableProcessors(); static final int UNTREEIFY_THRESHOLD = 6;
private transient volatile Node<K,V>[] nextTable; private transient volatile int transferIndex; private transient volatile int sizeCtl;
* 扩容哈希表,将旧表 tab 中元素转移到 nextTab 中 * * @param tab 旧表 * @param nextTab 新表 */ private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null) { try { Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab); boolean advance = true; boolean finishing = false; for (int i = 0, bound = 0; ; ) { Node<K, V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSetInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K, V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K, V> lastRun = f; for (Node<K, V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K, V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K, V>(ph, pk, pv, ln); else hn = new Node<K, V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> lo = null, loTail = null; TreeNode<K, V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K, V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K, V> p = new TreeNode<K, V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K, V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K, V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null); this.nextTable = tab; } }}
在我们分析完上述源码后再来看 helpTransfer 方法就非常容易了,该方法用于其他线程协助完成哈希表的扩容操作,本质上还是会调用 transfer 方法:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
final Node<K, V>[] helpTransfer(Node<K, V>[] tab, Node<K, V> f) { Node<K, V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K, V>) f).nextTable) != null) { int rs = resizeStamp(tab.length) << RESIZE_STAMP_SHIFT; while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if (sc == rs + MAX_RESIZERS || sc == rs + 1 || transferIndex <= 0) break; if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }}
到目前为止,相对复杂的逻辑已经全部讲解完了,接下来的内容在理解了上述内容后再看会非常简单,所以如果没理解上方的源码内容需要再去熟悉熟悉。
treeifyBin 树化方法
treeifyBin方法用于将链表转换成红黑树,以提高查询效率,逻辑非常简单,关注注释信息即可,如下:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { static final int MIN_TREEIFY_CAPACITY = 64; private final void treeifyBin(Node<K, V>[] tab, int index) { Node<K, V> b; int n; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K, V> hd = null, tl = null; for (Node<K, V> e = b; e != null; e = e.next) { TreeNode<K, V> p = new TreeNode<K, V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K, V>(hd)); } } } } } private final void tryPresize(int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K, V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSetInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (U.compareAndSetInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }}
get 方法
get 方法非常简单,如下:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { public V get(Object key) { Node<K, V>[] tab; Node<K, V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }}
在 eh < 0 的条件下,表示两种情况:要么为红黑树节点,要么正在扩容(ForwardingNode),前者就不在这里赘述了。这里我们需要看一下在扩容时,哈希表是如何寻找对应节点的,如下为 ForwardingNode 中查找对应节点值的源码:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } static class Node<K,V> implements Map.Entry<K,V> {
Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }}
由以上源码可知,在寻找某节点时如果发现了 转发节点,那么证明该节点已经被转移到新的哈希表中,那么需要去新的哈希表中寻找。
remove 方法
remove 方法也非常简单,当某节点被删除时需要更新计数,addCount 我们在上文也介绍过,就不在赘述了。在红黑树中移除节点(removeTreeNode 方法)时,可能会调用到 untreeify 方法,这里的将红黑树转换成链表的逻辑与在 transfer 中根据节点数量小于等于阈值 6 的转换判断逻辑不同,removeTreeNode 决定将链表转换成红黑树时判断的依据是红黑树是否太小(too small):root == null || r.right == null || (rl = r.left) == null || rl.left == null,感兴趣的大家可以去源码中了解一下,在这里就不再贴源码了。以下为删除方法主要逻辑,重点关注注释信息:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
public V remove(Object key) { return replaceNode(key, null, null); }
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }}
computeIfAbsent 方法
先前我们提到过 ReservationNode 用于占位,那么我们就以 computeIfAbsent 方法为例,来简单看一下它是怎么来占位的,其中大部分逻辑我们在上文中已经介绍过,主要关注它的不同点:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) { if (key == null || mappingFunction == null) throw new NullPointerException(); int h = spread(key.hashCode()); V val = null; int binCount = 0; for (Node<K, V>[] tab = table; ; ) { Node<K, V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & h)) == null) { Node<K, V> r = new ReservationNode<K, V>(); synchronized (r) { if (casTabAt(tab, i, null, r)) { binCount = 1; Node<K, V> node = null; try { if ((val = mappingFunction.apply(key)) != null) node = new Node<K, V>(h, key, val); } finally { setTabAt(tab, i, node); } } } if (binCount != 0) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else if (fh == h && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else { boolean added = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K, V> e = f; ; ++binCount) { K ek; if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { val = e.val; break; } Node<K, V> pred = e; if ((e = e.next) == null) { if ((val = mappingFunction.apply(key)) != null) { if (pred.next != null) throw new IllegalStateException("Recursive update"); added = true; pred.next = new Node<K, V>(h, key, val); } break; } } } else if (f instanceof TreeBin) { binCount = 2; TreeBin<K, V> t = (TreeBin<K, V>) f; TreeNode<K, V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(h, key, null)) != null) val = p.val; else if ((val = mappingFunction.apply(key)) != null) { added = true; t.putTreeVal(h, key, val); } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update"); } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (!added) return val; break; } } } if (val != null) addCount(1L, binCount); return val; }}
到这里 ConcurrentHashMap 中的源码大部分已经介绍完了,接下来我们简单谈一些有意思的问题。
key 和 value 不能为 null 的妙用
ConcurrentHashMap 与 HashMap 不同,它是不允许 key 和 value 为 null 的,这是为什么呢?根据源码分析,我觉得主要原因是为了 简化并发逻辑,提高处理效率,这样当遇到某桶中元素为 null 时便能判定此处无元素并不需要为 null 做特殊处理。当然这样做也能 避免歧义,比如我们在使用 get 方法获取某 key 的值时,为 null 就表示该键值对不存在,而不会发生认为这个 key 存在但 value 为 null 的情况。
ConcurrentHashMap 对代码的编排
ConcurrentHashMap 为了增加可读性,它首先排列了主要的静态常量声明和内部常用的静态方法,再声明了字段,然后才是主要的公有方法,最后是通用的扩容方法、树的定义、迭代器和批量操作方法。之后我们在写代码时也可以参考它对字段方法的排列。
That's all