jdk11源码--LinkedBlockingQueue源码分析
概述
上一篇介绍了jdk11源码--ArrayBlockingQueue源码分析,接下来看一下LinkedBlockingQueue的实现。
这两个阻塞队列最大的区别就是底层元素存储实现不同,ArrayBlockingQueue是基于==数组==,而LinkedBlockingQueue是基于==单向链表==。
LinkedBlockingQueue类图如下:
LinkedBlockingQueue也是==FIFO==先进先出队列,其实现是 ==双锁队列two lock queue
== 算法的变体,它内部维护了一个takeLock和一个putLock,也可以理解为读写锁的一种实现方式。
==思想:锁分离,提高性能。==
下面结合源码具体分析。
构造方法
网上有的说LinkedBlockingQueue是无界队列,其实不太准确,具体看下面的源码。
下面两个构造方法,一个是有参数的,设置容量大小。这是有界队列。
一个是无参数的构造方法,其内部实现是设置容量大小为Integer.MAX_VALUE的队列,这就是网络上说的无界队列,其实还是有界的,只不过比较大,是Integer.MAX_VALUE。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//初始化一个头结点 }
链表节点数据结构
上面说了,LinkedBlockingQueue是单向链表,那么我们看一下他的数据结构,只有一个next指针,典型的单向链表结构:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
关键属性
/** 队列容量 */ private final int capacity; /** 当前队列的元素数量 */ private final AtomicInteger count = new AtomicInteger(); /** * 链表的头结点 * 注意:head.item 永远是 null */ transient Node<E> head; /** * 链表的尾结点 * 注意: last.next 永远是 null */ private transient Node<E> last; /** 读锁: take, poll方法使用 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 读锁的condition */ private final Condition notEmpty = takeLock.newCondition(); /** 写锁: put, offer 方法使用 */ private final ReentrantLock putLock = new ReentrantLock(); /** 写锁的condition */ private final Condition notFull = putLock.newCondition();
count :由于LinkedBlockingQueue使用了两个锁,为了支持线程安全,所以使用原子性的AtomicInteger 来统计队列元素的个数。注意这里count的计数不包含head节点。
put
添加元素至队尾
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//加写锁,支持中断 try { //如果队列满了,则当前线程阻塞:添加到condition队列中。 //这里count是线程安全的 while (count.get() == capacity) { notFull.await(); } enqueue(node);//队列不满,写入队尾 c = count.getAndIncrement(); if (c + 1 < capacity)//元素入队后,还有剩余空间,就唤醒notFull这个condition队列的第一个线程 notFull.signal(); } finally { putLock.unlock(); } if (c == 0)//注意这里c是添加元素前的队列长度(getAndIncrement方法先返回值,再将其加1), signalNotEmpty();//原队列长度为0,空的,当前put方法添加了一个元素入队,立马唤醒等待在notEmpty condition的 线程来取数据 } private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal();//notEmpty是读锁takeLock 的condition队列 } finally { takeLock.unlock(); } } //添加元素至队尾 //enqueue是在写锁内部的,是线程安全的 private void enqueue(Node<E> node) { last = last.next = node; }
这里来看一下if (c + 1 < capacity) notFull.signal();
这行代码,为什么这么写。
原因肯定离不开多线程以及condition的原理。c + 1 < capacity
是来判断是否还有剩余空间,当有剩余空间的时候,就唤醒notFull这个condition队列中等待的第一个节点。首先因为是多线程的,所以可能会有多个线程阻塞在notfull上。可能有人会问,前面不是加了putLock的锁吗,这里只能有一个线程进入啊?提出这个疑问的同学请看博主文章jdk11源码--ReentrantLock之Condition源码分析。这里简单说一下:一个线程执行了notFull.await()
阻塞后,该线程添加到condition队列中,释放锁,其他线程就可以继续获取锁执行了。
当然,这里每次put,空间不满时,都去唤醒一下,确实有肯能会有部分(甚至是大部分)signal操作,这是使用双锁策略及condition的代价,不过这是值得的。
last = last.next = node;
再讲一下这句。其实分解一下等价于:
last.next = node; last = node;
take
从对头取元素
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly();//加锁,支持中断 try { while (count.get() == 0) {//如果队列为空,则当前线程阻塞在notEmpty condition上 notEmpty.await(); } x = dequeue();//取走队首元素 c = count.getAndDecrement();//获取旧的count数量,然后对其减1 if (c > 1) notEmpty.signal();//如果队列中还有值,那么唤醒阻塞在notEmpty condition的线程。 } finally { takeLock.unlock(); } if (c == capacity)//之前队列满的 signalNotFull();//之前队列满的,那么这里取走了一个,队列有空位子了,马上唤醒notFull condition的一个线程,让其竞争锁put数据 return x; } //返回head节点后面的第一个节点的数据 //并且将head后移一位,老head准备gc回收 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // 辅助GC (为什么不在下面将head设置为null?感觉这里next引用自己,对外也没有引用了,是可以被回收的,效果差不多) head = first; E x = first.item; first.item = null; return x; } private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
总结
两把锁,各配一个condition:
- takeLock 和 notEmpty 搭配:先获取 takeLock 锁,才能到链表中获取链表第一个节点的数据。如果此时队列为空,则当前线程加入notEmpty 这个condition队列阻塞。
- putLock 需要和 notFull 搭配:先获取 putLock 锁,才能到链表中插入(put)一个元素到链尾。如果此时队列满,则当前线程加入notFull 这个condition队列阻塞。
唤醒notFull这个condition队列上的线程有两种情况:
- put、offer每次入队后,还有剩余空间
- take、poll前队列是满的,那么take走一个后,就唤醒
唤醒notEmpty这个condition队列上的线程有两种情况:
- take、poll每次取走元素后,队列中不为空
- put、offer前队列是空的,那么put进来一个元素后,就唤醒
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
jdk11源码--ArrayBlockingQueue源码分析
概述 上一篇文章jdk11源码--ReentrantLock之Condition源码分析中分析了ReentrantLock和Condition的源码,那么接下来看一下Condition在JDK中的具体应用。 ArrayBlockingQueue底层就是使用Condition来实现的。 BlockingQueue BlockingQueue 阻塞队列,该类是一个接口,平时我们熟知的ArrayBlockingQueue,LinkedBlockingQueue等都是该接口的实现。BlockingQueue 之所以说是阻塞的,是因为他可以在队列为空的时候,获取元素的线程会阻塞,直到有新的元素添加进来。当队列满时,添加元素的线程会阻塞,直到有线程从队列中取走了元素。这也是注明的==生产者消费者==问题。当有面试官问你==生产者消费者==问题时,直接
- 下一篇
jdk11源码--SynchronousQueue源码分析
概述 SynchronousQueue是一个同步阻塞队列,每一个 put操作都必须等待一个take操作。每一个take操作也必须等待一个put操作。SynchronousQueue是没有容量的,无法存储元素节点信息,不能通过peek方法获取元素,peek方法会直接返回null。由于没有元素,所以不能被迭代,它的iterator方法会返回一个空的迭代器Collections.emptyIterator();。 SynchronousQueue比较适合线程通信、传递信息、状态切换等应用场景,一个线程必须等待另一个线程传递某些信息给他才可以继续执行。 SynchronousQueue这个队列不常用,但是线程池中有用到该队列,所以也分析一下。Executors.newCachedThreadPool()方法中使用到了SynchronousQue
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS8编译安装MySQL8.0.19
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS7设置SWAP分区,小内存服务器的救世主
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- CentOS8安装MyCat,轻松搞定数据库的读写分离、垂直分库、水平分库
- CentOS7,CentOS8安装Elasticsearch6.8.6
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- 2048小游戏-低调大师作品