Java多线程编程-线程池

线程池的优势

线程使应用更加充分利用 CPU、内存、IO 等资源,但是线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的空间会带来额外的消耗,线程销毁的时候又要回收资源再次带来额外消耗,如果频繁的出线创建和销毁线程的操作浪费了大量的资源。

所以我们才需要使用线程池,线程池可以协调多个线程,主要优势如下几点:

  • 线程池管理并复用线程,控制最大并发数
  • 方便实现任务线程队列的缓存策略和拒绝机制
  • 实现某些和时间相关的功能,比如定时任务
  • 隔离线程环境,比如交易服务和搜索服务在一台服务器上,交易线程的资源消耗明显会更大,因此可以通过配置独立的线程池,将较慢的交易服务和搜索服务隔离开,避免各服务线程互相影响。比如 Hystrix线程池隔离就可以选择使用线程池隔离的方式来解决一个服务访问暴增炸了也不会影响其他俩服务。

线程池的五种基本状态

RUNNING

线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理

状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

SHUTDOWN

线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务,调用线程池的 shutdown() 方法就进入了 SHUTDOWN 状态。

STOP

线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务,调用 shutdownNow() 就进入 STOP 状态

TIDYING

TIDYING状态是所有的任务都被终止了,工作线程为0,并且正在执行terminated()钩子方法的时候是TIDYING状态。

TERMINATED

线程池彻底终止,就变成TERMINATED状态,线程池处在TIDYING状态时,执行完terminated()之后,就会进入 TERMNATED 状态。

线程池基本使用

先看一下线程池是如何创建线程的,首先就得从 ThreadPoolExecutor 构造方法开始看,如何自定义一个 ThreadPollFactoryRejectExecutionHandler

ThreadPoolExecutor 的构造方法

ThreadPoolExecutor 一个构造方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//corePoolSize 和 maximumPoolSize 必须大于0,并且 maximumPoolSize 大于等于 corePoolSize
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize

corePoolSize 表示常驻的核心线程数,如果等于0,那么任务执行完成之后,没有任何请求进入线程池的时候会销毁线程池的所有线程。如果大于0,那么就算任务执行完了,核心线程也不会被销毁。

这个值的设置比较关键,设置过大会造成资源浪费,过小会导致频繁地创建和销毁线程。

maximumPoolSize

maximumPoolSize 表示线程池能够同时容纳执行的最大线程数,必须大于0,如果等待执行的任务大于这个值,就会根据 workQueue 参数的设置,将任务缓存在队列中。

如果 corePoolSizemaximumPoolSize 相等,就是固定大小的线程池。

keepAliveTime

keepAliveTime 表示的是线程池中线程的空闲时间,当空闲时间达到 keepAliveTime 的时候,线程池就会执行销毁操作,让线程池中只剩下 corePoolSize 数量的线程,避免造成内存和句柄资源的浪费。

默认情况下,当线程池的线程数大于 corePoolSize 数的时候, keepAliveTime 才会生效,但是当 ThreadPoolExecutorallowCoreThreadTimeOut 变量设置为 true时,核心线程超时也会被回收。

1
2
3
4
5
6
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;

unit

TimeUnit 表示的是时间单位, keepAliveTime 需要一个时间单位,通常使用 TimeUnit.SECONDS

workQueue

workQueue 表示缓存队列,当请求的线程数大于 corePoolSize 的时候,线程进入 BlockingQueue 阻塞队列,通过锁来控制出队入队的原子性。

threadFactory

threadFactory 表示的是线程工厂,用来生产一组相同任务的线程,线程池的名字是通过给这个 factory 增加组名前缀来实现的,有了线程工厂控制多个线程池的生产的线程名字,线程 dump 的时候比较容易分析和区分。

handler

handler 表示执行拒绝策略的对象,当 workQueue 的任务缓存满了之后,并且活动线程数大于 maximumPoolSize 的时候,线程池通过这个策略处理请求。

友好的拒绝策略比如下面三种:

  • 保存到数据库进行销峰,空闲的时候再提取出来执行
  • 转到某个提示页面
  • 打印日志

使用 Executors

ThreadPoolExecutor 的构造方法可以看到。队列、线程工厂、拒绝策略handler都必须要有实例对象,某些情况下部分程序员会直接使用方便简洁的 Executors 来创建线程池,这相当于静态工厂类,可以帮我们创建线程池。

线程池相关的一个类图如下:

线程池

