线程池使用与实现原理

    线程池在日常的开发中占据着非常的重要作用,即便我们在日常的开发中没有使用过线程池,也肯定听过周边的开发人员提到过过线程池这个东西。那么,线程池到底是用来做什么?为了解决什么问题呢?

前言

    在之前的博客(Synchronized和lock详解)中提到过,线程是最小的运算逻辑单元,合理的使用线程可以充分的利用资源,提升运行的效率,但是使用大量的线程也有相应的缺点。

  • 创建线程和创建普通对象一样,会消耗内存,运行结束后也需要回收资源,因此,在创造大量线程的时候,可能会造成OOM或者应用崩溃。举个例子,如果我们网页请求的时候,对于每个请求创建一个线程,某一刻来了成千上万个请求,就有可能造成OOM。
  • 创建大量的线程不利于管理,每个线程拥有各自的名字,查找问题的时候,可能会花费很长时间找不到问题的具体所在。

    为了防止出现上述问题,国内著名公司阿里巴巴的编程规范就要求,对于使用到线程创建的都应该使用到线程池,而且应使用ThreadPoolExecutor来进行线程池的创建,至于为什么需要使用ThreadPoolExecutor来创建,这个我们之后在详细说明。阿里巴巴公司的编码规范要求必须使用线程池创建和管理线程,足以说明了线程池的重要性,所以使用和掌握线程池很重要。接下来,我会从总体到细致的方式,来共同学习线程池。

总体的架构

线程池类图

线程池类图

  • Executor是线程池的最顶层接口,类似于Spring中BeanFactory的作用,只定义了提交线程任务的方法。
  • ExecutorService是Executor的一个实现,它在Executor的接口上定义了管理线程池和更多样化提交线程任务的方法,包括关闭线程池(shutdown)、终止线程池(awaitTermination)、唤醒线程池中的线程等方法。
  • AbstractExecutorService是个抽象类,实现了ExecutorService中的submit、invoke*等方法,并将Runnable进行包装为FutrueTask可以返回结果,但并未实现execute方法,execute方法由具体的实现类实现。
  • ThreadPoolExecutor是具体的实现类,这个类比较重要,之后会详细的分析这个类,这里先有个印象。ThreadPoolExecutor实现了一个非常重要的方法–execute()方法,在其父类中的submit也是调用ThreadPoolExecutor中的execute方法实现提交任务功能。ThreadPoolExecutor也提供了阻塞队列、拒绝策略、获取任务数等方法。
  • BlockingQueue是个阻塞队列,它不算是线程池中的组件,但是对于线程池来说是一个非常重要的组件。BlockingQueue的实现有ArrayListBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。这个大家先有个印象,之后会专门写一篇关于阻塞队列的实现。
  • ScheduleExecutorService继承ExecutorService接口,其本身也是个接口,在ExecutorService的基础上添加了定时提交任务和按频率执行任务的方法声明,在ScheduleThreadPoolExecutor实现类中,对上述方法进行了实现。
  • ScheduleThreadPoolExecutor是定时执行线程池中任务的具体实现,这个在源码分析章节在做详细的分析。
  • Executors没有实现和继承任何接口,它是一个工具类,类似于CollectionUtils等类,提供了创建一些比较简单的线程池,但是强烈不建议使用Executors创建线程池,具体的原因我们在线程池使用之Executors创建中进行解释。

    上述便是线程池的整体架构,对于使用到组件做了简单的介绍,了解了线程池的整体架构,我们接下来学习怎么使用上述的组件,创建我们的线程池以及使用线程池来管理任务。

线程池使用

