并发编程

并发编程原理和应用
帅旋
关注
充电
IT宅站长,技术博主,共享单车手,全网id:arthinking。

图解BlockingQueue阻塞队列

发布于 2020-03-27 | 更新于 2024-02-22

本文重点介绍各种阻塞队列的实现、对比和使用场景。阅读完本文,你将对各种阻塞队列的实现原理都有一定的了解,以及了解他们的使用场景。

1、阻塞队列 BlockingQueue

1.1、BlockingQueue的基本原理

我们先来解释一下阻塞队列:

image-20200322114947811

如上图

  • 生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

我们查阅BlockingQueue总结了以下阻塞队列的方法:

如果队列满了(插入)或者空(移除)了 抛异常 阻塞 超时 立刻返回
插入 add(E e) put(E e) offer(E e, long timeout, TimeUnit unit) offer(E e)
移除 remove(Object o) take() poll(long timeout, TimeUnit unit)

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。

以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

作为一个五年级的小学生,到这里就开始听不懂了,暗中嘚瑟,还好学的没那么快。

下图展示了主要的BlockingQueue的实现类:

image-20200323212959261

其实我们在前面的文章:ReentrantLock介绍与使用#2.4、条件变量 章节已经使用ReentrantLock的Condition实现了一个阻塞队列,底层是使用LinkedList进行存储的。

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

2、ArrayBlockingQueue

相信经过上面各个同步类的分析,大家已经对AQS比较熟悉了,下面我将不再画具体的内部结构图了。

ArrayBlockingQueue使用的数据结构是数组

Object[capacity]

容量大小有构造函数的capacity参数决定。

2.1、put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 获取ReentrantLock锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列满了,则进入条件队列进行等待
while (count == items.length)
notFull.await();
// 队列不满,或者被取数线程唤醒了,那么会继续执行
// 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程
enqueue(e);
} finally {
// 释放ReentrantLock锁
lock.unlock();
}
}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;
  2. 队列满了会阻塞进入条件队列等待;
  3. 队列不满则添加数据,并且唤醒等待时间最长的取数线程。

2.2、take方法

如果队列空了,则进入条件队列进行等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E take() throws InterruptedException {
// 获取ReentrantLock锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 如果队列空了,则进入条件队列进行等待
while (count == 0)
notEmpty.await();
// 队列不空,或者被存数线程唤醒了,那么会继续执行
// 这里会从阻塞队列取一个数据,然后唤醒等待时间最长的存数线程
return dequeue();
} finally {
// 释放ReentrantLock锁
lock.unlock();
}
}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;
  2. 队列空了会阻塞进入条件队列等待;
  3. 队列不满则取数据,并且唤醒等待时间最长的存数线程。

**注意:ArrayList中的数据取数和存数都是依次遍历一个一个取或者存,直到队尾之后,从头开始继续。**代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

如下图:

image-20200322153507868

这里put和take使用了同一个ReentrantLock,不能并发执行。

有没有办法能够做到让put和take能够并发执行呢?接下来我们就来看看LinkedBlockingQueue。

3、LinkedBlockingQueue

LinkedBlockingQueue的put方法和take方法分别使用了不同的ReentrantLock,put和take可以并发执行,但是不能并发执行put或者take操作。

LinkedBlockingQueue底层使用的数据结构是单向链表

transient Node head;

private transient Node last;

容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE

3.1、put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 使用AtomicInteger保证原子性
final AtomicInteger count = this.count;
// 获取put锁
putLock.lockInterruptibly();
try {
// 如果队列满了,则进入put条件队列等待
while (count.get() == capacity) {
notFull.await();
}
// 队列不满,或者被取数线程唤醒了,那么会继续执行
// 这里会往阻塞队列末尾添加一个数据
enqueue(node);
c = count.getAndIncrement();
// 如果队列不满,则唤醒等待时间最长的put线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放put锁
putLock.unlock();
}
// 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
if (c == 0)
signalNotEmpty();
}

3.2、take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 获取take锁
takeLock.lockInterruptibly();
try {
// 如果队列空了,则进入take条件队列等待
while (count.get() == 0) {
notEmpty.await();
}
// 获取到第一个节点,非哑节点
x = dequeue();
// 阻塞队列数量减1
c = count.getAndDecrement();
// 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放take锁
takeLock.unlock();
}
// 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
if (c == capacity)
signalNotFull();
return x;
}

take和put操作如下图所示:

image-20200322154519699

  1. 队列第一个节点为哑节点,占位用的;
  2. put操作一直往链表后面追加节点;
  3. take操作从链表头取节点;

ArrayBlockingQueue与LinkedBlockingQueue对比

队列 是否阻塞 是否有界 线程安全 适用场景
ArrayBlockingQueue 一把ReentrantLock锁 生产消费模型,平衡处理速度
LinkedBlockingQueue 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度

ArrayBlockingQueue

  • 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;

LinkedBlockingQueue

  • 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
  • 两把锁,并发性能较好;
  • 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。

4、LinkedBlockingDeque

