首页 文章 精选 留言 我的

精选列表

搜索[java],共10000篇文章
优秀的个人博客,低调大师

Java匿名内部类与回调函数

之所以将匿名内部类和回调函数两个知识点一起写,是因为最近学习zookeeper的时候正好遇到这么一个例子。详细内容请参考:https://www.w3cschool.cn/zookeeper/zookeeper_api.html 以下是与ZooKeeper集合连接的完整代码。 public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host,5000,new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } } 匿名内部类的创建格式如下: new 父类构造器(参数列表)|实现接口() { //匿名内部类的类体部分 } 在上面的代码中,connect方法中在实例化ZooKeeper对象时用到了匿名内部类: zoo = new ZooKeeper(host,5000,new Watcher() { public void process(WatchedEvent we) { if (we.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); 这个内部类没有自己的名字,而是用到了Watcher接口,而通常情况下接口是不能用new的,但是在匿名内部类中可以这样。匿名内部类的类体是一个名为process的方法,这个方法就是用来实现Watcher接口中定义的process抽象方法的。 在这个匿名内部类中恰好又运用了回调函数(又叫回调方法)。 回调是一种常见的程序设计模式。在这种模式中,可以指出某个特定事件发生时应该采取的动作。 ZooKeeper类通过其构造函数提供connect功能。构造函数的签名如下 : ZooKeeper(String connectionString, int sessionTimeout, Watcher watcher) 在上面的类ZooKeeperConnection中,connect 方法创建一个ZooKeeper对象,连接到ZooKeeper集合,然后返回对象。 在此处使用CountDownLatch,就是为了形成一个回调函数。一开始将CountDownLatch对象connectedSignal值设为CountDownLatch(1);如果匿名内部类中的if语句不为真,这意味着下面的主线程会在一直处于等待状态,停留在connectedSignal.await();处。这个就恰好符合了回调函数的意义: 在某个特定事件发生时应该采取的动作。 如果客户端与Zookeeper没有成功建立连接(也就是if语句不为真),就不返回ZooKeeper对象zoo(在 connectedSignal.await()停留)。而一旦成功建立连接 (也就是if语句为真,执行connectedSignal.countDown()),就返回 ZooKeeper对象zoo ( connectedSignal.await()放行) 将匿名内部类改为普通类 在上述代码中可以将匿名内部类拆出来,作为一个单独类:XyzWatcher public class XyzWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { final CountDownLatch connectedSignal = new CountDownLatch(1); if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } try { connectedSignal.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }原来的类ZooKeeperConnection改为如下: public class ZooKeeperConnection { // declare zookeeper instance to access ZooKeeper ensemble private ZooKeeper zoo; //public final CountDownLatch connectedSignal = new CountDownLatch(1); // Method to connect zookeeper ensemble. XyzWatcher xyz = new XyzWatcher(); public ZooKeeper connect(String host) throws IOException,InterruptedException { zoo = new ZooKeeper(host,5000,xyz); return zoo; } // Method to disconnect from zookeeper server public void close() throws InterruptedException { zoo.close(); } }

优秀的个人博客,低调大师

Java并发编程笔记之PriorityBlockingQueue源码分析