普通线程池创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//使用Sping Boot的自动映射属性将文件中的属性映射到实体属性中
@Component
@PropertySource(value = {"file:config/pool.properties", "classpath:config/pool.properties"}, ignoreResourceNotFound = true, encoding = "UTF-8")
@ConfigurationProperties(prefix = "thread.pool")
@Data
public class ThreadPoolConfig {

private static Logger logger = LoggerFactory.getLogger(ThreadPoolConfig.class);

private Integer core;
private Integer max;
private Integer queues;
private Long time;
private String unit;
private String policy;
private String factory;

public RejectedExecutionHandler getPolicy(){
try {
Class t = Class.forName(policy);
return (RejectedExecutionHandler) t.newInstance();
} catch (Exception e) {
logger.error("解析线程池线程数量过大策略类异常,默认返回抛出异常策略", e);
return new ThreadPoolExecutor.AbortPolicy();
}
}

public ThreadFactory getFactory(){
try {
Class t = Class.forName(policy);
return (ThreadFactory) t.newInstance();
} catch (Exception e) {
logger.error("解析线程池创建工厂类异常", e);
return null;
}
}
}

//创建单实例的线程池
public class ThreadPoolFactory {

private static Logger logger = LoggerFactory.getLogger(ThreadPoolFactory.class);
private static ThreadPoolConfig config;
private volatile static ThreadPoolExecutor exeuctor;

@Autowired
public ThreadPoolFactory(ThreadPoolConfig config) {
logger.debug("线程池初始化信息如下:core={},max={}, keepAliveTime = {}, queueNum={}",
config.getCore(), config.getMax(), config.getTime(), config.getQueues());
ThreadPoolFactory.config = config;
}

public static ThreadPoolExecutor getThreadPool(){
if(exeuctor == null){
synchronized (ThreadPoolFactory.class){
if(exeuctor == null){
exeuctor = new ThreadPoolExecutor(config.getCore(), config.getMax(), config.getTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(config.getQueues()));
}
}
exeuctor.setThreadFactory(config.getFactory());
exeuctor.setRejectedExecutionHandler(config.getPolicy());
}
return exeuctor;
}
}

    上述代码是我在实际开发中创建线程的一个例子,目的是实现一个单例模式的线程池,ThreadPoolConfig是一个属性类,通过上述代码可以将线程池的属性实现在文件中可配置。new ThreadPoolExecutor()是创建线程池的实现,其构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

参数含义如下:

  • corePoolSize:核心线程数量
  • maximumPoolSize: 允许创建最大的线程数
  • keepAliveTime:线程允许最长的空闲时间
  • unit:线程允许最长的空闲时间单位
  • workQueue:阻塞队列,当核心线程数量满了时,向队列中添加
  • threadFactory:线程工厂,定义线程创建的一些格式
  • handler:当线程池不能再添加线程池的拒绝策略。
    • AbortPolicy:当线程池不能再添加线程时,抛出异常
    • CallerRunsPolicy:当线程池不能再添加线程池,由添加线程的调用者执行该线程
    • DiscardOldestPolicy:抛弃在线程池中时间最久的线程
    • DiscardPolicy:丢弃线程,不做任何处理

上边讲了ThreadPoolExecutor的构造方法参数的含义,但到底是在什么地方使用到上述参数呢,这个就要看线程池处理的过程。

  1. 当一个线程提交到线程池时,线程池会先判断线程池的核心线程数是否已超过构造方法中所传入的核心线程数,如果超过了,则执行步骤2,若未超过,则创建线程来执行该任务。
  2. 判断阻塞队列是否已满,如果阻塞队列未满,则将线程任务添加到阻塞队列中,否则执行步骤3
  3. 判断线程池中的最大线程数是否超过了构造方法中所传入的最大线程数,若未超过,则创建线程执行该任务,否则执行步骤4
  4. 根据构造函数中的拒绝策略,执行相应的策略,默认是抛出异常。
  5. 当线程的空闲时间大于构造函数中的空闲时间时,就会回收线程池中的线程,对于未设置允许回收核心线程时,只能回收超过核心线程数的线程,对于设置了允许回收核心线程时,则允许回收全部的已超过空闲时间的线程。

    线程池的创建相对简单,只要掌握了上述参数的含义,相信都会创建一个线程池。但如何创建一个适合自己场景的线程池,需要多次实践和测试,下来只提供几个建议。

  • 对于计算密集型的任务,希望能够充分利用CPU的资源,一般设置线程池的线程数量为N+1,N为CPU的核数。
  • 对于IO密集型的任务,可以将线程数目设置更大,这样即使会有同个线程同时运行,但也可能因为是IO密集型而会将CPU资源让出来。
  • 尽量不要在线程池中将核心线程数固定死,应该通过某种配置机制或者系统中的CPU核数能动态设置线程数量。

