前言
Java中,创建线程的方式一般有三种方法:
继承Thread类创建线程
实现Runnable接口创建线程
使用Callable和Future创建线程
关于三种创建方法本文不再赘述。
可以看出,以上创建线程的方式,都缺乏对线程的管理,我们设想,如果线程在调用过程中使用了某一资源,当该资源处理缓慢或异常时,可能产生大量线程等待的情况,严重时可能造成OOM异常。
针对以上情况,应该对创建线程进行管理,这样线程池便产生了,好在在jdk1.5时,Doug Lea大神已经帮我们实现了这些功能,它们均在java.util.concurrent包下。建议大家想学习多线程,把该包下的源码理解,一定对多线程会有更深入的理解。
本文重点讲述线程池,会对以下这几个类(接口)进行重点讲解。
Executor,ExecutorService,Executors,AbstractExecutorService,ThreadPoolExecutor
线程池的创建
我们先来简单说下线程池的使用:
缓存型线程池
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
Executors.newCachedThreadPool简单使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int index = i;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index);
}
});
}定长线程池
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
Executors.newFixedThreadPool简单使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
定时及周期性任务性线程池
创建一个定长线程池,支持定时及周期性任务执行。
Executors.newScheduledThreadPool简单使用:
1 | ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); |
单线程型线程池
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
Executors.newSingleThreadExecutor简单使用:
1 | ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); |
自定义线程池
创建一个自定义线程池,以优化线程池。
根据Executors源码,可以看出:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
线程池的实现使用ThreadPoolExecutor这个类实现的。这个类全参参数有以下几个:
1 | public ThreadPoolExecutor(int corePoolSize, |
① corePoolSize:核心线程池大小
取值范围 0-Integer.MaxValue
② maximumPoolSize:最大线程池大小
取值范围 0-Integer.MaxValue
③ keepAliveTime:线程空闲时存活时间
④ unit:线程空闲时存活时间单位
⑤ workQueue:工作队列类型,线程队列类型
队列分类:
直接提交策略:SynchronousQueue,其无法设置队列长度,所有线程均直接提交给线程池。
无界队列:LinkedBlockingQueue,如果默认不设置初始长度,这个队列是无界的,可缓存大量等待线程。
有界队列:ArrayBlockingQueue,必须设置初始长度,线程池满,且达到队列最大长度后执行拒绝策略。
⑥ threadFactory:线程工厂
⑦ handler:线程池饱和后的拒绝策略
ThreadPoolExecutor定义了四种,我们也可以自己定义:
ThreadPoolExecutor.AbortPolicy:拒绝该任务并抛出异常
ThreadPoolExecutor.CallerRunsPolicy:直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
ThreadPoolExecutor.DiscardOldestPolicy:直接丢弃正在执行的任务,并执行该任务
ThreadPoolExecutor.DiscardPolicy:丢弃该任务
可以看出,当业务情况复杂时,Executors里提供的几种基本的线程池已经不能满足我们的要求,需要我们根据情况自定义线程池,而且可以举个例子,比如对于newCachedThreadPool创建线程池的方法,它传入的maximumPoolSize为Integer的Max值,如果业务资源异常,创建大量线程而不释放,newCachedThreadPool这种创建线程池的方法也能导致OOM异常。
而我们声明最大线程池大小,并声明拒绝策略。如下:
1 | ExecutorService myExecutor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, |
可以有效防止OOM异常以及及时发现系统运行问题。
自定义线程池也是被推荐的创建线程池的方法。
源码分析
下面我们主要对ThreadPoolExecutor这个类进行分析。
我们先看下它的execute方法:
1 | public void execute(Runnable command) { |
再看下addWorker方法
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
更多代码不一一赘述。上面代码基本是线程池的核心原理。
通俗点讲,线程池工作分为下面几步:
- 根据传入参数,设置核心线程池数量,最大线程池数量,拒绝策略,线程工作队列
- 当添加一个线程时,如果线程池线程数小于核心线程数,直接开启一个新线程执行任务。
- 如果核心线程池满了,那么把它添加到工作队列中。
- 如果核心线程池和工作队列都满了,则开启非核心线程执行任务。
- 如果全部都满了,执行拒绝策略。
以上就是对线程池的全部分析。
关于
我的个人博客:
GitHub地址:
欢迎关注。