与LinkedBlockingQueue类似,只不过底层的数据结构是双向链表,并且增加了可以从队列两端插入和移除元素的方法,支持FIFOFILO。相关方法定义:

  • putFirst(E e)
  • putLast(E e)
  • E getFirst()
  • E getLast()
  • E takeFirst()
  • E takeLast()

LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue:

  • FIFO;
  • 读写分开两个ReentrantLock;

LinkedBlockingDeque:

  • FIFO & FILO;
  • 全局一把ReentrantLock;

5、PriorityBlockingQueue

是一个无界队列

存储结构:

private transient Object[] queue;

内部会构造为一颗平衡的二叉小顶堆,根据构造函数中传入的Comparator进行排序或者没有传的情况下使用自然的排序方法,数组的第一个元素为最小的元素。

全局一把ReentrantLock锁。

5.1、put方法

**无界队列,一定可以添加成功,无需阻塞。容量不够则扩容,put完会重新构建小顶堆。**关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 尝试获取锁
lock.lock();
int n, cap;
Object[] array;
// 如果数组空间不够,尝试扩容:通常会扩大约50%
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 往小顶堆插入元素
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
// 元素个数+1
size = n + 1;
// 唤醒等待最久的取数线程
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
return true;
}

跟前面的各种阻塞队列实现思路基本一致,这里比较有意思的是数组的扩容和往小顶堆插入元素的处理逻辑,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

5.2、take方法

队列为空的时候进入条件等待,take完元素之后,立刻重新构建小顶堆。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁
lock.lockInterruptibly();
E result;
try {
// 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
// 释放锁
lock.unlock();
}
return result;
}

这里比较有趣的是dequeue()方法,涉及到取最小元素,然后重新排序,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

6、SynchronousQueue

通过使用SynchronousQueue,我们可以在线程之间安全的传递变量,A线程把需要传递的变量放入SynchronousQueue,B线程读取。该队列特点如下:

  • 容量永远为0;
  • put操作阻塞,直到另一个线程取走了队列中的元素;
  • take操作阻塞,直到另一个线程put一个元素到队列中;
  • 任何线程只能取得其他线程put进去的元素。

与其他阻塞队列不同的是,SynchronousQueue不依赖与AQS实现,而是直接使用CAS操作实现的,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理。

我们不推荐使用的无界线程池Executors.newCachedThreadPool()底层就是用到了SynchronousQueue来实现的。

SynchronousQueue具有公平模式和非公平模式的区别,两者的实现不太一样,接下来就介绍一下。

6.1、公平模式

公平模式下,底层的数据结构是一个单向链表,对应实现类为:TransferQueue。

底层数据结构与LinkedBlockingQueue类似,只不过阻塞的条件不同

  • LinkedBlockingQueue在队列满的时候put线程会阻塞,在队列空的时候,take线程会阻塞;
  • SynchronousQueue put进去的元素没有被take的时候,put线程阻塞,take线程获取不到元素的时候,take线程阻塞;

如下图,刚开始有三个线程执行了put操作,都阻塞等待了:

image-20200323232118531

然后有一个新的线程4执行了take操作,这里是FIFO队列,匹配上了线程1,于是线程1取了线程1节点的数据,然后同时唤醒了线程1,头节点向前推进:

image-20200323232234349

这种FIFO的模式真是公平的体现。

大致执行流程就是这样子,比使用了AQS的简单,取而代之的是使用CAS,通过大量的检验节点是否变更和处理,以达到更高put和take的性能,不过代码就自然会变得很复杂了,感兴趣的朋友可以前往查看源码,这里就不做详细的解读了。

6.2、非公平模式

非公平模式,底层的数据结构是一个栈。代码也是比较复杂的,这里我直接用图来描述下其执行原理。

如下图,线程1、线程2、线程3依次执行put操作,入栈情况如下图,结果三个线程都阻塞了:

image-20200324000849491

这个时候线程4执行take操作,会入栈,与栈顶的栈帧进行匹配:

image-20200324001604302

匹配成功之后,唤醒匹配上的线程3,然后从栈中移除线程3和线程4

image-20200324001533658

可以发现非公平模式下是LIFO的队列。

7、DelayQueue

延迟队列,提供给了在指定时间内才能获取队列元素的功能。

底层是通过PriorityQueue实现的:

  • put元素,触发Delayed接口的compareTo方法重新排序PriorityQueue小顶堆,让最小的元素(最快到期)排在最前面;一定会put成功,容量不够则扩容;
  • take元素,判断第一个元素是否到期,到期了则把原始poll出来(同时会重新构造小顶堆),否则会执行awaitNanos(delay),等待头节点元素到期之后,再重新获取元素。

image-20200326231540134

8、各种阻塞锁对比

