首页 文章 精选 留言 我的

精选列表

搜索[面试],共4912篇文章
优秀的个人博客,低调大师

谷歌面试题:如何从无序链表中移除重复项?

一位小伙伴来问一道谷歌的笔试题,关于单链表操作的,问到底有多少种解决方案,今天我们就来聊聊。 题目的大致意思是: 假设存在一个无序单链表,将重复结点去除后,并保原顺序。去重前:1→3→1→5→5→7去重后:1→3→5→7 顺序删除 通过双重循环直接在链表上执行删除操作。外层循环用一个指针从第一个结点开始遍历整个链表,然后内层循环用另外一个指针遍历其余结点,将与外层循环遍历到的指针所指结点的数据域相同的结点删除,如下图所示。 假设外层循环从outerCur开始遍历,当内层循环指针innerCur遍历到上图实线所示的位置(outerCur.data==innerCur.data)时,此时需要把innerCur指向的结点删除。 具体步骤如下: 用tmp记录待删除的结点的地址。 为了能够在删除tmp结点后继续遍历链表中其余的结点,使innerCur指针指向它的后继结点:innerCur=innerCur.next。 从链表中删除tmp结点。 实现代码如下: 运行结果: 算法性能分析 由于这种方法采用双重循环对链表进行遍历,因此,时间复杂度为O(N^2)。其中,N为链表的长度。在遍历链表的过程中,使用了常量个额外的指针变量来保存当前遍历的结点、前驱结点和被删除的结点,因此,空间复杂度为O(1)。 递归法 主要思路为:对于结点cur,首先递归地删除以cur.next为首的子链表中重复的结点,接着从以cur.next为首的子链表中找出与cur有着相同数据域的结点并删除。 实现代码如下: 算法性能分析 这种方法与方法一类似,从本质上而言,由于这种方法需要对链表进行双重遍历,因此,时间复杂度为O(N^2)。其中,N为链表的长度。由于递归法会增加许多额外的函数调用,因此,从理论上讲,该方法效率比前面的方法低。 空间换时间 通常情况下,为了降低时间复杂度,往往在条件允许的情况下,通过使用辅助空间实现。 具体而言,主要思路如下。 建立一个HashSet,HashSet中的内容为已经遍历过的结点内容,并将其初始化为空。 从头开始遍历链表中的所以结点,存在以下两种可能性: 如果结点内容已经在HashSet中,则删除此结点,继续向后遍历。 如果结点内容不在HashSet中,则保留此结点,将此结点内容添加到HashSet中,继续向后遍历。 「引申:如何从有序链表中移除重复项?」 如链表:1,3、5、5、7、7、8、9 去重后:1,3、5、7、8、9 分析与解答 上述介绍的方法也适用于链表有序的情况,但是由于以上方法没有充分利用到链表有序这个条件,因此,算法的性能肯定不是最优的。本题中,由于链表具有有序性,因此,不需要对链表进行两次遍历。所以,有如下思路:用cur 指向链表第一个结点,此时需要分为以下两种情况讨论。 如果cur.data==cur.next.data,那么删除cur.next结点。 如果cur.data!=cur.next.data,那么cur=cur.next,继续遍历其余结点。 总结 对于无序单链表中,想要删除其中重复的结点(多个重复结点保留一个)。删除办法有按照顺序删除、使用递归方式删除以及可以使用空间换时间(HashSet中元素的唯一性)。 点赞越多,bug越少~

优秀的个人博客,低调大师

我是这样给阿里面试官吹 ConcurrentHashMap的

