线程池的优势
线程使应用更加充分利用 CPU、内存、IO 等资源,但是线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的空间会带来额外的消耗,线程销毁的时候又要回收资源再次带来额外消耗,如果频繁的出线创建和销毁线程的操作浪费了大量的资源。
所以我们才需要使用线程池,线程池可以协调多个线程,主要优势如下几点:
- 线程池管理并复用线程,控制最大并发数
- 方便实现任务线程队列的缓存策略和拒绝机制
- 实现某些和时间相关的功能,比如定时任务
- 隔离线程环境,比如交易服务和搜索服务在一台服务器上,交易线程的资源消耗明显会更大,因此可以通过配置独立的线程池,将较慢的交易服务和搜索服务隔离开,避免各服务线程互相影响。比如 Hystrix线程池隔离就可以选择使用线程池隔离的方式来解决一个服务访问暴增炸了也不会影响其他俩服务。
线程池的五种基本状态
RUNNING
线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理
状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0
SHUTDOWN
线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务,调用线程池的 shutdown()
方法就进入了 SHUTDOWN
状态。
STOP
线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务,调用 shutdownNow()
就进入 STOP
状态
TIDYING
TIDYING状态是所有的任务都被终止了,工作线程为0,并且正在执行terminated()钩子方法的时候是TIDYING状态。
TERMINATED
线程池彻底终止,就变成TERMINATED状态,线程池处在TIDYING状态时,执行完terminated()之后,就会进入 TERMNATED
状态。
线程池基本使用
先看一下线程池是如何创建线程的,首先就得从 ThreadPoolExecutor
构造方法开始看,如何自定义一个 ThreadPollFactory
和 RejectExecutionHandler
。
ThreadPoolExecutor
的构造方法
ThreadPoolExecutor
一个构造方法如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
corePoolSize
corePoolSize
表示常驻的核心线程数,如果等于0,那么任务执行完成之后,没有任何请求进入线程池的时候会销毁线程池的所有线程。如果大于0,那么就算任务执行完了,核心线程也不会被销毁。
这个值的设置比较关键,设置过大会造成资源浪费,过小会导致频繁地创建和销毁线程。
maximumPoolSize
maximumPoolSize
表示线程池能够同时容纳执行的最大线程数,必须大于0,如果等待执行的任务大于这个值,就会根据 workQueue
参数的设置,将任务缓存在队列中。
如果 corePoolSize
和 maximumPoolSize
相等,就是固定大小的线程池。
keepAliveTime
keepAliveTime
表示的是线程池中线程的空闲时间,当空闲时间达到 keepAliveTime
的时候,线程池就会执行销毁操作,让线程池中只剩下 corePoolSize
数量的线程,避免造成内存和句柄资源的浪费。
默认情况下,当线程池的线程数大于 corePoolSize
数的时候, keepAliveTime
才会生效,但是当 ThreadPoolExecutor
的 allowCoreThreadTimeOut
变量设置为 true
时,核心线程超时也会被回收。
1 | /** |
unit
TimeUnit
表示的是时间单位, keepAliveTime
需要一个时间单位,通常使用 TimeUnit.SECONDS
。
workQueue
workQueue
表示缓存队列,当请求的线程数大于 corePoolSize
的时候,线程进入 BlockingQueue
阻塞队列,通过锁来控制出队入队的原子性。
threadFactory
threadFactory
表示的是线程工厂,用来生产一组相同任务的线程,线程池的名字是通过给这个 factory 增加组名前缀来实现的,有了线程工厂控制多个线程池的生产的线程名字,线程 dump 的时候比较容易分析和区分。
handler
handler
表示执行拒绝策略的对象,当 workQueue
的任务缓存满了之后,并且活动线程数大于 maximumPoolSize
的时候,线程池通过这个策略处理请求。
友好的拒绝策略比如下面三种:
- 保存到数据库进行销峰,空闲的时候再提取出来执行
- 转到某个提示页面
- 打印日志
使用 Executors
从 ThreadPoolExecutor
的构造方法可以看到。队列、线程工厂、拒绝策略handler都必须要有实例对象,某些情况下部分程序员会直接使用方便简洁的 Executors
来创建线程池,这相当于静态工厂类,可以帮我们创建线程池。
线程池相关的一个类图如下:
ExecutorService
接口继承了 Executor
接口,定义了管理线程任务的方法,实现 ExecutorService
接口的抽象类 AbstractExecutorService
提供了 submit()
,invokeAll()
方法的默认实现。
但是 AbstractExecutorService
没有实现 execute()
方法,不同的实现会有不同的执行策略,通过 Executors
的静态工厂方法可以创建三个线程池的包装对象: ForkJoinPool
,ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
。
Executors
有如下的五个核心方法:
Executors.newWorkStealingPool
JDK8引入的方法,创建持有足够线程的线程池支持给定的并行度,通过使用多个队列减少竞争,这个构造方法中默认会把当前宿主机的 CPU 数量设置为默认的并行度。
1 | public static ExecutorService newWorkStealingPool() { |
Executors.newCachedThreadPool
maximumPoolSize
最大可以至 Integer.MAX_VALUE
,是高度可以伸缩的线程池,如果达到这个上限,正常服务器应该已经挂了吧,多半会 OOM,keepAliveTime
默认是 60s,工作线程处于空闲状态就回收工作线程,如果任务数增加再创建新线程处理任务。
SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
1 | public static ExecutorService newCachedThreadPool() { |
Executors.newScheduledThreadPool
maximumPoolSize
最大可以至 Integer.MAX_VALUE
,跟 newCachedThreadPool
类似可能会发生 OOM 异常,返回的ScheduledExecutorService
类的实例,支持定时任务和周期任务的执行,相比 Timer
, ScheduledExecutorService
更安全功能也更多,与 newCachedThreadPool
相比,区别在于 ScheduledExecutorService
不会回收工作线程。
会调用 ScheduledExecutorService
的构造方法
1 | public ScheduledThreadPoolExecutor(int corePoolSize) { |
Executors.newSingleThreadExecutor
创建一个单线程的线程池,相当于单线程执行所有的任务,保证了任务的顺序性。
1 |
|
Executors.newFixedThreadExecutor
输入的参数就是固定线程数,输入参数为 corePoolSize
也是 maximumPoolSize
,不存在空闲线程,keepAliveTime
为0。
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
Executors 创建的线程池分析
Executors 使用的队列
newFixedThreadPool
、newSingleThreadExecutor
使用的队列使用的是 new LinkedBlockingQueue<Runnable>()
。 LinkedBlockingQueue
类的构造方法如下:
1 | public LinkedBlockingDeque() { |
使用这样的队列很有可能造成 OOM 的风险,除了 new WorkStealingPool
外,其他的四种创建方式都存在资源耗尽的风险。
Executors 线程工厂
Executors
中默认的线程工厂和拒绝策略都过于简单,线程工厂对创建的线程必须要有明确的标识,让开发能够在分析线程情况的时候就能分析到到底是哪个线程池的问题,必须为线程指定明确的名字和序号。
Executors 拒绝策略
Executors
中也是直接使用了默认的拒绝策略,ThreadPoolExecutor
中默认的策略是中断
1 | private static final RejectedExecutionHandler defaultHandler = |
拒绝策略
ThreadPoolExecutor
中提供了四个公开的内部静态类
AbortPolicy
(默认): 队列满了之后丢弃任务并且抛出RejectedExecutionException
异常。DiscardPolicy
: 丢弃任务但是不抛出异常,推荐不推荐这个,你自己不要还不给别人说一声的?DiscardOldestPolicy
: 从名字大概也能猜到,抛弃队列中等待最久的任务,然后把当前任务加入到队列。CallerRunsPolicy
: 直接调用任务的run()
方法绕过线程池的执行。
我们可以简单的实现自己的拒绝策略如下,简单的打个日志方便后面测试用:
1 | public class UserRejectHandler implements RejectedExecutionHandler { |
线程工厂
根据上面的分析,我们最好是自定义一个线程工厂,我们自己来定义不同的线程池不同的线程名字,让开发在进行线程分析的时候方便定位问题,我们定义一个简单的线程工厂和一个简单的任务如下:
1 | public class UserThreadFactory implements ThreadFactory { |
自己创建一个线程池
使用之前的自定义的拒绝策略和自定义的线程工厂来创建线程池。
1 | public class UserThreadPool { |
输入结果如下:
1 | UserThreadFactory's 第一组-Worker2 |
根据我们的代码和输出的结果可以分析,因为我们设置最大线程为2,并且两个线程池共用了一个任务队列 queue
,所以两个线程加起来最多 4个线程 + 任务队列 2个任务,可以看到日志中只 running
了6个任务,就有个拒绝的输出(为什么拒绝日志在上面?因为这俩线程还没得到 CPU 片执行,主线程想添加新任务到线程池就失败了),这个拒绝的输出也是我们自定义的 handler
的处理,队列里面已经有了俩,其他的就别想进来了,因为我们定义的队列是 BlockingQueue queue = new LinkedBlockingDeque(2);
。
线程池的 execute() 和 submit()
execute() 提交
execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常信息。
submit() 提交
有如下三种情况
<T> Future<T> submit(Callable<T> task)
这种提交的方式会返回一个Future对象,这个Future对象代表这线程的执行结果,当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。如果在线程的执行过程中发生了异常,get会获取到异常的信息。
Future<?> submit(Runnable task)
也可以提交一个Runable接口的对象,这样当调用get方法的时候,如果线程执行成功会直接返回null,如果线程执行异常会返回异常的信息
<T> Future<T> submit(Runnable task, T result)
这个接口就比较有意思了,除了task之外还有一个result对象,当线程正常结束的时候调用Future的get方法会返回result对象,当线程抛出异常的时候会获取到对应的异常的信息。
线程关于异常的处理
因为线程是独立执行的代码片断,线程的问题应该由线程自己来解决,而不要委托到外部。”基于这样的设计理念,在Java中,线程方法的异常都应该在线程代码边界之内(run方法内)进行try catch并处理掉。换句话说,我们不能捕获从线程中逃逸的异常。
方法一:UncaughtExceptionHandler
Thread
类中存在一个接口 UncaughtExceptionHandler
1 |
|
JDK5之后允许我们在每一个Thread对象上添加一个异常处理器UncaughtExceptionHandler 。Thread.UncaughtExceptionHandler.uncaughtException()方法会在线程因未捕获的异常而面临死亡时被调用。
1 | public class ThreadExceptionTest { |
方法二:在线程工厂中进行设置
在线程工厂创建线程的时候就指定异常处理
1 | public class UserThreadFactory implements ThreadFactory { |
不过有一个需要注意的地方,对于使用 execute()
提交的任务,可以在 handler 中捕获到异常,但是使用 submit()
提交的时候并不能捕获到。比如如下的代码,没有任何输出结果:
1 | public class ThreadPoolExecException { |
需要修改成 Future
的方式进行获取才能看到异常信息:
1 | public class ThreadPoolExecException { |
输出结果为:
1 | java.util.concurrent.ExecutionException: java.lang.NullPointerException: i want test exception |
方法三:使用线程组ThreadGroup
1 | public class DiyThreadFactory implements ThreadFactory { |
方法四:默认的线程异常捕获器
如果我们只需要一个线程异常处理器处理线程的异常,那么我们可以设置一个默认的线程异常处理器,当线程出现异常时,
如果我们没有指定线程的异常处理器,而且线程组也没有设置,那么就会使用默认的线程异常处理器。
只需要像下面这样写就好了
1 | Thread.setDefaultUncaughtExceptionHandler((thread, task) -> { |
方法五:使用FetureTask来捕获异常
1 | public class ThreadExceptionTest2 { |
方法六:利用线程池提交线程时返回的Feature引用
1 | public class ThreadExceptionTest2 { |
方法七: 重写ThreadPoolExecutor的afterExecute方法
1 | public class ThreadExceptionTest2 { |