BlockingLock 阻塞队列

BlockingLock 阻塞队列

薛定谔的汪

阻塞队列

BlockingQueue

BlockingQueue 是一个接口,继承自 Queue,Queue继承自 Collection 接口,

BlockingQueue插入和移除的4中处理方式:

方法/处理方式 抛出异常 返回boolean 一直阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time, unit)
检查 element(e) peek(e) 不可用 不可用

(表格摘自《Java 并发编程的艺术》)

抛出异常:

当队列满时,添加元素抛出IllegalStateException(“Queue full”)异常;

当队列为空时,从队列里获取元素会抛出NoSuchElementException 异常。

返回 boolean

当队列插入元素时,会返回元素是否插入成功,成功返回 true,反之返回 false;

当队列移除元素时,则是从队列中取出一个元素并返回,没有元素返回 null。

注意:由于移除元素成功返回被移除的元素,失败返回null,可以判断 BlockQueue 是不允许插入null 值的,否则就无法判断返回的null 值到底是队列中的元素null还是移除失败返回的null

一直阻塞

当添加元素时,如果队列满,则线程会阻塞直至成功添加元素,如果线程被中断抛出InterruptedException异常;

当移除元素时,如果队列为空,则线程会阻塞直至队列不为空,成功获取元素,如果线程被中断抛出InterruptedException异常。

超时退出

当添加元素时,如果队列满,则线程会阻塞一段时间,如果这段时间内添加成功则返回,超出这段时间后则自动返回;

当移除元素时,如果队为空,则线程会阻塞一段时间,如果这段时间内成功移除元素则返回,超出这段时间后也自动返回。

注意:如果是无界阻塞队列(容量是 Integer.MAX_VALUE),队列基本不可能出现满的情况,所以使用 put 或offer 永远不会阻塞,且 offer 永远返回true

BlockQueue 的实现类:

jdk8中它有8个实现类,分别有各自的特点和功能,能力精力时间有限,不能一一分析,就拿 ArrayBlockingQueue 来分析下阻塞队列的实现原理吧,其实原理都是相通的,比如生产者是如何知道当前队列是满了呢?消费者是如何知道当前队列有元素了呢?

ArrayBlockingQueue 实现原理

属性

1
2
3
4
5
6
7
8
9
10
11
/** 队列存放元素的数组 */
final Object[] items;

/** 下一个要移除元素的索引 */
int takeIndex;

/** 下一个要被添加元素的索引 */
int putIndex;

/** 队列中元素的数量 */
int count;

JDK 是使用等待通知模式来实现的,查看 ArrayBlockingLock 是使用了 Contition来实现:

1
2
3
4
5
/** 移除元素时,队列是否为空 */
private final Condition notEmpty;

/** 添加元素时,队列是否满 */
private final Condition notFull;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

ArrayBlockingQueue默认使用非公平ReentrantLock锁 ,ArrayBlockingQueue的插入、移除元素都基于ReentrantLock锁来实现的。

常用方法

add(E e)

add 方法在插入元素时,如果队列已满,插入失败抛出IllegalStateException("Queue full")异常

1
2
3
public boolean add(E e) {
return super.add(e);
}

add方法调用了父类AbstractQueue的add(),查看其源码:

1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}

发现 add 方法是先调用 offer(e),offer 也是插入元素。

offer(E e)

offer 在插入元素时,插入成功返回true,插入失败返回false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean offer(E e) {
//元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果当前阻塞队列满,插入失败,返回false
if (count == items.length)
return false;
else {
//插入元素
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

put(E e)

支持阻塞地插入:当队列满时,队列会阻塞队列插入元素,当队列有空余位置时才插入元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可被中断地获取锁
lock.lockInterruptibly();
try {
//只要队列满,一直自旋等待队列不满
while (count == items.length)
notFull.await();
//队列不满时才插入元素
enqueue(e);
} finally {
lock.unlock();
}
}

offer(e,time,unit)

与put(E e) 方法类似,多了超时等待的功能。

take()

支持阻塞地移除:当队列为空时,获取元素的线程会阻塞,知道队列中有元素,获得元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空,则阻塞
while (count == 0)
notEmpty.await();
//移除元素
return dequeue();
} finally {
lock.unlock();
}
}

发现put(E e)和 take()方法阻塞的实现都是用Condition的await()方法实现,查看AQS内部类ConditionObject源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

重点是LockSupport.park(this);方法:

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

发现调用UNSAFE.park(false, 0L);方法阻塞线程,该方法是一个native 方法。

总结

  1. 阻塞队列(BlockingQueue) 是一个 FIFO(先入先出)的队列,除了支持阻塞地插入、移除元素,还支持其他方式。

支持阻塞地插入:当队列满时,队列会阻塞队列插入元素,当队列有空余位置时才插入元素。

支持阻塞地移除:当队列为空时,获取元素的线程会阻塞,知道队列中有元素,获得元素。

阻塞队列的这两个特点非常重要,是学习了解它的基础。

  1. 阻塞原理的实现是使用了线程的等待通知机制,执行代码:UNSAFE.park(false, 0L);
  • Title: BlockingLock 阻塞队列
  • Author: 薛定谔的汪
  • Created at : 2018-08-16 18:01:54
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/08/16/java/BlockingQueue/
  • License: This work is licensed under CC BY-NC-SA 4.0.