我是这样给阿里面试官吹 ConcurrentHashMap的
因为上篇文章HashMap已经讲解的很详细了,因此此篇文章会简单介绍思路,再学习并发HashMap就简单很多了,上一篇文章中我们最终知道HashMap是线程不安全的,因此在老版本JDK中提供了HashTable来实现多线程级别的,改变之处重要有以下几点。 ❝ HashTable的 put, get, remove等方法是通过 synchronized来修饰保证其线程安全性的。 HashTable是 不允许key跟value为null的。 问题是 synchronized是个关键字级别的重量锁,在get数据的时候任何写入操作都不允许。相对来说性能不好。因此目前主要用的 ConcurrentHashMap来保证线程安全性。 ❞ ConcurrentHashMap主要分为JDK<=7跟JDK>=8的两个版本,ConcurrentHashMap的空间利用率更低一般只有10%~20%,接下来分别介绍。 JDK7 先宏观说下JDK7中的大致组成,ConcurrentHashMap由Segment数组结构和HashEntry数组组成。Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构。正是通过Segment分段锁,ConcurrentHashMap实现了高效率的并发。缺点是并发程度是有segment数组来决定的,并发度一旦初始化无法扩容。先绘制个ConcurrentHashMap的形象直观图。要想理解currentHashMap,可以简单的理解为将数据「分表分库」。ConcurrentHashMap是由 Segment 数组 结构和HashEntry 数组 结构组成。 ❝ Segment 是一种可重入锁 ReentrantLock的子类 ,在 ConcurrentHashMap 里扮演锁的角色, HashEntry则用于存储键值对数据。 ConcurrentHashMap 里包含一个 Segment 数组来实现锁分离, Segment的结构和 HashMap 类似,一个 Segment里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment守护者一个 HashEntry 数组里的元素,当对 HashEntry数组的数据进行修改时,必须首先获得它对应的 Segment 锁。 ❞ 我们先看下segment类: staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{transientvolatileHashEntry<K,V>[]table;//包含一个HashMap可以理解为} 可以理解为我们的每个segment都是实现了Lock功能的HashMap。如果我们同时有多个segment形成了segment数组那我们就可以实现并发咯。 我们看下 currentHashMap的构造函数,先总结几点。 每一个segment里面包含的table(HashEntry数组)初始化大小也一定是2的次幂 这里设置了若干个用于位计算的参数。 initialCapacity:初始容量大小 ,默认16。 loadFactor: 扩容因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。 concurrencyLevel:并发度,默认16。并发度可以理解为程序运行时能够 「同时更新」ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的 分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。 segment的数组大小最终一定是2的次幂 构造函数详解: //initialCapacity是我们保存所以KV数据的初始值//loadFactor这个就是HashMap的负载因子//我们segment数组的初始化大小@SuppressWarnings("unchecked")publicConcurrentHashMap(intinitialCapacity,floatloadFactor,intconcurrencyLevel){if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)thrownewIllegalArgumentException();if(concurrencyLevel>MAX_SEGMENTS)//最大允许segment的个数,不能超过1<24concurrencyLevel=MAX_SEGMENTS;intsshift=0;//类似扰动函数intssize=1;while(ssize<concurrencyLevel){++sshift;ssize<<=1;//确保segment一定是2次幂}this.segmentShift=32-sshift;//有点类似与扰动函数,跟下面的参数配合使用实现当前元素落到那个segment上面。this.segmentMask=ssize-1;//为了取模专用if(initialCapacity>MAXIMUM_CAPACITY)//不能大于1<30initialCapacity=MAXIMUM_CAPACITY;intc=initialCapacity/ssize;//总的数组大小被segment分散后需要多少个tableif(c*ssize<initialCapacity)++c;//确保向上取值intcap=MIN_SEGMENT_TABLE_CAPACITY;//每个table初始化大小为2while(cap<c)//单独的一个segment[i]对应的table 容量大小。cap<<=1;//将table的容量初始化为2的次幂Segment<K,V>s0=newSegment<K,V>(loadFactor,(int)(cap*loadFactor),(HashEntry<K,V>[])newHashEntry[cap]);//负载因子,阈值,每个segment的初始化大小。跟hashmap 初始值类似。//并且segment的初始化是懒加载模式,刚开始只有一个s0,其余的在需要的时候才会增加。Segment<K,V>[]ss=(Segment<K,V>[])newSegment[ssize];UNSAFE.putOrderedObject(ss,SBASE,s0);//orderedwriteofsegments[0]this.segments=ss;} hash 不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。 //整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数privateinthash(Objectk){inth=hashSeed;if((0!=h)&&(kinstanceofString)){returnsun.misc.Hashing.stringHash32((String)k);}h^=k.hashCode();//single-wordWang/Jenkinshash.h+=(h<<15)^0xffffcd7d;h^=(h>>>10);h+=(h<<3);h^=(h>>>6);h+=(h<<2)+(h<<14);returnh^(h>>>16);} get 相对来说比较简单,无非就是通过 hash找到对应的 segment,继续通过 hash找到对应的 table,然后就是遍历这个链表看是否可以找到,并且要注意 get的时候是没有加锁的。 publicVget(Objectkey){Segment<K,V>s;HashEntry<K,V>[]tab;inth=hash(key);//JDK7中标准的hash值获取算法longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;//hash值如何映射到对应的segment上if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab=s.table)!=null){//无非就是获得hash值对应的segment是否存在,for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);e!=null;e=e.next){//看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表Kk;if((k=e.key)key||(e.hashh&&key.equals(k)))returne.value;}}returnnull;} put 相同的思路,先找到 hash值对应的 segment位置,然后看该 segment位置是否初始化了(因为segment是懒加载模式)。选择性初始化,最终执行put操作。 @SuppressWarnings("unchecked")publicVput(Kkey,Vvalue){Segment<K,V>s;if(valuenull)thrownewNullPointerException();inthash=hash(key);//还是获得最终hash值intj=(hash>>>segmentShift)&segmentMask;//hash值位操作对应的segment数组位置if((s=(Segment<K,V>)UNSAFE.getObject(segments,(j<<SSHIFT)+SBASE))null)s=ensureSegment(j);//初始化时候因为只有第一个segment,如果落在了其余的segment中则需要现初始化。returns.put(key,hash,value,false);//直接在数据中执行put操作。} 其中put操作基本思路跟HashMap几乎一样,只是在开始跟结束进行了加锁的操作tryLock and unlock,然后JDK7中都是先扩容再添加数据的,并且获得不到锁也会进行自旋的tryLock或者lock阻塞排队进行等待(同时获得锁前提前new出新数据)。 finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){//在往该segment写入前,需要先获取该segment的独占锁,获取失败尝试获取自旋锁HashEntry<K,V>node=tryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{//segment内部的数组HashEntry<K,V>[]tab=table;//利用hash值,求应该放置的数组下标intindex=(tab.length-1)&hash;//first是数组该位置处的链表的表头HashEntry<K,V>first=entryAt(tab,index);for(HashEntry<K,V>e=first;;){if(e!=null){Kk;if((k=e.key)key||(e.hashhash&&key.equals(k))){oldValue=e.value;if(!onlyIfAbsent){//覆盖旧值e.value=value;++modCount;}break;}//继续顺着链表走e=e.next;}else{// node 是不是 null,这个要看获取锁的过程。没获得锁的线程帮我们创建好了节点,直接头插法//如果不为 null,那就直接将它设置为链表表头;如果是 null,初始化并设置为链表表头。if(node!=null)node.setNext(first);elsenode=newHashEntry<K,V>(hash,key,value,first);intc=count+1;//如果超过了该segment的阈值,这个segment需要扩容if(c>threshold&&tab.length<MAXIMUM_CAPACITY)rehash(node);//扩容else//没有达到阈值,将node放到数组tab的index位置,//将新的结点设置成原链表的表头setEntryAt(tab,index,node);++modCount;count=c;oldValue=null;break;}}}finally{//解锁unlock();}returnoldValue;} 如果加锁失败了调用scanAndLockForPut,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,「提升」了put操作的性能,这里涉及到自旋。大致过程: ❝ 在我获取不到锁的时候我进行tryLock,准备好new的数据,同时还有一定的次数限制,还要考虑别的已经获得线程的节点修改该头节点。 ❞ privateHashEntry<K,V>scanAndLockForPut(Kkey,inthash,Vvalue){HashEntry<K,V>first=entryForHash(this,hash);HashEntry<K,V>e=first;HashEntry<K,V>node=null;intretries=-1;//negativewhilelocatingnode//循环获取锁while(!tryLock()){HashEntry<K,V>f;//torecheckfirstbelowif(retries<0){if(enull){if(nodenull)//speculativelycreatenode//进到这里说明数组该位置的链表是空的,没有任何元素//当然,进到这里的另一个原因是tryLock()失败,所以该槽存在并发,不一定是该位置node=newHashEntry<K,V>(hash,key,value,null);retries=0;}elseif(key.equals(e.key))retries=0;else//顺着链表往下走e=e.next;}//重试次数如果超过MAX_SCAN_RETRIES(单核1次多核64次),那么不抢了,进入到阻塞队列等待锁//lock()是阻塞方法,直到获取锁后返回elseif(++retries>MAX_SCAN_RETRIES){lock();break;}elseif((retries&1)0&&//进入这里,说明有新的元素进到了链表,并且成为了新的表头//这边的策略是,重新执行scanAndLockForPut方法(f=entryForHash(this,hash))!=first){e=first=f;//re-traverseifentrychangedretries=-1;}}returnnode;} Size 这个size方法比较有趣,他是先无锁的统计下所有的数据量看下前后两次是否数据一样,如果一样则返回数据,如果不一样则要把全部的segment进行加锁,统计,解锁。并且size方法只是返回一个统计性的数字,因此size谨慎使用哦。 publicintsize(){//Tryafewtimestogetaccuratecount.Onfailuredueto//continuousasyncchangesintable,resorttolocking.finalSegment<K,V>[]segments=this.segments;intsize;booleanoverflow;//trueifsizeoverflows32bitslongsum;//sumofmodCountslonglast=0L;//previoussumintretries=-1;//firstiterationisn'tretrytry{for(;;){if(retries++RETRIES_BEFORE_LOCK){//超过2次则全部加锁for(intj=0;j<segments.length;++j)ensureSegment(j).lock();//直接对全部segment加锁消耗性太大}sum=0L;size=0;overflow=false;for(intj=0;j<segments.length;++j){Segment<K,V>seg=segmentAt(segments,j);if(seg!=null){sum+=seg.modCount;//统计的是modCount,涉及到增删该都会加1intc=seg.count;if(c<0||(size+=c)<0)overflow=true;}}if(sumlast)//每一个前后的修改次数一样则认为一样,但凡有一个不一样则直接break。break;last=sum;}}finally{if(retries>RETRIES_BEFORE_LOCK){for(intj=0;j<segments.length;++j)segmentAt(segments,j).unlock();}}returnoverflow?Integer.MAX_VALUE:size;} rehash segment 数组初始化后就不可变了,也就是说 「并发性不可变」,不过 segment里的 table可以扩容为2倍,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的 rehash思路跟JDK8 中扩容后处理链表的思路一样,个人不过感觉没有8写的精髓好看。 //方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。privatevoidrehash(HashEntry<K,V>node){HashEntry<K,V>[]oldTable=table;intoldCapacity=oldTable.length;//2倍intnewCapacity=oldCapacity<<1;threshold=(int)(newCapacity*loadFactor);//创建新数组HashEntry<K,V>[]newTable=(HashEntry<K,V>[])newHashEntry[newCapacity];//新的掩码,如从16扩容到32,那么sizeMask为31,对应二进制‘000...00011111’intsizeMask=newCapacity-1;//遍历原数组,将原数组位置i处的链表拆分到新数组位置i和i+oldCap两个位置for(inti=0;i<oldCapacity;i++){//e是链表的第一个元素HashEntry<K,V>e=oldTable[i];if(e!=null){HashEntry<K,V>next=e.next;//计算应该放置在新数组中的位置,//假设原数组长度为16,e在oldTable[3]处,那么idx只可能是3或者是3+16=19intidx=e.hash&sizeMask;//新位置if(nextnull)//该位置处只有一个元素newTable[idx]=e;else{//Reuseconsecutivesequenceatsameslot//e是链表表头HashEntry<K,V>lastRun=e;//idx是当前链表的头结点e的新位置intlastIdx=idx;//for循环找到一个lastRun结点,这个结点之后的所有元素是将要放到一起的for(HashEntry<K,V>last=next;last!=null;last=last.next){intk=last.hash&sizeMask;if(k!=lastIdx){lastIdx=k;lastRun=last;}}//将lastRun及其之后的所有结点组成的这个链表放到lastIdx这个位置newTable[lastIdx]=lastRun;//下面的操作是处理lastRun之前的结点,//这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中for(HashEntry<K,V>p=e;p!=lastRun;p=p.next){Vv=p.value;inth=p.hash;intk=h&sizeMask;HashEntry<K,V>n=newTable[k];newTable[k]=newHashEntry<K,V>(h,p.key,v,n);}}}}//将新来的node放到新数组中刚刚的两个链表之一的头部intnodeIndex=node.hash&sizeMask;//addthenewnodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex]=node;table=newTable;} CAS操作 在JDK7里在 ConcurrentHashMap中通过原子操作 sun.misc.Unsafe查找元素、替换元素和设置元素。通过这样的硬件级别获得数据可以保证及时是多线程我也每次获得的数据是最新的。这些原子操作起着非常关键的作用,你可以在所有 ConcurrentHashMap的基本功能中看到,随机距离如下: finalvoidsetNext(HashEntry<K,V>n){UNSAFE.putOrderedObject(this,nextOffset,n);}staticfinal<K,V>HashEntry<K,V>entryAt(HashEntry<K,V>[]tab,inti){return(tabnull)?null:(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)i<<TSHIFT)+TBASE);}staticfinal<K,V>voidsetEntryAt(HashEntry<K,V>[]tab,inti,HashEntry<K,V>e){UNSAFE.putOrderedObject(tab,((long)i<<TSHIFT)+TBASE,e);} 常见问题 ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提升? ❝ ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,只要多个修改操作发生在不同的段上,它们就可以并发进行。 ❞ 在高并发下的情况下如何保证取得的元素是最新的? ❝ 用于存储键值对数据的HashEntry,在设计上它的成员变量value跟next都是volatile类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。 ❞ ConcurrentHashMap的弱一致性体现在迭代器,clear和get方法,原因在于没有加锁。 比如迭代器在遍历数据的时候是一个Segment一个Segment去遍历的,如果在遍历完一个Segment时正好有一个线程在刚遍历完的Segment上插入数据,就会体现出不一致性。clear也是一样。 get方法和containsKey方法都是遍历对应索引位上所有节点,都是不加锁来判断的,如果是修改性质的因为可见性的存在可以直接获得最新值,不过如果是新添加值则无法保持一致性。 JDK8 JDK8相比与JDK7主要区别如下: ❝ 取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率,并发控制使用Synchronized和CAS来操作。 存储数据时采用了数组+ 链表+红黑树的形式。 ❞ CurrentHashMap重要参数: ❝ private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值 private static final int DEFAULT_CAPACITY = 16; // 默认数组长度 static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件 static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的一个条件 static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的另一个条件 static final int MOVED = -1; // 表示正在扩容转移 static final int TREEBIN = -2; // 表示已经转换成树 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数 transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素 private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组 static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数 private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null private transient volatile long baseCount;保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。private transient volatile int sizeCtl; 负数:表示进行初始化或者扩容,-1:表示正在初始化,-N:表示有 N-1 个线程正在进行扩容 正数:0 表示还没有被初始化,> 0的数:初始化或者是下一次进行扩容的阈值,有点类似HashMap中的threshold,不过功能「更强大」。 ❞ 若干重要类 构成每个元素的基本类 Node staticclassNode<K,V>implementsMap.Entry<K,V>{finalinthash;//key的hash值finalKkey;//keyvolatileVval;//valuevolatileNode<K,V>next;//表示链表中的下一个节点} TreeNode继承于Node,用来存储红黑树节点 staticfinalclassTreeNode<K,V>extendsNode<K,V>{TreeNode<K,V>parent;//红黑树的父亲节点TreeNode<K,V>left;//左节点TreeNode<K,V>right;//右节点TreeNode<K,V>prev;//前节点booleanred;//是否为红点} ForwardingNode 在 Node 的子类 ForwardingNode 的构造方法中,可以看到此变量的hash = 「-1」 ,类中还存储 nextTable的引用。该初始化方法只在 transfer方法被调用,如果一个类被设置成此种情况并且hash = -1 则说明该节点不需要resize了。 staticfinalclassForwardingNode<K,V>extendsNode<K,V>{finalNode<K,V>[]nextTable;ForwardingNode(Node<K,V>[]tab){//注意这里super(MOVED,null,null,null);this.nextTable=tab;}//.....} TreeBin TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制. staticfinalclassTreeBin<K,V>extendsNode<K,V>{TreeNode<K,V>root;volatileTreeNode<K,V>first;volatileThreadwaiter;volatileintlockState;//valuesforlockStatestaticfinalintWRITER=1;//setwhileholdingwritelockstaticfinalintWAITER=2;//setwhenwaitingforwritelockstaticfinalintREADER=4;//incrementvalueforsettingreadlock} 构造函数 整体的构造情况基本跟HashMap类似,并且为了跟原来的JDK7中的兼容性还可以传入并发度。不过JDK8中并发度已经有table的具体长度来控制了。 ❝ ConcurrentHashMap():创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(int):创建一个带有指定初始容量 tableSizeFor、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(Map<? extends K, ? extends V> m):构造一个与给定映射具有相同映射关系的新映射 ConcurrentHashMap(int initialCapacity, float loadFactor):创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射 ConcurrentHashMap(int, float, int):创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 ❞ put 假设table已经初始化完成,put操作采用 CAS + synchronized 实现并发插入或更新操作,具体实现如下。 ❝ 做一些边界处理,然后获得hash值。 没初始化就初始化,初始化后看下对应的桶是否为空,为空就原子性的尝试插入。 如果当前节点正在扩容还要去帮忙扩容,骚操作。 用 syn来加锁当前节点,然后操作几乎跟就跟hashmap一样了。 ❞ // Node 节点的 hash值在HashMap中存储的就是hash值,在currenthashmap中可能有多种情况哦!finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){if(keynull||valuenull)thrownewNullPointerException();//边界处理inthash=spread(key.hashCode());//最终hash值计算intbinCount=0;for(Node<K,V>[]tab=table;;){//循环表Node<K,V>f;intn,i,fh;if(tabnull||(n=tab.length)0)tab=initTable();//初始化表如果为空,懒汉式elseif((f=tabAt(tab,i=(n-1)&hash))null){//如果对应桶位置为空if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))//CAS原子性的尝试插入break;}elseif((fh=f.hash)MOVED)//如果当前节点正在扩容。还要帮着去扩容。tab=helpTransfer(tab,f);else{VoldVal=null;synchronized(f){//桶存在数据加锁操作进行处理if(tabAt(tab,i)f){if(fh>=0){//如果存储的是链表存储的是节点的hash值binCount=1;for(Node<K,V>e=f;;++binCount){Kek;//遍历链表去查找,如果找到key一样则选择性if(e.hashhash&&((ek=e.key)key||(ek!=null&&key.equals(ek)))){oldVal=e.val;if(!onlyIfAbsent)e.val=value;break;}Node<K,V>pred=e;if((e=e.next)null){//找到尾部插入pred.next=newNode<K,V>(hash,key,value,null);break;}}}elseif(finstanceofTreeBin){//如果桶节点类型为TreeBinNode<K,V>p;binCount=2;if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,value))!=null){//尝试红黑树插入,同时也要防止节点本来就有,选择性覆盖oldVal=p.val;if(!onlyIfAbsent)p.val=value;}}}}if(binCount!=0){//如果链表数量if(binCount>=TREEIFY_THRESHOLD)treeifyBin(tab,i);//链表转红黑树哦!if(oldVal!=null)returnoldVal;break;}}}addCount(1L,binCount);//统计大小并且检查是否要扩容。returnnull;} 涉及到重要函数initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数。 initTable 「只允许一个线程」对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度Thread.yield。这样,保证了表同时只会被一个线程初始化,对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。 //容器初始化操作privatefinalNode<K,V>[]initTable(){Node<K,V>[]tab;intsc;while((tab=table)null||tab.length0){if((sc=sizeCtl)<0)//如果正在初始化-1,-N 正在扩容。Thread.yield();//进行线程让步等待//让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。//它可能会获取到,也有可能被其他线程获取到。elseif(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//比较sizeCtl的值与sc是否相等,相等则用-1 替换,这表明我这个线程在进行初始化了!try{if((tab=table)null||tab.length0){intn=(sc>0)?sc:DEFAULT_CAPACITY;//默认为16@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=tab=nt;sc=n-(n>>>2);//sc=0.75n}}finally{sizeCtl=sc;//设置sizeCtl类似threshold}break;}}returntab;} unsafe 在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。 //用来返回节点数组的指定位置的节点的原子操作@SuppressWarnings("unchecked")staticfinal<K,V>Node<K,V>tabAt(Node<K,V>[]tab,inti){return(Node<K,V>)U.getObjectVolatile(tab,((long)i<<ASHIFT)+ABASE);}//cas原子操作,在指定位置设定值staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);}//原子操作,在指定位置设定值staticfinal<K,V>voidsetTabAt(Node<K,V>[]tab,inti,Node<K,V>v){U.putObjectVolatile(tab,((long)i<<ASHIFT)+ABASE,v);}//比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);} 可以看到获得table[i]数据是通过Unsafe对象通过反射获取的,取数据直接table[index]不可以么,为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」。 helpTransfer //可能有多个线程在同时帮忙运行helpTransferfinalNode<K,V>[]helpTransfer(Node<K,V>[]tab,Node<K,V>f){Node<K,V>[]nextTab;intsc;if(tab!=null&&(finstanceofForwardingNode)&&(nextTab=((ForwardingNode<K,V>)f).nextTable)!=null){//table不是空且node节点是转移类型,并且转移类型的nextTable不是空说明还在扩容ingintrs=resizeStamp(tab.length);//根据 length 得到一个前16位的标识符,数组容量大小。//确定新table指向没有变,老table数据也没变,并且此时sizeCtl小于0还在扩容ingwhile(nextTabnextTable&&tabletab&&(sc=sizeCtl)<0){if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||transferIndex<=0)//1.sizeCtl无符号右移16位获得高16位如果不等rs标识符变了// 2. 如果扩容结束了这里可以看 trePresize 函数第一次扩容操作://默认第一个线程设置sc=rs左移16位+2,当第一个线程结束扩容了,//就会将 sc 减一。这个时候,sc 就等于 rs + 1。//3.如果达到了最大帮助线程个数65535个//4.如果转移下标调整ing扩容已经结束了break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1)){//如果以上都不是,将sizeCtl+1,增加一个线程来扩容transfer(tab,nextTab);//进行转移break;//结束循环}}returnnextTab;}returntable;} Integer.numberOfLeadingZeros(n) ❝ 该方法的作用是「返回无符号整型i的最高非零位前面的0的个数」,包括符号位在内;如果i为负数,这个方法将会返回0,符号位为1. 比如说,10的二进制表示为 0000 0000 0000 0000 0000 0000 0000 1010 java的整型长度为32位。那么这个方法返回的就是28 ❞ resizeStamp 主要用来获得标识符,可以简单理解是对当前系统容量大小的一种监控。 staticfinalintresizeStamp(intn){returnInteger.numberOfLeadingZeros(n)|(1<<(RESIZE_STAMP_BITS-1));//RESIZE_STAMP_BITS=16} addCount 主要就2件事:一是更新 baseCount,二是判断是否需要扩容。 privatefinalvoidaddCount(longx,intcheck){CounterCell[]as;longb,s;//首先如果没有并发此时countCells is null, 此时尝试CAS设置数据值。if((as=counterCells)!=null||!U.compareAndSwapLong(this,BASECOUNT,b=baseCount,s=b+x)){//如果counterCells不为空以为此时有并发的设置或者CAS设置baseCount失败了CounterCella;longv;intm;booleanuncontended=true;if(asnull||(m=as.length-1)<0||(a=as[ThreadLocalRandom.getProbe()&m])null||!(uncontended=U.compareAndSwapLong(a,CELLVALUE,v=a.value,v+x))){//1.如果没出现并发此时计数盒子为null//2.随机取出一个数组位置发现为空//3.出现并发后修改这个cellvalue失败了//执行funAddCountfullAddCount(x,uncontended);//死循环操作return;}if(check<=1)return;s=sumCount();//吧counterCells数组中的每一个数据进行累加给baseCount。}//如果需要扩容if(check>=0){Node<K,V>[]tab,nt;intn,sc;while(s>=(long)(sc=sizeCtl)&&(tab=table)!=null&&(n=tab.length)<MAXIMUM_CAPACITY){intrs=resizeStamp(n);//获得高位标识符if(sc<0){//是否需要帮忙去扩容if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//第一次扩容elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2))transfer(tab,null);s=sumCount();}}} baseCount添加 ConcurrentHashMap提供了 baseCount、 counterCells 两个辅助变量和一个 CounterCell辅助内部类。sumCount() 就是迭代 counterCells来统计 sum 的过程。put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount()方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的 ConcurrentHashMap。每一个 Cell都用Contended修饰来避免伪共享。 ❝ JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。 JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。 ❞ 关于扩容 在 addCount第一次扩容时候会有骚操作 sc=rs << RESIZE_STAMP_SHIFT) + 2)其中 rs = resizeStamp(n)。这里需要核心说一点, 如果不是第一次扩容则直接将低16位的数字 +1 即可。 putTreeVal 这个操作几乎跟HashMap的操作完全一样,核心思想就是一定要决定向左还是向右然后最终尝试放置新数据,然后balance。不同点就是有锁的考虑。 treeifyBin 这里的基本思路跟HashMap几乎一样,不同点就是先变成TreeNode,然后是「单向链表」串联。 privatefinalvoidtreeifyBin(Node<K,V>[]tab,intindex){Node<K,V>b;intn,sc;if(tab!=null){//如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了//因为这个阈值扩容可以减少hash冲突,不必要去转红黑树if((n=tab.length)<MIN_TREEIFY_CAPACITY)tryPresize(n<<1);elseif((b=tabAt(tab,index))!=null&&b.hash>=0){synchronized(b){//锁定当前桶if(tabAt(tab,index)b){TreeNode<K,V>hd=null,tl=null;for(Node<K,V>e=b;e!=null;e=e.next){//遍历这个链表然后将每个节点封装成TreeNode,最终单链表串联起来,//最终调用setTabAt放置红黑树TreeNode<K,V>p=newTreeNode<K,V>(e.hash,e.key,e.val,null,null);if((p.prev=tl)null)hd=p;elsetl.next=p;tl=p;}//通过TreeBin对象对TreeNode转换成红黑树setTabAt(tab,index,newTreeBin<K,V>(hd));}}}}} TreeBin 主要功能就是链表变化为红黑树,这个红黑树用TreeBin来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下: TreeBin.first = 链表中第一个节点。 TreeBin.root = 红黑树中的root节点。 TreeBin(TreeNode<K,V>b){super(TREEBIN,null,null,null);//创建空节点hash=-2this.first=b;TreeNode<K,V>r=null;//root节点for(TreeNode<K,V>x=b,next;x!=null;x=next){next=(TreeNode<K,V>)x.next;x.left=x.right=null;if(rnull){x.parent=null;x.red=false;r=x;//root节点设置为x}else{Kk=x.key;inth=x.hash;Class<?>kc=null;for(TreeNode<K,V>p=r;;){//x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点intdir,ph;Kpk=p.key;if((ph=p.hash)>h)dir=-1;elseif(ph<h)dir=1;elseif((kcnull&&(kc=comparableClassFor(k))null)||(dir=compareComparables(kc,k,pk))0)dir=tieBreakOrder(k,pk);//当key不可以比较,或者相等的时候采取的一种排序措施TreeNode<K,V>xp=p;//放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。//找到了目前叶子节点才会进入再放置数据if((p=(dir<=0)?p.left:p.right)null){x.parent=xp;if(dir<=0)xp.left=x;elsexp.right=x;r=balanceInsertion(r,x);//每次插入一个元素的时候都调用balanceInsertion来保持红黑树的平衡break;}}}}this.root=r;assertcheckInvariants(root);} tryPresize 当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时候传入map会调用putAll方法直接put一个map的话,在「putAll」方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。 PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。 /***扩容表为指可以容纳指定个数的大小(总是2的N次方)*假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12*计算出来c的值为64,则要扩容到sizeCtl≥c*第一次扩容之后数组长:32 sizeCtl:24*第三次扩容之后数组长:128 sizeCtl:96 退出*/privatefinalvoidtryPresize(intsize){intc=(size>=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(size+(size>>>1)+1);//合理范围intsc;while((sc=sizeCtl)>=0){Node<K,V>[]tab=table;intn;if(tabnull||(n=tab.length)0){//初始化传入map,今天putAll会直接调用这个。n=(sc>c)?sc:c;if(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//初始化tab的时候,把sizeCtl设为-1try{if(tabletab){@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=nt;sc=n-(n>>>2);//sc=sizeCtl=0.75n}}finally{sizeCtl=sc;}}}//初始化时候如果数组容量<=sizeCtl或容量已经最大化了则退出elseif(c<=sc||n>=MAXIMUM_CAPACITY){break;//退出扩张}elseif(tabtable){intrs=resizeStamp(n);if(sc<0){//sc=siztCtl如果正在扩容Table的话,则帮助扩容Node<K,V>[]nt;if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;//各种条件判断是否需要加入扩容工作。//帮助转移数据的线程数+1if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//没有在初始化或扩容,则开始扩容//此处切记第一次扩容直接+2elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2)){transfer(tab,null);}}}} transfer 这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其「是其他线程帮着去扩容的骚操作」。 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。 每个线程进来会先领取自己的任务区间 [bound,i],然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。 该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。 f((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return; 第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。 几乎跟 HashMap大致思路类似的遍历链表/红黑树然后扩容操作。 privatefinalvoidtransfer(Node<K,V>[]tab,Node<K,V>[]nextTab){intn=tab.length,stride;if((stride=(NCPU>1)?(n>>>3)/NCPU:n)<MIN_TRANSFER_STRIDE)//MIN_TRANSFER_STRIDE用来控制不要占用太多CPUstride=MIN_TRANSFER_STRIDE;//subdividerange//MIN_TRANSFER_STRIDE=16每个CPU处理最小长度个数if(nextTabnull){//新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件try{@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n<<1];nextTab=nt;}catch(Throwableex){//trytocopewithOOMEsizeCtl=Integer.MAX_VALUE;return;}nextTable=nextTab;transferIndex=n;//transferIndex指向最后一个桶,方便从后向前遍历}intnextn=nextTab.length;//新表长度ForwardingNode<K,V>fwd=newForwardingNode<K,V>(nextTab);//创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点booleanadvance=true;//是否继续向前查找的标志位booleanfinishing=false;//toensuresweep(清扫)beforecommittingnextTab,在完成之前重新在扫描一遍数组,看看有没完成的没//第一部分// i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】这样来跟线程划分任务。for(inti=0,bound=0;;){Node<K,V>f;intfh;//这个while循环的目的就是通过--i遍历当前线程所分配到的桶结点//一个桶一个桶的处理while(advance){//每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据intnextIndex,nextBound;if(--i>=bound||finishing){//通过此处进行任务区间的遍历advance=false;}elseif((nextIndex=transferIndex)<=0){i=-1;//任务分配完了advance=false;}//更新transferIndex//为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)elseif(U.compareAndSwapInt(this,TRANSFERINDEX,nextIndex,nextBound=(nextIndex>stride?nextIndex-stride:0))){//nextIndex本来等于末尾数字,bound=nextBound;i=nextIndex-1;advance=false;}}//当前线程所有任务完成if(i<0||i>=n||i+n>=nextn){intsc;if(finishing){//已经完成转移则直接赋值操作nextTable=null;table=nextTab;sizeCtl=(n<<1)-(n>>>1);//设置sizeCtl为扩容后的0.75return;}if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){// sizeCtl-1 表示当前线程任务完成。if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT){//判断当前线程完成的线程是不是最后一个在扩容的,思路精髓return;}finishing=advance=true;//如果是则相应的设置参数i=n;}}elseif((f=tabAt(tab,i))null)//数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])advance=casTabAt(tab,i,null,fwd);//如果老节点数据是空的则直接进行CAS设置为fwdelseif((fh=f.hash)MOVED)//已经是个fwd了,因为是多线程操作可能别人已经给你弄好了,advance=true;//alreadyprocessedelse{synchronized(f){//加锁操作if(tabAt(tab,i)f){Node<K,V>ln,hn;if(fh>=0){//该节点的hash值大于等于0,说明是一个Node节点//关于链表的操作整体跟HashMap类似不过感觉好像更扰一些。intrunBit=fh&n;//fh=f.hashfirsthash的意思,看第一个点放老位置还是新位置Node<K,V>lastRun=f;for(Node<K,V>p=f.next;p!=null;p=p.next){intb=p.hash&n;//n的值为扩张前的数组的长度if(b!=runBit){runBit=b;lastRun=p;//最后导致发生变化的节点}}if(runBit0){//看最后一个变化点是新还是旧旧ln=lastRun;hn=null;}else{hn=lastRun;//看最后一个变化点是新还是旧旧ln=null;}/**构造两个链表,顺序大部分和原来是反的,不过顺序也有差异*分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for(Node<K,V>p=f;p!=lastRun;p=p.next){intph=p.hash;Kpk=p.key;Vpv=p.val;if((ph&n)0)/**假设runBit的值为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln=newNode<K,V>(ph,pk,pv,ln);else/**假设runBit的值不为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn=newNode<K,V>(ph,pk,pv,hn);}setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);advance=true;}elseif(finstanceofTreeBin){//该节点hash值是个负数否则的话是一个树节点TreeBin<K,V>t=(TreeBin<K,V>)f;TreeNode<K,V>lo=null,loTail=null;//旧头尾TreeNode<K,V>hi=null,hiTail=null;//新头围intlc=0,hc=0;for(Node<K,V>e=t.first;e!=null;e=e.next){inth=e.hash;TreeNode<K,V>p=newTreeNode<K,V>(h,e.key,e.val,null,null);if((h&n)0){if((p.prev=loTail)null)lo=p;elseloTail.next=p;//旧头尾设置loTail=p;++lc;}else{//新头围设置if((p.prev=hiTail)null)hi=p;elsehiTail.next=p;hiTail=p;++hc;}}//ln如果老位置数字<=6则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树ln=(lc<=UNTREEIFY_THRESHOLD)?untreeify(lo):(hc!=0)?newTreeBin<K,V>(lo):t;hn=(hc<=UNTREEIFY_THRESHOLD)?untreeify(hi):(lc!=0)?newTreeBin<K,V>(hi):t;setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);//老表中i位置节点设置下advance=true;}}}}}} get 这个就很简单了,获得hash值,然后判断存在与否,遍历链表即可,注意get没有任何锁操作! publicVget(Objectkey){Node<K,V>[]tab;Node<K,V>e,p;intn,eh;Kek;//计算key的hash值inth=spread(key.hashCode());if((tab=table)!=null&&(n=tab.length)>0&&(e=tabAt(tab,(n-1)&h))!=null){//表不为空并且表的长度大于0并且key所在的桶不为空if((eh=e.hash)h){//表中的元素的hash值与key的hash值相等if((ek=e.key)key||(ek!=null&&key.equals(ek)))//键相等//返回值returne.val;}elseif(eh<0)//是个TreeBinhash=-2//在红黑树中查找,因为红黑树中也保存这一个链表顺序return(p=e.find(h,key))!=null?p.val:null;while((e=e.next)!=null){//对于结点hash值大于0的情况链表if(e.hashh&&((ek=e.key)key||(ek!=null&&key.equals(ek))))returne.val;}}returnnull;} clear 关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。 publicvoidclear(){longdelta=0L;inti=0;Node<K,V>[]tab=table;while(tab!=null&&i<tab.length){intfh;Node<K,V>f=tabAt(tab,i);if(fnull)++i;//这个桶是空的直接跳过elseif((fh=f.hash)MOVED){//这个桶的数据还在扩容中,要去扩容同时等待。tab=helpTransfer(tab,f);i=0;//restart}else{synchronized(f){//真正的删除if(tabAt(tab,i)f){Node<K,V>p=(fh>=0?f:(finstanceofTreeBin)?((TreeBin<K,V>)f).first:null);//循环到链表/者红黑树的尾部while(p!=null){--delta;//记录删除了多少个p=p.next;}//利用CAS无锁置nullsetTabAt(tab,i++,null);}}}}if(delta!=0L)addCount(delta,-1);//调整count} end ConcurrentHashMap是如果来做到「并发安全」,又是如何做到「高效」的并发的呢? 首先是读操作,读源码发现get方法中根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。 写操作 . 数据扩容函数是 transfer,该方法的只有 addCount, helpTransfer和 tryPresize这三个方法来调用。 addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。 helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的·hash·= MOVED 的时候,就会调用 helpTransfer方法,在 helpTransfer中再调用 transfer方法来帮助完成数组的扩容 ❝ tryPresize是在 treeIfybin和 putAll方法中调用, treeIfybin主要是在 put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。注意 putAll在初始化传入一个大map的时候会调用。· ❞ 总结扩容情况发生: ❝ 在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容 ❞ 3. 扩容时候是否可以进行读写。 ❝ 对于读操作,因为是没有加锁的所以可以的. 对于写操作,JDK8中已经将锁的范围细腻到table[i]l了,当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。 ❞ 多个线程又是如何同步处理的 在 ConcurrentHashMap中,同步处理主要是通过 Synchronized和 unsafe的硬件级别原子性 这两种方式来完成的。 ❝ 在取得sizeCtl跟某个位置的Node的时候,使用的都是 unsafe的方法,来达到并发安全的目的 当需要在某个位置设置节点的时候,则会通过 Synchronized的同步机制来锁定该位置的节点。 在数组扩容的时候,则通过处理的 步长和 fwd节点来达到并发安全的目的,通过设置hash值为MOVED=-1。 当把某个位置的节点复制到扩张后的table的时候,也通过 Synchronized的同步机制来保证线程安全 ❞ 套路 ❝ 谈谈你理解的 HashMap,讲讲其中的 get put 过程。 1.8 做了什么优化? 是线程安全的嘛? 不安全会导致哪些问题? 如何解决?有没有线程安全的并发容器? ConcurrentHashMap 是如何实现的?1.7、1.8 实现有何不同,为什么这么做。 1.8中ConcurrentHashMap的sizeCtl作用,大致说下协助扩容跟标志位。 HashMap 为什么不用跳表替换红黑树呢? ❞ 参考 HashMap讲解 本文分享自微信公众号 - sowhat1412(sowhat9094)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。