ExecutorService 接口继承了 Executor 接口,定义了管理线程任务的方法,实现 ExecutorService 接口的抽象类 AbstractExecutorService 提供了 submit(),invokeAll() 方法的默认实现。

但是 AbstractExecutorService 没有实现 execute() 方法,不同的实现会有不同的执行策略,通过 Executors 的静态工厂方法可以创建三个线程池的包装对象: ForkJoinPoolThreadPoolExecutorScheduledThreadPoolExecutor

Executors 有如下的五个核心方法:

Executors.newWorkStealingPool

JDK8引入的方法,创建持有足够线程的线程池支持给定的并行度,通过使用多个队列减少竞争,这个构造方法中默认会把当前宿主机的 CPU 数量设置为默认的并行度。

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

Executors.newCachedThreadPool

maximumPoolSize 最大可以至 Integer.MAX_VALUE,是高度可以伸缩的线程池,如果达到这个上限,正常服务器应该已经挂了吧,多半会 OOM,keepAliveTime 默认是 60s,工作线程处于空闲状态就回收工作线程,如果任务数增加再创建新线程处理任务。

SynchronousQueue没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

Executors.newScheduledThreadPool

maximumPoolSize 最大可以至 Integer.MAX_VALUE,跟 newCachedThreadPool 类似可能会发生 OOM 异常,返回的ScheduledExecutorService 类的实例,支持定时任务和周期任务的执行,相比 TimerScheduledExecutorService 更安全功能也更多,与 newCachedThreadPool 相比,区别在于 ScheduledExecutorService 不会回收工作线程。

会调用 ScheduledExecutorService 的构造方法

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

Executors.newSingleThreadExecutor

创建一个单线程的线程池,相当于单线程执行所有的任务,保证了任务的顺序性。

1
2
3
4
5
6
7

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

Executors.newFixedThreadExecutor

输入的参数就是固定线程数,输入参数为 corePoolSize 也是 maximumPoolSize,不存在空闲线程,keepAliveTime 为0。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

Executors 创建的线程池分析

Executors 使用的队列

newFixedThreadPoolnewSingleThreadExecutor 使用的队列使用的是 new LinkedBlockingQueue<Runnable>()LinkedBlockingQueue 类的构造方法如下:

1
2
3
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}

使用这样的队列很有可能造成 OOM 的风险,除了 new WorkStealingPool 外,其他的四种创建方式都存在资源耗尽的风险。

Executors 线程工厂

Executors 中默认的线程工厂和拒绝策略都过于简单,线程工厂对创建的线程必须要有明确的标识,让开发能够在分析线程情况的时候就能分析到到底是哪个线程池的问题,必须为线程指定明确的名字和序号。

Executors 拒绝策略

Executors 中也是直接使用了默认的拒绝策略,ThreadPoolExecutor 中默认的策略是中断

1
2
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

拒绝策略

ThreadPoolExecutor 中提供了四个公开的内部静态类

  • AbortPolicy (默认): 队列满了之后丢弃任务并且抛出 RejectedExecutionException 异常。
  • DiscardPolicy: 丢弃任务但是不抛出异常,推荐不推荐这个,你自己不要还不给别人说一声的?
  • DiscardOldestPolicy: 从名字大概也能猜到,抛弃队列中等待最久的任务,然后把当前任务加入到队列。
  • CallerRunsPolicy: 直接调用任务的 run() 方法绕过线程池的执行。

我们可以简单的实现自己的拒绝策略如下,简单的打个日志方便后面测试用:

1
2
3
4
5
6
7
public class UserRejectHandler implements RejectedExecutionHandler {

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task rejected. " + executor.toString());
}
}

线程工厂

根据上面的分析,我们最好是自定义一个线程工厂,我们自己来定义不同的线程池不同的线程名字,让开发在进行线程分析的时候方便定位问题,我们定义一个简单的线程工厂和一个简单的任务如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class UserThreadFactory implements ThreadFactory {

//名字前缀
private String namePrefix;

//序号
private AtomicInteger nextId = new AtomicInteger(1);

UserThreadFactory(String groupName) {
namePrefix = "UserThreadFactory's " +groupName + "-Worker";
}

@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.incrementAndGet();
Thread thread = new Thread(null, r, name, 0);
System.out.println(thread.getName());
return thread;
}

}

