0%

ReentrantLock源码解析

早在JDK 1.6 之前,JDK 提供的 synchronized 关键字同步是重量轻锁性能比较低, Doug Lea 以及其团队就自己用Java 开发了一套应对并发问题的类,这些类就是我们平时说的JUC(java utill concurrent),这些工具类都放在java.util.concurrent包下,比ConcurrentHashMap,AtomicXxxx等等,其中关键部分就是Lock,这里我们来认识下其中关键的Lock子类ReentrantLock;

timg

柯南 和 新一 是否可以理解为线程, 同时争夺一个身体的控制权呢? 好像是有点牵强,但是这不是重点.哈哈

背景

早在JDK 1.6 之前, JDK 提供了一套基于JVM内置实现的同步方案,也就是 synchronized ,通过内部对象 Monitor 的来实现同步,但是基于进入和退出 Monitor 对象实现的方法与代码块同步,它是一个重量轻锁,其性能比较低。 在这个背景下,Doug Lea 通过便使用Java语音开发了一套性能更高的同步锁。

特点

ReentrantLock 是基于AQS(AbstractQueuedSynchronizer)实现了两种锁,FairSync(公平锁),以及NonfairSync(非公平锁);

  • 1:可重入性
  • 2:公平性
  • 3:可中断性

可重入性

当线程持有锁,可以再对需要同步的代码块重复加锁,从类的命名上也可以看的出来.

公平性

直接拿FairSync,NonfairSync来说明,当一个线程A持有锁,其他线程(BCD)想获取锁失败之后,都会加入到一个CLH 队列(双链表)中去排队,等待持有锁的线程释放锁,正当持有锁的线程A释放了锁时,同时来了一个新线程(E),这个线程E,不是通过队列排队等待出队,而是直接和BCD线程抢锁,这就是非公平的锁。

当A线程释放锁时:

FairSync:新线程E,不可以直接参与获取锁,而是先去队列尾排队,等待AQS通知。

NonfairSync:新线程E,可以直接参与获取锁,与在队列中的BCD竞争锁。

可中断性

允许将在等待锁的线程作废掉。

ReentrantLock 简单使用

下面是简单的测试代码,a 是一个共享的变量,在不加锁的情况下, 打印的结果 <= 10 ; 我们通过 ReentrantLock 锁 ,可以保证每次稳定的输出结果10。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Test {

static int a = 1;

public static void main(String[] args) throws InterruptedException {
final ReentrantLock reentrantLock = new ReentrantLock(true);
Runnable runnable = new Runnable() {
public void run() {
try {
reentrantLock.lock();
Thread.sleep(10);
a++;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}

}
};

for (int i = 1; i < 10; i++) {
Thread thread = new Thread(runnable);
thread.start();
}

Thread.sleep(1000);
System.out.println(a);
}
}

ReentrantLock & AQS 结构

ReentrantLock 实现了 Lock 接口,内部提供了2个实现了AQS抽象方法的默认子类:FairSync,NonfairSync;

ReentrantLock的构造方法,默认创建的是一个NonfairSync的锁;

AQS 内部有 一个以3人名字首字母命名(CLH Craig, Landin, and Hagersten)的Node对象引用, 这个对象内部维护了东西有如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

tatic final class Node {

static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

// 存储等待状态
volatile int waitStatus;

volatile Node prev;

volatile Node next;
// 用于存储排队线程引用
volatile Thread thread;

Node nextWaiter;
}


这里我们先只看 FairSync ,NonfairSync 有关的参数;

waitStatus:可以有如下这些情况,FairSync ,NonfairSync只需要关注,EXCLUSIVE,SIGNAL 两种状态即可。

  • 1:EXCLUSIVE 独占
  • 2:CANCELLED 取消
  • 3:SIGNAL 信号量
  • 4:CONDITION 条件
  • 5:PROPAGATE 传播

从 prev,next 可以看出,这个CLH 链表是一个双向链表, 并且这CLH链表从存储了 thread 的信息;

下面我们来看看AQS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

/**
* The synchronization state.
*/
private volatile int state;

volatile Thread thread;

AQS 对象中 维护着, CLH 链表的 头head 和 尾tail, 以及state;

state 表示同步状态。

1
2
3
4
5
state = 0    说明没有线程持有锁

state = 1 说明有线程持有锁,执行了1次lock操作

state = 2 说明持有锁的线程,执行了2次lock操作

ReentrantLock 源码原理

正常我们使用 ReentrantLock ,通过lock() 加锁,unLock()解锁,下面我们之间从这两个方法入口入手,看看ReentrantLock的实现原理。 我们分别查看下FairSync,NonfairSync的实现;

下面又开始了源码搬运工的工作了

1
2
3
4
//ReentrantLock 锁的入口
public void lock() {
sync.lock();
}

sync 的引用对象是FairSync,NonfairSync;

FairSync 公平锁

我们这里假设有两个线程Thread A,B 同时来获取锁;

假设 A获取到了锁,B未获取到锁;

我们分别看下A,B线程 执行lock() 都经历了些什么。

gxs

