创建线程池的方式
- Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。
- Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。
- Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序。
- Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池。
- Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池。
- Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。
- ThreadPoolExecutor:手动创建线程池的方式,它创建时最多可以设置 7 个参数。
线程池的创建推荐使用最后一种 ThreadPoolExecutor 的方式来创建,因为使用它可以明确线程池的运行规则,规避资源耗尽的风险。
线程池七个参数含义
|
- corePoolSize:核心线程数。
- maximumPoolSize:最大线程数。
- keepAliveTime:空闲线程存活时间。
- TimeUnit:时间单位。
- BlockingQueue:线程池任务队列。
- ThreadFactory:创建线程的工厂。
- RejectedExecutionHandler:拒绝策略。
详细如下
corePoolSize
核心线程数:是指线程池中长期存活的线程数。
maximumPoolSize
最大线程数:线程池允许创建的最大线程数量,当线程池的任务队列满了之后,可以创建的最大线程数。
最大线程数 maximumPoolSize 的值不能小于核心线程数 corePoolSize,否则在程序运行时会报 IllegalArgumentException 非法参数异常。
keepAliveTime
空闲线程存活时间,当线程池中没有任务时,会销毁一些线程,销毁的线程数=maximumPoolSize(最大线程数)-corePoolSize(核心线程数)。
TimeUnit
时间单位:空闲线程存活时间的描述单位,此参数是配合参数 3 使用的。
- TimeUnit.DAYS:天
- TimeUnit.HOURS:小时
- TimeUnit.MINUTES:分
- TimeUnit.SECONDS:秒
- TimeUnit.MILLISECONDS:毫秒
- TimeUnit.MICROSECONDS:微妙
- TimeUnit.NANOSECONDS:纳秒
BlockingQueue
阻塞队列:线程池存放任务的队列,用来存储线程池的所有待执行任务
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
ThreadFactory
线程工厂:线程池创建线程时调用的工厂方法,通过此方法可以设置线程的优先级、线程命名规则以及线程类型(用户线程还是守护线程)等。public static void main(String[] args) {
// 创建线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// 创建线程池中的线程
Thread thread = new Thread(r);
// 设置线程名称
thread.setName("Thread-" + r.hashCode());
// 设置线程优先级(最大值:10)
thread.setPriority(Thread.MAX_PRIORITY);
//......
return thread;
}
};
// 创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory); // 使用自定义的线程工厂
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
Thread thread = Thread.currentThread();
System.out.println(String.format("线程:%s,线程优先级:%d",
thread.getName(), thread.getPriority()));
}
});
}
RejectedExecutionHandler
拒绝策略:当线程池的任务超出线程池队列可以存储的最大值之后,执行的策略。
- AbortPolicy:拒绝并抛出异常。
- CallerRunsPolicy:使用当前调用的线程来执行此任务。
- DiscardOldestPolicy:抛弃队列头部(最旧)的一个任务,并执行当前任务。
- DiscardPolicy:忽略并抛弃当前任务。
线程池的生命周期
线程池的五种状态
状态 | 说明 |
---|---|
RUNNING | 处于RUNNING状态下的线程池能够接收新的任务以及处理阻塞队列中的任务。 |
SHUTDOWN | 处于SHUTDOWN状态下的线程池,不再接收新到来的任务,但是依然能够处理阻塞队列中的任务。 |
STOP | 处于STOP状态下的线程池,不再接收新到来的任务,但是依然能够处理阻塞队列中的任务。 |
TIDYING | 所有任务已经终止,有效线程数为0, 线程转换到TIDYING状态对运行terminated钩子方法 |
TERMINATED | terminated方法执行完成后进入terminated状态 |
ThreadPoolExecutor中表示线程池状态设计
在ThreadPoolExecutor中使用一个AtomicInteger类型的ctl字段来描述线程池地运行状态和线程数量,通过ctl的高3位来表示线程池的5种状态,低29位表示线程池中现有的线程数量。使用最少的变量来减少锁竞争,提高并发效率。private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池线程数地bit数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池中最大线程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池地运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取有效工作线程地数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 组装线程数量和线程池状态
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的执行流程
- 如果工作线程 小于 核心线程,那么就会创建线程执行提交的任务。
- 如果工作线程 大于 核心线程,并且阻塞队列未满,那么就会添加至阻塞队列,等待后续线程来执行提交地任务。
- 如果工作线程 大于 核心线程,并且工作线程 小于 最大线程数,并且阻塞队列已满,那么就会创建非核心线程执行提交的任务。
- 如果工作线程 大于等于 最大线程数,并且阻塞队列已满,那么就会执行拒绝策略。
提交任务
|
addWorker 创建线程加入线程池
|
将一个任务Runnable提交给线程池执行时,主要分三步:
- 如果线程池中的线程数(workCount)< 线程池大小(corePoolSize),则创建一个线程,执行这个任务,并把这个线程放入线程池。添加任务时会对线程池状态进行检查,以防止线程池状态为关闭时还添加线程。
- 如果线程池中的线程数(workCount)>= 线程池大小(corePoolSize),或者上一步添加任务最后失败,将任务放入缓存队列中。当任务成功加入缓存队列,仍需要对线程池状态进行二次检查,防止线程池状态改为关闭或线程池中已经没有可以运行的线程。
- 如果上一步将任务放入缓存队列失败,试着去增加一个新的线程来执行它(超过线程池大小的额外线程)。如果添加新线程失败(可能是线程数已到达maximumPoolSize),则抛出异常拒绝执行该任务。
runWorker执行任务
|
线程池有个内部类 Worker,它实现了 Runnable接口,它自己要 run 起来。 然后它会在合适的时候获取我们提交的 Runnable 任务,然后调用任务的 run()接口。 一个 Worker 不终止的话可以不断执行任务。
Worker 继承了AQS,用的是非公平锁,独占锁(不可重入)。
- Worker执行任务时获得锁,执行完毕释放锁。
- Worker具有不可重入特性,目的是为了防止worker刚好在运行途中,线程池控制类操作(比如setCorePoolSize)时获得锁,这样的话,因为重入性,setCorePoolSize会执行中断操作,会把正在运行的任务中断掉。在空闲时可以响应中断,在执行任务时不可被中断
比如进行shutdown()优雅停机的时候,要进行w.tryLock方法,没有获取到锁,说明正在运行或者干其他事情,是不会被其他事情打断掉的。
实现了runnable来达到具备线程功能。
“线程池中的线程”,其实就是 Worker;等待队列中的元素,是我们提交的 Runnable 任务。每一个 Worker 在创建出来的时候,会调用它本身的 run()方法,实现是 runWorker(this),这个实现的核心是一个 while 循环,这个循环不结束, Worker 线程就不会终止,就是这个基本逻辑。
- 在这个 while 条件中,有个 getTask()方法是核心中的核心,它所做的事情就是从等待队列中取出任务来执行
getTask()
|
- 如果没有达到 corePoolSize,则创建的 Worker 在执行完它承接的任务后,会用workQueue.take()取任务,这个接口是阻塞接口,如果取不到任务, Worker线程一直阻塞。
- 如果超过了 corePoolSize,或者allowCoreThreadTimeOut,一个Worker 在空闲了之后,会用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务。注意,这个接口只阻塞等待 keepAliveTime 时间,超过这个时间返回 null,则Worker 的 while 循环执行结束,则被终止了。
核心线程(Worker)即使一直空闲也不终止,是通过workQueue.take()实现的,它会一直阻塞到从等待队列中取到新的任务。 非核心线程空闲指定时间后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个空闲的 Worker 只等待 keepAliveTime,如果还没有取到任务则循环终止,线程也就运行结束了。
另外,这里针对allowCoreThreadTimeOut
是否允许核心线程超时(被清理)有大量的使用。也可以通过设置这个字段来达到不被清理的效果。
processWorkerExit()
runWorker() 的收尾方法。// completedAbruptly
// true表示当前worker是因为任务出异常退出的
// false表示当前worker是因为没有获取到任务
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 使用CAS + 自旋将ctl的值-1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将当前worker完成的任务总数累加到全局总数上
completedTaskCount += w.completedTasks;
// 将当前worker从线程池(workers就是一个HashSet)移除
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 条件成立:当前线程池状态为RUNNING 或者 SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// min表示线程池最低可以持有的线程数量
// allowCoreThreadTimeOut == true => 说明核心线程数内的线程,也会超时被回收 => min == 0
// allowCoreThreadTimeOut == false => min == corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 线程池状态:RUNNING SHUTDOWN
// 条件1:假设mid == 0成立
// 条件2:队列不为空
// 说明队列中还有任务,起码得留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 条件成立: 线程池中还拥有足够的线程时,后续就不需要调用addWorker()了
// 考虑一个问题: workerCountOf(c) >= min -> (0 >= 0) ?
// 当线程池中的核心线程数是可以被回收的情况下,会出现这种情况,这种情况下,
// 当前线程池中的线程数会变为0,下次在提交任务时,会再创建线程。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 来到这里的情况
// 1.completedAbruptly为true,表示当前线程在执行task时发生异常退出了,这里一定要创建一个新worker顶上去
// 2.当队列中还有任务时,起码留一个线程,这里就会创建一个线程(worker)
// 3.当前线程数 < corePoolSize,此时会创建线程,维护线程池数量在corePoolSize个
addWorker(null, false);
}
}
最后可以看到无论是否为核心线程,都会执行remove
的操作,来销毁该线程。
allowCoreThreadTimeOut(boolean value)
核心线程数默认是不会被回收的,如果需要回收核心线程数,需要调用下面的方法。//允许核心线程池超时之后回收。
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
//能回收
if (value)
interruptIdleWorkers();
}
}
可以看到allowCoreThreadTimeOut
字段在之前也出现过。
线程池的阻塞队列
1.有界队列
|
2.无界队列
|
3.同步移交队列
|
无缓冲的等待队列,队列不存元素,每个put操作必须等待take操作,否则无法添加元素,支持公平非公平锁,无界。workQueue 不要使用无界队列,尽量使用有界队列。 避免大量任务等待,造成 OOM。
线程池四类拒绝策略
当线程池中的线程和阻塞队列中的任务已经处于饱和状态,线程池则需要执行给定的拒绝策略来拒绝正在提交的任务,ThreadPoolExecutor主要提供了一下四种拒绝策略来拒绝任务。
AbortPolicy
抛出RejectedExecutionException异常拒绝任务提交public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy
什么也不做,直接丢弃任务public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
将阻塞队列中的任务poll出来,然后执行当前任务public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
CallerRunsPolicy
让提交任务的线程来执行任务public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
Excutors的四类线程
1.newCachedThreadPool
创建一个可缓存的无界线程池,如果线程池长度超过处理需要,可灵活回收空线程,若无可回收,则新建线程,核心线程数为0。当线程池中的线程空闲时间超过60s,则会自动回收该线程,当任务超过线程池的线程数则创建新的线程,线程池的大小上限为Integer.MAX_VALUE,可看作无限大。
缺点:使用多少个最大线程,用户是无法控制的。
2.newFixedThreadPool
创建一个指定大小的线程池,可控制线程的最大并发数,超出的线程会在LinkedBlockingQueue阻塞队列中等待。
3.newScheduledThreadPool
创建一个定长的线程池,可以指定线程池核心线程数,支持定时及周期性任务的执行。
方法:
- schedule(Runnable command, long delay, TimeUnit unit),延迟一定时间后执行Runnable任务;
- schedule(Callable callable, long delay, TimeUnit unit),延迟一定时间后执行Callable任务;
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit),延迟一定时间后,以间隔period时间的频率周期性地执行任务;
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit),与scheduleAtFixedRate()方法很类似,但是不同的是scheduleWithFixedDelay()方法的周期时间间隔是以上一个任务执行结束到下一个任务开始执行的间隔,而scheduleAtFixedRate()方法的周期时间间隔是以上一个任务开始执行到下一个任务开始执行的间隔。
4.newSingleThreadExecutor
创建一个单线程化的线程池,它只有一个线程,用仅有的一个线程来执行任务,保证所有的任务按照指定顺序(FIFO,LIFO,优先级)执行,所有的任务都保存在队列LinkedBlockingQueue中,等待唯一的单线程来执行任务。该方法无参数,所有任务都保存队列LinkedBlockingQueue中,核心线程数为1,线程空闲时间为0。
对比
工厂方法 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newFixedThreadPool | nThreads | nThreads | 0 | LinkedBlockingQueue |
newScheduledThreadPool | 1 | 1 | 0 | LinkedBlockingQueue |
newSingleThreadExecutor | corePoolSize | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
其他参数都相同,其中线程工厂的默认类为DefaultThreadFactory,线程饱和的默认策略为ThreadPoolExecutor.AbortPolicy。
合理设置线程池参数
CPU 密集型任务(N+1)
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N)
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。