Java线程池

前言

Java中,创建线程的方式一般有三种方法:

  1. 继承Thread类创建线程

  2. 实现Runnable接口创建线程

  3. 使用Callable和Future创建线程

关于三种创建方法本文不再赘述。

可以看出,以上创建线程的方式,都缺乏对线程的管理,我们设想,如果线程在调用过程中使用了某一资源,当该资源处理缓慢或异常时,可能产生大量线程等待的情况,严重时可能造成OOM异常。

针对以上情况,应该对创建线程进行管理,这样线程池便产生了,好在在jdk1.5时,Doug Lea大神已经帮我们实现了这些功能,它们均在java.util.concurrent包下。建议大家想学习多线程,把该包下的源码理解,一定对多线程会有更深入的理解。

本文重点讲述线程池,会对以下这几个类(接口)进行重点讲解。

Executor,ExecutorService,Executors,AbstractExecutorService,ThreadPoolExecutor

线程池的创建

我们先来简单说下线程池的使用:

  1. 缓存型线程池

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    Executors.newCachedThreadPool

    简单使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 5; i++) {
    final int index = i;
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    cachedThreadPool.execute(new Runnable() {
    @Override
    public void run() {
    System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index);
    }
    });
    }
  2. 定长线程池

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    Executors.newFixedThreadPool

    简单使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
  1. 定时及周期性任务性线程池 创建一个定长线程池,支持定时及周期性任务执行。
    Executors.newScheduledThreadPool 简单使用:
1
2
3
4
5
6
7
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("Thread id="+Thread.currentThread().getId()+";5s后,每2s执行一次");
}
}, 5, 2, TimeUnit.SECONDS);
  1. 单线程型线程池

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    Executors.newSingleThreadExecutor

    简单使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Thread id="+Thread.currentThread().getId()+";index="+index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
  1. 自定义线程池

    创建一个自定义线程池,以优化线程池。

    根据Executors源码,可以看出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

线程池的实现使用ThreadPoolExecutor这个类实现的。这个类全参参数有以下几个:

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

① corePoolSize:核心线程池大小
取值范围 0-Integer.MaxValue
② maximumPoolSize:最大线程池大小
取值范围 0-Integer.MaxValue
③ keepAliveTime:线程空闲时存活时间
④ unit:线程空闲时存活时间单位
⑤ workQueue:工作队列类型,线程队列类型
队列分类:
直接提交策略:SynchronousQueue,其无法设置队列长度,所有线程均直接提交给线程池。
无界队列:LinkedBlockingQueue,如果默认不设置初始长度,这个队列是无界的,可缓存大量等待线程。
有界队列:ArrayBlockingQueue,必须设置初始长度,线程池满,且达到队列最大长度后执行拒绝策略。
⑥ threadFactory:线程工厂
⑦ handler:线程池饱和后的拒绝策略
ThreadPoolExecutor定义了四种,我们也可以自己定义:
ThreadPoolExecutor.AbortPolicy:拒绝该任务并抛出异常
ThreadPoolExecutor.CallerRunsPolicy:直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
ThreadPoolExecutor.DiscardOldestPolicy:直接丢弃正在执行的任务,并执行该任务
ThreadPoolExecutor.DiscardPolicy:丢弃该任务

可以看出,当业务情况复杂时,Executors里提供的几种基本的线程池已经不能满足我们的要求,需要我们根据情况自定义线程池,而且可以举个例子,比如对于newCachedThreadPool创建线程池的方法,它传入的maximumPoolSize为Integer的Max值,如果业务资源异常,创建大量线程而不释放,newCachedThreadPool这种创建线程池的方法也能导致OOM异常。

而我们声明最大线程池大小,并声明拒绝策略。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService myExecutor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(5), new AbortPolicy());
for (int i = 0; i < 11; i++) {
final int index = i;
try {
myExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("Thread id=" + Thread.currentThread().getId() + ";index=" + index);
try {
Thread.sleep(10000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}

可以有效防止OOM异常以及及时发现系统运行问题。

自定义线程池也是被推荐的创建线程池的方法。

源码分析

下面我们主要对ThreadPoolExecutor这个类进行分析。

我们先看下它的execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程数量
int c = ctl.get();
//如果线程池线程数量小于核心线程数,那么试着向核心线程池添加一个线程
if (workerCountOf(c) < corePoolSize) {
//添加成功就返回
if (addWorker(command, true))
return;
//添加不成功就再次拿到线程数量
c = ctl.get();
}
//如果添加失败了,或者线程池数量达到了核心线程池数量
//那么判断下运行状态,然后试着向工作等待队列里添加此线程
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果不是运行状态,那么试着从工作队列移除此线程
if (! isRunning(recheck) && remove(command))
//成功就进行拒绝策略处理
reject(command);
//如果核心线程池和队列都满了,达到CAPACITY
else if (workerCountOf(recheck) == 0)
//那么尝试将任务添加至非核心线程池
addWorker(null, false);
}
//如果添加失败,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}

再看下addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//sakuratears
int c = ctl.get();
//拿下线程运行状态
int rs = runStateOf(c);
//如果程序停止或者状态不是暂停并且任务不为空并且任务队列不为空,添加失败
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//数量大于CAPACITY或者数量大于corePoolSize(向核心线程池添加时)或者maximumPoolSize(向非核心线程池添加时),返回添加失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//试着计算下当前线程数量
if (compareAndIncrementWorkerCount(c))
//成功跳出循环
break retry;
//不成功就重读ctl
c = ctl.get();
//如果当前状态与开始线程状态不一致
if (runStateOf(c) != rs)
//重试循环
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//获得锁
final ReentrantLock mainLock = this.mainLock;
//创建一个新的Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//如果是运行状态,或者核心线程池暂停,但要将线程添加到非核心线程池中
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
//添加线程
workers.add(w);
int s = workers.size();
//记录曾经有过的最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
if (workerAdded) {
//启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//线程启动失败,执行失败操作
addWorkerFailed(w);
}
return workerStarted;
}

更多代码不一一赘述。上面代码基本是线程池的核心原理。

通俗点讲,线程池工作分为下面几步:

  1. 根据传入参数,设置核心线程池数量,最大线程池数量,拒绝策略,线程工作队列
  2. 当添加一个线程时,如果线程池线程数小于核心线程数,直接开启一个新线程执行任务。
  3. 如果核心线程池满了,那么把它添加到工作队列中。
  4. 如果核心线程池和工作队列都满了,则开启非核心线程执行任务。
  5. 如果全部都满了,执行拒绝策略。

以上就是对线程池的全部分析。

关于

我的个人博客:

https://www.sakuratears.top

GitHub地址:

https://github.com/javazwt

欢迎关注。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道