所谓线程池通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的 某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放 资源。为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列。
¶前言
线程的使用:
1 | Thread thread = new Thread(()->{ |
缺点:
- 使用一次创建一次
- 使用之后销毁线程
- 同时创建大量线程可能导致系统资源耗尽
¶线程池
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。
在Java中虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。所以提高服务程序效率的一个
手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁。
如何利用已有对象来服务就是一个解决的关键问题,这也就是"池化资源"技术产生的原因。
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用。
可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃。
一个线程池包括以下四个基本组成部分:
- 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池、销毁线程池,添加新任务;
- 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
- 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
¶线程池的五种状态
线程池状态示意图以及五种状态的说明摘自CSDN一只逗比的程序猿
一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
线程池状态切换示意图
- RUNNING
状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0
- SHUTDOWN
状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务
状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
注:虽然状态已经不是RUNNING了,但是如果任务队列中还有任务的时候,线程池仍然会继续执行,具体分析请见ThreadPoolExecutor.execute()方法解析
- STOP
状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP
- TIDYING
状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现
状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
- TERMINATED
状态说明:线程池彻底终止,就变成TERMINATED状态
状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED
线程池五种状态的二进制表示
线程池状态 | 二进制 |
---|---|
RUNNING | 111 |
SHUTDOWN | 000 |
STOP | 001 |
TIDYING | 010 |
TERMINATED | 011 |
1 | COUNT_BITS :29 |
¶工作线程(PoolWorker)
工作线程是由ThreadPoolExecutor的内部类Worker类实现:
1 | /** |
Worker是实现了Runnable接口,每个Worker中有一个线程属性thread
,添加Worker的时候会启动该线程,该线程会循环执行任务,直到线程池停止。
¶任务接口(Task)
- Runnable
1 | public interface Runnable { |
任务需要实现Runnable接口,实现抽象方法run
,在其中编写具体的任务实现。
- Callable
1 | public interface Callable<V> { |
任务实现Callable接口,实现抽象方法call
,这个方法有一个返回值,用于获取任务执行的结果。
提交到线程池后,会返回另一个实现了Runnable接口的RunnableFuture
,其定义如下:
1 | public interface RunnableFuture<V> extends Runnable, Future<V> { |
RunnableFuture
接口继承了Runnable
接口和Future
接口,可以调用提交到线程池后返回的RunnableFuture
的get
方法,该方法是阻塞的,直到该任务执行完成。
1 | public V get() throws InterruptedException, ExecutionException { |
¶任务队列
任务提交到线程池后,如果没有空闲的线程来执行该任务,则会将其放到任务缓冲队列里面,该队列是一个阻塞队列:
1 | private final BlockingQueue<Runnable> workQueue; |
BlockingQueue
4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 | remove() |
poll() |
take() |
poll(time, unit) |
检查 | element() |
peek() |
不可用 | 不可用 |
四组不同的行为方式解释:
- 异常
如果试图的操作无法立即执行,抛一个异常。
- 特定值
如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时
如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。
具有以下特点:
- 先进先出(FIFO)
- 不接受 null 元素
- 可以是限定容量的
- 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口
- 实现是线程安全的
BlockingQueue
有多种实现,分别满足不同功能的线程池,这里只介绍线程池中常用的队列(其它实现有兴趣的可以深入具体学习):
- ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
- DelayQueue
Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。
- LinkedBlockingQueue
内部以一个链式结构(链接节点)对其元素进行存储,满足FIFO(先进先出)原则。
- SynchronousQueue
SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
¶线程池管理器
¶类图
- Executor:负责线程的使用与调度的根接口
- ExecutorService:Executor的子接口,线程池的主要接口
- AbstractExecutorService:实现了ExecutorService接口,基本实现了ExecutorService其中声明的所有方法,另有添加其他方法
- ThreadPoolExecutor:继承了AbstractExecutorService,线程池常用实现类
- ScheduledExecutorService:继承了ExecutorService,负责线程调度的接口
- ScheduledThreadPoolExecutor:继承了ThreadPoolExecutor同时实现了ScheduledExecutorService
¶Executors
Executors利用工厂模式向我们提供了4种线程池实现方式:
- newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
拥有一个存活时间无限长的线程,排队的任务将放入无界队列,适用于一个任务一个任务执行的场景。
- newFixedThreadPool
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。
线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
每个线程存活的时间无限,适用于执行长期的任务,例如服务器。
- newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
可以无限增加线程数,适用于执行很多短期异步的小程序或者负载较轻的服务器。
- newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行.
!> 阿里巴巴编码规约有一条:
!> 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
大家对线程池的了解吗,newSingleThreadExecutor
、newFixedThreadPool
、newCachedThreadPool
、newScheduledThreadPool
- 这些线程池是如何实现的呢?
- 创建的时候都有哪些参数呢?
¶ThreadPoolExecutor
¶构造函数
先来看一下ThreadPoolExecutor的构造函数吧。
在这里着重介绍几个参数
- 阻塞队列
没有空闲的Worker时,将到达的任务加入队列中。
1.直接传递。SynchronousQueue队列的默认方式,一个存储元素的阻塞队列而是直接投递到线程中。
每一个入队操作必须等到另一个线程调用移除操作,否则入队将一直阻塞。
当处理一些可能有内部依赖的任务时,这种策略避免了加锁操作。
直接传递一般不能限制maximumPoolSizes以避免拒绝 接收新的任务。
如果新增任务的速度大于任务处理的速度就会造成增加无限多的线程的可能性。
2.无界队列。如LinkedBlockingQueue,当核心线程正在工作时,使用不用预先定义大小的无界队列将使新到来的任务处理等到中,
所以如果线程数是小于corePoolSize时,将不会创建有入队操作。这种策略将很适合那些相互独立的任务,
如Web服务器。如果新增任务的速度大于任务处理的速度就会造成无界队列一直增长的可能性。
3.有界队列。如ArrayBlockingQueue,当定义了maximumPoolSizes时使用有界队列可以预防资源的耗尽,
但是增加了调整和控制队列的难度,队列的大小和线程池的大小是相互影响的,
使用很大的队列和较小的线程池会减少CPU消耗、操作系统资源以及线程上下文开销,但却人为的降低了吞吐量。
如果任务是频繁阻塞型的(I/O),系统是可以把时间片分给多个线程的。而采用较小的队列和较大的线程池,
虽会造成CPU繁忙,但却会遇到调度开销,这也会降低吞吐量。
- 饱和策略(拒绝接收任务)
当Executor调用shutdown方法后或者达到工作队列的最容量时,线程池则已经饱和了,此时则不会接收新的task。但无论是何种情 况,execute方法会调用RejectedExecutionHandler#rejectedExecution方法来执行饱和策略,在线程池内部预定义了几种处理策略:
1.终止执行(AbortPolicy)。默认策略, Executor会抛出一个RejectedExecutionException运行异常到调用者线程来完成终止。
2.调用者线程来运行任务(CallerRunsPolicy)。这种策略会由调用execute方法的线程自身来执行任务,
它提供了一个简单的反馈机制并能降低新任务的提交频率。
3.丢弃策略(DiscardPolicy)。不处理,直接丢弃提交的任务。
4.丢弃队列里最近的一个任务(DiscardOldestPolicy)。如果Executor还未shutdown的话,
则丢弃工作队列的最近的一个任务,然后执行当前任务。
¶主要属性
1 | /** |
¶任务的执行
线程池执行任务的主要方法有:execute
、submit
,其中submit方法会将传入的Runnable或者Callable封装成RunnableFuture然后调用execute方法,
那么任务具体是如何执行的呢?我的理解大致如下:
其代码如下:
1 | public void execute(Runnable command) { |
execute方法简单总结如下:
- 如果当前线程池里面运行的线程数量小于corePoolSize,则创建新的线程(需要获取全局锁)。
- 如果当前线程池里面运行的线程数量大于或等于corePoolSize,则将任务加入workQueue中,缓存起来。
- 如果workQueue已满,但是线程数量小于maximumPoolSize,则继续添加线程(需要再次获取全局锁)。
- 如果线程数已达到最大线程数了,任务队列也满了,任务将被拒绝,并调用RejectedExecutionHandler的rejectExecution方法。
¶添加工作线程addWorker
1 | // 两个参数,firstTask表示需要跑的任务。boolean类型的core参数为true的话表示使用corePoolSize,为false使用maximumPoolSize |
工作线程添加的方法基本了解了,那么这个Worker是如何运行的呢,又是如何重用,如何执行多个任务的呢?
¶工作线程的运行
Worker在addWorker方法中,当Worker成功添加到workers后,调用Worker.thread启动Worker,在前面Worker的介绍中了解到run方法直接调用了
ThreadPoolExecutor.runWorker方法具体执行任务,ThreadPoolExecutor.runWorker代码如下:
1 | final void runWorker(Worker w) { |
Worker正常结束或者异常结束时都会调用processWorkerExit方法,当Worker异常结束时(比如执行的任务中抛出了未处理的异常)可能会重新创建一个新的Worker替换上。
processWorkerExit具体实现如下:
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
¶线程池的关闭
shutdown方法,关闭线程池,关闭之后阻塞队列里的任务不受影响,会继续被Worker处理,但是新的任务不会被接受,方法实现如下:
1 | public void shutdown() { |
interruptIdleWorkers方法,注意,这个方法打断的是闲置Worker,打断闲置Worker之后,getTask方法会返回null,然后Worker会被回收。那什么是闲置Worker呢?
闲置Worker是这样解释的:Worker运行的时候会去阻塞队列拿数据(getTask方法),拿的时候如果没有设置超时时间,那么会一直阻塞等待阻塞队列进数据,这样的Worker就被称为闲置Worker。
由于Worker也是一个AQS(AbstractQueuedSynchronizer,详解点击这里),在runWorker方法里会有一对lock和unlock操作,这对lock操作是为了确保Worker不是一个闲置Worker。
所以Worker被设计成一个AQS是为了根据Worker的锁来判断是否是闲置线程,是否可以被强制中断。我们来看看它的实现:
1 | // 调用他的一个重载方法,传入了参数false,表示要中断所有的正在运行的闲置Worker,如果为true表示只打断一个闲置Worker |
从上面的代码可以看到,shutdown并不会立即停止线程池(shutdownNow会立即停止线程池),而是先将线程状态置于SHUTDOWN,然后中断闲置的Worker。
然后尝试结束线程池,tryTerminate的实现如下:
1 | final void tryTerminate() { |
¶简单实战
好了关于线程池的简单介绍就到这里了,我们接下来看一个小小的例子,顺便了解一下线程池的参数选择。
线程数的选择常见策略如下:
- CPU密集型任务
尽量使用较小的线程池,一般为CPU核心数+1。
因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,只能增加上下文切换的次数,因此会带来额外的开销。
- IO密集型任务
可以使用稍大的线程池,一般为2*CPU核心数。
IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候去处理别的任务,充分利用CPU时间。
- 混合型任务
可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。
只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。
因为如果划分之后两个任务执行时间相差甚远,那么先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
通过前面了解到ThreadPoolExecutor的饱和策略默认选择的是AbortPolicy策略,新来的任务将被丢弃,即使保证任务不丢弃的策略CallerRunsPolicy也是直接调用任务的run方法来实现。
我们如何实现一个满足生产者消费者模型的线程池呢,这里将Worker当做消费者,可以同时处理多个生产者提交的任务,同时要保证生产者过多时任务不被丢弃。具体代码如下:
1 | /** |