Java线程池解析

线程池:可以根据字面意思简单的理解,一个管理线程的池子。

  • 帮我们管理线程,避免增加创建线程和销毁线程的资源损耗。
  • 提高响应速度。
  • 重复利用。

构造函数

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime: 线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory: 用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题。
  • handler: 线城池的饱和策略事件,主要有四种类型。

任务执行流程

线程池执行流程,对应execute()方法。
Z7iOzt.md.jpg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

从execute代码注释中可以看出,任务执行流程主要是以下几步:

  • 提交任务,如果线程池里存活的核心线程数小于线程数corePoolSize时,线程池就会创建一个核心线程去处理提交的任务
  • 如果线程池核心线程数满了,那么新提交的任务就会被放进任务队列workQueue中排队等待执行。
  • 当线程池里面存活的线程数已经等于corePoolSize,并且任务队列workQueue也满了,判断线程数是否达到maximumPoolSize,如果没有达到,创建一个非核心线程执行提交的任务。
  • 如果当前线程数达到了maximumPoolSize,那么直接采取拒绝策略处理。

线程池的工作队列

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue
  • PriorityBlockingQueue
  • SynchronousQueue

ArrayBlockingQueue

ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按照FIFO排序。

LinkedBlockingQueue

LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,就是一个没有边界的阻塞队列,最大长度为Integer.MAX_VALUE。

DelayQueue

DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后顺序排序。

PriorityBlockingQueue

PriorityBlockingQueue(优先阻塞队列)具有优先级的无界阻塞队列。

SynchronousQueue

SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等另一个线程调用移除操作,否则插入操作一直处于阻塞状态。

几种常用的线程池

newSingleThreadPool

1
2
3
4
5
6
7
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

特点

这个线程池只有一个核心线程在工作,也就是相当于单线程串化执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。阻塞队列使用的是LinkedBlockingQueue
Z7i4sK.jpg

使用场景

适用于串行执行任务的场景,一个任务一个任务地执行。

newFixedThreadPool

1
2
3
4
5
6
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

特点

这个线程池的大小(核心线程)是固定的。每次提交一个任务就创建一个线程,直到线程数达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,会将后面的任务加入到阻塞队列。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。使用的阻塞队列是LinkedBlockingQueue
Z7ihM6.md.jpg

使用场景

适用于处理CPU密集型任务,确保CPU在长期被工作线程使用的情况下,尽可能少的分配线程,适用于执行长期的任务。

newCachedThreadPool

1
2
3
4
5
6
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

特点

创建一个可缓存的线程池(即核心线程数为0),如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60s不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新的线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统能够创建的最大线程大小。阻塞队列使用的是SynchronousQueue
Z7iWxx.md.jpg

使用场景

用于并发执行大量短期的小任务

newScheduleThreadPool

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

特点

创建一个大小为Integer.MAX_VALUE的线程池,此线程池支持定时以及周期性执行任务的需求。阻塞队列使用的是DelayedWorkQueue

  1. 添加一个任务
  2. 线程池中的线程从DelayQueue中取任务
  3. 线程从DelayQueue中获取time大于等于当前时间的task
  4. 执行完后修改这个task的time为下一次被执行的时间
  5. 这个task返回到DelayQueue队列中

使用场景

需要周期性执行任务的场景,需要限制线程数量的场景

https://juejin.im/post/5d1882b1f265da1ba84aa676?utm_source=gold_browser_extension#heading-37