定时线程池创建

Executors创建线程池

    Excutors是线程池的一个工具类,提供了创建一些简单线程池的方法,并且提供了将Runnable转换为带结果集输出的线程任务。Executors提供的创建的线程池如下:

创建单线程的线程池
1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

    创建单线程池方法的代码内部使用了new ThreadPoolExecutor()的方法,其中corePoolSize和maxPoolSize方法都是1,但是阻塞队列使用的是LinkedBlockingQueue阻塞队列,因此上述只会创建拥有一个线程执行任务的线程池,后续添加的线程都会添加到阻塞队列中。

创建固定线程池
1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

    newFixedThreadPool是创建固定数量线程池的方法,线程的数量由传入的参数决定。和newSingleThreadExecutor一样,核心线程数和最大线程数一样,所以不会创建超过入参大小的线程数量,若超过核心线程数,则会添加到阻塞队列中。这里和newSingleThreadExecutor一样,有个缺陷,之后分析完Executors创建只会讲明是什么缺陷。

创建缓存线程池
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

    newCachedThreadPool和newFixedThreadPool/newSingleThreadExecutor有一定的区别,newCachedThreadPool中核心线程数量初始为0,但是最大的线程数量为整数类型的最大值,并且使用SynchronousQueue作为阻塞队列,这种队列有个特点就是,他不会缓存线程任务,而是直接将队列中的内容给上层,只是充当一个中转站的作用。因此该线程池最大会创建整数类型最大值的线程数量。这种创建方式也有几个比较致命的缺点,后续详细说明。

Excutors创建的缺陷
  1. newSingleThreadExecutor和newFixedThreadPool都是使用LinkedBlockingQueue作为阻塞队列,这个阻塞队列最大允许创建整数类型最大值节点,若在一个拥有大量运行的代码中,可能LinkedBlockingQueue的容器就会达到太大,从而导致内存溢出甚至是应用崩溃。
  2. newCachedThreadPool初始时未创建传入核心线程数量,但最大的线程数量允许很大。我们知道线程也是一个对象,相对于普通对象来说,它甚至更占用内存一些,之前我遇到过一个问题是创建线程足够多时,堆内存没有报溢出异常,但是本地栈却抛出了异常,之后查代码发现创建线程是使用了本地方法中的对象从而导致内存溢出。因此创建数量太大的线程很容易导致内存溢出甚至应用崩溃。
  3. 使用线程池的一个优势是不需要线程对象频繁的创建和销毁,newCachedThreadPool却和这点有点相悖,它允许频繁创建线程以及当不空闲时间超过60s时就会销毁,如果某一刻大量的线程出现空闲,从而导致虚拟机会有很长的一段时间去回收对象,从而影响运行效率。

提交任务

execute
1
2
3
4
public static Object execute(Runnable runnable){
exeuctor.execute(runnable);
return null;
}

    execute(Runnable runnable)方法是最顶层的Executor声明的方法,具体的实现在ThreadPoolExecutor中,之后再源码分析中会具体分析这个方法,这里先做个了解,它可以用做提交方法,但不会返回结果。

submit
1
2
3
4
5
6
7
8
public static <V> V submit(Callable<V> runnable) throws ExecutionException, InterruptedException {
Future<V> future = exeuctor.submit(runnable);
while(true){
if(future.isDone()){
return future.get();
}
}
}

    submit(Callable runnable)/submit(Runnable runnable)也被用作提交线程任务,和execute方法的区别是submit可以返回结果Futrue对象,Future可以使用get()方法获取线程的执行结果,但是get()方法会阻塞获取,因此慎用。submit方法内部也是通过调用execute方法实现,具体的代码分析我们在源码分析章节再做具体的分析。

