0%

读BlockingQueue源码--ArrayBlockingQueue(上篇)

BlockingQueue 是JUC包中的一个接口, 从名字可以看出是一个支持阻塞的队列. 在刚学Thread的时候, 是否有被生产者-消费者问题难倒过? 在学Thread的时候主要学习解决 生产者-消费者问题的思路, 如果现在在来解决这个问题, 我们就不用再手撸wait - notify的代码了, BlockingQueue 就可以很轻松的解决这个问题, 下面我们就学习下 并发大神Doug Lea 是怎么解决这个问题的.

0

BlockingQueue

BlockingQueue 是一个接口,下面我列出几个比较常用的子类;
2

  • ArrayBlockingQueue : 基于数组存储结构的阻塞有界队列
  • LinkedBlockingQueue: 基于链表的阻塞队列
  • LinkedBlockingDeque: 基于双链表的阻塞队列
  • DelayedWorkQueue: 支持延时的阻塞队列
  • PriorityBlockingQueue: 支持优先级的阻塞队列

这几个子类后面我准备都分别总结下;

首先我看看 BlockingQueue 定义了那些接口方法.这里我们暂时只考虑 ArrayBlockingQueue ,我们通过ArrayBlockingQueue 来开启认知BlockingQueue.

1

  • offer( e ): 如果添加成功则返回ture否则false;
  • add( e ): 底层调用的是 offer( e ) 方法, 添加失败的时候会抛出提示’Queue full’的异常.
  • put( e ): 支持阻塞的添加方法,如果添加不了,则阻塞.
  • take(): 取出,如果队列为空,则阻塞.

ArrayBlockingQueue

ArrayBlockingQueue 底层是基于数组结构,配合ReentrantLock实现的阻塞队列, 队列大小是固定的, 在构造的时候就已经决定队列的大小, 默认使用的是 非公平锁.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//队列的存储容器
this.items = new Object[capacity];
//锁
lock = new ReentrantLock(fair);
//2个条件链表
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

final Object[] items;
// 记录取出的位置
int takeIndex;
// 记录添加元素的位置
int putIndex;
// 记录队列中元素个数
int count;

//锁
final ReentrantLock lock;
// 非空条件 ConditionObject
private final Condition notEmpty;
// 非满条件 ConditionObject
private final Condition notFull;

其中 ConditionObject 内部有firstWaiter ,lastWaiter 两个指针,指向 CLH 条件链表的头和尾Node.

简单的调用

下面是简单的调用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

public class ThreadTest {


public static void main(final String[] args) {

ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue(4);

for (int i = 0; i < 2; i++) {
Producter thread = new Producter(String.valueOf(i), blockingQueue);
thread.start();
}


for (int i = 0; i < 2; i++) {
Customer thread = new Customer(String.valueOf(i), blockingQueue);
thread.start();
}


}

}

