BlockingQueue 是JUC包中的一个接口, 从名字可以看出是一个支持阻塞的队列. 在刚学Thread
的时候, 是否有被生产者-消费者问题难倒过? 在学Thread的时候主要学习解决 生产者-消费者问题
的思路, 如果现在在来解决这个问题, 我们就不用再手撸wait - notify
的代码了, BlockingQueue 就可以很轻松的解决这个问题, 下面我们就学习下 并发大神Doug Lea 是怎么解决这个问题的.
BlockingQueue BlockingQueue 是一个接口,下面我列出几个比较常用的子类;
ArrayBlockingQueue : 基于数组存储结构的阻塞有界队列
LinkedBlockingQueue: 基于链表的阻塞队列
LinkedBlockingDeque: 基于双链表的阻塞队列
DelayedWorkQueue: 支持延时的阻塞队列
PriorityBlockingQueue: 支持优先级的阻塞队列
这几个子类后面我准备都分别总结下;
首先我看看 BlockingQueue
定义了那些接口方法.这里我们暂时只考虑 ArrayBlockingQueue
,我们通过ArrayBlockingQueue 来开启认知BlockingQueue.
offer( e ): 如果添加成功则返回ture
否则false
;
add( e ): 底层调用的是 offer( e )
方法, 添加失败的时候会抛出提示’Queue full’的异常.
put( e ): 支持阻塞的添加方法,如果添加不了,则阻塞.
take(): 取出,如果队列为空,则阻塞.
ArrayBlockingQueue ArrayBlockingQueue 底层是基于数组结构,配合ReentrantLock实现的阻塞队列, 队列大小是固定的, 在构造的时候就已经决定队列的大小, 默认使用的是 非公平锁
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final Object[] items;int takeIndex;int putIndex;int count;final ReentrantLock lock;private final Condition notEmpty;private final Condition notFull;
其中 ConditionObject 内部有firstWaiter
,lastWaiter
两个指针,指向 CLH
条件链表的头和尾Node.
简单的调用 下面是简单的调用.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class ThreadTest { public static void main (final String[] args) { ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue(4 ); for (int i = 0 ; i < 2 ; i++) { Producter thread = new Producter(String.valueOf(i), blockingQueue); thread.start(); } for (int i = 0 ; i < 2 ; i++) { Customer thread = new Customer(String.valueOf(i), blockingQueue); thread.start(); } } } class Producter extends Thread { ArrayBlockingQueue<Integer> blockingDeque; String name; int i = 0 ; public Producter (String name, ArrayBlockingQueue<Integer> blockingDeque) { this .blockingDeque = blockingDeque; this .name = name; } @Override public void run () { try { while (true ) { int a = i++; blockingDeque.put(a); Thread.sleep(500 ); System.out.println(name + " 生产者 生产了 : " + a); } } catch (InterruptedException e) { e.printStackTrace(); } } } class Customer extends Thread { ArrayBlockingQueue<Integer> blockingDeque; String name; public Customer (String name, ArrayBlockingQueue<Integer> blockingDeque) { this .blockingDeque = blockingDeque; this .name = name; } @Override public void run () { try { while (true ) { Integer take = blockingDeque.take(); Thread.sleep(2000 ); System.out.println(name + " 消费者 消费了 : " + take); } } catch (InterruptedException e) { e.printStackTrace(); } } }
输出的日志:
从输出的日志可以看出,队列中存储了4个生产者生产的数字,消费者每次消费2个,生产者就立马补充.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 1 生产者 生产了 : 0 0 生产者 生产了 : 0 1 生产者 生产了 : 1 0 生产者 生产了 : 1 1 生产者 生产了 : 2 0 生产者 生产了 : 2 0 消费者 消费了 : 0 1 消费者 消费了 : 0 1 生产者 生产了 : 3 0 生产者 生产了 : 3 0 消费者 消费了 : 1 1 消费者 消费了 : 1 1 生产者 生产了 : 4 0 生产者 生产了 : 4 0 消费者 消费了 : 2 1 消费者 消费了 : 2 1 生产者 生产了 : 5 0 生产者 生产了 : 5
生产者-消费者模型 这里以餐饮店为例,来了解下一般生产者-消费者模型.
餐饮店有3个大厨, 3个服务生, 后堂和用餐区有一个存放已经烹饪好的食物的传菜口,大概就像下图展示的这个样子:
当用户来店吃饭, 省略了服务员催菜的过程,直接大厨直接很高效的生产了3个菜,在服务员还没反应过来的时候就陆续把菜品房进了传菜区, 然后按铃 ,通知服务员来取. 如下图所示:
服务员取走食物之后,同时通知大厨,”菜我取走了, 你们可以张罗下一个菜了”.然后大厨开始准备下一个菜.
上面简单的生产者-消费者模型,还有很多的情况没有考虑,比如服务员们和大厨们缺乏沟通:
1 .大厨们盲目的积极,一直轮番进入传菜通道,即使通道中都已经积累满了菜品,也不给服务员进入的机会?
服务员盲目的积极,一直轮番进入传菜通道,即使通道中没有菜可取,也丝毫不给大厨们进入的机会?
上面的两种情况都会导致餐饮店不能正常经营不下去.BlockingQueue(ArrayBlockingQueue)就已经完整的考虑了这种情况,ArrayBlockingQueue 中 有两个 Condition
条件对象 notFull,notEmpty,其通过这两个 Condition
条件对象 实现 大厨们和服务员 的沟通问题,避免上面的类似问题发生.
图解ArrayBlockingQueue 我们之间从put(e)
方法开始入手,模拟生产者从put
到消费者take
的完整过程.看看ArrayBlockingQueue 是用什么策略来解决生产者-消费者
问题的.
开始 ArrayBlockingQueue 已满 我们这里假设 ArrayBlockingQueue 的数组容量是6
, 一开始就已经线程将其填充慢,此时,又来了3个线程 A B C
想要往队列中添加元素.
如下图所示:
正常情况下,可能是同时存在 生产者, 消费者线程同时来,但是我们这里先简单考虑.
线程间 ReentrantLock 之争 A B C
3个线程都尝试获取 ArrayBlockingQueue 中 ReentrantLock 锁,但是只有线程A
竞争到了锁, B C
两个线程竞争锁失败,进入CLH
队列等待并阻塞. 如下图所示:
下面我们来对应看下源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
3个线程执行到lock.lockInterruptibly();
时 ,
线程A 获取到锁, 线程 BC 获取锁失败 阻塞在改位置.
如果不是熟悉 ReentrantLock ,可以去看下上一篇文章. 这里就不在对ReentrantLock 在做展开;
生产者的谦让 此时,只有线程A
竞争获取到了锁,但是很不幸,我们的队列已经满了. 所以执行了条件队列方法notFull.await();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
生产者的谦让 –构建等待队列 在await
方法中, 首先 在notFull
Condition 构建了一个条件等待队列,也就是图中的 Wait --CLH队列
,firstWaiter
和 lastWaiter
分别表示条件等待队列的头和尾.
这里之所以将条件等待队列叫 Wait --CLH队列
, 是因为其是基于AQS中的CLH链表来实现的.模式差不多,只是条件队列中的元素的waitStatus
是Node.CONDITION( -2 )
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
生产者的谦让 – 释放锁,唤醒ReentrantLock CLH队列线程,阻塞自己
完全释放锁
唤醒ReentrantLock CLH队列中的线程
阻塞自己
线程A
执行fullyRelease
完全释放锁, 即使是重入的锁也将全部释放. 修改ReentrantLock中的state,exclusiveOwnerThread.
如果ReentrantLock CLH队列中有线程在等待锁, 在唤醒该线程.
最后判断sOnSyncQueue(Node node)
,线程A
在条件等待队列中, 其 waitStatus 是 Node.CONDITION 所以满足条件,线程A
阻塞自己LockSupport.park(this)
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public final void await () throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); }
线程 A
释放线程,唤醒前面竞争锁失败C
, 线程C
命运和线程A
一样,同样的事情也将发生在线程B
身上.
线程C B
从被阻塞的位置 lock.lockInterruptibly()
开始执行.
如果线程B
在获取到锁后, ReentrantLock CLH队列 中没有等待的线程,那么线程B
,直接将自己添加到条件队列, 释放锁..等待机缘.故事到此结束 END
.
如果线程B
在获取到锁后,ReentrantLock CLH队列 中出现了一个消费者线程C0
, 那故事就发生了转机.
生产者的机缘 如果故事想继续下去, 阻塞在条件等待队列(Wait –CLH队列)中的线程需要一个机缘— 消费者的出现.
你们期待已久的消费者, 好,今天他来了!
生产者的机缘 – 消费者出现 此时,来了一个消费者线程C0
, 它可能是直接获取锁,也可能是线程B
唤醒来的,反正它会拿到ReentrantLock 锁权限.
1 2 3 4 5 6 7 8 9 10 11 12 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
在消费者线程C0
后面,可能还有其他的线程,在等待获取ReentrantLock 锁.
生产者的机缘 – 消费者消费 消费者线程C0
,执行的是take()
方法,从ArrayBlockingQueue队列中消费掉一个元素, 所以数组将空余一个位置. 与此同时,消费者线程C0
将通过调用notFull.signal();
通知前面在等待的条件队列.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private E dequeue () { final Object[] items = this .items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
生产者的机缘 – 消费者反馈 前面生产者线程A B C
都进入了 notFull
的条件队列. 在消费者线程C0
从队列取走元素之后.
通知在 notFull
的条件队列的线程,通知其可以去尝试获取ReentrantLock锁,并依次清空notFull
的条件队列.
释放锁
如果notFull
的条件队列中有在等待的线程,通过do ... while ...
循环可以看出, 其依次通知在notFull
的条件队列中有在等待的线程.
1.修改waitStatus状态,从 Node.CONDITION 改为0. (默认添加到ReentrantLock中的Node waitStatus 为0)
2.如果线程取消 ws > 0 , 唤醒该线程,被唤醒的线程 将从condition.await()里面LockSupport.park(this)
位置重新开始执行,后续acquireQueued(node, savedState)
补充获取锁权限,如果获取不到锁权限,将阻塞在 acquireQueued内
,如果获取锁成功,则直接入队元素,最终完成任务,释放锁.
3.compareAndSetWaitStatus(p, ws, Node.SIGNAL) 失败 ,唤醒该线程,被唤醒的线程 将从condition.await()里面LockSupport.park(this)
位置重新开始执行,后续acquireQueued(node, savedState)
补充获取锁权限,如果获取不到锁权限,将阻塞在acquireQueued内
,如果获取锁成功,则直接入队元素,最终完成任务,释放锁.
4.陆续将条件队列中的每个节点的前一个节点的ws 设置为 Node.SIGNAL, 通过调用compareAndSetWaitStatus(p, ws, Node.SIGNAL)
. 最终的结果是全部迁移的节点前一个元素的waitStatus 都是-1
, 只有最后一个节点 依旧是 0
.
注意 : 阻塞在条件队列中的位置!!!!!!!! 阻塞在 condition.await()
方法中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) lastWaiter = null ; first.nextWaiter = null ; } while (!transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true ; }
这里的第 4 点, 比较有讲究: 循环调用transferForSignal
,将条件队列中的线程,迁移到reentrantLock CLH 队列中, 并依次设置其前一个节点
,也就是添加前的tail节点
,将其waitStatus设置为Node.SIGNAL. 此时锁
还是被当前线程持有, 在take()
执行完成返回之后,执行其finaly
,执行释放锁的操作.
在执行释放reentrantLock锁的时候, 最终会释放unparkSuccessor(h)
在排队等锁的第一个线程.
进入reentrantLock CLH 队列的 线程A
就可以重新尝试获取锁了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
至此,我们就简单理解完了. 生产者太积极,导致队列阻塞问题.
这样我们就不用担心大厨们阻塞传菜通道了.
下篇,我们从消费者太积极导致队列阻塞,在来解读一遍ArrayBlockingQueue.