//任务
class Task implements Runnable {

private static final AtomicLong count = new AtomicLong(0L);

@Override
public void run() {
System.out.println("running_" + count.getAndIncrement());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

自己创建一个线程池

使用之前的自定义的拒绝策略和自定义的线程工厂来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class UserThreadPool {

public static void main(String[] args) {
//任务队列长度为2
BlockingQueue queue = new LinkedBlockingDeque(2);

UserThreadFactory f1 = new UserThreadFactory("第一组");
UserThreadFactory f2 = new UserThreadFactory("第二组");

UserRejectHandler userRejectHandler = new UserRejectHandler();

//核心线程1,最大线程2,为了测试下拒绝策略
ThreadPoolExecutor threadPoolExecutorFirst = new ThreadPoolExecutor(1, 2,
60, TimeUnit.SECONDS, queue, f1, userRejectHandler);

//核心线程1,最大线程2
ThreadPoolExecutor threadPoolExecutorSecond = new ThreadPoolExecutor(1, 2,
60, TimeUnit.SECONDS, queue, f2, userRejectHandler);

Runnable task = new Task();

//创建40个任务线程
for (int i = 0; i < 20; i++) {
threadPoolExecutorFirst.execute(task);
threadPoolExecutorSecond.execute(task);
}

}

}

输入结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
UserThreadFactory's 第一组-Worker2
UserThreadFactory's 第二组-Worker2
running_0
running_1
UserThreadFactory's 第一组-Worker3
UserThreadFactory's 第二组-Worker3
running_2
running_3
task rejected. java.util.concurrent.ThreadPoolExecutor@3fee733d[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
~~~
running_4
running_5

根据我们的代码和输出的结果可以分析,因为我们设置最大线程为2,并且两个线程池共用了一个任务队列 queue,所以两个线程加起来最多 4个线程 + 任务队列 2个任务,可以看到日志中只 running 了6个任务,就有个拒绝的输出(为什么拒绝日志在上面?因为这俩线程还没得到 CPU 片执行,主线程想添加新任务到线程池就失败了),这个拒绝的输出也是我们自定义的 handler 的处理,队列里面已经有了俩,其他的就别想进来了,因为我们定义的队列是 BlockingQueue queue = new LinkedBlockingDeque(2);

线程池的 execute() 和 submit()

execute() 提交

execute提交的方式只能提交一个Runnable的对象,且该方法的返回值是void,也即是提交后如果线程运行后,和主线程就脱离了关系了,当然可以设置一些变量来获取到线程的运行结果。并且当线程的执行过程中抛出了异常通常来说主线程也无法获取到异常的信息的,只有通过ThreadFactory主动设置线程的异常处理类才能感知到提交的线程中的异常信息。

submit() 提交

有如下三种情况

<T> Future<T> submit(Callable<T> task)

这种提交的方式会返回一个Future对象,这个Future对象代表这线程的执行结果,当主线程调用Future的get方法的时候会获取到从线程中返回的结果数据。如果在线程的执行过程中发生了异常,get会获取到异常的信息。

Future<?> submit(Runnable task)

也可以提交一个Runable接口的对象,这样当调用get方法的时候,如果线程执行成功会直接返回null,如果线程执行异常会返回异常的信息

<T> Future<T> submit(Runnable task, T result)

这个接口就比较有意思了,除了task之外还有一个result对象,当线程正常结束的时候调用Future的get方法会返回result对象,当线程抛出异常的时候会获取到对应的异常的信息。

线程关于异常的处理

因为线程是独立执行的代码片断,线程的问题应该由线程自己来解决,而不要委托到外部。”基于这样的设计理念,在Java中,线程方法的异常都应该在线程代码边界之内(run方法内)进行try catch并处理掉。换句话说,我们不能捕获从线程中逃逸的异常。

方法一:UncaughtExceptionHandler

Thread 类中存在一个接口 UncaughtExceptionHandler

1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}

JDK5之后允许我们在每一个Thread对象上添加一个异常处理器UncaughtExceptionHandler 。Thread.UncaughtExceptionHandler.uncaughtException()方法会在线程因未捕获的异常而面临死亡时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadExceptionTest {

public static void main(String[] args) {
Thread t = new Thread(() -> {
throw new NullPointerException("Runnable抛出个空指针异常");
});

t.setUncaughtExceptionHandler((thread, throwable)-> {
System.out.println("Thread:" + thread + " Exception message:" + throwable);
});
t.start();
}

}

方法二:在线程工厂中进行设置

在线程工厂创建线程的时候就指定异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class UserThreadFactory implements ThreadFactory {

//名字前缀
private String namePrefix;

//序号
private AtomicInteger nextId = new AtomicInteger(1);

UserThreadFactory(String groupName) {
namePrefix = "UserThreadFactory's " +groupName + "-Worker";
}

@Override
public Thread newThread(Runnable r) {
String name = namePrefix + nextId.incrementAndGet();
Thread thread = new Thread(null, r, name, 0);
//指定异常处理方式
thread.setUncaughtExceptionHandler( (threadTemp, throwable)-> {
System.out.println("Thread:" + threadTemp + " Exception message:" + throwable);
});
System.out.println(thread.getName());
return thread;
}

}

不过有一个需要注意的地方,对于使用 execute() 提交的任务,可以在 handler 中捕获到异常,但是使用 submit() 提交的时候并不能捕获到。比如如下的代码,没有任何输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadPoolExecException {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);

Thread exceptionThread = new Thread(() -> {
throw new NullPointerException("i want test exception");
});

exceptionThread.setUncaughtExceptionHandler( (thread, throwable)-> {
System.out.println("Thread:" + thread + " Exception message:" + throwable);
});

executorService.submit(exceptionThread);
}

}

