0%

图解ThreadPoolExecutor

Java线程的创建非常昂贵, 为了在有限资源的情况下, 又有线程需求的时候,我们希望线程能复用,这样就可以尽可能的减少创建,消耗线程的开销.因此就有了线程池 –ThreadPoolExecutor.

这里为什么一反常态,既不是放的类关系, 也不是放的柯南, 而是放的筷子,勺子的照片呢?

首先是真的想不到,怎么和柯南关联上.其二是类关系好像也没啥好放的.反倒是筷子到时很时候理解ThreadPoolExecutor的.在这里,我们把线程当做是筷子或者勺子, 筷子篓当做线程池.当我们需要筷子的时候, 但是我们又没有筷子,我们需要去买筷子,或者自己弄材料做筷子. 自己弄筷子的这个事情,比较麻烦, 就相当于系统创建线程一样,开销大.

如果我们用完筷子之后,直接丢弃, 那我们下次再有需要筷子的时候,我们又得麻烦一遍.

如果我们将筷子洗刷干净,放在筷子篓中,下次有需要我们直接从筷子篓中拿就好了,这样是不是更方便呢.

线程池也差不多是这个意思,下面我们就来图解ThreadPoolExecutor吧.

筷子

为什么创建线程开销大?

在图解之前,我们先了解下.为什么创建线程开销大?

Java 创建线程需要执行大量的工作, 想系统申请资源,然后等待操作系统为完成准备工作:

  • 为线程堆栈分配和初始化内存块
  • 需要进行系统调用,以便在主机OS中创建/注册本机线程
  • 创建、初始化并添加到JVM内部数据结构中

阿里规范–线程池的使用

阿里规范中强制规定使用线程池要根据自己的需求场景自己构造线程池,而不去使用Executors工具类去创建线程池.

1
2
3
4
5
6
7
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

线程池的使用

线程池构造参数

通过构造方法,创建线程池,其有7个参数. 下面就来认识下线程池的构造方法.

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
...
}

  • int corePoolSize 核心线程数
  • int maximumPoolSize 非核心线程数
  • long keepAliveTime 运行线程等待任务时间
  • TimeUnit unit 时间单位
  • BlockingQueue workQueue 任务队列
  • ThreadFactory threadFactory 创建线程的工厂
  • RejectedExecutionHandler handler 拒绝策略

execute

