DelayQueue源码分析

DelayQueue是一个阻塞队列,其实现了BlockingQueue接口。

public class DelayQueue extends AbstractQueue
implements BlockingQueue {}

添加到延迟队列中的元素必须实现Delayed接口,该接口有2个方法:

// 获取元素的延误时间
long getDelay(TimeUnit unit);// 元素比较
public int compareTo(T o);
  • 为了将最早过期的元素放置在队列头部,DelayQueue基于PriorityQueue优先队列来维护队列中的元素;
  • DelayQueue基于ReentrantLock的Condition实现了阻塞。

入队方法

offer方法将元素插入到队列中,并返回是否成功插入。

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {// 执行优先队列的offer方法,将元素插入q.offer(e);// 如果优先队列的首元素为该元素if (q.peek() == e) {// 将leader置为null,代表可以获取该元素leader = null;// 通知工作线程来获取该元素available.signal();}// 返回truereturn true;} finally {lock.unlock();}
}

put方法为阻塞队列(BlockingQueue)的方法,其直接调用的offer方法:

public void put(E e) {offer(e);
}

offer(E e, long timeout, TimeUnit unit)也属于阻塞队列(BlockingQueue)的方法,入参需要额外指定超时时间timeout,但仍然直接调用的offer方法,timeout未发挥作用。

public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);
}

由于底层存储元素的PriorityQueue优先队列相当于是"无界"队列(最多Integer.MAX_VALUE-8个元素),所以添加元素的阻塞与非阻塞实现是一致的。

出队方法

poll()为非阻塞方法,若首元素的延迟时间未到,则直接返回null:

public E poll() {final ReentrantLock lock = this.lock;// 加锁lock.lock();try {// 获取优先队列的首元素E first = q.peek();// 如果首元素为null或者首元素的延迟时间>0,则直接返回null// 如果首元素不为null,且首元素的延迟时间<=0,则执行q.poll(),返回优先队列的首元素return (first == null || first.getDelay(NANOSECONDS) > 0)? null: q.poll();} finally {// 释放锁lock.unlock();}
}

而poll(long timeout, TimeUnit unit)则是阻塞方法,timeout为超时时间,其会在timeout时间内反复尝试获取优先队列的首元素。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();// 若优先队列首元素为nullif (first == null) {// 超时时间nanos<=0,代表已超时,直接返回nullif (nanos <= 0L)return null;// 否则阻塞等待nanos纳秒elsenanos = available.awaitNanos(nanos);// 若预先队列首元素不为null} else {// 获取首元素的延迟时间long delay = first.getDelay(NANOSECONDS);// 如果延迟时间已到,代表任务已到期,则不管timeout是否已经<=0,均直接执行q.poll()if (delay <= 0L)return q.poll();// 任务未到期,但timeout已经<=0,直接返回nullif (nanos <= 0L)return null;first = null; // don't retain ref while waiting// 若nanos=delay且leader为null// 则将当前线程设置为leader,并阻塞等待delay纳秒Thread thisThread = Thread.currentThread();leader = thisThread;try {long timeLeft = available.awaitNanos(delay);// 被唤醒之后,更新nanos为nanos-(delay - timeLeft),继续执行新一轮的for(;;)nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

take()是阻塞方法,若首元素的延迟时间未到,则持续阻塞:

public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 添加可中断锁lock.lockInterruptibly();try {for (;;) {E first = q.peek();// 若优先队列为空,代表无可消费的元素,则消费线程执行awit()方法阻塞等待if (first == null)available.await();// 若优先队列不为空else {// 获取优先队列首元素的过期时间long delay = first.getDelay(NANOSECONDS);// 若已过期,则直接返回优先队列的首元素if (delay <= 0L)return q.poll();first = null; // don't retain ref while waiting// 若leader不为空,则证明已有线程在拉取首元素,当前线程阻塞if (leader != null)available.await();else {// 若leader不为空,则将leader设置为自身线程,然后await delay// 延迟时间到达之后,会重新进入到for(;;) 此时获取到first的delay<=0,执行q.poll()返回首元素Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// 如果leader已置为null且优先队列q中首元素不为null,则唤醒等待的线程,并释放锁if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}

可以看到。take()阻塞方法是若队列为空,则持续await阻塞挂起,直到被队列中添加元素的线程signal唤醒;


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部