因为上篇文章HashMap已经讲解的很详细了,因此此篇文章会简单介绍思路,再学习并发HashMap就简单很多了,上一篇文章中我们最终知道HashMap是线程不安全的,因此在老版本JDK中提供了HashTable来实现多线程级别的,改变之处重要有以下几点。 ❝ HashTable的 put, get, remove等方法是通过 synchronized来修饰保证其线程安全性的。 HashTable是 不允许key跟value为null的。 问题是 synchronized是个关键字级别的重量锁,在get数据的时候任何写入操作都不允许。相对来说性能不好。因此目前主要用的 ConcurrentHashMap来保证线程安全性。 ❞ ConcurrentHashMap主要分为JDK<=7跟JDK>=8的两个版本,ConcurrentHashMap的空间利用率更低一般只有10%~20%,接下来分别介绍。 JDK7 先宏观说下JDK7中的大致组成,ConcurrentHashMap由Segment数组结构和HashEntry数组组成。Segment是一种可重入锁,是一种数组和链表的结构,一个Segment中包含一个HashEntry数组,每个HashEntry又是一个链表结构。正是通过Segment分段锁,ConcurrentHashMap实现了高效率的并发。缺点是并发程度是有segment数组来决定的,并发度一旦初始化无法扩容。先绘制个ConcurrentHashMap的形象直观图。要想理解currentHashMap,可以简单的理解为将数据「分表分库」。ConcurrentHashMap是由 Segment 数组 结构和HashEntry 数组 结构组成。 ❝ Segment 是一种可重入锁 ReentrantLock的子类 ,在 ConcurrentHashMap 里扮演锁的角色, HashEntry则用于存储键值对数据。 ConcurrentHashMap 里包含一个 Segment 数组来实现锁分离, Segment的结构和 HashMap 类似,一个 Segment里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment守护者一个 HashEntry 数组里的元素,当对 HashEntry数组的数据进行修改时,必须首先获得它对应的 Segment 锁。 ❞ 我们先看下segment类: staticfinalclassSegment<K,V>extendsReentrantLockimplementsSerializable{transientvolatileHashEntry<K,V>[]table;//包含一个HashMap可以理解为} 可以理解为我们的每个segment都是实现了Lock功能的HashMap。如果我们同时有多个segment形成了segment数组那我们就可以实现并发咯。 我们看下 currentHashMap的构造函数,先总结几点。 每一个segment里面包含的table(HashEntry数组)初始化大小也一定是2的次幂 这里设置了若干个用于位计算的参数。 initialCapacity:初始容量大小 ,默认16。 loadFactor: 扩容因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。 concurrencyLevel:并发度,默认16。并发度可以理解为程序运行时能够 「同时更新」ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的 分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。 segment的数组大小最终一定是2的次幂 构造函数详解: //initialCapacity是我们保存所以KV数据的初始值//loadFactor这个就是HashMap的负载因子//我们segment数组的初始化大小@SuppressWarnings("unchecked")publicConcurrentHashMap(intinitialCapacity,floatloadFactor,intconcurrencyLevel){if(!(loadFactor>0)||initialCapacity<0||concurrencyLevel<=0)thrownewIllegalArgumentException();if(concurrencyLevel>MAX_SEGMENTS)//最大允许segment的个数,不能超过1<24concurrencyLevel=MAX_SEGMENTS;intsshift=0;//类似扰动函数intssize=1;while(ssize<concurrencyLevel){++sshift;ssize<<=1;//确保segment一定是2次幂}this.segmentShift=32-sshift;//有点类似与扰动函数,跟下面的参数配合使用实现当前元素落到那个segment上面。this.segmentMask=ssize-1;//为了取模专用if(initialCapacity>MAXIMUM_CAPACITY)//不能大于1<30initialCapacity=MAXIMUM_CAPACITY;intc=initialCapacity/ssize;//总的数组大小被segment分散后需要多少个tableif(c*ssize<initialCapacity)++c;//确保向上取值intcap=MIN_SEGMENT_TABLE_CAPACITY;//每个table初始化大小为2while(cap<c)//单独的一个segment[i]对应的table 容量大小。cap<<=1;//将table的容量初始化为2的次幂Segment<K,V>s0=newSegment<K,V>(loadFactor,(int)(cap*loadFactor),(HashEntry<K,V>[])newHashEntry[cap]);//负载因子,阈值,每个segment的初始化大小。跟hashmap 初始值类似。//并且segment的初始化是懒加载模式,刚开始只有一个s0,其余的在需要的时候才会增加。Segment<K,V>[]ss=(Segment<K,V>[])newSegment[ssize];UNSAFE.putOrderedObject(ss,SBASE,s0);//orderedwriteofsegments[0]this.segments=ss;} hash 不管是我们的get操作还是put操作要需要通过hash来对数据进行定位。 //整体思想就是通过多次不同方式的位运算来努力将数据均匀的分不到目标table中,都是些扰动函数privateinthash(Objectk){inth=hashSeed;if((0!=h)&&(kinstanceofString)){returnsun.misc.Hashing.stringHash32((String)k);}h^=k.hashCode();//single-wordWang/Jenkinshash.h+=(h<<15)^0xffffcd7d;h^=(h>>>10);h+=(h<<3);h^=(h>>>6);h+=(h<<2)+(h<<14);returnh^(h>>>16);} get 相对来说比较简单,无非就是通过 hash找到对应的 segment,继续通过 hash找到对应的 table,然后就是遍历这个链表看是否可以找到,并且要注意 get的时候是没有加锁的。 publicVget(Objectkey){Segment<K,V>s;HashEntry<K,V>[]tab;inth=hash(key);//JDK7中标准的hash值获取算法longu=(((h>>>segmentShift)&segmentMask)<<SSHIFT)+SBASE;//hash值如何映射到对应的segment上if((s=(Segment<K,V>)UNSAFE.getObjectVolatile(segments,u))!=null&&(tab=s.table)!=null){//无非就是获得hash值对应的segment是否存在,for(HashEntry<K,V>e=(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)(((tab.length-1)&h))<<TSHIFT)+TBASE);e!=null;e=e.next){//看下这个hash值对应的是segment(HashEntry)中的具体位置。然后遍历查询该链表Kk;if((k=e.key)key||(e.hashh&&key.equals(k)))returne.value;}}returnnull;} put 相同的思路,先找到 hash值对应的 segment位置,然后看该 segment位置是否初始化了(因为segment是懒加载模式)。选择性初始化,最终执行put操作。 @SuppressWarnings("unchecked")publicVput(Kkey,Vvalue){Segment<K,V>s;if(valuenull)thrownewNullPointerException();inthash=hash(key);//还是获得最终hash值intj=(hash>>>segmentShift)&segmentMask;//hash值位操作对应的segment数组位置if((s=(Segment<K,V>)UNSAFE.getObject(segments,(j<<SSHIFT)+SBASE))null)s=ensureSegment(j);//初始化时候因为只有第一个segment,如果落在了其余的segment中则需要现初始化。returns.put(key,hash,value,false);//直接在数据中执行put操作。} 其中put操作基本思路跟HashMap几乎一样,只是在开始跟结束进行了加锁的操作tryLock and unlock,然后JDK7中都是先扩容再添加数据的,并且获得不到锁也会进行自旋的tryLock或者lock阻塞排队进行等待(同时获得锁前提前new出新数据)。 finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){//在往该segment写入前,需要先获取该segment的独占锁,获取失败尝试获取自旋锁HashEntry<K,V>node=tryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{//segment内部的数组HashEntry<K,V>[]tab=table;//利用hash值,求应该放置的数组下标intindex=(tab.length-1)&hash;//first是数组该位置处的链表的表头HashEntry<K,V>first=entryAt(tab,index);for(HashEntry<K,V>e=first;;){if(e!=null){Kk;if((k=e.key)key||(e.hashhash&&key.equals(k))){oldValue=e.value;if(!onlyIfAbsent){//覆盖旧值e.value=value;++modCount;}break;}//继续顺着链表走e=e.next;}else{// node 是不是 null,这个要看获取锁的过程。没获得锁的线程帮我们创建好了节点,直接头插法//如果不为 null,那就直接将它设置为链表表头;如果是 null,初始化并设置为链表表头。if(node!=null)node.setNext(first);elsenode=newHashEntry<K,V>(hash,key,value,first);intc=count+1;//如果超过了该segment的阈值,这个segment需要扩容if(c>threshold&&tab.length<MAXIMUM_CAPACITY)rehash(node);//扩容else//没有达到阈值,将node放到数组tab的index位置,//将新的结点设置成原链表的表头setEntryAt(tab,index,node);++modCount;count=c;oldValue=null;break;}}}finally{//解锁unlock();}returnoldValue;} 如果加锁失败了调用scanAndLockForPut,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,「提升」了put操作的性能,这里涉及到自旋。大致过程: ❝ 在我获取不到锁的时候我进行tryLock,准备好new的数据,同时还有一定的次数限制,还要考虑别的已经获得线程的节点修改该头节点。 ❞ privateHashEntry<K,V>scanAndLockForPut(Kkey,inthash,Vvalue){HashEntry<K,V>first=entryForHash(this,hash);HashEntry<K,V>e=first;HashEntry<K,V>node=null;intretries=-1;//negativewhilelocatingnode//循环获取锁while(!tryLock()){HashEntry<K,V>f;//torecheckfirstbelowif(retries<0){if(enull){if(nodenull)//speculativelycreatenode//进到这里说明数组该位置的链表是空的,没有任何元素//当然,进到这里的另一个原因是tryLock()失败,所以该槽存在并发,不一定是该位置node=newHashEntry<K,V>(hash,key,value,null);retries=0;}elseif(key.equals(e.key))retries=0;else//顺着链表往下走e=e.next;}//重试次数如果超过MAX_SCAN_RETRIES(单核1次多核64次),那么不抢了,进入到阻塞队列等待锁//lock()是阻塞方法,直到获取锁后返回elseif(++retries>MAX_SCAN_RETRIES){lock();break;}elseif((retries&1)0&&//进入这里,说明有新的元素进到了链表,并且成为了新的表头//这边的策略是,重新执行scanAndLockForPut方法(f=entryForHash(this,hash))!=first){e=first=f;//re-traverseifentrychangedretries=-1;}}returnnode;} Size 这个size方法比较有趣,他是先无锁的统计下所有的数据量看下前后两次是否数据一样,如果一样则返回数据,如果不一样则要把全部的segment进行加锁,统计,解锁。并且size方法只是返回一个统计性的数字,因此size谨慎使用哦。 publicintsize(){//Tryafewtimestogetaccuratecount.Onfailuredueto//continuousasyncchangesintable,resorttolocking.finalSegment<K,V>[]segments=this.segments;intsize;booleanoverflow;//trueifsizeoverflows32bitslongsum;//sumofmodCountslonglast=0L;//previoussumintretries=-1;//firstiterationisn'tretrytry{for(;;){if(retries++RETRIES_BEFORE_LOCK){//超过2次则全部加锁for(intj=0;j<segments.length;++j)ensureSegment(j).lock();//直接对全部segment加锁消耗性太大}sum=0L;size=0;overflow=false;for(intj=0;j<segments.length;++j){Segment<K,V>seg=segmentAt(segments,j);if(seg!=null){sum+=seg.modCount;//统计的是modCount,涉及到增删该都会加1intc=seg.count;if(c<0||(size+=c)<0)overflow=true;}}if(sumlast)//每一个前后的修改次数一样则认为一样,但凡有一个不一样则直接break。break;last=sum;}}finally{if(retries>RETRIES_BEFORE_LOCK){for(intj=0;j<segments.length;++j)segmentAt(segments,j).unlock();}}returnoverflow?Integer.MAX_VALUE:size;} rehash segment 数组初始化后就不可变了,也就是说 「并发性不可变」,不过 segment里的 table可以扩容为2倍,该方法没有考虑并发,因为执行该方法之前已经获取了锁。其中JDK7中的 rehash思路跟JDK8 中扩容后处理链表的思路一样,个人不过感觉没有8写的精髓好看。 //方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。privatevoidrehash(HashEntry<K,V>node){HashEntry<K,V>[]oldTable=table;intoldCapacity=oldTable.length;//2倍intnewCapacity=oldCapacity<<1;threshold=(int)(newCapacity*loadFactor);//创建新数组HashEntry<K,V>[]newTable=(HashEntry<K,V>[])newHashEntry[newCapacity];//新的掩码,如从16扩容到32,那么sizeMask为31,对应二进制‘000...00011111’intsizeMask=newCapacity-1;//遍历原数组,将原数组位置i处的链表拆分到新数组位置i和i+oldCap两个位置for(inti=0;i<oldCapacity;i++){//e是链表的第一个元素HashEntry<K,V>e=oldTable[i];if(e!=null){HashEntry<K,V>next=e.next;//计算应该放置在新数组中的位置,//假设原数组长度为16,e在oldTable[3]处,那么idx只可能是3或者是3+16=19intidx=e.hash&sizeMask;//新位置if(nextnull)//该位置处只有一个元素newTable[idx]=e;else{//Reuseconsecutivesequenceatsameslot//e是链表表头HashEntry<K,V>lastRun=e;//idx是当前链表的头结点e的新位置intlastIdx=idx;//for循环找到一个lastRun结点,这个结点之后的所有元素是将要放到一起的for(HashEntry<K,V>last=next;last!=null;last=last.next){intk=last.hash&sizeMask;if(k!=lastIdx){lastIdx=k;lastRun=last;}}//将lastRun及其之后的所有结点组成的这个链表放到lastIdx这个位置newTable[lastIdx]=lastRun;//下面的操作是处理lastRun之前的结点,//这些结点可能分配在另一个链表中,也可能分配到上面的那个链表中for(HashEntry<K,V>p=e;p!=lastRun;p=p.next){Vv=p.value;inth=p.hash;intk=h&sizeMask;HashEntry<K,V>n=newTable[k];newTable[k]=newHashEntry<K,V>(h,p.key,v,n);}}}}//将新来的node放到新数组中刚刚的两个链表之一的头部intnodeIndex=node.hash&sizeMask;//addthenewnodenode.setNext(newTable[nodeIndex]);newTable[nodeIndex]=node;table=newTable;} CAS操作 在JDK7里在 ConcurrentHashMap中通过原子操作 sun.misc.Unsafe查找元素、替换元素和设置元素。通过这样的硬件级别获得数据可以保证及时是多线程我也每次获得的数据是最新的。这些原子操作起着非常关键的作用,你可以在所有 ConcurrentHashMap的基本功能中看到,随机距离如下: finalvoidsetNext(HashEntry<K,V>n){UNSAFE.putOrderedObject(this,nextOffset,n);}staticfinal<K,V>HashEntry<K,V>entryAt(HashEntry<K,V>[]tab,inti){return(tabnull)?null:(HashEntry<K,V>)UNSAFE.getObjectVolatile(tab,((long)i<<TSHIFT)+TBASE);}staticfinal<K,V>voidsetEntryAt(HashEntry<K,V>[]tab,inti,HashEntry<K,V>e){UNSAFE.putOrderedObject(tab,((long)i<<TSHIFT)+TBASE,e);} 常见问题 ConcurrentHashMap实现原理是怎么样的或者ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提升? ❝ ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,只要多个修改操作发生在不同的段上,它们就可以并发进行。 ❞ 在高并发下的情况下如何保证取得的元素是最新的? ❝ 用于存储键值对数据的HashEntry,在设计上它的成员变量value跟next都是volatile类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。 ❞ ConcurrentHashMap的弱一致性体现在迭代器,clear和get方法,原因在于没有加锁。 比如迭代器在遍历数据的时候是一个Segment一个Segment去遍历的,如果在遍历完一个Segment时正好有一个线程在刚遍历完的Segment上插入数据,就会体现出不一致性。clear也是一样。 get方法和containsKey方法都是遍历对应索引位上所有节点,都是不加锁来判断的,如果是修改性质的因为可见性的存在可以直接获得最新值,不过如果是新添加值则无法保持一致性。 JDK8 JDK8相比与JDK7主要区别如下: ❝ 取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率,并发控制使用Synchronized和CAS来操作。 存储数据时采用了数组+ 链表+红黑树的形式。 ❞ CurrentHashMap重要参数: ❝ private static final int MAXIMUM_CAPACITY = 1 << 30; // 数组的最大值 private static final int DEFAULT_CAPACITY = 16; // 默认数组长度 static final int TREEIFY_THRESHOLD = 8; // 链表转红黑树的一个条件 static final int UNTREEIFY_THRESHOLD = 6; // 红黑树转链表的一个条件 static final int MIN_TREEIFY_CAPACITY = 64; // 链表转红黑树的另一个条件 static final int MOVED = -1; // 表示正在扩容转移 static final int TREEBIN = -2; // 表示已经转换成树 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // 获得hash值的辅助参数 transient volatile Node<K,V>[] table;// 默认没初始化的数组,用来保存元素 private transient volatile Node<K,V>[] nextTable; // 转移的时候用的数组 static final int NCPU = Runtime.getRuntime().availableProcessors();// 获取可用的CPU个数 private transient volatile Node<K,V>[] nextTable; // 连接表,用于哈希表扩容,扩容完成后会被重置为 null private transient volatile long baseCount;保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。private transient volatile int sizeCtl; 负数:表示进行初始化或者扩容,-1:表示正在初始化,-N:表示有 N-1 个线程正在进行扩容 正数:0 表示还没有被初始化,> 0的数:初始化或者是下一次进行扩容的阈值,有点类似HashMap中的threshold,不过功能「更强大」。 ❞ 若干重要类 构成每个元素的基本类 Node staticclassNode<K,V>implementsMap.Entry<K,V>{finalinthash;//key的hash值finalKkey;//keyvolatileVval;//valuevolatileNode<K,V>next;//表示链表中的下一个节点} TreeNode继承于Node,用来存储红黑树节点 staticfinalclassTreeNode<K,V>extendsNode<K,V>{TreeNode<K,V>parent;//红黑树的父亲节点TreeNode<K,V>left;//左节点TreeNode<K,V>right;//右节点TreeNode<K,V>prev;//前节点booleanred;//是否为红点} ForwardingNode 在 Node 的子类 ForwardingNode 的构造方法中,可以看到此变量的hash = 「-1」 ,类中还存储 nextTable的引用。该初始化方法只在 transfer方法被调用,如果一个类被设置成此种情况并且hash = -1 则说明该节点不需要resize了。 staticfinalclassForwardingNode<K,V>extendsNode<K,V>{finalNode<K,V>[]nextTable;ForwardingNode(Node<K,V>[]tab){//注意这里super(MOVED,null,null,null);this.nextTable=tab;}//.....} TreeBin TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制. staticfinalclassTreeBin<K,V>extendsNode<K,V>{TreeNode<K,V>root;volatileTreeNode<K,V>first;volatileThreadwaiter;volatileintlockState;//valuesforlockStatestaticfinalintWRITER=1;//setwhileholdingwritelockstaticfinalintWAITER=2;//setwhenwaitingforwritelockstaticfinalintREADER=4;//incrementvalueforsettingreadlock} 构造函数 整体的构造情况基本跟HashMap类似,并且为了跟原来的JDK7中的兼容性还可以传入并发度。不过JDK8中并发度已经有table的具体长度来控制了。 ❝ ConcurrentHashMap():创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(int):创建一个带有指定初始容量 tableSizeFor、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 ConcurrentHashMap(Map<? extends K, ? extends V> m):构造一个与给定映射具有相同映射关系的新映射 ConcurrentHashMap(int initialCapacity, float loadFactor):创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射 ConcurrentHashMap(int, float, int):创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 ❞ put 假设table已经初始化完成,put操作采用 CAS + synchronized 实现并发插入或更新操作,具体实现如下。 ❝ 做一些边界处理,然后获得hash值。 没初始化就初始化,初始化后看下对应的桶是否为空,为空就原子性的尝试插入。 如果当前节点正在扩容还要去帮忙扩容,骚操作。 用 syn来加锁当前节点,然后操作几乎跟就跟hashmap一样了。 ❞ // Node 节点的 hash值在HashMap中存储的就是hash值,在currenthashmap中可能有多种情况哦!finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){if(keynull||valuenull)thrownewNullPointerException();//边界处理inthash=spread(key.hashCode());//最终hash值计算intbinCount=0;for(Node<K,V>[]tab=table;;){//循环表Node<K,V>f;intn,i,fh;if(tabnull||(n=tab.length)0)tab=initTable();//初始化表如果为空,懒汉式elseif((f=tabAt(tab,i=(n-1)&hash))null){//如果对应桶位置为空if(casTabAt(tab,i,null,newNode<K,V>(hash,key,value,null)))//CAS原子性的尝试插入break;}elseif((fh=f.hash)MOVED)//如果当前节点正在扩容。还要帮着去扩容。tab=helpTransfer(tab,f);else{VoldVal=null;synchronized(f){//桶存在数据加锁操作进行处理if(tabAt(tab,i)f){if(fh>=0){//如果存储的是链表存储的是节点的hash值binCount=1;for(Node<K,V>e=f;;++binCount){Kek;//遍历链表去查找,如果找到key一样则选择性if(e.hashhash&&((ek=e.key)key||(ek!=null&&key.equals(ek)))){oldVal=e.val;if(!onlyIfAbsent)e.val=value;break;}Node<K,V>pred=e;if((e=e.next)null){//找到尾部插入pred.next=newNode<K,V>(hash,key,value,null);break;}}}elseif(finstanceofTreeBin){//如果桶节点类型为TreeBinNode<K,V>p;binCount=2;if((p=((TreeBin<K,V>)f).putTreeVal(hash,key,value))!=null){//尝试红黑树插入,同时也要防止节点本来就有,选择性覆盖oldVal=p.val;if(!onlyIfAbsent)p.val=value;}}}}if(binCount!=0){//如果链表数量if(binCount>=TREEIFY_THRESHOLD)treeifyBin(tab,i);//链表转红黑树哦!if(oldVal!=null)returnoldVal;break;}}}addCount(1L,binCount);//统计大小并且检查是否要扩容。returnnull;} 涉及到重要函数initTable、tabAt、casTabAt、helpTransfer、putTreeVal、treeifyBin、addCount函数。 initTable 「只允许一个线程」对表进行初始化,如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度Thread.yield。这样,保证了表同时只会被一个线程初始化,对于table的大小,会根据sizeCtl的值进行设置,如果没有设置szieCtl的值,那么默认生成的table大小为16,否则,会根据sizeCtl的大小设置table大小。 //容器初始化操作privatefinalNode<K,V>[]initTable(){Node<K,V>[]tab;intsc;while((tab=table)null||tab.length0){if((sc=sizeCtl)<0)//如果正在初始化-1,-N 正在扩容。Thread.yield();//进行线程让步等待//让掉当前线程 CPU 的时间片,使正在运行中的线程重新变成就绪状态,并重新竞争 CPU 的调度权。//它可能会获取到,也有可能被其他线程获取到。elseif(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//比较sizeCtl的值与sc是否相等,相等则用-1 替换,这表明我这个线程在进行初始化了!try{if((tab=table)null||tab.length0){intn=(sc>0)?sc:DEFAULT_CAPACITY;//默认为16@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=tab=nt;sc=n-(n>>>2);//sc=0.75n}}finally{sizeCtl=sc;//设置sizeCtl类似threshold}break;}}returntab;} unsafe 在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制。 //用来返回节点数组的指定位置的节点的原子操作@SuppressWarnings("unchecked")staticfinal<K,V>Node<K,V>tabAt(Node<K,V>[]tab,inti){return(Node<K,V>)U.getObjectVolatile(tab,((long)i<<ASHIFT)+ABASE);}//cas原子操作,在指定位置设定值staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);}//原子操作,在指定位置设定值staticfinal<K,V>voidsetTabAt(Node<K,V>[]tab,inti,Node<K,V>v){U.putObjectVolatile(tab,((long)i<<ASHIFT)+ABASE,v);}//比较table数组下标为i的结点是否为c,若为c,则用v交换操作。否则,不进行交换操作。staticfinal<K,V>booleancasTabAt(Node<K,V>[]tab,inti,Node<K,V>c,Node<K,V>v){returnU.compareAndSwapObject(tab,((long)i<<ASHIFT)+ABASE,c,v);} 可以看到获得table[i]数据是通过Unsafe对象通过反射获取的,取数据直接table[index]不可以么,为什么要这么复杂?在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的「副本」,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,「保证了每次拿到数据都是最新的」。 helpTransfer //可能有多个线程在同时帮忙运行helpTransferfinalNode<K,V>[]helpTransfer(Node<K,V>[]tab,Node<K,V>f){Node<K,V>[]nextTab;intsc;if(tab!=null&&(finstanceofForwardingNode)&&(nextTab=((ForwardingNode<K,V>)f).nextTable)!=null){//table不是空且node节点是转移类型,并且转移类型的nextTable不是空说明还在扩容ingintrs=resizeStamp(tab.length);//根据 length 得到一个前16位的标识符,数组容量大小。//确定新table指向没有变,老table数据也没变,并且此时sizeCtl小于0还在扩容ingwhile(nextTabnextTable&&tabletab&&(sc=sizeCtl)<0){if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||transferIndex<=0)//1.sizeCtl无符号右移16位获得高16位如果不等rs标识符变了// 2. 如果扩容结束了这里可以看 trePresize 函数第一次扩容操作://默认第一个线程设置sc=rs左移16位+2,当第一个线程结束扩容了,//就会将 sc 减一。这个时候,sc 就等于 rs + 1。//3.如果达到了最大帮助线程个数65535个//4.如果转移下标调整ing扩容已经结束了break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1)){//如果以上都不是,将sizeCtl+1,增加一个线程来扩容transfer(tab,nextTab);//进行转移break;//结束循环}}returnnextTab;}returntable;} Integer.numberOfLeadingZeros(n) ❝ 该方法的作用是「返回无符号整型i的最高非零位前面的0的个数」,包括符号位在内;如果i为负数,这个方法将会返回0,符号位为1. 比如说,10的二进制表示为 0000 0000 0000 0000 0000 0000 0000 1010 java的整型长度为32位。那么这个方法返回的就是28 ❞ resizeStamp 主要用来获得标识符,可以简单理解是对当前系统容量大小的一种监控。 staticfinalintresizeStamp(intn){returnInteger.numberOfLeadingZeros(n)|(1<<(RESIZE_STAMP_BITS-1));//RESIZE_STAMP_BITS=16} addCount 主要就2件事:一是更新 baseCount,二是判断是否需要扩容。 privatefinalvoidaddCount(longx,intcheck){CounterCell[]as;longb,s;//首先如果没有并发此时countCells is null, 此时尝试CAS设置数据值。if((as=counterCells)!=null||!U.compareAndSwapLong(this,BASECOUNT,b=baseCount,s=b+x)){//如果counterCells不为空以为此时有并发的设置或者CAS设置baseCount失败了CounterCella;longv;intm;booleanuncontended=true;if(asnull||(m=as.length-1)<0||(a=as[ThreadLocalRandom.getProbe()&m])null||!(uncontended=U.compareAndSwapLong(a,CELLVALUE,v=a.value,v+x))){//1.如果没出现并发此时计数盒子为null//2.随机取出一个数组位置发现为空//3.出现并发后修改这个cellvalue失败了//执行funAddCountfullAddCount(x,uncontended);//死循环操作return;}if(check<=1)return;s=sumCount();//吧counterCells数组中的每一个数据进行累加给baseCount。}//如果需要扩容if(check>=0){Node<K,V>[]tab,nt;intn,sc;while(s>=(long)(sc=sizeCtl)&&(tab=table)!=null&&(n=tab.length)<MAXIMUM_CAPACITY){intrs=resizeStamp(n);//获得高位标识符if(sc<0){//是否需要帮忙去扩容if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//第一次扩容elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2))transfer(tab,null);s=sumCount();}}} baseCount添加 ConcurrentHashMap提供了 baseCount、 counterCells 两个辅助变量和一个 CounterCell辅助内部类。sumCount() 就是迭代 counterCells来统计 sum 的过程。put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount()方法。整体的思维方法跟LongAdder类似,用的思维就是借鉴的 ConcurrentHashMap。每一个 Cell都用Contended修饰来避免伪共享。 ❝ JDK1.7 和 JDK1.8 对 size 的计算是不一样的。1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。 JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。 ❞ 关于扩容 在 addCount第一次扩容时候会有骚操作 sc=rs << RESIZE_STAMP_SHIFT) + 2)其中 rs = resizeStamp(n)。这里需要核心说一点, 如果不是第一次扩容则直接将低16位的数字 +1 即可。 putTreeVal 这个操作几乎跟HashMap的操作完全一样,核心思想就是一定要决定向左还是向右然后最终尝试放置新数据,然后balance。不同点就是有锁的考虑。 treeifyBin 这里的基本思路跟HashMap几乎一样,不同点就是先变成TreeNode,然后是「单向链表」串联。 privatefinalvoidtreeifyBin(Node<K,V>[]tab,intindex){Node<K,V>b;intn,sc;if(tab!=null){//如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了//因为这个阈值扩容可以减少hash冲突,不必要去转红黑树if((n=tab.length)<MIN_TREEIFY_CAPACITY)tryPresize(n<<1);elseif((b=tabAt(tab,index))!=null&&b.hash>=0){synchronized(b){//锁定当前桶if(tabAt(tab,index)b){TreeNode<K,V>hd=null,tl=null;for(Node<K,V>e=b;e!=null;e=e.next){//遍历这个链表然后将每个节点封装成TreeNode,最终单链表串联起来,//最终调用setTabAt放置红黑树TreeNode<K,V>p=newTreeNode<K,V>(e.hash,e.key,e.val,null,null);if((p.prev=tl)null)hd=p;elsetl.next=p;tl=p;}//通过TreeBin对象对TreeNode转换成红黑树setTabAt(tab,index,newTreeBin<K,V>(hd));}}}}} TreeBin 主要功能就是链表变化为红黑树,这个红黑树用TreeBin来包装。并且要注意 转成红黑树以后以前链表的结构信息还是有的,最终信息如下: TreeBin.first = 链表中第一个节点。 TreeBin.root = 红黑树中的root节点。 TreeBin(TreeNode<K,V>b){super(TREEBIN,null,null,null);//创建空节点hash=-2this.first=b;TreeNode<K,V>r=null;//root节点for(TreeNode<K,V>x=b,next;x!=null;x=next){next=(TreeNode<K,V>)x.next;x.left=x.right=null;if(rnull){x.parent=null;x.red=false;r=x;//root节点设置为x}else{Kk=x.key;inth=x.hash;Class<?>kc=null;for(TreeNode<K,V>p=r;;){//x代表的是转换为树之前的顺序遍历到链表的位置的节点,r代表的是根节点intdir,ph;Kpk=p.key;if((ph=p.hash)>h)dir=-1;elseif(ph<h)dir=1;elseif((kcnull&&(kc=comparableClassFor(k))null)||(dir=compareComparables(kc,k,pk))0)dir=tieBreakOrder(k,pk);//当key不可以比较,或者相等的时候采取的一种排序措施TreeNode<K,V>xp=p;//放一定是放在叶子节点上,如果还没找到叶子节点则进行循环往下找。//找到了目前叶子节点才会进入再放置数据if((p=(dir<=0)?p.left:p.right)null){x.parent=xp;if(dir<=0)xp.left=x;elsexp.right=x;r=balanceInsertion(r,x);//每次插入一个元素的时候都调用balanceInsertion来保持红黑树的平衡break;}}}}this.root=r;assertcheckInvariants(root);} tryPresize 当数组长度小于64的时候,扩张数组长度一倍,调用此函数。扩容后容量大小的核对,可能涉及到初始化容器大小。并且扩容的时候又跟2的次幂联系上了!,其中初始化时候传入map会调用putAll方法直接put一个map的话,在「putAll」方法中没有调用initTable方法去初始化table,而是直接调用了tryPresize方法,所以这里需要做一个是不是需要初始化table的判断。 PS:默认第一个线程设置 sc = rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1,这个时候说明扩容完毕了。 /***扩容表为指可以容纳指定个数的大小(总是2的N次方)*假设原来的数组长度为16,则在调用tryPresize的时候,size参数的值为16<<1(32),此时sizeCtl的值为12*计算出来c的值为64,则要扩容到sizeCtl≥c*第一次扩容之后数组长:32 sizeCtl:24*第三次扩容之后数组长:128 sizeCtl:96 退出*/privatefinalvoidtryPresize(intsize){intc=(size>=(MAXIMUM_CAPACITY>>>1))?MAXIMUM_CAPACITY:tableSizeFor(size+(size>>>1)+1);//合理范围intsc;while((sc=sizeCtl)>=0){Node<K,V>[]tab=table;intn;if(tabnull||(n=tab.length)0){//初始化传入map,今天putAll会直接调用这个。n=(sc>c)?sc:c;if(U.compareAndSwapInt(this,SIZECTL,sc,-1)){//初始化tab的时候,把sizeCtl设为-1try{if(tabletab){@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n];table=nt;sc=n-(n>>>2);//sc=sizeCtl=0.75n}}finally{sizeCtl=sc;}}}//初始化时候如果数组容量<=sizeCtl或容量已经最大化了则退出elseif(c<=sc||n>=MAXIMUM_CAPACITY){break;//退出扩张}elseif(tabtable){intrs=resizeStamp(n);if(sc<0){//sc=siztCtl如果正在扩容Table的话,则帮助扩容Node<K,V>[]nt;if((sc>>>RESIZE_STAMP_SHIFT)!=rs||scrs+1||scrs+MAX_RESIZERS||(nt=nextTable)null||transferIndex<=0)break;//各种条件判断是否需要加入扩容工作。//帮助转移数据的线程数+1if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1))transfer(tab,nt);}//没有在初始化或扩容,则开始扩容//此处切记第一次扩容直接+2elseif(U.compareAndSwapInt(this,SIZECTL,sc,(rs<<RESIZE_STAMP_SHIFT)+2)){transfer(tab,null);}}}} transfer 这里代码量比较大主要分文三部分,并且感觉思路很精髓,尤其「是其他线程帮着去扩容的骚操作」。 主要是 单个线程能处理的最少桶结点个数的计算和一些属性的初始化操作。 每个线程进来会先领取自己的任务区间 [bound,i],然后开始 --i 来遍历自己的任务区间,对每个桶进行处理。如果遇到桶的头结点是空的,那么使用 ForwardingNode标识旧table中该桶已经被处理完成了。如果遇到已经处理完成的桶,直接跳过进行下一个桶的处理。如果是正常的桶,对桶首节点加锁,正常的迁移即可(跟HashMap第三部分一样思路),迁移结束后依然会将原表的该位置标识位已经处理。 该函数中的finish= true 则说明整张表的迁移操作已经「全部」完成了,我们只需要重置 table的引用并将 nextTable 赋为空即可。否则,CAS 式的将 sizeCtl减一,表示当前线程已经完成了任务,退出扩容操作。如果退出成功,那么需要进一步判断当前线程是否就是最后一个在执行扩容的。 f((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT)return; 第一次扩容时在addCount中有写到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示当前只有一个线程正在工作,「相对应的」,如果 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,说明当前线程就是最后一个还在扩容的线程,那么会将 finishing 标识为 true,并在下一次循环中退出扩容方法。 几乎跟 HashMap大致思路类似的遍历链表/红黑树然后扩容操作。 privatefinalvoidtransfer(Node<K,V>[]tab,Node<K,V>[]nextTab){intn=tab.length,stride;if((stride=(NCPU>1)?(n>>>3)/NCPU:n)<MIN_TRANSFER_STRIDE)//MIN_TRANSFER_STRIDE用来控制不要占用太多CPUstride=MIN_TRANSFER_STRIDE;//subdividerange//MIN_TRANSFER_STRIDE=16每个CPU处理最小长度个数if(nextTabnull){//新表格为空则直接新建二倍,别的辅助线程来帮忙扩容则不会进入此if条件try{@SuppressWarnings("unchecked")Node<K,V>[]nt=(Node<K,V>[])newNode<?,?>[n<<1];nextTab=nt;}catch(Throwableex){//trytocopewithOOMEsizeCtl=Integer.MAX_VALUE;return;}nextTable=nextTab;transferIndex=n;//transferIndex指向最后一个桶,方便从后向前遍历}intnextn=nextTab.length;//新表长度ForwardingNode<K,V>fwd=newForwardingNode<K,V>(nextTab);//创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点booleanadvance=true;//是否继续向前查找的标志位booleanfinishing=false;//toensuresweep(清扫)beforecommittingnextTab,在完成之前重新在扫描一遍数组,看看有没完成的没//第一部分// i 指向当前桶, bound 指向当前线程需要处理的桶结点的区间下限【bound,i】这样来跟线程划分任务。for(inti=0,bound=0;;){Node<K,V>f;intfh;//这个while循环的目的就是通过--i遍历当前线程所分配到的桶结点//一个桶一个桶的处理while(advance){//每一次成功处理操作都会将advance设置为true,然里来处理区间的上一个数据intnextIndex,nextBound;if(--i>=bound||finishing){//通过此处进行任务区间的遍历advance=false;}elseif((nextIndex=transferIndex)<=0){i=-1;//任务分配完了advance=false;}//更新transferIndex//为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex)elseif(U.compareAndSwapInt(this,TRANSFERINDEX,nextIndex,nextBound=(nextIndex>stride?nextIndex-stride:0))){//nextIndex本来等于末尾数字,bound=nextBound;i=nextIndex-1;advance=false;}}//当前线程所有任务完成if(i<0||i>=n||i+n>=nextn){intsc;if(finishing){//已经完成转移则直接赋值操作nextTable=null;table=nextTab;sizeCtl=(n<<1)-(n>>>1);//设置sizeCtl为扩容后的0.75return;}if(U.compareAndSwapInt(this,SIZECTL,sc=sizeCtl,sc-1)){// sizeCtl-1 表示当前线程任务完成。if((sc-2)!=resizeStamp(n)<<RESIZE_STAMP_SHIFT){//判断当前线程完成的线程是不是最后一个在扩容的,思路精髓return;}finishing=advance=true;//如果是则相应的设置参数i=n;}}elseif((f=tabAt(tab,i))null)//数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1])advance=casTabAt(tab,i,null,fwd);//如果老节点数据是空的则直接进行CAS设置为fwdelseif((fh=f.hash)MOVED)//已经是个fwd了,因为是多线程操作可能别人已经给你弄好了,advance=true;//alreadyprocessedelse{synchronized(f){//加锁操作if(tabAt(tab,i)f){Node<K,V>ln,hn;if(fh>=0){//该节点的hash值大于等于0,说明是一个Node节点//关于链表的操作整体跟HashMap类似不过感觉好像更扰一些。intrunBit=fh&n;//fh=f.hashfirsthash的意思,看第一个点放老位置还是新位置Node<K,V>lastRun=f;for(Node<K,V>p=f.next;p!=null;p=p.next){intb=p.hash&n;//n的值为扩张前的数组的长度if(b!=runBit){runBit=b;lastRun=p;//最后导致发生变化的节点}}if(runBit0){//看最后一个变化点是新还是旧旧ln=lastRun;hn=null;}else{hn=lastRun;//看最后一个变化点是新还是旧旧ln=null;}/**构造两个链表,顺序大部分和原来是反的,不过顺序也有差异*分别放到原来的位置和新增加的长度的相同位置(i/n+i)*/for(Node<K,V>p=f;p!=lastRun;p=p.next){intph=p.hash;Kpk=p.key;Vpv=p.val;if((ph&n)0)/**假设runBit的值为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点)设置到旧的table的第一个hash计算后为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/ln=newNode<K,V>(ph,pk,pv,ln);else/**假设runBit的值不为0,*则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点)设置到旧的table的第一个hash计算后不为0的节点下一个节点*并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点*/hn=newNode<K,V>(ph,pk,pv,hn);}setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);advance=true;}elseif(finstanceofTreeBin){//该节点hash值是个负数否则的话是一个树节点TreeBin<K,V>t=(TreeBin<K,V>)f;TreeNode<K,V>lo=null,loTail=null;//旧头尾TreeNode<K,V>hi=null,hiTail=null;//新头围intlc=0,hc=0;for(Node<K,V>e=t.first;e!=null;e=e.next){inth=e.hash;TreeNode<K,V>p=newTreeNode<K,V>(h,e.key,e.val,null,null);if((h&n)0){if((p.prev=loTail)null)lo=p;elseloTail.next=p;//旧头尾设置loTail=p;++lc;}else{//新头围设置if((p.prev=hiTail)null)hi=p;elsehiTail.next=p;hiTail=p;++hc;}}//ln如果老位置数字<=6则要对老位置链表进行红黑树降级到链表,否则就看是否还需要对老位置数据进行新建红黑树ln=(lc<=UNTREEIFY_THRESHOLD)?untreeify(lo):(hc!=0)?newTreeBin<K,V>(lo):t;hn=(hc<=UNTREEIFY_THRESHOLD)?untreeify(hi):(lc!=0)?newTreeBin<K,V>(hi):t;setTabAt(nextTab,i,ln);setTabAt(nextTab,i+n,hn);setTabAt(tab,i,fwd);//老表中i位置节点设置下advance=true;}}}}}} get 这个就很简单了,获得hash值,然后判断存在与否,遍历链表即可,注意get没有任何锁操作! publicVget(Objectkey){Node<K,V>[]tab;Node<K,V>e,p;intn,eh;Kek;//计算key的hash值inth=spread(key.hashCode());if((tab=table)!=null&&(n=tab.length)>0&&(e=tabAt(tab,(n-1)&h))!=null){//表不为空并且表的长度大于0并且key所在的桶不为空if((eh=e.hash)h){//表中的元素的hash值与key的hash值相等if((ek=e.key)key||(ek!=null&&key.equals(ek)))//键相等//返回值returne.val;}elseif(eh<0)//是个TreeBinhash=-2//在红黑树中查找,因为红黑树中也保存这一个链表顺序return(p=e.find(h,key))!=null?p.val:null;while((e=e.next)!=null){//对于结点hash值大于0的情况链表if(e.hashh&&((ek=e.key)key||(ek!=null&&key.equals(ek))))returne.val;}}returnnull;} clear 关于清空也相对简单 ,无非就是遍历桶数组,然后通过CAS来置空。 publicvoidclear(){longdelta=0L;inti=0;Node<K,V>[]tab=table;while(tab!=null&&i<tab.length){intfh;Node<K,V>f=tabAt(tab,i);if(fnull)++i;//这个桶是空的直接跳过elseif((fh=f.hash)MOVED){//这个桶的数据还在扩容中,要去扩容同时等待。tab=helpTransfer(tab,f);i=0;//restart}else{synchronized(f){//真正的删除if(tabAt(tab,i)f){Node<K,V>p=(fh>=0?f:(finstanceofTreeBin)?((TreeBin<K,V>)f).first:null);//循环到链表/者红黑树的尾部while(p!=null){--delta;//记录删除了多少个p=p.next;}//利用CAS无锁置nullsetTabAt(tab,i++,null);}}}}if(delta!=0L)addCount(delta,-1);//调整count} end ConcurrentHashMap是如果来做到「并发安全」,又是如何做到「高效」的并发的呢? 首先是读操作,读源码发现get方法中根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的。 写操作 . 数据扩容函数是 transfer,该方法的只有 addCount, helpTransfer和 tryPresize这三个方法来调用。 addCount是在当对数组进行操作,使得数组中存储的元素个数发生了变化的时候会调用的方法。 helpTransfer是在当一个线程要对table中元素进行操作的时候,如果检测到节点的·hash·= MOVED 的时候,就会调用 helpTransfer方法,在 helpTransfer中再调用 transfer方法来帮助完成数组的扩容 ❝ tryPresize是在 treeIfybin和 putAll方法中调用, treeIfybin主要是在 put添加元素完之后,判断该数组节点相关元素是不是已经超过8个的时候,如果超过则会调用这个方法来扩容数组或者把链表转为树。注意 putAll在初始化传入一个大map的时候会调用。· ❞ 总结扩容情况发生: ❝ 在往map中添加元素的时候,在某一个节点的数目已经超过了8个,同时数组的长度又小于64的时候,才会触发数组的扩容。 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容 ❞ 3. 扩容时候是否可以进行读写。 ❝ 对于读操作,因为是没有加锁的所以可以的. 对于写操作,JDK8中已经将锁的范围细腻到table[i]l了,当在进行数组扩容的时候,如果当前节点还没有被处理(也就是说还没有设置为fwd节点),那就可以进行设置操作。如果该节点已经被处理了,则当前线程也会加入到扩容的操作中去。 ❞ 多个线程又是如何同步处理的 在 ConcurrentHashMap中,同步处理主要是通过 Synchronized和 unsafe的硬件级别原子性 这两种方式来完成的。 ❝ 在取得sizeCtl跟某个位置的Node的时候,使用的都是 unsafe的方法,来达到并发安全的目的 当需要在某个位置设置节点的时候,则会通过 Synchronized的同步机制来锁定该位置的节点。 在数组扩容的时候,则通过处理的 步长和 fwd节点来达到并发安全的目的,通过设置hash值为MOVED=-1。 当把某个位置的节点复制到扩张后的table的时候,也通过 Synchronized的同步机制来保证线程安全 ❞ 套路 ❝ 谈谈你理解的 HashMap,讲讲其中的 get put 过程。 1.8 做了什么优化? 是线程安全的嘛? 不安全会导致哪些问题? 如何解决?有没有线程安全的并发容器? ConcurrentHashMap 是如何实现的?1.7、1.8 实现有何不同,为什么这么做。 1.8中ConcurrentHashMap的sizeCtl作用,大致说下协助扩容跟标志位。 HashMap 为什么不用跳表替换红黑树呢? ❞ 参考 HashMap讲解 本文分享自微信公众号 - sowhat1412(sowhat9094)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

面试官:如何写出让 CPU 跑得更快的代码?

前言 代码都是由 CPU 跑起来的,我们代码写的好与坏就决定了 CPU 的执行效率,特别是在编写计算密集型的程序,更要注重 CPU 的执行效率,否则将会大大影响系统性能。 CPU 内部嵌入了 CPU Cache(高速缓存),它的存储容量很小,但是离 CPU 核心很近,所以缓存的读写速度是极快的,那么如果 CPU 运算时,直接从 CPU Cache 读取数据,而不是从内存的话,运算速度就会很快。 但是,大多数人不知道 CPU Cache 的运行机制,以至于不知道如何才能够写出能够配合 CPU Cache 工作机制的代码,一旦你掌握了它,你写代码的时候,就有新的优化思路了。 那么,接下来我们就来看看,CPU Cache 到底是什么样的,是如何工作的呢,又该写出让 CPU 执行更快的代码呢? 正文 CPU Cache 有多快? 你可能会好奇为什么有了内存,还需要 CPU Cache?根据摩尔定律,CPU 的访问速度每 18 个月就会翻倍,相当于每年增长 60% 左右,内存的速度当然也会不断增长,但是增长的速度远小于 CPU,平均每年只增长 7% 左右。于是,CPU 与内存的访问性能的差距不断拉大。 到现在,一次内存访问所需时间是 200~300 多个时钟周期,这意味着 CPU 和内存的访问速度已经相差 200~300 多倍了。 为了弥补 CPU 与内存两者之间的性能差异,就在 CPU 内部引入了 CPU Cache,也称高速缓存。 CPU Cache 通常分为大小不等的三级缓存,分别是 L1 Cache、L2 Cache 和 L3 Cache。 由于 CPU Cache 所使用的材料是 SRAM,价格比内存使用的 DRAM 高出很多,在当今每生产 1 MB 大小的 CPU Cache 需要 7 美金的成本,而内存只需要 0.015 美金的成本,成本方面相差了 466 倍,所以 CPU Cache 不像内存那样动辄以 GB 计算,它的大小是以 KB 或 MB 来计算的。 在 Linux 系统中,我们可以使用下图的方式来查看各级 CPU Cache 的大小,比如我这手上这台服务器,离 CPU 核心最近的 L1 Cache 是 32KB,其次是 L2 Cache 是 256KB,最大的 L3 Cache 则是 3MB。 其中,L1 Cache 通常会分为「数据缓存」和「指令缓存」,这意味着数据和指令在 L1 Cache 这一层是分开缓存的,上图中的 index0 也就是数据缓存,而 index1 则是指令缓存,它两的大小通常是一样的。 另外,你也会注意到,L3 Cache 比 L1 Cache 和 L2 Cache 大很多,这是因为 L1 Cache 和 L2 Cache 都是每个 CPU 核心独有的,而 L3 Cache 是多个 CPU 核心共享的。 程序执行时,会先将内存中的数据加载到共享的 L3 Cache 中,再加载到每个核心独有的 L2 Cache,最后进入到最快的 L1 Cache,之后才会被 CPU 读取。它们之间的层级关系,如下图: 越靠近 CPU 核心的缓存其访问速度越快,CPU 访问 L1 Cache 只需要 2~4 个时钟周期,访问 L2 Cache 大约 10~20 个时钟周期,访问 L3 Cache 大约 20~60 个时钟周期,而访问内存速度大概在 200~300 个 时钟周期之间。如下表格: 所以,CPU 从 L1 Cache 读取数据的速度,相比从内存读取的速度,会快 100 多倍。 CPU Cache 的数据结构和读取过程是什么样的? CPU Cache 的数据是从内存中读取过来的,它是以一小块一小块读取数据的,而不是按照单个数组元素来读取数据的,在 CPU Cache 中的,这样一小块一小块的数据,称为 Cache Line(缓存块)。 你可以在你的 Linux 系统,用下面这种方式来查看 CPU 的 Cache Line,你可以看我服务器的 L1 Cache Line 大小是 64 字节,也就意味着 L1 Cache 一次载入数据的大小是 64 字节。 比如,有一个 int array[100] 的数组,当载入 array[0] 时,由于这个数组元素的大小在内存只占 4 字节,不足 64 字节,CPU 就会顺序加载数组元素到 array[15],意味着 array[0]~array[15] 数组元素都会被缓存在 CPU Cache 中了,因此当下次访问这些数组元素时,会直接从 CPU Cache 读取,而不用再从内存中读取,大大提高了 CPU 读取数据的性能。 事实上,CPU 读取数据的时候,无论数据是否存放到 Cache 中,CPU 都是先访问 Cache,只有当 Cache 中找不到数据时,才会去访问内存,并把内存中的数据读入到 Cache 中,CPU 再从 CPU Cache 读取数据。 这样的访问机制,跟我们使用「内存作为硬盘的缓存」的逻辑是一样的,如果内存有缓存的数据,则直接返回,否则要访问龟速一般的硬盘。 那 CPU 怎么知道要访问的内存数据,是否在 Cache 里?如果在的话,如何找到 Cache 对应的数据呢?我们从最简单、基础的直接映射 Cache(Direct Mapped Cache) 说起,来看看整个 CPU Cache 的数据结构和访问逻辑。 前面,我们提到 CPU 访问内存数据时,是一小块一小块数据读取的,具体这一小块数据的大小,取决于 coherency_line_size 的值,一般 64 字节。在内存中,这一块的数据我们称为内存块(Bock),读取的时候我们要拿到数据所在内存块的地址。 对于直接映射 Cache 采用的策略,就是把内存块的地址始终「映射」在一个 CPU Line(缓存块) 的地址,至于映射关系实现方式,则是使用「取模运算」,取模运算的结果就是内存块地址对应的 CPU Line(缓存块) 的地址。 举个例子,内存共被划分为 32 个内存块,CPU Cache 共有 8 个 CPU Line,假设 CPU 想要访问第 15 号内存块,如果 15 号内存块中的数据已经缓存在 CPU Line 中的话,则是一定映射在 7 号 CPU Line 中,因为 15 % 8 的值是 7。 机智的你肯定发现了,使用取模方式映射的话,就会出现多个内存块对应同一个 CPU Line,比如上面的例子,除了 15 号内存块是映射在 7 号 CPU Line 中,还有 7 号、23 号、31 号内存块都是映射到 7 号 CPU Line 中。 因此,为了区别不同的内存块,在对应的 CPU Line 中我们还会存储一个组标记(Tag)。这个组标记会记录当前 CPU Line 中存储的数据对应的内存块,我们可以用这个组标记来区分不同的内存块。 除了组标记信息外,CPU Line 还有两个信息: 一个是,从内存加载过来的实际存放数据(Data)。 另一个是,有效位(Valid bit),它是用来标记对应的 CPU Line 中的数据是否是有效的,如果有效位是 0,无论 CPU Line 中是否有数据,CPU 都会直接访问内存,重新加载数据。 CPU 在从 CPU Cache 读取数据的时候,并不是读取 CPU Line 中的整个数据块,而是读取 CPU 所需要的一个数据片段,这样的数据统称为一个字(Word)。那怎么在对应的 CPU Line 中数据块中找到所需的字呢?答案是,需要一个偏移量(Offset)。 因此,一个内存的访问地址,包括组标记、CPU Line 索引、偏移量这三种信息,于是 CPU 就能通过这些信息,在 CPU Cache 中找到缓存的数据。而对于 CPU Cache 里的数据结构,则是由索引 + 有效位 + 组标记 + 数据块组成。 如果内存中的数据已经在 CPU Cahe 中了,那 CPU 访问一个内存地址的时候,会经历这 4 个步骤: 根据内存地址中索引信息,计算在 CPU Cahe 中的索引,也就是找出对应的 CPU Line 的地址; 找到对应 CPU Line 后,判断 CPU Line 中的有效位,确认 CPU Line 中数据是否是有效的,如果是无效的,CPU 就会直接访问内存,并重新加载数据,如果数据有效,则往下执行; 对比内存地址中组标记和 CPU Line 中的组标记,确认 CPU Line 中的数据是我们要访问的内存数据,如果不是的话,CPU 就会直接访问内存,并重新加载数据,如果是的话,则往下执行; 根据内存地址中偏移量信息,从 CPU Line 的数据块中,读取对应的字。 到这里,相信你对直接映射 Cache 有了一定认识,但其实除了直接映射 Cache 之外,还有其他通过内存地址找到 CPU Cache 中的数据的策略,比如全相连 Cache (Fully Associative Cache)、组相连 Cache (Set Associative Cache)等,这几种策策略的数据结构都比较相似,我们理解流直接映射 Cache 的工作方式,其他的策略如果你有兴趣去看,相信很快就能理解的了。 如何写出让 CPU 跑得更快的代码? 我们知道 CPU 访问内存的速度,比访问 CPU Cache 的速度慢了 100 多倍,所以如果 CPU 所要操作的数据在 CPU Cache 中的话,这样将会带来很大的性能提升。访问的数据在 CPU Cache 中的话,意味着缓存命中,缓存命中率越高的话,代码的性能就会越好,CPU 也就跑的越快。 于是,「如何写出让 CPU 跑得更快的代码?」这个问题,可以改成「如何写出 CPU 缓存命中率高的代码?」。 在前面我也提到, L1 Cache 通常分为「数据缓存」和「指令缓存」,这是因为 CPU 会别处理数据和指令,比如 1+1=2 这个运算,+ 就是指令,会被放在「指令缓存」中,而输入数字 1 则会被放在「数据缓存」里。 因此,我们要分开来看「数据缓存」和「指令缓存」的缓存命中率。 如何提升数据缓存的命中率? 假设要遍历二维数组,有以下两种形式,虽然代码执行结果是一样,但你觉得哪种形式效率最高呢?为什么高呢? 经过测试,形式一 array[i][j] 执行时间比形式二 array[j][i] 快好几倍。 之所以有这么大的差距,是因为二维数组 array 所占用的内存是连续的,比如长度 N 的指是 2 的话,那么内存中的数组元素的布局顺序是这样的: 形式一用 array[i][j] 访问数组元素的顺序,正是和内存中数组元素存放的顺序一致。当 CPU 访问 array[0][0] 时,由于该数据不在 Cache 中,于是会「顺序」把跟随其后的 3 个元素从内存中加载到 CPU Cache,这样当 CPU 访问后面的 3 个数组元素时,就能在 CPU Cache 中成功地找到数据,这意味着缓存命中率很高,缓存命中的数据不需要访问内存,这便大大提高了代码的性能。 而如果用形式二的 array[j][i] 来访问,则访问的顺序就是: 你可以看到,访问的方式跳跃式的,而不是顺序的,那么如果 N 的数值很大,那么操作 array[j][i] 时,是没办法把 array[j+1][i] 也读入到 CPU Cache 中的,既然 array[j+1][i] 没有读取到 CPU Cache,那么就需要从内存读取该数据元素了。很明显,这种不连续性、跳跃式访问数据元素的方式,可能不能充分利用到了 CPU Cache 的特性,从而代码的性能不高。 那访问 array[0][0] 元素时,CPU 具体会一次从内存中加载多少元素到 CPU Cache 呢?这个问题,在前面我们也提到过,这跟 CPU Cache Line 有关,它表示 CPU Cache 一次性能加载数据的大小,可以在 Linux 里通过 coherency_line_size 配置查看 它的大小,通常是 64 个字节。 也就是说,当 CPU 访问内存数据时,如果数据不在 CPU Cache 中,则会一次性会连续加载 64 字节大小的数据到 CPU Cache,那么当访问 array[0][0] 时,由于该元素不足 64 字节,于是就会往后顺序读取 array[0][0]~array[0][15] 到 CPU Cache 中。顺序访问的 array[i][j] 因为利用了这一特点,所以就会比跳跃式访问的 array[j][i] 要快。 因此,遇到这种遍历数组的情况时,按照内存布局顺序访问,将可以有效的利用 CPU Cache 带来的好处,这样我们代码的性能就会得到很大的提升, 如何提升指令缓存的命中率? 提升数据的缓存命中率的方式,是按照内存布局顺序访问,那针对指令的缓存该如何提升呢? 我们以一个例子来看看,有一个元素为 0 到 100 之间随机数字组成的一维数组: 接下来,对这个数组做两个操作: 第一个操作,循环遍历数组,把小于 50 的数组元素置为 0; 第二个操作,将数组排序; 那么问题来了,你觉得先遍历再排序速度快,还是先排序再遍历速度快呢? 在回答这个问题之前,我们先了解 CPU 的分支预测器。对于 if 条件语句,意味着此时至少可以选择跳转到两段不同的指令执行,也就是 if 还是 else 中的指令。那么,如果分支预测可以预测到接下来要执行 if 里的指令,还是 else 指令的话,就可以「提前」把这些指令放在指令缓存中,这样 CPU 可以直接从 Cache 读取到指令,于是执行速度就会很快。 当数组中的元素是随机的,分支预测就无法有效工作,而当数组元素都是顺序的,分支预测器会动态地根据历史命中数据对未来进行预测,这样命中率就会很高。 因此,先排序再遍历速度会更快,这是因为排序之后,数字是从小到大的,那么前几次循环命中 if < 50 的次数会比较多,于是分支预测就会缓存 if 里的 array[i] = 0 指令到 Cache 中,后续 CPU 执行该指令就只需要从 Cache 读取就好了。 如果你肯定代码中的 if 中的表达式判断为 true 的概率比较高,我们可以使用显示分支预测工具,比如在 C/C++ 语言中编译器提供了 likely 和 unlikely 这两种宏,如果 if 条件为 ture 的概率大,则可以用 likely 宏把 if 里的表达式包裹起来,反之用 unlikely 宏。 实际上,CPU 自身的动态分支预测已经是比较准的了,所以只有当非常确信 CPU 预测的不准,且能够知道实际的概率情况时,才建议使用这两种宏。 如果提升多核 CPU 的缓存命中率? 在单核 CPU,虽然只能执行一个进程,但是操作系统给每个进程分配了一个时间片,时间片用完了,就调度下一个进程,于是各个进程就按时间片交替地占用 CPU,从宏观上看起来各个进程同时在执行。 而现代 CPU 都是多核心的,进程可能在不同 CPU 核心来回切换执行,这对 CPU Cache 不是有利的,虽然 L3 Cache 是多核心之间共享的,但是 L1 和 L2 Cache 都是每个核心独有的,如果一个进程在不同核心来回切换,各个核心的缓存命中率就会受到影响,相反如果进程都在同一个核心上执行,那么其数据的 L1 和 L2 Cache 的缓存命中率可以得到有效提高,缓存命中率高就意味着 CPU 可以减少访问 内存的频率。 当有多个同时执行「计算密集型」的线程,为了防止因为切换到不同的核心,而导致缓存命中率下降的问题,我们可以把线程绑定在某一个 CPU 核心上,这样性能可以得到非常可观的提升。 在 Linux 上提供了 sched_setaffinity 方法,来实现将线程绑定到某个 CPU 核心这一功能。 总结 由于随着计算机技术的发展,CPU 与 内存的访问速度相差越来越多,如今差距已经高达好几百倍了,所以 CPU 内部嵌入了 CPU Cache 组件,作为内存与 CPU 之间的缓存层,CPU Cache 由于离 CPU 核心很近,所以访问速度也是非常快的,但由于所需材料成本比较高,它不像内存动辄几个 GB 大小,而是仅有几十 KB 到 MB 大小。 当 CPU 访问数据的时候,先是访问 CPU Cache,如果缓存命中的话,则直接返回数据,就不用每次都从内存读取速度了。因此,缓存命中率越高,代码的性能越好。 但需要注意的是,当 CPU 访问数据时,如果 CPU Cache 没有缓存该数据,则会从内存读取数据,但是并不是只读一个数据,而是一次性读取一块一块的数据存放到 CPU Cache 中,之后才会被 CPU 读取。 内存地址映射到 CPU Cache 地址里的策略有很多种,其中比较简单是直接映射 Cache,它巧妙的把内存地址拆分成「索引 + 组标记 + 偏移量」的方式,使得我们可以将很大的内存地址,映射到很小的 CPU Cache 地址里。 要想写出让 CPU 跑得更快的代码,就需要写出缓存命中率高的代码,CPU L1 Cache 分为数据缓存和指令缓存,因而需要分别提高它们的缓存命中率: 对于数据缓存,我们在遍历数据的时候,应该按照内存布局的顺序操作,这是因为 CPU Cache 是根据 CPU Cache Line 批量操作数据的,所以顺序地操作连续内存数据时,性能能得到有效的提升; 对于指令缓存,有规律的条件分支语句能够让 CPU 的分支预测器发挥作用,进一步提高执行的效率; 另外,对于多核 CPU 系统,线程可能在不同 CPU 核心来回切换,这样各个核心的缓存命中率就会受到影响,于是要想提高进程的缓存命中率,可以考虑把线程绑定 CPU 到某一个 CPU 核心。 絮叨 分享个喜事,小林平日里忙着输出文章,今天收到一份特别的快递,是 CSDN 寄来的奖状。 骄傲的说,你们关注的是 CSDN 首届技术原创第一名的博主,以后简历又可以吹牛逼了 没有啦,其实主要还是谢谢你们不离不弃的支持。 哈喽,我是小林,就爱图解计算机基础,如果觉得文章对你有帮助,欢迎分享给你的朋友,也给小林点个「在看」,这对小林非常重要,谢谢你们,给各位小姐姐小哥哥们抱拳了,我们下次见! 推荐阅读 这个星期不知不觉输出了 3 篇文章了,前面的 2 篇还没看过的同学,赶紧去看看呀! 天啦噜!知道硬盘很慢,但没想到比 CPU Cache 慢 10000000 倍 CPU 执行程序的秘密,藏在了这 15 张图里 本文分享自微信公众号 - 小林coding(CodingLin)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

Python | 面试官:你来说说并发场景锁怎么用?

点击上方蓝字,关注并星标,和我一起学技术。 今天是Python专题的第24篇文章,我们一起来聊聊多线程场景当中不可或缺的另外一个部分——锁。 如果你学过操作系统,那么对于锁应该不陌生。锁的含义是线程锁,可以用来指定某一个逻辑或者是资源同一时刻只能有一个线程访问。这个很好理解,就好像是有一个房间被一把锁锁住了,只有拿到钥匙的人才能进入。每一个人从房间门口拿到钥匙进入房间,出房间的时候会把钥匙再放回到门口。这样下一个到门口的人就可以拿到钥匙了。这里的房间就是某一个资源或者是一段逻辑,而拿取钥匙的人其实指的是一个线程。 加锁的原因 我们明白了锁的原理,不禁有了一个问题,我们为什么需要锁呢,它在哪些场景当中会用到呢? 其实它的使用场景非常广,我们举一个非常简单的例子,就是淘宝买东西。我们都知道商家的库存都是有限的,卖掉一个少一个。假如说当前某个商品库存只剩下一个,但当下却有两个人同时购买。两个人同时购买也就是有两个请求同时发起购买请求,如果我们不加锁的话,两个线程同时查询到商品的库存是1,大于0,进行购买逻辑之后,同时减一。由于两个线程同时执行,所以最后商品的库存会变成-1。 显然商品的库存不应该是一个负数,所以我们需要避免这种情况发生。通过加锁可以完美解决这个问题。我们规定一次只能有一个线程发起购买的请求,那么这样当一个线程将库存减到0的时候,第二个请求就无法修改了,就保证了数据的准确性。 代码实现 那么在Python当中,我们怎么样来实现这个锁呢? 其实很简单,threading库当中已经为我们提供了线程的工具,我们直接拿过来用就可以了。我们通过使用threading当中的Lock对象, 可以很轻易的实现方法加锁的功能。 importthreadingclassPurchaseRequest:'''初始化库存与锁'''def__init__(self,initial_value=0):self._value=initial_valueself._lock=threading.Lock()defincr(self,delta=1):'''加库存'''self._lock.acquire()self._value+=deltaself._lock.release()defdecr(self,delta=1):'''减库存'''self._lock.acquire()self._value-=deltaself._lock.release() 我们从代码当中就可以很轻易的看出Lock这个对象的使用方法,我们在进入加锁区(资源抢占区)之前,我们需要先使用lock.acquire()方法获取锁。Lock对象可以保证同一时刻只能有一个线程获取锁,只有获取了锁之后才会继续往下执行。当我们执行完成之后,我们需要把锁“放回门口”,所以需要再调用一下release方法,表示锁的释放。 这里有一个小问题是很多程序员在编程的时候总是会忘记release,导致不必要的bug,而且这种分布式场景当中的bug很难通过测试发现。因为测试的时候往往很难测试并发场景,code review的时候也很容易忽略,因此一旦泄露了还是挺难发现的。 为了解决这个问题,Lock还提供了一种改进的用法,就是使用with语句。with语句我们之前在使用文件的时候用到过,使用with可以替我们完成try catch以及资源回收等工作,我们只管用就完事了。这里也是一样,使用with之后我们就可以不用管锁的申请和释放了,直接写代码就行,所以上面的代码可以改写成这样: importthreadingclassPurchaseRequest:'''初始化库存与锁'''def__init__(self,initial_value=0):self._value=initial_valueself._lock=threading.Lock()defincr(self,delta=1):'''加库存'''withself._lock:self._value+=deltadefdecr(self,delta=1):'''减库存'''withself._lock:self._value-=delta 这样看起来是不是清爽很多? 可重入锁 上面介绍的只是最简单的锁,我们经常使用的往往是可重入锁。 什么叫可重入锁呢?简单解释一下,就是在一个线程已经持有了锁的情况下,它可以再次进入被加锁的区域。但是既然线程还持有锁没有释放,那么它不应该还是在加锁区域吗,怎么会有需要再次进入被加锁区域的情况呢?其实是有的,道理也很简单,就是递归。 我们把上面的例子稍微改一点点,就完全不一样了。 importthreadingclassPurchaseRequest:'''初始化库存与锁'''def__init__(self,initial_value=0):self._value=initial_valueself._lock=threading.Lock()defincr(self,delta=1):'''加库存'''withself._lock:self._value+=deltadefdecr(self,delta=1):'''减库存'''withself._lock:self.incr(-delta) 我们关注一下上面的decr方法,我们用incr来代替了原本的逻辑实现了decr。但是有一个问题是decr也是一个加锁的方法,需要前一个锁释放了才能进入。但它已经持有了锁了,那么这种情况下就会发生死锁。 我们只需要把Lock换成可重入锁就可以解决这个问题,只需要修改一行代码。 importthreadingclassPurchaseRequest:'''初始化库存与锁我们使用RLock代替了Lock,也可重入锁代替了普通锁'''def__init__(self,initial_value=0):self._value=initial_valueself._lock=threading.RLock()defincr(self,delta=1):'''加库存'''withself._lock:self._value+=deltadefdecr(self,delta=1):'''减库存'''withself._lock:self.incr(-delta) 总结 今天我们的文章介绍了Python当中锁的使用方法,以及可重入锁的概念。在并发场景下开发和调试都是一个比较困难的工作,稍微不小心就会踩到各种各样的坑,死锁只是其中一种比较常见并且比较容易解决的问题,除此之外还有很多其他各种各样的问题。 针对死锁的问题,Python还提供了其他的解决方案,我们放到下一篇文章当中再和大家分享。 今天的文章到这里就结束了,如果喜欢本文的话,请来一波素质三连,给我一点支持吧(关注、在看、点赞)。 本文分享自微信公众号 - TechFlow(techflow2019)。如有侵权,请联系 support@oschina.cn 删除。本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

优秀的个人博客,低调大师

面试常考:Java中synchronized和volatile有什么区别?

云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 之前的文章中我们介绍过两个在Java并发编程中比较重要的两个关键字:synchronized和volatile 简单回顾一下相关内容: 1、Java语言为了解决并发编程中存在的原子性、可见性和有序性问题,提供了一系列和并发处理相关的关键字,比如synchronized、volatile、final、concurren包等。 2、synchronized通过加锁的方式,使得其在需要原子性、可见性和有序性这三种特性的时候都可以作为其中一种解决方案,看起来是“万能”的。的确,大部分并发控制操作都能使用synchronized来完成。 3、volatile通过在volatile变量的操作前后插入内存屏障的方式,保证了变量在并发场景下的可见性和有序性。 4、volatile关键字是无法保证原子性的,而synchronized通过monitorenter和monitorexit两个指令,可以保证被synchronized修饰的代码在同一时间只能被一个线程访问,即可保证不会出现CPU时间片在多个线程间切换,即可保证原子性。 那么,我们知道,synchronized和volatile两个关键字是Java并发编程中经常用到的两个关键字,而且,通过前面的回顾,我们知道synchronized可以保证并发编程中不会出现原子性、可见性和有序性问题,而volatile只能保证可见性和有序性,那么,既生synchronized、何生volatile? 接下来,本文就来论述一下,为什么Java中已经有了synchronized关键字,还要提供volatile关键字。 synchronized的问题 我们都知道synchronized其实是一种加锁机制,那么既然是锁,天然就具备以下几个缺点: 1、有性能损耗 虽然在JDK 1.6中对synchronized做了很多优化,如如适应性自旋、锁消除、锁粗化、轻量级锁和偏向锁等,但是他毕竟还是一种锁。 以上这几种优化,都是尽量想办法避免对Monitor进行加锁,但是,并不是所有情况都可以优化的,况且就算是经过优化,优化的过程也是有一定的耗时的。 所以,无论是使用同步方法还是同步代码块,在同步操作之前还是要进行加锁,同步操作之后需要进行解锁,这个加锁、解锁的过程是要有性能损耗的。 关于二者的性能对比,由于虚拟机对锁实行的许多消除和优化,使得我们很难量化这两者之间的性能差距,但是我们可以确定的一个基本原则是:volatile变量的读操作的性能小号普通变量几乎无差别,但是写操作由于需要插入内存屏障所以会慢一些,即便如此,volatile在大多数场景下也比锁的开销要低。 2、产生阻塞 我们在深入理解多线程(一)——Synchronized的实现原理中介绍过关于synchronize的实现原理,无论是同步方法还是同步代码块,无论是ACC_SYNCHRONIZED还是monitorenter、monitorexit都是基于Monitor实现的。 基于Monitor对象,当多个线程同时访问一段同步代码时,首先会进入Entry Set,当有一个线程获取到对象的锁之后,才能进行The Owner区域,其他线程还会继续在Entry Set等待。并且当某个线程调用了wait方法后,会释放锁并进入Wait Set等待。 所以,synchronize实现的锁本质上是一种阻塞锁,也就是说多个线程要排队访问同一个共享对象。 而volatile是Java虚拟机提供的一种轻量级同步机制,他是基于内存屏障实现的。 说到底,他并不是锁,所以他不会有synchronized带来的阻塞和性能损耗的问题。 volatile的附加功能 除了前面我们提到的volatile比synchronized性能好以外,volatile其实还有一个很好的附加功能,那就是禁止指令重排。 我们先来举一个例子,看一下如果只使用synchronized而不使用volatile会发生什么问题,就拿我们比较熟悉的单例模式来看。 我们通过双重校验锁的方式实现一个单例,这里不使用volatile关键字: 以上代码,我们通过使用synchronized对Singleton.class进行加锁,可以保证同一时间只有一个线程可以执行到同步代码块中的内容,也就是说singleton = new Singleton()这个操作只会执行一次,这就是实现了一个单例。 但是,当我们在代码中使用上述单例对象的时候有可能发生空指针异常。这是一个比较诡异的情况。 我们假设Thread1 和 Thread2两个线程同时请求Singleton.getSingleton方法的时候: Step1 ,Thread1执行到第8行,开始进行对象的初始化。 Step2 ,Thread2执行到第5行,判断singleton == null。 Step3 ,Thread2经过判断发现singleton != null,所以执行第12行,返回singleton。 Step4 ,Thread2拿到singleton对象之后,开始执行后续的操作,比如调用singleton.call()。 以上过程,看上去并没有什么问题,但是,其实,在Step4,Thread2在调用singleton.call()的时候,是有可能抛出空指针异常的。 之所有会有NPE抛出,是因为在Step3,Thread2拿到的singleton对象并不是一个完整的对象。 什么叫做不完整对象,这个怎么理解呢? 我们这里来先来看一下,singleton = new Singleton();这行代码到底做了什么事情,大致过程如下: 1、虚拟机遇到new指令,到常量池定位到这个类的符号引用。 2、检查符号引用代表的类是否被加载、解析、初始化过。 3、虚拟机为对象分配内存。 4、虚拟机将分配到的内存空间都初始化为零值。 5、虚拟机对对象进行必要的设置。 6、执行方法,成员变量进行初始化。 7、将对象的引用指向这个内存区域。 我们把这个过程简化一下,简化成3个步骤: a、JVM为对象分配一块内存M b、在内存M上为对象进行初始化 c、将内存M的地址复制给singleton变量 如下图: 因为将内存的地址赋值给singleton变量是最后一步,所以Thread1在这一步骤执行之前,Thread2在对singleton==null进行判断一直都是true的,那么他会一直阻塞,直到Thread1将这一步骤执行完。 但是,问题就出在以上过程并不是一个原子操作,并且编译器可能会进行重排序,如果以上步骤被重排成: a、JVM为对象分配一块内存Mc、将内存的地址复制给singleton变量 b、在内存M上为对象进行初始化 如下图: 这样的话,Thread1会先执行内存分配,在执行变量赋值,最后执行对象的初始化,那么,也就是说,在Thread1还没有为对象进行初始化的时候,Thread2进来判断singleton==null就可能提前得到一个false,则会返回一个不完整的sigleton对象,因为他还未完成初始化操作。 这种情况一旦发生,我们拿到了一个不完整的singleton对象,当尝试使用这个对象的时候就极有可能发生NPE异常。 那么,怎么解决这个问题呢?因为指令重排导致了这个问题,那就避免指令重排就行了。所以,volatile就派上用场了,因为volatile可以避免指令重排。只要将代码改成以下代码,就可以解决这个问题: 对singleton使用volatile约束,保证他的初始化过程不会被指令重排。这样就可以保Thread2 要不然就是拿不到对象,要不然就是拿到一个完整的对象。 synchronized的有序性保证呢? 看到这里可能有朋友会问了,说到底上面问题是发生了指令重排,其实还是个有序性的问题,不是说synchronized是可以保证有序性的么,这里为什么就不行了呢? 首先,可以明确的一点是:synchronized是无法禁止指令重排和处理器优化的。那么他是如何保证的有序性呢?这就要再把有序性的概念扩展一下了。 Java程序中天然的有序性可以总结为一句话:如果在本线程内观察,所有操作都是天然有序的。如果在一个线程中观察另一个线程,所有操作都是无序的。 以上这句话也是《深入理解Java虚拟机》中的原句,但是怎么理解呢?周志明并没有详细的解释。这里我简单扩展一下,这其实和as-if-serial语义有关。 as-if-serial语义的意思指:不管怎么重排序,单线程程序的执行结果都不能被改变。编译器和处理器无论如何优化,都必须遵守as-if-serial语义。 这里不对as-if-serial语义详细展开了,简单说就是,as-if-serial语义保证了单线程中,不管指令怎么重排,最终的执行结果是不能被改变的。 那么,我们回到刚刚那个双重校验锁的例子,站在单线程的角度,也就是只看Thread1的话,因为编译器会遵守as-if-serial语义,所以这种优化不会有任何问题,对于这个线程的执行结果也不会有任何影响。 但是,Thread1内部的指令重排却对Thread2产生了影响。 那么,我们可以说,synchronized保证的有序性是多个线程之间的有序性,即被加锁的内容要按照顺序被多个线程执行。但是其内部的同步代码还是会发生重排序,只不过由于编译器和处理器都遵循as-if-serial语义,所以我们可以认为这些重排序在单线程内部可忽略。 总结 本文从两方面论述了volatile的重要性以及不可替代性: 一方面是因为synchronized是一种锁机制,存在阻塞问题和性能问题,而volatile并不是锁,所以不存在阻塞和性能问题。 另外一方面,因为volatile借助了内存屏障来帮助其解决可见性和有序性问题,而内存屏障的使用还为其带来了一个禁止指令重排的附件功能,所以在有些场景中是可以避免发生指令重排的问题的。 所以,在日后需要做并发控制的时候,如果不涉及到原子性的问题,可以优先考虑使用volatile关键字。 【云栖号在线课堂】每天都有产品技术专家分享!课程地址:https://yqh.aliyun.com/live 立即加入社群,与专家面对面,及时了解课程最新动态!【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK 原文发布时间:2020-05-28本文作者:码农突围本文来自:“掘金”,了解相关信息可以关注“掘金”

优秀的个人博客,低调大师

每日一博 | 那些年,面试官问你的消息队列

MQ理论介绍 一、为什么需要消息队列(MQ) 主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。大量的请求到达访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection错误,引发雪崩效应。我们使用消息队列,通过异步处理请求,从而缓解系统的压力。核心:异步处理、流量削峰、应用解耦 二、应用场景 异步处理,流量削峰,应用解耦,消息通讯四个场景 2.1、异步处理 场景1:用户注册后,需要发送注册邮件和注册短信。 串行方式:将注册信息写入 数据库 成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间 假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。 因为CPU在单位时间内处理的请求数是一定的,假设CPU在1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。 并行方式处理的请求量是10次(1000/100) 小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题? 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下: 按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回, 因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。 比串行提高了3倍,比并行提高了2倍。 场景2:订单处理,前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。 2.2、流量削峰 流量削峰也是消息队列中的常用场景,一般在 秒杀或团抢活动 中使用广泛 应用场景:秒杀活动,一般会因为流量过大,导致流量报增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。 可以控制活动的人数 可以缓解短时间内高流量压垮应用 让消息不直接到达服务器,而是先让 「消息队列」 保存这些数据,然后让下面的服务器每一次都取各自能处理的请求数再去处理,这样当请求数超过服务器最大负载时,就不至于把服务器搞挂了。 秒杀业务根据消息队列中的请求信息,再做后续处理 2.3、应用解耦 场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用系统库存接口。 传统模式的缺点: 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败 解决订单系统与库存系统耦合,如何解决? 引入消息队列后的方案: 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 库存系统:订阅下单的消息,采用pub/sub(发布/订阅)的方式,获取下单信息,库存系统根据下单信息,进行库存操作 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入信息队列就不再关心其他后续操作了。实现订单系统与库存系统的应用解耦。 2.4、日志处理 日志处理是将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下 日志采集客户端,负载日志数据采集,定时写入Kafka队列 Kafka消息队列,负载日志数据的接收,储存和转发 日志处理应用:订阅并消费Kafka队列中的日志数据 (1) Kafka:接收用户日志的消息队列 (2) Logstash:做日志解析,统一成JSON 输出给Elasticsearch (3)Kibana:基于Elasticsearch 的数据可视化组件,超强的数据可视化能力是众多公司选择Elkstack的重要原因 2.5、消息通讯 消息通讯是指,消息队列一般都内置了高效的通信机制就,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。 点对点通讯: 客户端A和客户端B使用同一队列,进行消息通讯。 聊天室通讯: 客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。以上实际是消息队列的两种消息模式,点对点或发布订阅模式。 3、衡量指标 我们从服务性能、数据存储、集群结构三个方面去对比,选择适合自己项目的消息中间件 特性 ActiveMQ RabbitMQ RocketMQ Kafka 单机吞吐量 万级 万级 十万级 十万级 topic 数量对吞吐量的影响 - - topic 可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 topic 从几十个到几百个的时候,吞吐量会大幅度下降 时效性 毫秒级 微妙级 毫秒级 毫秒级 可用性 高 高 非常高,分布式架构 非常高,分布式架构 消息可靠性 有较低的概率丢失数据 - 经过参数优化配置,可以做到 0 丢失 经过参数优化配置,消息可以做到 0 丢失 功能支持 完善 并发能力很强,性能极其好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 优劣势总结 非常成熟,功能强大;偶尔会有较低概率丢失消息;社区不活跃了 性能极其好,延时很低;功能完善;提供管理界面;社区比较活跃;吞吐量较低;使用 erlang 开发源码阅读不方便; 接口简单易用;吞吐量高;分布式扩展方便;社区还算活跃;经过双 11 的考验 MQ 功能比较少;吞吐量高;分布式架构;可能存在消息重复消费问题;主要适用大数据实时计算以及日志收集; 4、AvctiveMQ和RabbitMQ 4.1、ActiveMQ 特点: 早期主流的消息中间件,包括ZeroMQ,但是这几年使用的很少了,主要在长期维护的项目中使用。API丰富,本身很成熟但是在高并发、大数据 环境下的性能不够出色,主要试用于中小型项目,有较低的概率丢失数据,最主要是的,官方现在维护的频率一直在降低,好几个月才发布一个版本。 架构: 1、Master-Slave模式,通过Zookeeper实现节点切换和故障转移 2、NetWork模式,相当于多套Master-Slave模式,通过NetWork网关进行集成 4.2、RabbitMQ RabbitMQ 是使用 Erlang 语言开发的开源消息队列系统。基于AMQP协议来实现的。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布与订阅)、可靠性、安全。 AMQP协议:更多的用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次场景。 好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Nacos

Nacos

Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service 的首字母简称,一个易于构建 AI Agent 应用的动态服务发现、配置管理和AI智能体管理平台。Nacos 致力于帮助您发现、配置和管理微服务及AI智能体应用。Nacos 提供了一组简单易用的特性集,帮助您快速实现动态服务发现、服务配置、服务元数据、流量管理。Nacos 帮助您更敏捷和容易地构建、交付和管理微服务平台。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。