1
2
3

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(4, 14, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(6), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

这里我们定义

  • corePoolSize = 4
  • maximumPoolSize = 14
  • workQueue 容量 = 6

至于其他的4个参数就随便定义的,图解这一块不需要用的其他4个参数.

当已经获取到线程池实例对象的情况下,我们只需要执行execute 方法,将任务交给线程池,等待其完成就好了.

下面是execute的源码.

如果英语能力在线的,可以直接阅读代码作者的注释.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/

//获取状态
int c = ctl.get();

// 如果工作线程数 < 核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 如果是运行状态 且 队列未满,能添加进队列
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取状态
int recheck = ctl.get();
//检查是否是运行状态, 如果不是,需要将已经添加进队列的 任务移除
if (! isRunning(recheck) && remove(command))
// 如果移除失败,交给拒绝粗略
reject(command);
else if (workerCountOf(recheck) == 0) // 如果工作线程数为 0 ,( 因为有些线程可能已经消亡)
// 补充工作线程
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

execute 的源码中可以看出,随着任务数量的增加线程池中创建线程有自己特定的一套机制. 下面我们就通过图的方式,来强化对这套机制的理解.

这里我先列下 机制规则:

  • 如果当前工作线程数 < 核心线程数 , 则创建核心工作线程
  • 如果当前工作线程数 > 核心线程数 , 则将任务添加进BlockingQueue
  • 如果BlockingQueue 添加满 , 则创建非核心线线程
  • 如果当前工作线程数 > 非核心线程数, 则将任务交给拒绝策略

首先我们来看看,线程池对象内部拥有哪些成员变量,这里我只列出了和图有关的变量.

1
2
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();

线程信息 , 封装在Worker中.

图解线程池创建线程机制

线程池为空

当线程刚刚被创建的时候,其内部线程数,以及worker工人数都是0.

来了两个待添加的Runnable.如下图所示:

空的

如果当前工作线程数 < 核心线程数 , 则创建核心工作线程

创建第一个核心线程到线程.

核心线程数

当提交的任务达到,核心线程数4时.核心线程数创建满.

核心线程数

如果当前工作线程数 > 核心线程数 , 则将任务添加进BlockingQueue

执行逻辑2的过程中,只是将任务添加打阻塞队列,并没有创建线程.

添加一个任务到阻塞队列.

进入队列

最终阻塞队列也被添加满.

进入队列

如果BlockingQueue 添加满 , 则创建非核心线线程

当阻塞队列被添加满.线程池开始创建非核心线程开始执行任务.

非核心线程数满

最终创建的线程总数(核心线程数 + 非核心线程数) 达到定义的最多线程数.

非核心线程数满

如果当前工作线程数 > 非核心线程数, 则将任务交给拒绝策略

进入拒绝策略

线程池提供了4个拒绝策略,可以按需选择,一般拒绝策略需要根据自己的业务需要自行实现.

1
2
3
4
5
6
7
DiscardOldestPolicy 抛弃最早提交的任务,执行当前提交的任务

AbortPolicy 直接抛出异常报错处理

CallerRunsPolicy 使用调用者线程执行任务

DiscardPolicy 不做任何处理的策略

线程池状态 与 工作线程数

线程池将其状态与工作线程数信息都封装在 一个原子类Integer 中. 其高3位表示其状态, 低29位表示工作线程数的数量.

1
2
3
4
5
6
7
8
9
10
11
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

线程池定义了5中线程池状态. 分别是

  • RUNNING : 允许执行新任务和任务队列中的任务
  • SHUTDOWN : 不允许执行新任务, 但是执行任务队列中的任务
  • STOP : 不允许执行新任务和任务队列中的任务
  • TIDYING : 所以任务都将终止,工作线程数将置为0
  • TERMINATED : 线程池终止完成

下面的Worker运作模式,将不会重点关注这些状态,重点关注Worker 运作模式.其创建 –>执行任务 –> 领取任务 –> 最终结束的过程.

Worker运作模式

线程信息,已经要执行的任务, 都封装在Worker对象中.

Worker

就如命名一样,Worker 工人, 一个工人对应着一个线程, 启用工人干活, 工人负责将分配给自己的工作做完, 当工人手头上的工作做完了, 就去领取新的任务, 知道没有任务可以领取. 没有工作了,工人可能就没有存在的必要了.

下面我们从源码的层面来看看线程池中Worker运作模式.

当执行 addWorker(Runnable firstTask, boolean core) 如果添加成功之后,最终会start其线程,最好等待线程获取CPU执行权,运行其run()中的runWorker(this)方法.

Worker运作模式的工作流程大概如下:

运作模式

下面是 addWorker的源码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69

private boolean addWorker(Runnable firstTask, boolean core) {
// 先确保工作数量 添加上去了
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 确认状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 确认 工作线程数 添加成功
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 在确保当前线程池状态, 已经 工作线程数的条件下 尝试添加工作线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

addWorker 的过程中, 先是通过 CAS的方式设置工作线程数. 如果修改成功才会真正进入创建Worker的流程.

下面是 WorkerrunWorker方法源码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//回调方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行runnable 的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//回调方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理worker退出
processWorkerExit(w, completedAbruptly);
}
}

getTask() 决定哪些线程需要去执行销毁流程, 在不允许核心线程数超时的情况下, 将有corePoolSize个线程将阻塞在循环中.直到阻塞队列添加任务进来才可能被唤醒.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果核心线程数允许超时 最终只有留下一个线程
// 如果核心线程数不允许超时 最终留下 corePoolSize 个 worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}


Worker创建的时候,可能会分配一个任务firstTask,如果 firstTask 执行完成, Worker就会去 getTask() 获取 BlockingQueue 中的任务,直到获取getTask()返回 null结束循环,最终去执行 processWorkerExit.

期间如果出现异常,导致 Worker 中的线程已经死亡.需要修改工作线程数 并且 添加一个没有firstTaskWorker来线程数量.

getTask()方法中可以看出, 不管是否是核心线程,没有任务的时候,都是一样的退出,就看保留多少个Worker而已.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

最后

在追踪完 ThreadPoolExecutor 发现其实现思路也是很简单,只是其中有很多需要考虑并发的细节, 而这里这些细节我们这里都没有怎么关注.

总结来说ThreadPoolExecutor 就是通过了一个机制来控制创建线程, 创建处理的线程,具体在Worker类中, 执行任务也是交个Worker,Worker在有任务的情况下,会一直循环执行任务,直到没有任务退出,销毁,期间Worker 存在复用,只是具体的任务会发生改变.

最后也说一个相关又不相干的: 鉴于当前互联网环境, ThreadPoolExecutor 假如是一个公司, 你就是那个Worker, 不管你是不是核心 ,如果没有任务可做的时候,大家都是危险的.