需要修改成 Future 的方式进行获取才能看到异常信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecException {

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);

Thread exceptionThread = new Thread(() -> {
throw new NullPointerException("i want test exception");
});

exceptionThread.setUncaughtExceptionHandler( (thread, throwable)-> {
System.out.println("Thread:" + thread + " Exception message:" + throwable);
});

Future result = executorService.submit(exceptionThread);
try {
result.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

}

输出结果为:

1
2
3
4
5
6
7
8
9
10
11
12
java.util.concurrent.ExecutionException: java.lang.NullPointerException: i want test exception
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.gzr.thread.ThreadPoolExecException.main(ThreadPoolExecException.java:20)
Caused by: java.lang.NullPointerException: i want test exception
at com.gzr.thread.ThreadPoolExecException.lambda$main$0(ThreadPoolExecException.java:11)
at java.lang.Thread.run(Thread.java:748)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

方法三:使用线程组ThreadGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DiyThreadFactory implements ThreadFactory {

@Override
public Thread newThread(Runnable r) {
ThreadGroup threadGroup = new ThreadGroup("group") {
// 继承ThreadGroup并重新定义以下方法
// 在线程成员抛出unchecked exception 会执行此方法
@Override
public void uncaughtException(Thread t, Throwable e) {
//4.处理捕获的线程异常
System.out.println("get exception");
}
};

return new Thread(threadGroup, r, "test", 0);
}
}

方法四:默认的线程异常捕获器

如果我们只需要一个线程异常处理器处理线程的异常,那么我们可以设置一个默认的线程异常处理器,当线程出现异常时,
如果我们没有指定线程的异常处理器,而且线程组也没有设置,那么就会使用默认的线程异常处理器。

只需要像下面这样写就好了

1
2
3
Thread.setDefaultUncaughtExceptionHandler((thread, task) -> {
System.out.println("default thread exception method");
});

方法五:使用FetureTask来捕获异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadExceptionTest2 {

public static void main(String[] args) {
//1.创建FeatureTask
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1/0;
}
});
//2.创建Thread
Thread thread = new Thread(futureTask);
//3.启动线程
thread.start();
try {
Integer result = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//4.处理捕获的线程异常
}
}

}

方法六:利用线程池提交线程时返回的Feature引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadExceptionTest2 {

public static void main(String[] args) {
//1.创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
//2.创建Callable,有返回值的,你也可以创建一个线程实现Callable接口。
// 如果你不需要返回值,这里也可以创建一个Thread即可,在第3步时submit这个thread。
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1/0;
}
};
//3.提交待执行的线程
Future<Integer> future = executorService.submit(callable);
try {
Integer result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//4.处理捕获的线程异常
}
}

}

方法七: 重写ThreadPoolExecutor的afterExecute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadExceptionTest2 {

public static void main(String[] args) {
//1.创建线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (r instanceof Thread) {
if (t != null) {
//处理捕获的异常
}
} else if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
try {
futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//处理捕获的异常
}
}

}
};
Thread t1 = new Thread(() -> {
int c = 1 / 0;
});
threadPoolExecutor.execute(t1);

Callable<Integer> callable = () -> 2 / 0;
threadPoolExecutor.submit(callable);
}

}