源码实现

    ThreadPoolExecutor是线程池的最终实现,之前我们在介绍线程池使用的时候说过ThreadPoolExecutor的入参,因此我们在开始的时候,先介绍下ThreadPoolExecutor的核心属性。

ThreadPoolExecutor核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//线程的状态和数量的表示
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示线程数量的二进制位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池当前状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;


//阻塞队列
private final BlockingQueue<Runnable> workQueue;
//可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//执行线程任务的工作线程
private final HashSet<Worker> workers = new HashSet<Worker>();
//条件变量
private final Condition termination = mainLock.newCondition();

private int largestPoolSize;
//完成的线程任务数量
private long completedTaskCount;
//线程创建工厂
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//线程的最大的空闲时间
private volatile long keepAliveTime;
//是否允许核心线程回收
private volatile boolean allowCoreThreadTimeOut;
//核心线程数量
private volatile int corePoolSize;
最大线程数量
private volatile int maximumPoolSize;
//默认的拒绝策略是抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");

private final AccessControlContext acc;

    ThreadPoolExecutor中最为重要的一个属性是ctl,ctl用于描述线程池线程任务的数量和线程状态,它是怎样通过一个属性来描述状态和数量的呢?其实这个和可读写锁很相似,使用属性的二进制位,高3位用来表示线程池的状态,低29位用来描述线程池中线程任务的数量。线程池一共拥有5种状态,分别如下:

属性名|值|描述|备注|
|:–|:–:|:–|:–|
RUNNING | -1 << COUNT_BITS | 允许提交并处理任务,线程池创建时的状态 |即1111……1111左移29位,最高的3位为111
SHUTDOWN | 0 << COUNT_BITS | 不允许提交新的任务,但是会处理完已提交的任务 |即0000……0000左移29位,最高的3位为000
STOP | 1 << COUNT_BITS | 不允许提交新的任务,也不会处理阻塞队列中未执行的任务,并设置正在执行的线程的中断标志位 |即0000……0001左移29位,最高的3位为001
TIDYING| 2 << COUNT_BITS | 所有任务执行完毕,线程池池中工作的线程数为0,等待执行terminated()勾子方法 |即0000……0010左移29位,最高的3位为010
TERMINATED| 3 << COUNT_BITS | terminated()勾子方法执行完毕 |即0000……0011左移29位,最高的3位为011。

    ThreadPoolExecutor较为重要的便是ctl属性,掌握了ctl属性则掌握ThreadoPoolExecutor的一部分了,当然不是说其它的属性不够重要,其他的属性很多都是创建ThreadPoolExecutor时传进来的,在线程池使用的章节已介绍过相关参数的含义。除了上述的核心属性,ThreadPoolExecutor也提供了很多基础的方法和内部类,在之后的源码实现中会经常看到,所以我们先看看这些方法。

ThreadPoolExecutor基础方法和内部类

基础方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//获取当前线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取当前线程池中任务的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//初始化ctl属性
private static int ctlOf(int rs, int wc) { return rs | wc; }

//判断当前的状态c是否小于预期的预期状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//判断当前的状态c是否至少满足预期的状态s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

//判断当前线程池是否还处于运行中,即判断入参是否小于SHUTDOWN
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//CAS增加线程池的任务数量
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

//CAS减少线程池的任务数量
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

//循环执行减少线程池的任务数量,防止减少线程池任务数量失败
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

    ThreadPoolExecutor的基础方法基本都是操作线程池状态和线程池数量的,在后续的源码分析环节会详细介绍其起了什么作用。

