死磕 java集合之DelayQueue源码分析
问题
(1)DelayQueue是阻塞队列吗?
(2)DelayQueue的实现方式?
(3)DelayQueue主要用于什么场景?
简介
DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。
继承体系
从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。
另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口。
那么,Delayed是什么呢?
public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}
 
Delayed是一个继承自Comparable的接口,并且定义了一个getDelay()方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。
源码分析
主要属性
// 用于控制并发的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素
private final Condition available = lock.newCondition();
 
从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
因为优先级队列是无界的,所以这里只需要一个条件就可以了。
还记得优先级队列吗?点击链接直达【死磕 java集合之PriorityQueue源码分析】
主要构造方法
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}
 
构造方法比较简单,一个默认构造方法,一个初始化添加集合c中所有元素的构造方法。
入队
因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
public boolean add(E e) {
    return offer(e);
}
public void put(E e) {
    offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}
 
入队方法比较简单:
(1)加锁;
(2)添加元素到优先级队列中;
(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
(4)解锁;
出队
因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。
我们这里主要分析两个,poll()和take()方法。
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
 
poll()方法比较简单:
(1)加锁;
(2)检查第一个元素,如果为空或者还没到期,就返回null;
(3)如果第一个元素到期了就调用优先级队列的poll()弹出第一个元素;
(4)解锁。
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 堆顶元素
            E first = q.peek();
            // 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
            if (first == null)
                available.await();
            else {
                // 堆顶元素的到期时间
                long delay = first.getDelay(NANOSECONDS);
                // 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
                if (delay <= 0)
                    return q.poll();
                
                // 如果delay大于0 ,则下面要阻塞了
                
                // 将first置为空方便gc,因为有可能其它元素弹出了这个元素
                // 这里还持有着引用不会被清理
                first = null; // don't retain ref while waiting
                // 如果前面有其它线程在等待,直接进入等待
                if (leader != null)
                    available.await();
                else {
                    // 如果leader为null,把当前线程赋值给它
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 等待delay时间后自动醒过来
                        // 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
                        // 这里即使醒过来后也不一定能获取到元素
                        // 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
                        // 条件锁的唤醒分成两步,先从Condition的队列里出队
                        // 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒
                        // 关于AQS我们后面会讲的^^
                        available.awaitNanos(delay);
                    } finally {
                        // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
        if (leader == null && q.peek() != null)
            // signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒
            available.signal();
        // 解锁,这才是真正的唤醒
        lock.unlock();
    }
}
 
take()方法稍微要复杂一些:
(1)加锁;
(2)判断堆顶元素是否为空,为空的话直接阻塞等待;
(3)判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
(4)没到期,再判断前面是否有其它线程在等待,有则直接等待;
(5)前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
(6)获取到元素之后再唤醒下一个等待的线程;
(7)解锁;
使用方法
说了那么多,是不是还是不知道怎么用呢?那怎么能行,请看下面的案例:
public class DelayQueueTest {
    public static void main(String[] args) {
        DelayQueue<Message> queue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        // 启动一个线程从队列中取元素
        new Thread(()->{
            while (true) {
                try {
                    // 将依次打印1000,2000,5000,7000,8000
                    System.out.println(queue.take().deadline - now);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        // 添加5个元素到队列中
        queue.add(new Message(now + 5000));
        queue.add(new Message(now + 8000));
        queue.add(new Message(now + 2000));
        queue.add(new Message(now + 1000));
        queue.add(new Message(now + 7000));
    }
}
class Message implements Delayed {
    long deadline;
    public Message(long deadline) {
        this.deadline = deadline;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return deadline - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
    @Override
    public String toString() {
        return String.valueOf(deadline);
    }
}
 
是不是很简单,越早到期的元素越先出队。
总结
(1)DelayQueue是阻塞队列;
(2)DelayQueue内部存储结构使用优先级队列;
(3)DelayQueue使用重入锁和条件来控制并发安全;
(4)DelayQueue常用于定时任务;
彩蛋
java中的线程池实现定时任务是直接用的DelayQueue吗?
当然不是,ScheduledThreadPoolExecutor中使用的是它自己定义的内部类DelayedWorkQueue,其实里面的实现逻辑基本都是一样的,只不过DelayedWorkQueue里面没有使用现成的PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。
欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。
关注公众号
					低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 
							
								
								    上一篇
								    
								
								docker CE on Linux示例浅析(二)数据存储与持久化
概述 github项目地址:https://github.com/superwujc 尊重原创,欢迎转载,注明出处:https://my.oschina.net/superwjc/blog/3043665 历史系列: docker CE on Linux示例浅析(一)安装与基本运行 docker对容器采用分层式(layer)管理,运行中的容器包括若干位于底部的镜像层与一个位于顶部的容器层。镜像层在构建完成时确定,属性为只读,内容始终保持不变;容器层在容器启动时添加,属性为读写,内容随运行而变化。默认情况下,容器中发生的所有文件变更仅影响当前容器自身的可写层,且可写层中的数据会随容器的移除而永久丢失。 docker提供了以下3种方式,将宿主机上的文件/目录挂载到容器,而容器则将需要保留的数据写入到挂载点而非可写层,以实现容器数据的持久化与共享功能: 数据卷(data volume) 绑定挂载(bind mount) Linux特有的临时文件系统(tmpfs) 这3种挂载类型都在容器启动时指定,对应的命令为docker run。在早期的docker版本中,数据卷与绑定挂载由-v/--...
 - 
							
								
								    下一篇
								    
								
								基于Spring MVC框架的Http流程分析
一、问题提出 我们可以方便的利用Spring MVC进行业务开发,请求的大部分工作都被框架和容器封装,使得我们只需要做很少量的工作。但是整个http请求流程是怎么样的?Spring MVC框架在其中起到什么作用?它是怎么和Web容器进行交互的?Controller中的一个方法怎么被暴露出来提供http请求服务的?本着这些想法,我们对整个http请求过程进行讨索。全文以spring-mvc-demo为例 二、整体处理流程概述 整个过程包括三部分:应用启动、请求路由与处理、请求返回。 应用启动:web容器初始化(context建立等)、应用初始化(初始化handlerMap)。 请求路由与处理:请求路由(根据url找到Context、根据context找到dispatcherServlet、根据url找到handler、根据url找到handler的方法)、method反射调用获取ModelAndView。 请求返回:逻辑视图到物理视图的转换、物理视图的渲染、视图返回。 具体流程如下: 系统启动: 1、web容器自己去将contextPath、docBase设置到一个context里面,这...
 
相关文章
文章评论
共有0条评论来说两句吧...

			

				
				
				
				
				
				
				
微信收款码
支付宝收款码