JDK 中无界优先级队列PriorityBlockingQueue 内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的? PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。 首先看一下PriorityBlockingQueue类图结构,如下: 可以看到PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列, 状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。 我们首先看看PriorityBlockingQueue的构造函数,源码如下: private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } 如上构造函数,默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。 接下来我们主要看PriorityBlockingQueue的几个操作的源码,如下: 1.offer 操作,offer操作的作用是在队列插入一个元素,由于是无界队列,所以一直返回true,源码如下: public boolean offer(E e) { if (e == null) throw new NullPointerException(); //获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //如果当前元素个数>=队列容量,则扩容(1) while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //默认比较器为null (2) if (cmp == null) siftUpComparable(n, e, array); else //自定义比较器 (3) siftUpUsingComparator(n, e, array, cmp); //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程(9) size = n + 1; notEmpty.signal();//激活调用take()方法被阻塞的线程 } finally { //释放独占锁 lock.unlock(); } return true; } 可以看到上面代码,offer操作主流程比较简单,接下来主要关注PriorityBlockingQueue是如何进行扩容的和内部如何建立堆的,首先看扩容源码如下: private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //释放获取的锁 Object[] newArray = null; //cas成功则扩容(4) if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // 如果一开始容量很小,则扩容宽度变大 (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 可能溢出 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu, //尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5) if (newArray == null) // 如果两外一个线程正在分配,则让出 Thread.yield(); lock.lock();//(6) if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } } tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功呢? 其实这里不先释放锁也是可以的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容的时候还占用锁,那么其他线程在这个时候是不能进行出队和入队操作的, 这大大降低了并发性。所以为了提高性能,使用CAS控制只有一个线程可以进行扩容,并且在扩容前释放了锁,让其他线程可以进行入队和出队操作。 spinlock锁使用CAS控制只有一个线程可以进行扩容,CAS失败的线程会调用Thread.yield() 让出 cpu,目的是为了让扩容线程扩容后优先调用lock.lock 重新获取锁, 但是这得不到一定的保证。有可能yield的线程在扩容线程扩容完成前已经退出,并执行了代码(6)获取到了锁。如果当前数组扩容还没完毕,当前线程会再次调用tryGrow方法, 然后释放锁,这又给扩容线程获取锁提供了机会,如果这时候扩容线程还没扩容完毕,则当前线程释放锁后又调用yield方法让出CPU。可知当扩容线程进行扩容期间, 其他线程是原地自旋通过代码(1)检查当前扩容是否完毕,等扩容完毕后才退出代码(1)的循环。 当扩容线程扩容完毕后会重置自旋锁变量allocationSpinLock 为 0,这里并没有使用UNSAFE方法的CAS进行设置是因为同时只可能有一个线程获取了该锁,并且 allocationSpinLock 被修饰为了 volatile。 当扩容线程扩容完毕后会执行代码 (6) 获取锁,获取锁后复制当前 queue 里面的元素到新数组。 接下来我们看看建堆算法,源码如下: private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; //队列元素个数>0则判断插入位置,否者直接入队(7) while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key;(8) } 接下来用图来解释上面的算法过程,假设队列初始化容量为2,创建的优先级队列的泛型参数为Integer。 首先调用队列offer(2)方法,希望插入元素2到队列,插入前队列状态如下图所示: 首先执行代码(1),从上图变量值可以知道判断值为false,所以紧接着执行代码(2),由于 k=n=size=0 所以代码(7)判断结果为 false,所以会执行代码(8)直接把元素 2 入队,最后执行代码(9)设置 size 的值加 1,这时候队列的状态如下图: 然后调用队列的 offer(4) 时候,首先执行代码(1),从上图变量值可知判断为 false,所以执行代码(2),由于 k=1, 所以进入 while 循环,由于 parent=0;e=2;key=4; 默认元素比较器是使用元素的 compareTo 方法, 可知 key>e 所以执行 break 退出 siftUpComparable 中的循环; 然后把元素存到数组下标为 1 的地方,最后执行代码(9)设置 size 的值加 1,这时候队列状态为: 然后调用队列的offer(6) 时候,首先执行代码(1),从上图变量值知道这时候判断值为true,所以嗲用tryGrow进行数组扩容,由于2 < 64 所以newCap=2 + (2+2)=6; 然后创建新数组并拷贝, 然后调用siftUpComparable 方法,由于 k=2>0 进入 while 循环,由于 parent=0;e=2;key=6;key>e 所以 break 后退出 while 循环; 并把元素 6 放入数组下标为 2 的地方,最后设置 size 的值加 1,现在队列状态: 然后调用队列的 offer(1) 时候,首先执行代码(1),从上图变量值知道这次判断值为 false,所以执行代码(2),由于k=3, 所以进入 while 循环,由于parent=0;e=4;key=1; key<e,所以把元素 4 复制到数组下标为 3 的地方, 然后 k=0 退出 while 循环;然后把元素 1 存放到下标为 0 地方,现在状态: 此时此刻的二叉树堆的树形图如下: 可知堆的根元素是 1,也就是这是一个最小堆,那么当调用这个优先级队列的 poll 方法时候,会一次返回堆里面值最小的元素。 2.poll操作,poll 操作作用是获取队列内部堆树的根节点元素,如果队列为空,则返回 null。源码如下: public E poll() { final ReentrantLock lock = this.lock; lock.lock();//获取独占锁 try { return dequeue(); } finally { lock.unlock();//释放独占锁 } } 如上代码可以知道在进行出队操作过程中要先加锁,这意味着,当前线程进行出队操作的时候,其他线程不能再进行入队和出队操作,但是从前面介绍offer函数的时候,知道这时候可以有其他线程进行扩容, 接下来,我们要看一下出队操作的dequeue方法的源码如下: private E dequeue() { //队列为空,则返回null int n = size - 1; if (n < 0) return null; else { //获取队头元素(1) Object[] array = queue; E result = (E) array[0]; //获取队尾元素,并值null(2) E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null)//(3) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n;//(4) return result; } } 如上代码,如果队列为空则直接返回 null,否者执行代码(1)获取数组第一个元素作为返回值存放到变量 Result,这里要注意一下数组里面第一个元素是优先级最小或者最大的元素,出队操作就是返回这个元素。 然后代码(2)获取队列尾部元素存放到变量X,并且置空尾部节点,然后执行代码(3)插入变量X 到数组下标为 0 的位置后,重新调整堆为最大或者最小堆,然后返回。 这里重要的是看如何去掉堆的根节点后,使用剩下的节点重新调整为一个最大或者最小堆。 接下来我们看看siftDownComparable 的源码,如下: private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; int half = n >>> 1; // loop while a non-leaf while (k < half) { int child = (k << 1) + 1; // 假设左边子树最小 Object c = array[child];(5) int right = child + 1;(6) if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7) c = array[child = right]; if (key.compareTo((T) c) <= 0)(8) break; array[k] = c; k = child; } array[k] = key;(9) } } 下面我们结合图来模拟上面调整堆的算法过程,接着上节队列的状态继续讲解,上节队列元素序列为 1,2,6,4: 第一次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =4;n=3;result=1;x=4; 这时候队列状态图如下: 然后执行代码(3),调整堆后队列状态图,如下: 第二次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =3;n=2;result=2;x=6; 这时候队列状态图,如下: 然后执行代码(3)调整堆后队列状态图,如下: 第三次调用队列的 poll() 方法时候,首先执行代码(1)(2),这时候变量 size =2;n=1;result=4;x=6; 这时候队列状态图,如下: 然后执行代码(3)调整堆后队列状态图,如下: 第四次直接返回元素 6. 接下来重点说说 siftDownComparable 这个调整堆的算法: 首先说下堆调整的思路,由于队列数组第 0 个元素为树根,出队时候要被移除,这时候数组就不在是最小堆了,所以需要调整堆, 具体是要从被移除的树根的左右子树中找一个最小的值来当树根,左右子树又会看自己作为树根节点的树的左右子树里面哪个是最小值,这是一个递归的过程,直到树叶节点结束递归, 如果不明白,下面结合图形来说明,假如当前队列内容如下: 对应的二叉堆树如下: 这时候如果调用了 poll(); 那么 result=2;x=11;队列末尾的元素设置为 null 后,剩下的元素调整堆的步骤如下图: 如上图(1)树根的 leftChildVal = 4;rightChildVal = 6; 4<6; 所以 c=4; 然后 11>4 也就是 key>c;所以使用元素 4 覆盖树根节点的值,现在堆对应的树如图(2)。 然后树根的左子树树根的左右孩子节点中 leftChildVal = 8;rightChildVal = 10; 8<10; 所以 c=8; 然后发现 11>8 也就是 key>c;所以元素 8 作为树根左子树的根节点,现在树的形状如图(3), 这时候判断 k<half 为 false 就会退出循环,然后把 x=11 设置到数组下标为 3 的地方,这时候堆树如图(4),至此调整堆完毕,siftDownComparable 返回 result=2,poll 方法也返回了。 3.put操作,put 操作内部调用的 offer, 由于是无界队列,所以不需要阻塞,源码如下: public void put(E e) { offer(e); // never need to block } 4.take 操作,take 操作作用是获取队列内部堆树的根节点元素,如果队列为空则阻塞,源码如下: public E take() throws InterruptedException { //获取锁,可被中断 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列 while ( (result = dequeue()) == null) notEmpty.await();//阻塞当前线程 } finally { lock.unlock();//释放锁 } return result; } 如上代码,首先通过 lock.lockInterruptibly() 获取独占锁,这个方式获取的锁是对中断进行响应的。然后调用 dequeue 方法返回堆树根节点元素,如果队列为空,则返回 false, 然后当前线程调用 notEmpty.await() 阻塞挂起当前线程,直到有线程调用了 offer()方法(offer 方法内在添加元素成功后调用了 notEmpty.signal 方法会激活一个阻塞在 notEmpty 的条件队列里面的一个线程)。 另外这里使用 while 而不是 if 是为了避免虚假唤醒。 5.size操作,获取队列元个数,如下代码,在返回 size 前加了锁,保证在调用 size() 方法时候不会有其它线程进行入队和出队操作,另外由于 size 变量没有被修饰为 volatie,这里加锁也保证了多线程下 size 变量的内存可见性。源码如下: public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } 总结:PriorityBlockingQueue 队列内部使用二叉树堆维护元素优先级,内部使用数组作为元素存储的数据结构,这个数组是可以扩容的,当前元素个数 >= 最大容量的时候会通过算法扩容, 出队的时候始终保证出队的元素是堆树的根节点,而不是在队列里面停留时间最长的元素,默认元素优先级比较规则是使用元素的compareTo方法来做,用户可以自定义优先级的比较优先级。

优秀的个人博客,低调大师

Java并发编程笔记之ArrayBlockingQueue源码分析

一.JDK 中基于数组的阻塞队列 ArrayBlockingQueue 原理剖析,ArrayBlockingQueue 内部如何基于一把独占锁以及对应的两个条件变量实现出入队操作的线程安全? 首先我们先大概的浏览一下ArrayBlockingQueue 的内部构造,如下类图: 如类图所示,可以看到ArrayBlockingQueue 内部有个数组items 用来存放队列元素,putIndex变量标示入队元素的下标,takeIndex是出队的下标,count是用来统计队列元素个数, 从定义可以知道,这些属性并没有使用valatile修饰,这是因为访问这些变量的使用都是在锁块内被用。而加锁了,就足以保证了锁块内变量的内存可见性。 另外还有个独占锁lock 用来保证出队入队操作的原子性,这保证了同时只有一个线程可以进行入队出队操作,另外notEmpty,notFull条件变量用来进行出队入队的同步。 由于ArrayBlockingQueue 是有界队列,所以构造函数必须传入队列大小的参数。 接下来我们进入ArrayBlockingQueue的源码看,如下: public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } 如上面代码所示,默认情况下使用的是ReentrantLock提供的非公平独占锁进行入队出队操作的加锁。 接下来主要看看ArrayBlockingQueue的主要的几个操作的源码,如下: 1.offer 操作,向队列尾部插入一个元素,如果队列有空闲容量,则插入成功后返回true,如果队列已满则丢弃当前元素,然后返回false, 如果e元素为null,则抛出NullPointerException 异常,另外该方法是不阻塞的。源码如下: public boolean offer(E e) { //(1)e为null,则抛出NullPointerException异常 checkNotNull(e); //(2)获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { //(3)如果队列满则返回false if (count == items.length) return false; else { //(4)否者插入元素 enqueue(e); return true; } } finally { lock.unlock(); } } 代码(2)获取独占锁,当前线程获取到该锁后,其他入队和出队操作的线程都会被阻塞挂起后放入lock锁的AQS阻塞队列。 代码(3)如果队列满则直接返回false,否则调用enqueue方法后返回true,enqueue的源码如下: private void enqueue(E x) { //(6)元素入队 final Object[] items = this.items; items[putIndex] = x; //(7)计算下一个元素应该存放的下标 if (++putIndex == items.length) putIndex = 0; count++; //(8) notEmpty.signal(); } 可以看到上面代码首先把当前元素放入items数组,然后计算下一个元素应该存放的下标,然后递增元素个数计数器,最后激活 notEmpty 的条件队列中因为调用 poll 或者 take 操作而被阻塞的的一个线程。 这里由于在操作共享变量,比如count前加了锁,所以不存在内存不可见问题,加过锁后获取的共享变量都是从主存获取的,而不是在CPU缓存获取寄存器里面的值。 代码(5)释放锁,释放锁后会把修改的共享变量值,比如Count的值刷新回主内存中,这样其他线程通过加锁再次读取这些共享变量后就可以看到最新的值。 2.put操作,向队列尾部插入一个元素,如果队列有空闲则插入后直接返回true,如果队列已满则阻塞当前线程直到队列有空闲插入成功后返回true, 如果在阻塞的时候被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。源码如下: public void put(E e) throws InterruptedException { //(1) checkNotNull(e); final ReentrantLock lock = this.lock; //(2)获取锁(可被中断) lock.lockInterruptibly(); try { //(3)如果队列满,则把当前线程放入notFull管理的条件队列 while (count == items.length) notFull.await(); //(4)插入元素 enqueue(e); } finally { //(5) lock.unlock(); } } 代码(2)在获取锁的过程中当前线程被其它线程中断了,则当前线程会抛出 InterruptedException 异常而退出。 代码(3)判断如果当前队列满了,则把当前线程阻塞挂起后放入到 notFull 的条件队列,注意这里是使用了 while 而不是 if。为什么需要while呢? 这是因为考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候,notFull.await() 在某种情况下会自动返回。 如果使用if语句简单判断一下,那么虚假唤醒后会执行代码(4),元素入队,并且递增计数器,而这时候队列已经是满了的,导致队列元素个数大于了队列设置的容量,导致程序出错。 而使用使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是否是满的,如果是则再次进行等待。 代码(4)如果队列不满则插入当前元素。 3.poll操作,从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。源码如下: public E poll() { //(1)获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { //(2)当前队列为空则返回null,否者调用dequeue()获取 return (count == 0) ? null : dequeue(); } finally { //(3)释放锁 lock.unlock(); } } 代码(1)获取独占锁 代码(2)如果队列为空则返回 null,否者调用 dequeue() 方法,dequeue 源码如下: private E dequeue() { final Object[] items = this.items; //(4)获取元素值 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //(5)数组中值值为null; items[takeIndex] = null; //(6)队头指针计算,队列元素个数减一 if (++takeIndex == items.length) takeIndex = 0; count--; //(7)发送信号激活notFull条件队列里面的一个线程 notFull.signal(); return x; } 如上代码,可以看到首先获取当前队头元素保存到局部变量,然后重置队头元素为null,并重新设置队头下标,元素计数器递减,最后发送信号激活notFull 的条件队列里面一个因为调用 put 或者 offer 而被阻塞的线程。 4.take 操作,获取当前队列头部元素,并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程知道队列不为空,然后返回元素, 如果如果在阻塞的时候被其它线程设置了中断标志,则被阻塞线程会抛出 InterruptedException 异常而返回。源码如下: public E take() throws InterruptedException { //(1)获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //(2)队列为空,则等待,直到队列有元素 while (count == 0) notEmpty.await(); //(3)获取队头元素 return dequeue(); } finally { //(4) 释放锁 lock.unlock(); } } 可以看到take操作的代码也比较简单,与poll相比,只是步骤(2)如果队列为空,则把当前线程挂起后放入到notEmpty的条件队列,等其他线程调用notEmpty.signal() 方法后在返回, 需要注意的是这里也是使用 while 循环进行检测并等待而不是使用 if。之所以这样做,道理都是一样。这里就不在解释了。 5.peek 操作获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。源码如下: public E peek() { //(1)获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { //(2) return itemAt(takeIndex); } finally { //(3) lock.unlock(); } } @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; } peek的实现更加简单,首先获取独占锁,然后从数组items 中获取当前队头下标的值并返回,在返回之前释放了获取的锁。 6.size 操作,获取当前队列元素个数。源码如下: public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } } size 操作是简单的,获取锁后直接返回 count,并在返回前释放锁。也许你会有疑问这里有没有修改Count的值,只是简单的获取下,为何要加锁呢? 答案很简单,如果count声明为volatile,这里就不需要加锁了,因为因为 volatile 类型变量保证了内存的可见性,而 ArrayBlockingQueue 的设计中 count 并没有声明为 volatile, 这是因为count的操作都是在获取锁后进行的,而获取锁的语义之一就是获取锁后访问的变量都是从主内存获取的,这就保证了变量的内存可见性。 最后用一张图来加深对ArrayBlockingQueue的理解,如下图: 总结:ArrayBlockingQueue 通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加 synchronized 的意味。ArrayBlockingQueue 的 size 操作的结果是精确的,因为计算前加了全局锁。 二、Logback 框架中异步日志打印中 ArrayBlockingQueue 的使用 在高并发并且响应时间要求比较小的系统中同步打日志已经满足不了需求了,这是因为打日志本身是需要同步写磁盘的,会造成 响应时间 增加,如下图同步日志打印模型为: 异步模型是业务线程把要打印的日志任务写入一个队列后直接返回,然后使用一个线程专门负责从队列中获取日志任务写入磁盘,其模型具体如下图: 如图可知其实 logback 的异步日志模型是一个多生产者单消费者模型,通过使用队列把同步日志打印转换为了异步,业务线程调用异步 appender 只需要把日志任务放入日志队列,日志线程则负责使用同步的 appender 进行具体的日志打印到磁盘。 接下来看看异步日志打印具体实现,要把同步日志打印改为异步需要修改 logback 的 xml 配置文件如下: <appender name="PROJECT" class="ch.qos.logback.core.FileAppender"> <file>project.log</file> <encoding>UTF-8</encoding> <append>true</append> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- daily rollover --> <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern> <!-- keep 7 days' worth of history --> <maxHistory>7</maxHistory> </rollingPolicy> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern> <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer}, ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n %-5level %logger{35} - %m%n]]> </pattern> </layout> </appender> <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender"> <discardingThreshold>0</discardingThreshold> <queueSize>1024</queueSize> <neverBlock>true</neverBlock> <appender-ref ref="PROJECT" /> </appender> <logger name="PROJECT_LOGGER" additivity="false"> <level value="WARN" /> <appender-ref ref="asyncProject" /> </logger> 可知 AsyncAppender 是实现异步日志的关键,下节主要讲这个的内部实现。 异步原理实现 首先从 AsyncAppender 的类图结构来从全局了解下 AsyncAppender 中组件构成,类图如下所示: 从上面的类图我们可以知道如下两点: 1.如上图可知 AsyncAppender 继承自 AsyncAppenderBase,其中后者具体实现了异步日志模型的主要功能,前者只是重写了其中的一些方法。另外从上类图可知 logback 中的异步日志队列是一个阻塞队列, 后面会知道其实是一个有界阻塞队列 ArrayBlockingQueue, 其中 queueSize 是有界队列的元素个数默认为 256。 2.worker 是个线程,也就是异步日志打印模型中的单消费者线程,aai 是一个 appender 的装饰器里面存放同步日志的 appender, 其中 appenderCount 记录 aai 里面附加的同步 appender 的个数;neverBlock 是当日志队列满了的时候是否阻塞打日志的线程的一个开关;discardingThreshold 是一个阈值,当日志队列里面空闲个数小于该值时候新来的某些级别的日志会被直接丢弃,下面会具体讲到。 首先我们来看下何时创建的日志队列以及何时启动的消费线程,这需要看下 AsyncAppenderBase 的 start 方法,该方法是在解析完毕配置 AsyncAppenderBase 的 xml 的节点元素后被调用,源码如下: public void start() { ... //(1)日志队列为有界阻塞队列 blockingQueue = new ArrayBlockingQueue<E>(queueSize); //(2)如果没设置discardingThreshold则设置为队列大小的1/5 if (discardingThreshold == UNDEFINED) discardingThreshold = queueSize / 5; //(3)设置消费线程为守护线程,并设置日志名称 worker.setDaemon(true); worker.setName("AsyncAppender-Worker-" + worker.getName()); //(4)设置启动消费线程 super.start(); worker.start(); } 从上代码可知如下两点: 1. logback 使用的队列是有界队列 ArrayBlockingQueue,之所以使用有界队列是考虑到内存溢出问题,在高并发下写日志的 qps 会很高如果设置为无界队列队列本身会占用很大内存,很可能会造成 内存溢出。 2.这里消费日志队列的 worker 线程被设置为了守护线程,意味着当主线程运行结束并且当前没有用户线程时候该 worker 线程会随着 JVM 的退出而终止,而不管日志队列里面是否还有日志任务未被处理。另外这里设置了线程的名称是个很好的习惯,因为这在查找问题的时候很有帮助,根据线程名字就可以定位到是哪个线程。 既然是有界队列那么肯定需要考虑如果队列满了,该如何处置,是丢弃老的日志任务,还是阻塞日志打印线程直到队列有空余元素那? 要回答这个问题,我们需要看看具体进行日志打印的AsyncAppenderBase 的 append 方法,源码如下: protected void append(E eventObject) { //(5)调用AsyncAppender重写的isDiscardable if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } ... //(6)放入日志任务到队列 put(eventObject); } private boolean isQueueBelowDiscardingThreshold() { return (blockingQueue.remainingCapacity() < discardingThreshold); } 其中 (5) 调用了 AsyncAppender 重写的 isDiscardable,源码如下: //(7) protected boolean isDiscardable(ILoggingEvent event) { Level level = event.getLevel(); return level.toInt() <= Level.INFO_INT; } 结合 代码(5)和代码(7) 可知如果当前日志的级别小于 INFO_INT 级别并且当前队列的剩余容量小于 discardingThreshold 时候会直接丢弃这些日志任务。 接下来看具体步骤 (6) 的 put 方法,源码如下: private void put(E eventObject) { //(8) if (neverBlock) { blockingQueue.offer(eventObject); } else { try {//(9) blockingQueue.put(eventObject); } catch (InterruptedException e) { // Interruption of current thread when in doAppend method should not be consumed // by AsyncAppender Thread.currentThread().interrupt(); } } } 可知如果 neverBlock 设置为了 false(默认为 false)则会调用阻塞队列的 put 方法,而 put 是阻塞的,也就是说如果当前队列满了,如果在企图调用 put 方法向队列放入一个元素则调用线程会被阻塞直到队列有空余空间。这里有必要提下其中第 (9) 步当日志队列满了的时候 put 方法会调用 await() 方法阻塞当前线程,如果其它线程中断了该线程,那么该线程会抛出 InterruptedException 异常,那么当前的日志任务就会被丢弃了。如果 neverBlock 设置为了 true 则会调用阻塞队列的 offer 方法,而该方法是非阻塞的,如果当前队列满了,则会直接返回,也就是丢弃当前日志任务。 最后看下 addAppender 方法内是什么的,源码如下: public void addAppender(Appender<E> newAppender) { if (appenderCount == 0) { appenderCount++; ... aai.addAppender(newAppender); } else { addWarn("One and only one appender may be attached to AsyncAppender."); addWarn("Ignoring additional appender named [" + newAppender.getName() + "]"); } } 如上代码可知一个异步 appender 只能绑定一个同步 appender, 这个 appender 会被放到 AppenderAttachableImpl 的 appenderList 列表里面。 到这里我们已经分析完了日志生产线程放入日志任务到日志队列的实现,下面一起来看下消费线程是如何从队列里面消费日志任务并写入磁盘的,由于消费线程是一个线程,那就从 worker 的 run 方法看起,源码如下所示: class Worker extends Thread { public void run() { AsyncAppenderBase<E> parent = AsyncAppenderBase.this; AppenderAttachableImpl<E> aai = parent.aai; //(10)一直循环直到该线程被中断 while (parent.isStarted()) { try {//(11)从阻塞队列获取元素 E e = parent.blockingQueue.take(); aai.appendLoopOnAppenders(e); } catch (InterruptedException ie) { break; } } //(12)到这里说明该线程被中断,则吧队列里面的剩余日志任务 //刷新到磁盘 for (E e : parent.blockingQueue) { aai.appendLoopOnAppenders(e); parent.blockingQueue.remove(e); } ... .. } } 其中(11)从日志队列使用 take 方法获取一个日志任务,如果当前队列为空则当前线程会阻塞到 take 方法直到队列不为空才返回,获取到日志任务后会调用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,该方法会循环调用通过 addAppender 注入的同步日志 appener 具体实现日志打印到磁盘的任务。

优秀的个人博客,低调大师

Java并发编程笔记之LinkedBlockingQueue源码探究

LinkedBlockingQueue的实现是使用独占锁实现的阻塞队列。首先看一下LinkedBlockingQueue的类图结构,如下图所示: 如类图所示:LinkedBlockingQueue是使用单向链表实现,有两个Node分别来存放首尾节点,并且里面有个初始值为0 的原子变量count,它用来记录队列元素个数。 另外里面有两个ReentrantLock的实例,分别用来控制元素入队和出队的原子性,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待, putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty 和 notFull 是信号量,内部分别有一个条件队列用来存放进队和出队的时候被阻塞的线程, 说白了,这其实就是一个生产者 - 消费者模型。 我们首先看一下独占锁的源码,如下所示: /** 执行take, poll等操作时候需要获取该锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 当队列为空时候执行出队操作(比如take)的线程会被放入这个条件队列进行等待 */ private final Condition notEmpty = takeLock.newCondition(); /** 执行put, offer等操作时候需要获取该锁*/ private final ReentrantLock putLock = new ReentrantLock(); /**当队列满时候执行进队操作(比如put)的线程会被放入这个条件队列进行等待 */ private final Condition notFull = putLock.newCondition(); /** 当前队列元素个数 */ private final AtomicInteger count = new AtomicInteger(0); 接着我们要进入LinkedBlockingQueue 无参构造函数,源码如下: public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //初始化首尾节点,指向哨兵节点 last = head = new Node<E>(null); } 从源码中可以看到,默认队列的容量为0x7fffffff; 用户也可以自己指定容量,所以一定程度上 LinkedBlockingQueue 可以说是有界阻塞队列。 接下来我们主要看LinkedBlockingQueue 的几个主要方法的源码,如下: 1.offer操作,向队列尾部插入一个元素,如果队列有空闲容量则插入成功后返回true,如果队列已满则丢弃当前元素然后返回false,如果 e元素为null,则抛出空指针异常(NullPointerException),还有一点就是,该方法是非阻塞的。源码如下: public boolean offer(E e) { //(1)空元素抛空指针异常 if (e == null) throw new NullPointerException(); //(2) 如果当前队列满了则丢弃将要放入的元素,然后返回false final AtomicInteger count = this.count; if (count.get() == capacity) return false; //(3) 构造新节点,获取putLock独占锁 int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //(4)如果队列不满则进队列,并递增元素计数 if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } } finally { //(6)释放锁 putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); //(8) return c >= 0; } private void enqueue(Node<E> node) { last = last.next = node; } 代码(2)判断的是如果当前队列已满则丢弃当前元素并返回false。 代码(3)获取到putLock锁,当前线程获取到该锁后,则其他调用put 和 offer 的线程将会被阻塞(阻塞的线程被放到 putLock 锁的 AQS 阻塞队列)。 代码(4)这里又重新判断了一下当前队列是否满了,这是因为在执行代码(2)和获取到putLock锁期间,有可能其他线程通过put 或者 offer方法想队列里面添加了新的元素。重新判断队列确实不满则新元素入队,并递增计数器。 代码(5)判断的是如果新元素入队后还有空闲空间,则唤醒notFull的条件队列里面因为调用了notFull 的 await 操作(比如执行put方法而队列满了的时候)而被阻塞的一个线程,因为队列现在有空闲,所以这里可以提前唤醒一个入队线程。 代码(6)则释放获取的putLock锁,这里要注意锁的释放一定要在finally里面做,因为即使try块抛出异常了,finally也是会被执行到的。另外释放锁后其他因为调用put和offer而被阻塞的线程将会有一个获取到改锁。 代码(7)c == 0说明在执行代码(6)释放锁的时候队列里面至少有一个元素,队列里面有元素则执行signalNotEmpty,signalNotEmpty的源码如下: private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } 通过上面代码可以看到其作用是激活notEmpty 的条件队列中因为调用notEmpty的await方法(比如调用 take 方法并且队列为空的时候)而被阻塞的一个线程,这里也说明了调用条件变量的方法前,要首先获取对应的锁。 offer的总结:offer方法中通过使用putLock锁保证了在队尾新增元素的原子性和队列元素个数的比较和递增操作的原子性。 2.put操作,向队列尾部插入一个元素,如果队列有空闲则插入后直接返回true,如果队列已经满则阻塞当前线程知道队列有空闲插入成功后返回true,如果在阻塞的时候被其他线程设置了中断标志, 则被阻塞线程会抛出InterruptedException 异常而返回,另外如果 e 元素为 null 则抛出 NullPointerException 异常。源码如下: public void put(E e) throws InterruptedException { //(1)空元素抛空指针异常 if (e == null) throw new NullPointerException(); //(2) 构建新节点,并获取独占锁putLock int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //(3)如果队列满则等待 while (count.get() == capacity) { notFull.await(); } //(4)进队列并递增计数 enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } finally { //(6) putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); } 代码(2)中使用 putLock.lockInterruptibly() 获取独占锁,相比 offer 方法中这个获取独占锁方法意味着可以被中断,具体说是当前线程在获取锁的过程中,如果被其它线程设置了中断标志则当前线程会抛出 InterruptedException 异常, 所以put操作在获取 锁过程中是可被中断的。 代码(3)如果当前队列已经满,则notFull 的 await() 把当前线程放入 notFull 的条件队列,当前线程被阻塞挂起并释放获取到的 putLock 锁,由于putLock锁被释放了,所以现在其他线程就有机会获取到putLock锁了。 代码(3)判断队列是否为空为何使用 while 循环而不是 if 语句呢? 这是因为考虑到当前线程被虚假唤醒的问题,也就是其它线程没有调用 notFull 的 singal 方法时候,notFull.await() 在某种情况下会自动返回。 如果使用if语句简单判断一下,那么虚假唤醒后会执行代码(4),元素入队,并且递增计数器,而这时候队列已经是满了的,导致队列元素个数大于了队列设置的容量,导致程序出错。 而使用使用 while 循环假如 notFull.await() 被虚假唤醒了,那么循环在检查一下当前队列是否是满的,如果是则再次进行等待。 3.poll操作,从队列头部获取并移除一个元素,如果队列为空则返回 null,该方法是不阻塞的。源码如下: public E poll() { //(1)队列为空则返回null final AtomicInteger count = this.count; if (count.get() == 0) return null; //(2)获取独占锁 E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { //(3)队列不空则出队并递减计数 if (count.get() > 0) {//3.1 x = dequeue();//3.2 c = count.getAndDecrement();//3.3 //(4) if (c > 1) notEmpty.signal(); } } finally { //(5) takeLock.unlock(); } //(6) if (c == capacity) signalNotFull(); //(7)返回 return x; } private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } 代码(1) 如果当前队列为空,则直接返回 null。 代码(2)获取独占锁 takeLock,当前线程获取该锁后,其它线程在调用 poll 或者 take 方法会被阻塞挂起。 代码 (3) 如果当前队列不为空则进行出队操作,然后递减计数器。 代码(4)如果 c>1 则说明当前线程移除掉队列里面的一个元素后队列不为空(c 是删除元素前队列元素个数),那么这时候就可以激活因为调用 poll 或者 take 方法而被阻塞到notEmpty 的条件队列里面的一个线程。 代码(5)释放锁,一定要在finally里面释放锁。 代码(6)说明当前线程移除队头元素前当前队列是满的,移除队头元素后队列当前至少有一个空闲位置,那么这时候就可以调用signalNotFull激活因为调用put 或者 offer 而被阻塞放到 notFull 的条件队列里的一个线程,signalNotFull源码如下: private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } } poll 代码逻辑比较简单,值得注意的是获取元素时候只操作了队列的头节点。 4.peek 操作,获取队列头部元素但是不从队列里面移除,如果队列为空则返回 null,该方法是不阻塞的。源码如下: public E peek() { //(1) if (count.get() == 0) return null; //(2) final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; //(3) if (first == null) return null; else //(4) return first.item; } finally { //(5) takeLock.unlock(); } } 可以看到代码(3)这里还是需要判断下 first 是否为 null 的,不能直接执行代码(4)。 正常情况下执行到代码(2)说明队列不为空,但是代码(1)和(2)不是原子性操作,也就是在执行代码(1)判断队列不为空后, 在代码(2)获取到锁前,有可能其他线程执行了poll 或者 take 操作导致队列变为了空,然后当前线程获取锁后,直接执行 first.item 会抛出空指针异常。 5.take 操作,获取当前队列头部元素并从队列里面移除,如果队列为空则阻塞调用线程。如果队列为空则阻塞当前线程知道队列不为空,然后返回元素,如果在阻塞的时候被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException 异常而返回。源码如下: public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; //(1)获取锁 final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //(2)当前队列为空则阻塞挂起 while (count.get() == 0) { notEmpty.await(); } //(3)出队并递减计数 x = dequeue(); c = count.getAndDecrement(); //(4) if (c > 1) notEmpty.signal(); } finally { //(5) takeLock.unlock(); } //(6) if (c == capacity) signalNotFull(); //(7) return x; } 代码(1)当前线程获取到独占锁,其他调用take 或者 poll的线程将会被阻塞挂起。 代码(2)如果队列为空则阻塞挂起当前线程,并把当前线程放入notEmpty 的条件队列。 代码(3)进行出队操作并递减计数。 代码(4)如果 c > 1 说明当前队列不为空,则唤醒notEmpty 的条件队列的条件队列里面的一个因为调用 take 或者 poll 而被阻塞的线程。 代码(5)释放锁。 代码(6)如果 c == capacity 则说明当前队列至少有一个空闲位置,则激活条件变量 notFull 的条件队列里面的一个因为调用 put 或者 offer 而被阻塞的线程。 6.remove操作,删除队列里面指定元素,有则删除返回 true,没有则返回 false,源码如下: public boolean remove(Object o) { if (o == null) return false; //(1)双重加锁 fullyLock(); try { //(2)遍历队列找则删除返回true for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //(3) if (o.equals(p.item)) { unlink(p, trail); return true; } } //(4)找不到返回false return false; } finally { //(5)解锁 fullyUnlock(); } } 代码(1)通过fullyLock获取双重锁,当前线程获取后,其他线程进行入队或者出队的操作就会被阻塞挂起。双重锁方法fullyLock的源码如下: void fullyLock() { putLock.lock(); takeLock.lock(); } 代码(2)遍历队列寻找要删除的元素,找不到则直接返回false,找到则执行unlink操作,unlink的源码如下: void unlink(Node<E> p, Node<E> trail) { p.item = null; trail.next = p.next; if (last == p) last = trail; 如果当前队列满,删除后,也不忘记唤醒等待的线程 if (count.getAndDecrement() == capacity) notFull.signal(); } 可以看到删除元素后,如果发现当前队列有空闲空间,则唤醒 notFull 的条件队列中一个因为调 用 put 或者 offer 方法而被阻塞的线程。 代码(5)调用 fullyUnlock 方法使用与加锁顺序相反的顺序释放双重锁,源码如下: void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } 7.size操作,获取当前队列元素个数。源码如下: public int size() { return count.get(); } 总结:由于在操作出队入队的时候操作Count的时候加了锁,因此相比ConcurrentLinkedQueue 的size方法比较准确。 最后用一张图来加深LinkedBlockingQueue的理解,如下图: 因此我们要思考一个问题:为何 ConcurrentLinkedQueue 中需要遍历链表来获取 size 而不适用一个原子变量呢? 这是因为使用原子变量保存队列元素个数需要保证入队出队操作和操作原子变量是原子操作,而ConcurrentLinkedQueue 是使用 CAS 无锁算法的,所以无法做到这个。

优秀的个人博客,低调大师

Java并发编程笔记之ConcurrentLinkedQueue源码探究

JDK 中基于链表的非阻塞无界队列 ConcurrentLinkedQueue 原理剖析,ConcurrentLinkedQueue 内部是如何使用 CAS 非阻塞算法来保证多线程下入队出队操作的线程安全? ConcurrentLinkedQueue是线程安全的无界非阻塞队列,其底层数据结构是使用单向链表实现,入队和出队操作是使用我们经常提到的CAS来保证线程安全的。 我们首先看一下ConcurrentLinkedQueue的类图结构先,好有一个内部逻辑有一个大概的印象,如下图所示: 可以清楚的看到ConcurrentLinkedQueue内部的队列是使用单向链表方式实现,类中两个volatile 类型的Node 节点分别用来存放队列的首位节点。 首先我们先来看一下ConcurrentLinkedQueue的构造函数,如下: public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); } 通过无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。 Node节点内部则维护一个volatile 修饰的变量item 用来存放节点的值,next用来存放链表的下一个节点,从而链接为一个单向无界链表,这就是单向无界的根本原因。如下图: 接下来看ConcurrentLinkedQueue 主要关注入队,出队,获取队列元素的方法的源码,如下所示: 1.首先看入队方法offer,offer 操作是在队列末尾添加一个元素,如果传递的参数是 null 则抛出 NPE 异常,否者由于 ConcurrentLinkedQueue 是无界队列该方法一直会返回 true。另外由于使用 CAS 无阻塞算法,该方法不会阻塞调用线程,其源码如下: public boolean offer(E e) { //(1)e为null则抛出空指针异常 checkNotNull(e); //(2)构造Node节点 final Node<E> newNode = new Node<E>(e); //(3)从尾节点进行插入 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //(4)如果q==null说明p是尾节点,则执行插入 if (q == null) { //(5)使用CAS设置p节点的next节点 if (p.casNext(null, newNode)) { //(6)cas成功,则说明新增节点已经被放入链表,然后设置当前尾节点 if (p != t) casTail(t, newNode); // Failure is OK. return true; } } else if (p == q)//(7) //多线程操作时候,由于poll操作移除元素后有可能会把head变为自引用,然后head的next变为新head,所以这里需要 //重新找新的head,因为新的head后面的节点才是正常的节点。 p = (t != (t = tail)) ? t : head; else //(8) 寻找尾节点 p = (p != t && t != (t = tail)) ? t : q; } } 类图结构时候谈到构造队列时候参构造函数创建了一个 item 为 null 的哨兵节点,并且 head 和 tail 都是指向这个节点,下面通过图形结合来讲解下 offer 操作的代码实现。 1.首先看一下,当一个线程调用offer(item)时候情况:首先代码(1)对传参判断空检查,如果为null 则抛出空指针异常,然后代码(2)则使用item作为构造函数参数创建一个新的节点, 代码(3)从队列尾部节点开始循环,目的是从队列尾部添加元素。如下图: 上图是执行代码(4)时候队列的情况,这时候节点 p , t ,head ,tail 同时指向了item为null的哨兵节点,由于哨兵节点的next节点为null,所以这里q指向也是null。 代码(4)发现q==null 则执行代码(5),通过CAS原子操作判断p 节点的next节点是否为null,如果为null 则使用节点newNode替换p 的next节点, 然后执行代码(6),由于 p == t ,所以没有设置尾部节点,然后退出offer方法,这时候队列的状态图如下: 上面讲解的是一个线程调用offer方法的情况下,如果多个线程同时调用,就会存在多个线程同时执行到代码(5),假设线程A调用offer(item1), 线程B调用offer(item2),线程 A 和线程B同时到 p.casNext(null,newNode)。而CAS的比较并设置操作是原子性的,假设线程A先执行了比较设置操作, 则发现当前P的next节点确实是null ,则会原子性更新next节点为newNode,这时候线程B 也会判断p 的next节点是否为null,结果发现不是null,(因为线程 A 已经设置了 p 的 next 为 newNode)则会跳到代码(3), 然后执行到代码(4)的时候的队列分布图如下: 根据这个状态图可知线程B会执行代码(8),然后q 赋值给了p,这个时候状态图为: 然后线程B再次跳转到代码(3)执行,当执行到代码(4)时候队列状态图为: 由于这时候q == null ,所以线程B 会执行步骤(5),通过CAS操作判断 当前p的next 节点是否为null ,不是则再次循环后尝试,是则使用newNode替换,假设CAS成功了,那么执行步骤(6), 由于 p != t 所以设置tail节点为newNode ,然后退出offer方法。这时候队列的状态图为: 到现在为止,offer代码在执行路径现在就差步骤(7)还没有执行过,其实这个要在执行poll操作才会出现的,这里先看一下执行poll操作后可能会存在的一种情况,如下图所示: 下面分析下当队列处于这种状态调用offer添加元素代码执行到代码(4)的时候的队列状态图,如下: 由于q节点不为空并且p==q 所以执行代码(7),因为 t == tail所以p 被赋值为head ,然后进入循环,循环后执行到代码(4)的时候的队列状态图,如下: 由于 q ==null,所以执行代码(5),进行CAS操作,如果当前没有其他线程执行offer操作,则CAS操作会成功,p的next节点被设置为新增节点,然后执行代码(6), 由于p != t 所以设置新节点为队列尾节点,现在队列状态图,如下: 在这里的自引用的节点会被垃圾回收掉,可见offer操作里面关键步骤是代码(5)通过原子CAS操作来进行控制同时只有一个线程可以追加元素到队列末尾,进行cas竞争失败的线程, 则会通过循环一次次尝试进行cas操作,知道cas成功才会返回,也就是通过使用无限循环里面不断进行CAS尝试方式来替代阻塞算法挂起调用线程,相比阻塞算法,这是使用CPU资源换取阻塞带来的开销。 2.poll操作,poll 操作是在队列头部获取并且移除一个元素,如果队列为空则返回 null,我们首先看改方法的源码,如下: public E poll() { //(1) goto标记 restartFromHead: //(2)无限循环 for (;;) { for (Node<E> h = head, p = h, q;;) { //(3)保存当前节点值 E item = p.item; //(4)当前节点有值则cas变为null if (item != null && p.casItem(item, null)) { //(5)cas成功标志当前节点以及从链表中移除 if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } //(6)当前队列为空则返回null else if ((q = p.next) == null) { updateHead(h, p); return null; } //(7)自引用了,则重新找新的队列头节点 else if (p == q) continue restartFromHead; else//(8) p = q; } } } final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } poll操作是从队头获取元素,所以代码(2)内层循环是从head节点开始迭代,代码(3)获取当前队头的节点,当队列一开始为空的时候队列状态为: 由于head 节点指向的item 为null 的哨兵节点,所以会执行到代码(6),假设这个过程没有线程调用offer,则此时q等于null ,如下图: 所以执行updateHead方法,由于h 等于 p所以没有设置头节点,poll方法直接返回null。 假设执行到代码(6)的时候已经有其他线程调用了offer 方法成功添加了一个元素到队列,这时候q执行的是新增元素的节点,这时候队列状态图为: 所以代码(6)判断结果为false,然后会转向代码(7)执行,而此时p不等于q,所以转向代码(8)执行,执行结果是p指向了节点q,此时的队列状态如下: 然后程序转向代码(3)执行,p现在指向的元素值不为null,则执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null, 如果此时没有其他线程进行poll操作,CAS成功则执行代码(5),由于此时 p != h ,所以设置头节点为p,poll然后返回被从队列移除的节点值item。此时队列状态为: 这个状态就是前面提到offer操作的时候,offer代码的执行路径(7)执行的前提状态。 假如现在一个线程调用了poll操作,则在执行代码(4)的时候的队列状态为: 可以看到这时候执行代码(6)返回null。 现在poll的代码还有个分支(7)还没有被执行过,那么什么时候会执行呢?假设线程A执行poll操作的时候,当前的队列状态,如下: 那么执行p.casItem(item, null)通过 CAS 操作尝试设置 p 的 item 值为 null。 假设 CAS 设置成功则标示该节点从队列中移除了,此时队列状态为: 然后由于p != h,所以会执行updateHead 方法,假如线程A执行updateHead前,另外一个线程B开始poll操作,这时候线程B的p指向head节点, 但是还没有执行到代码(6),这时候队列状态为: 然后线程A执行updateHead 操作,执行完毕后线程 A 退出,这时候队列状态为: 然后线程B继续执行代码(6)q=p.next由于该节点是自引用节点所以p==q,所以会执行代码(7)跳到外层循环restartFromHead,重新获取当前队列队头 head, 现在状态为: 总结:poll元素移除一个 元素的时候,只是简单的使用CAS操作把当前节点的item值设置为null,然后通过重新设置头节点让该元素从队列里面摘除, 被摘除的节点就成了孤立节点,这个节点会被在GC的时候会被回收掉。另外,执行分支中如果发现头节点被修改了要跳到外层循环重新获取新的头节点。 3.peek操作,peek 操作是获取队列头部一个元素(只不获取不移除),如果队列为空则返回 null,其源码如下: public E peek() { //(1) restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { //(2) E item = p.item; //(3) if (item != null || (q = p.next) == null) { updateHead(h, p); return item; } //(4) else if (p == q) continue restartFromHead; else //(5) p = q; } } } 代码结构与poll操作类似,不同于代码(3)的使用只是少了castItem 操作,其实这很正常,因为peek只是获取队列头元素值,并不清空其值, 根据前面我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次peek的时候,代码(3)中会发现item==null, 然后会执行 q = p.next, 这时候 q 节点指向的才是队列里面第一个真正的元素或者如果队列为 null 则 q 指向 null。 当队列为空的时候,队列状态图,如下: 这时候执行updateHead 由于 h 节点等于 p 节点所以不进行任何操作,然后 peek 操作会返回 null。 当队列中至少有一个元素的时候(假如只有一个),这时候队列状态为: 这时候执行代码(5)这时候 p 指向了 q 节点,然后执行代码(3)这时候队列状态为: 执行代码(3)发现 item 不为 null,则执行 updateHead 方法,由于 h!=p, 所以设置头结点,设置后队列状态为: 可以看到其实就是剔除了哨兵节点。 总结:peek操作代码与poll操作类似,只是前者只获取队列头元素,但是并不从队列里面删除,而后者获取后需要从队列里面删除,另外,在第一次调用peek操作的时候, 会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。 4.size方法,获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确。源码如下: public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // 最大返回Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; } //获取第一个队列元素(哨兵元素不算),没有则为null Node<E> first() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { boolean hasItem = (p.item != null); if (hasItem || (q = p.next) == null) { updateHead(h, p); return hasItem ? p : null; } else if (p == q) continue restartFromHead; else p = q; } } } //获取当前节点的next元素,如果是自引入节点则返回真正头节点 final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; } 5.remove方法,如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回 true,否者返回 false。源码如下: public boolean remove(Object o) { //查找元素为空,直接返回false if (o == null) return false; Node<E> pred = null; for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.item; //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。 if (item != null && o.equals(item) && p.casItem(item, null)) { //获取next元素 Node<E> next = succ(p); //如果有前驱节点,并且next不为空则链接前驱节点到next, if (pred != null && next != null) pred.casNext(p, next); return true; } pred = p; } return false; } ConcurrentLinkedQueue 底层使用单向链表数据结构来保存队列元素,每个元素被包装为了一个 Node 节点,队列是靠头尾节点来维护的,创建队列时候头尾节点指向一个 item 为 null 的哨兵节点, 第一次 peek 或者 first 时候会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以获取 size 的时候有可能进行了 offer,poll 或者 remove 操作,导致获取的元素个数不精确,所以在并发情况下 size 函数不是很有用。

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

腾讯云软件源

腾讯云软件源

为解决软件依赖安装时官方源访问速度慢的问题,腾讯云为一些软件搭建了缓存服务。您可以通过使用腾讯云软件源站来提升依赖包的安装速度。为了方便用户自由搭建服务架构,目前腾讯云软件源站支持公网访问和内网访问。

Rocky Linux

Rocky Linux

Rocky Linux(中文名:洛基)是由Gregory Kurtzer于2020年12月发起的企业级Linux发行版,作为CentOS稳定版停止维护后与RHEL(Red Hat Enterprise Linux)完全兼容的开源替代方案,由社区拥有并管理,支持x86_64、aarch64等架构。其通过重新编译RHEL源代码提供长期稳定性,采用模块化包装和SELinux安全架构,默认包含GNOME桌面环境及XFS文件系统,支持十年生命周期更新。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。

用户登录
用户注册