内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{

private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

//
public void run() {
runWorker(this);
}

//判断当前工作线程状态,是否处于运行中
protected boolean isHeldExclusively() {
return getState() != 0;
}

//尝试获取工作线程
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

//释放工作线程资源
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

    工作线程Worker则是我们在实例化线程池所指的创建核心线程数量或最大线程数量的那个工作线程的数量。其一,Worker继承了AbstractQueuedSynchronizer(同步队列器),继承同步器的作用是为了实现当前工作线程是否正在被占用,防止其他线任务加入到工作线程中,如果不了解其中同步器的实现,可以看下之前我写的《Synchronized和lock详解》,里边详细分析了队列同步器的作用和实现。其二是Worker实现了Runnable方法,我们在使用线程池提交任务时候,并不是调用了我们提交线程的start方法,而是调用了Worker工作现成的start方法,原因是如果调用线程的start方法,相当于创建了线程,没有达到控制线程数量的状况。
    介绍了ThreadPool的基础方法和核心内部类,接下来进入我们的主题,提交任务方法和关闭线程池方法。

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取当前clt的值
int c = ctl.get();
//若ctl的工作线程小于核心线程数,则添加工作线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//判断当前线程池是否正处于运行中,若处于运行中,则像阻塞队列中添加当前任务的信息
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判断当前的线程池状态,若不是运行中则移除该任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加工作线是否满足最大线程数,若添加失败则执行拒绝任务的方法
else if (!addWorker(command, false))
reject(command);
}
  • 判断核心工作线程数量,若未超过则添加工作线程,若失败则重新获取线程池的ctl属性,重新获取ctl属性确保当前线程池的状态,避免没必要的操作。
  • 判断当前线程池的状态是否还处于运行中,若处于运行中则向队列中添加任务,添加成功后再次获取线程池的ctl属性,继续检查当前线程池的状态,提高响应速度。
  • 若添加阻塞队列失败或是线程池不是运行中的状态,则添加工作线程,此时的false表示是是否是添加核心线程,true表示添加的的核心的工作线程,false表示此时添加的是最大工作线程。

    从上述代码可以看出,execute方法和我们在介绍线程池使用之创建线程池提交任务的处理流程很相似。不知道大家有没有注意到,其中多次获取线程池的ctl属性来判断其线程池状态,其目的是避免线程池已被某个请求执行了shutdown方法,若此时还是添加工作线程或向队列添加任务,势必违反了shutdown的原理(不允许提交任务,但会处理完已提交的任务)。execute方法中调用最多的是addWorker(Runnable, boolean)方法,我们继续看看addWorker方法。