class Producter extends Thread {
ArrayBlockingQueue<Integer> blockingDeque;
String name;
int i = 0;

public Producter(String name, ArrayBlockingQueue<Integer> blockingDeque) {
this.blockingDeque = blockingDeque;
this.name = name;
}

@Override
public void run() {
try {
while (true) {
int a = i++;
blockingDeque.put(a);
Thread.sleep(500);
System.out.println(name + " 生产者 生产了 : " + a);
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Customer extends Thread {
ArrayBlockingQueue<Integer> blockingDeque;
String name;

public Customer(String name, ArrayBlockingQueue<Integer> blockingDeque) {
this.blockingDeque = blockingDeque;
this.name = name;
}

@Override
public void run() {
try {
while (true) {
Integer take = blockingDeque.take();
Thread.sleep(2000);
System.out.println(name + " 消费者 消费了 : " + take);
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

输出的日志:

从输出的日志可以看出,队列中存储了4个生产者生产的数字,消费者每次消费2个,生产者就立马补充.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 生产者 生产了   :  0
0 生产者 生产了 : 0
1 生产者 生产了 : 1
0 生产者 生产了 : 1
1 生产者 生产了 : 2
0 生产者 生产了 : 2
0 消费者 消费了 : 0
1 消费者 消费了 : 0
1 生产者 生产了 : 3
0 生产者 生产了 : 3
0 消费者 消费了 : 1
1 消费者 消费了 : 1
1 生产者 生产了 : 4
0 生产者 生产了 : 4
0 消费者 消费了 : 2
1 消费者 消费了 : 2
1 生产者 生产了 : 5
0 生产者 生产了 : 5

生产者-消费者模型

这里以餐饮店为例,来了解下一般生产者-消费者模型.

餐饮店有3个大厨, 3个服务生, 后堂和用餐区有一个存放已经烹饪好的食物的传菜口,大概就像下图展示的这个样子:

l1

当用户来店吃饭, 省略了服务员催菜的过程,直接大厨直接很高效的生产了3个菜,在服务员还没反应过来的时候就陆续把菜品房进了传菜区, 然后按铃 ,通知服务员来取. 如下图所示:

l2

服务员取走食物之后,同时通知大厨,”菜我取走了, 你们可以张罗下一个菜了”.然后大厨开始准备下一个菜.

l3

上面简单的生产者-消费者模型,还有很多的情况没有考虑,比如服务员们和大厨们缺乏沟通:

  • 1 .大厨们盲目的积极,一直轮番进入传菜通道,即使通道中都已经积累满了菜品,也不给服务员进入的机会?
    1. 服务员盲目的积极,一直轮番进入传菜通道,即使通道中没有菜可取,也丝毫不给大厨们进入的机会?

上面的两种情况都会导致餐饮店不能正常经营不下去.BlockingQueue(ArrayBlockingQueue)就已经完整的考虑了这种情况,ArrayBlockingQueue 中 有两个 Condition 条件对象 notFull,notEmpty,其通过这两个 Condition 条件对象 实现 大厨们和服务员 的沟通问题,避免上面的类似问题发生.

图解ArrayBlockingQueue

我们之间从put(e)方法开始入手,模拟生产者从put到消费者take的完整过程.看看ArrayBlockingQueue 是用什么策略来解决生产者-消费者问题的.

开始 ArrayBlockingQueue 已满

我们这里假设 ArrayBlockingQueue 的数组容量是6, 一开始就已经线程将其填充慢,此时,又来了3个线程 A B C想要往队列中添加元素.

如下图所示:

4

正常情况下,可能是同时存在 生产者, 消费者线程同时来,但是我们这里先简单考虑.

线程间 ReentrantLock 之争

A B C 3个线程都尝试获取 ArrayBlockingQueue 中 ReentrantLock 锁,但是只有线程A 竞争到了锁, B C 两个线程竞争锁失败,进入CLH队列等待并阻塞. 如下图所示:

5

下面我们来对应看下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 线程A 获取到锁, 线程 BC 获取锁失败 阻塞
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

public void lockInterruptibly() throws InterruptedException {
// 默认 调用非公平锁
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
throws InterruptedException {
//优先判断线程是否中断
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁, 如果获取锁成功, 直接返回, 如果获取锁失败
if (!tryAcquire(arg))
// 尝试获取锁, 如果还是获取不到锁 就进入CLH 队列阻塞自己
doAcquireInterruptibly(arg);
}


3个线程执行到lock.lockInterruptibly();时 ,

线程A 获取到锁, 线程 BC 获取锁失败 阻塞在改位置.

如果不是熟悉 ReentrantLock ,可以去看下上一篇文章. 这里就不在对ReentrantLock 在做展开;

生产者的谦让

此时,只有线程A 竞争获取到了锁,但是很不幸,我们的队列已经满了. 所以执行了条件队列方法notFull.await();

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//线程A 发现 队列已满
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

生产者的谦让

生产者的谦让 –构建等待队列

await 方法中, 首先 在notFull Condition 构建了一个条件等待队列,也就是图中的 Wait --CLH队列,firstWaiterlastWaiter 分别表示条件等待队列的头和尾.

这里之所以将条件等待队列叫 Wait --CLH队列, 是因为其是基于AQS中的CLH链表来实现的.模式差不多,只是条件队列中的元素的waitStatusNode.CONDITION( -2 ).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//  Condition 方法     
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

// Condition 方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

生产者的谦让 – 释放锁,唤醒ReentrantLock CLH队列线程,阻塞自己

  • 完全释放锁
  • 唤醒ReentrantLock CLH队列中的线程
  • 阻塞自己

线程A 执行fullyRelease 完全释放锁, 即使是重入的锁也将全部释放. 修改ReentrantLock中的state,exclusiveOwnerThread.

如果ReentrantLock CLH队列中有线程在等待锁, 在唤醒该线程.

最后判断sOnSyncQueue(Node node) ,线程A 在条件等待队列中, 其 waitStatus 是 Node.CONDITION 所以满足条件,线程A 阻塞自己LockSupport.park(this).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

// Condition 方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//添加到条件队列
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

//完全释放锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// 线程A 在条件等待队列中, 其 waitStatus 是 Node.CONDITION
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

线程 A 释放线程,唤醒前面竞争锁失败C , 线程C 命运和线程A一样,同样的事情也将发生在线程B身上.

线程C B 从被阻塞的位置 lock.lockInterruptibly() 开始执行.

  • 如果线程B 在获取到锁后, ReentrantLock CLH队列 中没有等待的线程,那么线程B,直接将自己添加到条件队列, 释放锁..等待机缘.故事到此结束 END.

  • 如果线程B 在获取到锁后,ReentrantLock CLH队列 中出现了一个消费者线程C0, 那故事就发生了转机.

生产者的机缘

如果故事想继续下去, 阻塞在条件等待队列(Wait –CLH队列)中的线程需要一个机缘— 消费者的出现.

你们期待已久的消费者, 好,今天他来了!

生产者的机缘 – 消费者出现

此时,来了一个消费者线程C0, 它可能是直接获取锁,也可能是线程B 唤醒来的,反正它会拿到ReentrantLock 锁权限.

1
2
3
4
5
6
7
8
9
10
11
12
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//消费者线程获取到了锁 , 其他线程竞争锁失败 阻塞在此
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

7

在消费者线程C0后面,可能还有其他的线程,在等待获取ReentrantLock 锁.

8

生产者的机缘 – 消费者消费

消费者线程C0 ,执行的是take()方法,从ArrayBlockingQueue队列中消费掉一个元素, 所以数组将空余一个位置. 与此同时,消费者线程C0将通过调用notFull.signal();通知前面在等待的条件队列.

9

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

生产者的机缘 – 消费者反馈

前面生产者线程A B C 都进入了 notFull的条件队列. 在消费者线程C0 从队列取走元素之后.

  • 通知在 notFull的条件队列的线程,通知其可以去尝试获取ReentrantLock锁,并依次清空notFull的条件队列.
  • 释放锁

如果notFull的条件队列中有在等待的线程,通过do ... while ... 循环可以看出, 其依次通知在notFull的条件队列中有在等待的线程.

  • 1.修改waitStatus状态,从 Node.CONDITION 改为0. (默认添加到ReentrantLock中的Node waitStatus 为0)
  • 2.如果线程取消 ws > 0 , 唤醒该线程,被唤醒的线程 将从condition.await()里面LockSupport.park(this)位置重新开始执行,后续acquireQueued(node, savedState) 补充获取锁权限,如果获取不到锁权限,将阻塞在 acquireQueued内,如果获取锁成功,则直接入队元素,最终完成任务,释放锁.
  • 3.compareAndSetWaitStatus(p, ws, Node.SIGNAL) 失败 ,唤醒该线程,被唤醒的线程 将从condition.await()里面LockSupport.park(this)位置重新开始执行,后续acquireQueued(node, savedState) 补充获取锁权限,如果获取不到锁权限,将阻塞在acquireQueued内 ,如果获取锁成功,则直接入队元素,最终完成任务,释放锁.
  • 4.陆续将条件队列中的每个节点的前一个节点的ws 设置为 Node.SIGNAL, 通过调用compareAndSetWaitStatus(p, ws, Node.SIGNAL). 最终的结果是全部迁移的节点前一个元素的waitStatus 都是-1, 只有最后一个节点 依旧是 0.

注意: 阻塞在条件队列中的位置!!!!!!!! 阻塞在 condition.await() 方法中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//返回 原尾节点
Node p = enq(node);
// 默认条件在reentrantLock 尾部的元素 ws = 0.
int ws = p.waitStatus;
//
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

10

这里的第 4 点, 比较有讲究: 循环调用transferForSignal ,将条件队列中的线程,迁移到reentrantLock CLH 队列中, 并依次设置其前一个节点,也就是添加前的tail节点,将其waitStatus设置为Node.SIGNAL. 此时还是被当前线程持有, 在take()执行完成返回之后,执行其finaly ,执行释放锁的操作.

在执行释放reentrantLock锁的时候, 最终会释放unparkSuccessor(h)在排队等锁的第一个线程.

进入reentrantLock CLH 队列的 线程A 就可以重新尝试获取锁了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//
unparkSuccessor(h);
return true;
}
return false;
}

至此,我们就简单理解完了. 生产者太积极,导致队列阻塞问题.

这样我们就不用担心大厨们阻塞传菜通道了.

下篇,我们从消费者太积极导致队列阻塞,在来解读一遍ArrayBlockingQueue.