0%

读BlockingQueue源码--ArrayBlockingQueue(下篇)

上篇我们图解生产者阻塞队列的场景,这篇,我们从消费者阻塞队列的场景,再来图解一次ArrayBlockingQueue.

消费者阻塞

开始图解之前,我们先说下我的规则, 继续沿用上篇中的设定, ArrayBlockingQueue队列的大小 6;

消费者来了

上一篇中,消费者线程C 迟迟不来, 这回他来了,而且来了很多.

这里我们假设有3个消费者线程A B C,如下图所示:

场景开始的情况

积极的消费者

消费者线程A B C都想从ArrayBlockingQueue 中取走元素.他们都执行take( )操作. 但是只有 线程A 竞争到了锁. 其他线程都在阻塞在 lock.lockInterruptibly();内部, 并进入了 ReentrantLock CLH 队列.

如下图所示:

积极的消费者

得上点源码了,嘿嘿.

1
2
3
4
5
6
7
8
9
10
11
12
13
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 只有线程 A 获取到了锁
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

消费者发现队列是空的

消费者线程A获取到了锁,想出队元素, 发现队列居然是空的.

触发设定条件:

  • 1.线程加入等待队列.
  • 2.释放当前锁的状态值.
  • 3.阻塞自己

消费者想A执行take操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//线程加入等待队列
Node node = addConditionWaiter();
//释放当前锁的状态值
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//阻塞自己
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

上面的这一系列操作,和上一篇的生产者 await() 调用的是同一个方法,只是区分了不同实例调用而已.

消费者相同的境遇

自从 线程A 将锁的状态值释放为0,线程B C ,就可以尝试获取锁.

不管B C 哪个线程获取到了锁,最终他们面临的问题和线程A 一样.

消费者BC通用的命运

消费者最终囧状

下面是,线程A B C的最终状况, A B C 都进入条件等待队列,都被阻塞住. 锁的状态值为0, 等待机缘.

消费者线程A B C,统统阻塞在notEmpty.await() 内部.

积极消费者的最终情况

消费者–机缘已来

在消费者线程A B C 被困在 条件队列中苦苦等待的时候, 来了一个生产者线程 A

救星的出现

生产者线程A , 向队列中添加了一个元素.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//入队元素
enqueue(e);
} finally {
lock.unlock();
}
}

private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//发出信号,通知阻塞在条件队列中等待的消费者
notEmpty.signal();
}

救星通知大伙

生产者线程A , 不仅带来了元素, 还给消费者们发送出信号. 通知消费者线程 A B C 进入 reentrantLock - CLH 队列等待获取锁. 这里的操作设上一篇的一样.这里就不在赘述.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

生产者–释放锁

生产者执行完上面一系列操作之后, 结束了自己的任务, 释放了锁.

救星走了

此时, ArrayBlockingQueue 队列中有了一个元素. 以及3个等待获取锁的 消费者线程A B C.

在生产者释放锁的时候, 消费者线程A将获取到锁.


 public void unlock() {
        sync.release(1);
    }

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

消费者–取走元素

消费者线程A,获取到了锁权限, 所以它能取走元素. 并释放锁.
A取走元素

故事再次开始

消费者线程A,取走了元素,留下了一个空的 ArrayBlockingQueue 队列. 以及消费者线程B C.消费者线程B C将再度开始前面发送的故事,并等待新的机缘.

故事好像又开始了

至此. 消费者太积极,导致的问题也已经图解完毕. 现在店家就不用担心,餐饮店开不下去了.