addWorker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private boolean addWorker(Runnable firstTask, boolean core) {
//定位一个标志位,类似于C中的goto
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 若满足以下条件,则创建工作线程,否则返回
* 线程池处于运行状态,则创建工作线程
* 线程池处于关闭、执行的任务为空并且工作队列不为空,则创建工作线程
* /
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//获取工作线程数量,判断是否创建工作线程,若工作线程数量已超过或者确认要创建线程则跳出循环。
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//判断当前工作现成的状态是否和刚开始获取的状态一致,若不一致,则跳转到开始节点判断线程池状态是否满足
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

//
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取当前线程池状态,此时获取的线程池状态是最准确的线程池状态,因为加了锁控制
int rs = runStateOf(ctl.get());
//若线程池处于运行中或处于关闭状态并且添加的任务为空,则创建讲工作线程添加到工作队列集合中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//启动工作线程任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//若启动工作线程失败,执行工作队列添加失败方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
  1. 首先定义一个标志位,为了之后出现获取的状态和初始获取的状态不一致时能够快速跳转到重新获取状态判断的情况
  2. 判断工作线程的数量是否超过最大线程数或线程池内部允许的工作线程数,若未超过,则增加工作线程数量,如果此时执行CAS操作增加数量成功,则跳出retry循环,执行步骤4,否则执行步骤3
  3. 重新获取当前线程池的ctl属性,判断当前状态是否和初始获取的状态一致,若不一致,则跳转到retry标志位,重新判断线程池当前状态是否满足创建工作线程的条件;若状态一致,继续执行步骤2
  4. 创建工作线程,判断工作线程的中的线程是否为空,若不为空,则进行加锁,这里加锁的目的是,为了防止在添加任务时,线程池执行关闭、终止方法,导致最后的执行结果不准确。
  5. 获取当前线程池的状态,这个状态是一个准确的状态,因为有加锁逻辑,然后判断状态是否满足,若满足则向工作队列集合中添加该工作线程,并更新最大工作线程数量。
  6. 启动工作线程,执行任务
  7. 若启动工作线程的标志位失败,则执行添加工作线程失败的方法。

    上述方法有几处比较难理解。其一是刚开始判断是否创建工作线程的逻辑,我们可以将这段代码逻辑反着过来,即什么条件可以创建工作线程,反过来的代码逻辑如下:

1
2
3
4
5
if (rs < SHUTDOWN ||
(rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

这样子是不是一目了然,处于运行中或者运行状态是关闭并且当前任务为空但是工作队列不为空的允许创建工作线程,和我们之前讲的逻辑一致。
    其二比较难理解的是在创建工作线程方法内判断工作线程中t是否还存活,这点不能理解,希望之后有大佬看到能给解答,在这里先谢谢了。
    创建工作线程之后会调用t.start()方法启动工作线程,这时候才是额外创建一个线程来执行任务,我们继续看下t.start()方法内部都做了什么?

runWorker(Worker w)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void run() {
runWorker(this);
}

//
final void runWorker(Worker w) {
//获取当前的线程信息
Thread wt = Thread.currentThread();
//获取创建工作线程中的任务
Runnable task = w.firstTask;
//然后将其置为空,表示该工作线程可以接受其他任务
w.firstTask = null;
//再次释放锁,这个是个确保行为
w.unlock();
boolean completedAbruptly = true;
try {
//如果工作线程中的任务不为空或者获取工作队列中的任务不为空,则处理任务
while (task != null || (task = getTask()) != null) {
w.lock();
//对于执行了shutdownNow()方法的尝试中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//为做任何操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务,调用的是run方法,而不是start方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
  1. 获取当前的线程,并获工作线程的任务,将工作线程的任务置为null,目的是为后续的执行队列中的任务做铺垫
  2. 判断工作线程的任务不为空或者队列中的任务不为空,则执行步骤3,否则执行步骤
  3. 判断当前线程池的状态,如果当前线程池的状态为正在关闭中(不允许提交任务,并且不执行已提交的任务)或当前线程被尝试中断并且线程池的状态变为关闭中,则尝试中断该线程,这段代码就是实现shutdownNow()的逻辑。
  4. 执行任务,调用的是run方法,而不是start方法,如果此处调用的是start方法,则又会创建一个线程,这是和逻辑相悖的。
  5. 如果获取任务为空,则执行关闭工作线程方法。

    上述代码中比较难理解是对于线程池正在处于关闭中状态的判断的逻辑难理解。这段代码需要搞清楚Thread.interrupted和object.isInterrupted()方法的区别。

  • Thread.interrupted()是个静态方法,目的是调用当前线程的isInterrupted()方法并且重置中断标志位
  • object.isInterrupted():判断线程是否被中断,不会重置标志位。

理解了上述两个方法的区别,则这段判断逻辑非常好理解。

  • (runStateAtLeast(ctl.get(), STOP):判断当前线程池的状态是否是关闭中之上的状态,若是则直接执行wt.interrupt()尝试中断线程。
  • Thread.interrupted():判断当前的线程是否被中断,当前线程不一定指的是代码刚开始获取到的wt,有可能是其他正在运行的线程。如果当前线程的中断标志位为true,则继续判断当前线程池的状态,然后判断wt的中断标志位是否被中断,若未false,则执行wt.interrupt()尝试中断线程。

    明白了这段代码逻辑,则对于shutdownNow方法,则已经知道了大部分的内容了,等之后我们在详细分析shutdownNow()方法。上述方法除了这段逻辑,还尝试从工作队列中获取任务getTask(),具体的代码如下:

getTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private Runnable getTask() {
//是否已超时标志
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//获取当前线程池的状态,若线程池满足以下状态则直接减少工作线程数量并返回空
//如果该线程池处于shutdown状态并且工作队列是空
//如果工作队列处于STOP之上的状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取工作线程的数量
int wc = workerCountOf(c);

//判断是否需要回收工作线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//判断是否需要回收工作线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//从队列中获取,如果需要回收工作线程,则尝试从工作队列中等待keepAliveTime时间来获取任务,如果还未获取到则认为需释放工作线程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

    上述代码相对之前的几个方法来说,应该是最简单一个方法,具体的代码逻辑如下。

  • 在一个循环中,获取当前线程的状态,如果该线程池处于shutdown状态并且工作队列是空或者如果线程池处于STOP之上的状态,则执行步骤4
  • 获取工作线程的数量, 判断是否已大于最大线程数并且回收线程标志timed和超时标志timeout都为true,队列不为空并且工作线程数量大于1时,则执行步骤4。回收线程标志timed的判断是是否核心工作线程,若允许则直接返回ture,否则判断是否超过最大工作线程数,若超过,则返回true。
  • 如果工作线程回收标志timed为true,则尝试从队列中等待keepAliveTime时间来获取任务,若获取到则返回任务,没有则更新timeout为true并继续执行步骤1。若工作线程回收标志timed为false,则一直等待直到获取到任务返回。
小结

    上述便是线程池提交任务execute(Runnable r)的方法,涵盖了创建工作线程,工作线程执行任务、从工作队列中获取任务、遇到关闭中的线程池时工作线程的操作以及工作线程满足回收条件时等场景,虽然代码比较长,但涵盖的也比较多,之后的源码分析会相对简单很多。

submit

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

    之前介绍了execute(Runnable r)方法,execute方法没有返回结果,但submit方法是有返回结果的,其内部实现主要是调用newTaskFor(task)方法创建了一个RunnableFuture实例,我们看看newTaskFor(task, result)方法。

1
2
3
4
5
6
7
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

    newTaskFor方法内部实际上创建了一个FutureTask实例,FutureTask分析起来不是一两句话分析清楚,若果要掌握FutureTask,至少还需要这样一篇文章,所以这里我们先了解下FutureTask是实现了Runnable接口和Future接口的一个实例,可以返回一个线程的执行结果,之后我们再写一篇关于FutureTask的文章。
    抛开FutureTask对象来说,submit方法还是非常简单的,创建FutureTask对象,调用ThreadPoolExecutor中的execute方法,然后返回Futrue对象,之后开发人员可以通过Futrue对象执行响应的逻辑。

小结

    execute和submit都可以提交方法,但是submit相比execute来说更加强大,支持返回结果,因此如果业务场景需要线程的执行结果,则应该使用submit方法,一般开发也建议使用submit方法。

线程池优势

    在开篇说道,创建大量的线程会有很大的缺点,而上述的缺点也就是使用线程池的优势。

  1. 降低资源消耗。通过重复利用已创建的线从而程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,如果工作线程有空闲则不需要等待创建线程而直接提交。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,则会造成内存溢出甚至应用崩溃。并且使用线程池可以快速的关闭或立即关闭全部线程的运行,管理比较简单。

总结

      这次我们分析了线程池使用和实现原理,知道了使用线程池的好处,使用Executors创建线程池的缺点,以及线程池是如何创建、提交任务的内部实现原理。但是如何使用好线程池,使之能够更好、更快的处理业务,这个需要多次的实验测试,并且线程池也有多线程的风险,并发执行、死锁等,所以在使用线程池也要多加注意,争取做到更好。
      之后每写完一篇文章都会分享一个自己觉得不错句子,可能是歌词,可能是评论,可能是电影的对白,带给你的可能是感动,伤心、开心等等。不管你怎么想,我就是想分享,所以今天的分享话语是……

      “整日想着一了百了的事,一定是因为获得太过认真吧。”

--《曾经我也想过一了百了》评论