线程A

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 static final class FairSync extends Sync {
final void lock() {
// 获取锁
acquire(1);
}

public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg) &&
// 尝试获取锁失败 获取队列 并将 线程信息添加到队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 默认 state 是 0
int c = getState();
if (c == 0) {
// 判断队列情况
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// 是否有队列
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

线程A :通过调用acquire(1)获取锁:

  • 1:线程A 通tryAcquire(1)尝试获取锁,因为线程A 第一个来获取锁的,当前AQS 中的 state = 0
  • 2: state = 0 , 没有线程持有该锁;
  • 3:hasQueuedPredecessors() 返回false, 判断是否有队列: 这里我们是第一个获取到锁的线程,暂时还没有等待锁的CLH链表;
  • 4:compareAndSetState(0, acquires); cas 的方式赋值设置 state ,锁状态置为 1,表明 该线程获取到了锁,其他线程不能在获取锁。
  • 5:setExclusiveOwnerThread(current),将当前线程的引用设置给AQS;
  • 6: 获取到了锁,直接调用结束;

线程B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

//加锁
final void lock() {
acquire(1);
}

//获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

//1:线程B 通过tryAcquire(1),因为state = 1 ,尝试获取锁失败;
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

//2:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果不是空链 ,则在链表尾部添加一个节点,
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果是空链
enq(node);
return node;
}
//3:addWaiter(Node.EXCLUSIVE) : 先新创建一个空白的Node,作为AQS中CLH链表的头,再创建一个包含等待线程信息的Node 添加到链表。
//如果是链表为null
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 先添加一个 空Node 作为链表的头
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将节点 添加在 链表尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

//4:acquireQueued(): 循环,判断当前线程是否是头head 下一个节点Node , 如果是在尝试tryAcquire(获取一次锁, 我们这里假设获取不到锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

//shouldParkAfterFailedAcquire(): 将前一个Node 的state 状态改为 Node.SIGNAL;
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//第一次循环 将waitStatus 改为Node.SIGNAL了
return true;
if (ws > 0) {

do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 修改waitStatus
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

//parkAndCheckInterrupt();park 住线程,阻塞住线程,等待唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

线程B:通过调用acquire(1)获取锁:

  • 1:线程B 通过tryAcquire(1),因为state = 1 ,尝试获取锁失败;
  • 2:acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  • 3:addWaiter(Node.EXCLUSIVE) : 先新创建一个空白的Node,作为AQS中CLH链表的头,再创建一个包含等待线程信息的Node 添加到链表。— 循环执行了两次
  • 4:acquireQueued(): 循环,判断当前线程是否是头head 下一个节点Node , 如果是在尝试tryAcquire()获取一次锁, 我们这里假设获取不到锁。
  • 5:shouldParkAfterFailedAcquire(): 将前一个Node 的state 状态改为 Node.SIGNAL;— 循环执行了两次
  • 6:parkAndCheckInterrupt();park 住线程,阻塞住线程,等待唤醒;

线程A释放锁 unLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 前面等待的时候,已经将前面的节点的 waitStatus 全部改为了Node.SIGNAL
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}

//尝试释放锁( 通过计算 state )
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 释放线程引用
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 唤醒线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 循环找到链表 第二个位置的Node 唤醒该节点中的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程
LockSupport.unpark(s.thread);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

//线程被唤醒之后 从阻塞位置开始执行
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//这里就获取到锁
if (p == head && tryAcquire(arg)) {
// 获取到了锁 断开和前一个节点的联系, 该节点不被引用,将被GC掉
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 被唤醒的线程 从被阻塞的这个位置开始执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

NonfairSync 非公平锁

NonfairSync 线程启动,就可以立马去和当前在排队的线程 争夺锁的使用权,不用进入链表(队列)尾部进行排队.
如果争夺失败,则插入在队列尾部,进行排队等待.

下面我们对比看下代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

//非公平锁的 尝试获取锁方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

对比FairSync,NonfairSync 的 tryAcquire(int acquires) 方法, 先比NonfairSync,
tryAcquire(int acquires)中, FairSync 多了 !hasQueuedPredecessors()的判断.

1
2
3
4
5
6
7
8
9
10
11
12
13
// 如果放回false  说明队列只有一个空 head节点 或者 队列是为空
//如果返回ture 说明 队列有线程正在排队
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

  • 1 如果返回false 说明队列只有一个空 head节点 或者 队列是为空;
  • 2 如果返回ture 说明 队列有线程正在排队;

线程中断

假如等待获取锁的线程发生中断,会是个什么样的流程呢?

如果等待获取reentrantlock锁的线程,在获取到锁之前发生了中断,其线程中断标记上会保留其中断的记录.

我们来看看在线程从阻塞状态唤醒后,都发生了写什么.

线程会在 LockSupport.park(this);阻塞的地方,重新开始执行, parkAndCheckInterrupt()的方法会返回中断标记, 中断过就返回 ture, 这里调用获取中断标记的api在获取完中断标记之后,会清除线程中断标记.

1
2
3
4
5
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

伴随着parkAndCheckInterrupt() 返回的ture, 因此 interrupted = true; ,假设这里是公平锁, 线程被唤醒之后, tryAcquire()获取到了锁, acquireQueued()方法直接返回的 ture;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

因为acquireQueued()方法直接返回的 ture, 所以程序会进入if 中执行selfInterrupt();,程序再次调用中断API,自己将自己中断. 至此, 该中断的线程就将在try,catch,finally中执行unlock操作.

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

补充: 如果是非公平锁,被唤醒的中断线程,需要和新来的线程同时竞争锁, 中断线程竞争锁失败,将重新阻塞,等待下次获取锁.