阻塞锁 数据结构 是否有界 线程安全 适用场景
ArrayBlockingQueue 数组 有界 一把ReentrantLock锁控制put和take。 生产消费模型,平衡处理速度
LinkedBlockingQueue 单向链表 可配置 两把ReentrantLock锁,put和take可以并发执行。 生产消费模型,平衡处理速度
LinkedBlockingDeque 双向链表 可配置 一把ReentrantLock锁控制put和take。 生产消费模型,平衡处理速度
优先级队列 PriorityBlockingQueue 二叉小顶堆 无界,会自动扩容 一把ReentrantLock锁控制put和take,队列为空的时候take进入条件等待; 短信队列中的验证码短信优先发送
同步队列 SynchronousQueue 单向链表或者栈 容量为1 CAS,put和take都会阻塞,直到配对成功为止; 线程之间传递数据
延迟队列 DelayQueue PriorityQueue,二叉小顶堆 无界,会自动扩容 一把ReentrantLock锁控制put和take,一次只能一个线程take,其他线程进入条件等待; 关闭超时空连接,任务超时处理…

9、使用案例

9.1、案例一:生产者消费者

下面是一个使用案例,使用到了阻塞队列和CountDownLatch。这个程序模拟:

  • 一个生产者线程,往阻塞队列中依次存入20条消息;
  • 一个消费者线程,一直尝试从阻塞线程中取出消息进行消费。

您可以使用上一节中 4.2.1~4.2.4中的任何一个阻塞队列替换掉代码中的阻塞队列,尝试看看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class BlockingQueueTest {

public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 这里可以替换为你想试用的阻塞队列
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
executor.submit(new Producer(queue));
executor.submit(new Cunsumer(queue));
executor.shutdown();
}
}

/**
* 生产者线程,往阻塞队列中依次存入20条消息
*/
class Producer implements Runnable{

private BlockingQueue<String> queue;

Producer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
for (int i = 0; i < 20; i++){
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
String threadName = Thread.currentThread().getName();
String msg = threadName + " : " + (i + 1);
queue.put(msg);
System.out.println("producer: " + msg);
}
System.out.println("producer finished...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

/**
* 消费者线程,一直尝试从阻塞线程中取出消息进行消费
*/
class Cunsumer implements Runnable{

private BlockingQueue<String> queue;

Cunsumer(BlockingQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(5000));
String msg = queue.take();
System.out.println("consumer " + Thread.currentThread().getName() + ", msg : " + msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}

}

9.2、案例二:优先级阻塞队列

下面是一个PriorityBlockingQueue的使用例子,可以发现每次put一个元素之后会自动排序,小顶堆第一个元素总是最小的那个,每次take出来的元素也是最小的,take出来之后也会再次自动排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class PriorityBlockingQueueTest {

public static void main(String[] args) throws InterruptedException {

PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>();
queue.put(new UserInfo(10, "User1"));
System.out.println("priorityBlockingQueue: " + queue);

queue.put(new UserInfo(5, "User2"));
System.out.println("priorityBlockingQueue: " + queue);

queue.put(new UserInfo(2,"User3"));
System.out.println("priorityBlockingQueue: " + queue);

queue.put(new UserInfo(4,"User4"));
System.out.println("priorityBlockingQueue: " + queue);

System.out.println("take data: " + queue.take());
System.out.println("priorityBlockingQueue: " + queue);

System.out.println("take data: " + queue.take());
System.out.println("priorityBlockingQueue: " + queue);
}

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class UserInfo implements Comparable<UserInfo>{

private int id;

private String name;

@Override
public String toString() {
return this.id + "";
}
@Override
public int compareTo(UserInfo person) {
return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
}
}

输出结果:

1
2
3
4
5
6
7
8
priorityBlockingQueue: [10]
priorityBlockingQueue: [5, 10]
priorityBlockingQueue: [2, 10, 5]
priorityBlockingQueue: [2, 4, 5, 10]
take data: 2
priorityBlockingQueue: [4, 10, 5]
take data: 4
priorityBlockingQueue: [5, 10]

9.3、案例三:SynchronousQueue的使用

下面使用SynchronousQueue的使用案例。大家可以替换成公平模式或者非公平模式,来查看程序输出结果,看看是FIFO还是LIFO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
ExecutorService executor = Executors.newFixedThreadPool(10);
// 构造函数参数设置公平模式或者非公平模式
SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);

Runnable producer = () -> {
Integer producedElement = ThreadLocalRandom
.current()
.nextInt();
try {
System.out.println(Thread.currentThread().getName() + " put " + producedElement);
queue.put(producedElement);
System.out.println(Thread.currentThread().getName() + " put finished");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};

Runnable consumer = () -> {
try {
TimeUnit.SECONDS.sleep(1);
Integer consumedElement = queue.take();
System.out.println(Thread.currentThread().getName() + " take " + consumedElement);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
};

Thread.sleep(1000);
executor.execute(producer);
Thread.sleep(1000);
executor.execute(producer);
Thread.sleep(1000);
executor.execute(producer);
Thread.sleep(1000);
executor.execute(consumer);

executor.shutdown();
System.out.println(queue.size());

References

A Guide to Java SynchronousQueue

本文作者: 帅旋

本文链接: https://www.itzhai.com/columns/cpj/blocking-queue.html

版权声明: 版权归作者所有,未经许可不得转载,侵权必究!联系作者请加公众号。

×
IT宅

关注公众号及时获取网站内容更新。