Fork me on GitHub

CountDownLatch和CylicBarrier以及Semaphare

前言

再说这三个工具时,需要先了解一下AQS框架,所谓AQS,指的是AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLockSemaphoreCountDownLatchCyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

我们这篇文章先了解下这三个AQS框架工具类,后面在整体入手了解AQS框架的一些特点及源码。

无论是这几个工具类,还是AQS框架,都是JDK1.5之后出现的,位于java.util.concurrent包下,是由著名的Doug Lea操刀实现并完成设计的。

PS:ReentrantLock我在另一篇文章有介绍,这儿就不过多叙述。

正文

CountDownLatch

CountDownLatch 作为一种同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

我们可以使用给定的count初始化一个CountDownLatch

CountDownLatch有两个关键方法,countDownawait

当我们调用countDown方法时,await方法会被阻塞,直到计数count为0后,所有的等待线程才会都被释放。

需要注意CountDownLatch是一次性的,计数不会重置。如果我们需要一个可以重置的计数工具版本,我们可以使用CyclicBarrier

我们可以使用这个同步工具做一些事情,实现一些功能。

比如初始化一个计数为1的CountDownLatch,可以作为一个简单的开/关锁,或者阀门。所有调用await的线程都在阀门处等待,直到它被一个调用countDown的线程打开。
初始化为N的CountDownLatch可以让一个线程等待,直到N个线程已经完成某个动作,或者某个动作已经完成N次。

CountDownLatch的一个特点是,它不要求调用countDown的线程在继续执行之前等待计数为零,它只是阻止任何线程通过await,直到所有线程都通过。

另一种典型的用法是将一个问题分成N个部分,用Runnable(或Callable)描述每个部分,该Runnable执行该部分并在CountDownLatch上向下计数,然后将所有Runnable排队到一个执行器。当所有子任务完成后,协调线程将能够通过await方法。(当需要重复计数时,请使用CyclicBarrier)

我们以一个例子来看下CountDownLatch的使用。

例子:我们以田径赛跑为例,运动员在裁判枪响后同时起跑,因此起跑时需要使用CountDownLatch来控制,跑完后,每个运动员结束时间是不同的,我们需要等所有运动员完成后统计他们的时间进行比较。

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class CountDownLatchDemo {
//我们以田径跑步运动来进行举例,假设有十名运动员,裁判枪响后起跑,全部跑完后统计成绩。

private final static int N = 10;
/**
* 创建一个定长线程池
*/
private static final ExecutorService executorService = Executors.newFixedThreadPool(N);


public static void main(String[] args) throws InterruptedException{
//开始标志
CountDownLatch startSignal = new CountDownLatch(1);
//结束标志
CountDownLatch doneSignal = new CountDownLatch(N);

for (int i = 0; i < N; ++i) {
executorService.submit(new Runner(startSignal,doneSignal,i));
}

System.out.println("起跑预备阶段--->");
TimeUnit.SECONDS.sleep(10);

startSignal.countDown();
System.out.println("裁判枪响,所有人开跑--->");
doneSignal.await();
System.out.println("跑步完成,统计每个人的时间");
}
}
class Runner implements Callable<Map<Integer,Integer>> {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final Integer i;

Runner(CountDownLatch startSignal, CountDownLatch doneSignal,Integer i) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.i = i;
}

@Override
public Map<Integer,Integer> call() {
int time;
try {
startSignal.await();
time = doRun();
} catch (InterruptedException ex) {
ex.printStackTrace();
time = -1;
}finally {
doneSignal.countDown();
}
Map<Integer,Integer> map = new HashMap<>();
map.put(i,time);
return map;
}

private int doRun() throws InterruptedException{
Random random = new Random();
int time = random.nextInt(2000)+10000;
TimeUnit.MILLISECONDS.sleep(time);
System.out.println("第" +i+ "位运动员" + "running......耗时"+ (double)(time)/1000 + "秒");
return time;
}
}

其具体逻辑如下:

  1. 预备起跑阶段(主线程sleep 10s用于模拟准备),一个名为startSignalCountDownLatch模拟裁判,计数为1。
  2. 使用startSignal.countDown()方法对每个RunnerdoRun();进行阻塞(startSignal.await();),模拟裁判发枪。
  3. 发枪后startSignal.await();方法通过,每个Runner执行自己的doRun();方法完成跑步。
  4. 在统计最后成绩时,使用了doneSignal,计数为N,我们只有当全部运动员完成后才能对其进行排名。

相关输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
起跑预备阶段--->
裁判枪响,所有人开跑--->
第9位运动员running......耗时10.169秒
第8位运动员running......耗时10.279秒
第7位运动员running......耗时10.432秒
第0位运动员running......耗时10.465秒
第1位运动员running......耗时10.633秒
第5位运动员running......耗时10.748秒
第2位运动员running......耗时10.993秒
第3位运动员running......耗时11.333秒
第6位运动员running......耗时11.337秒
第4位运动员running......耗时11.847秒
跑步完成,统计每个人的时间

CyclicBarrier

CyclicBarrier一种同步辅助工具,它允许一组线程都等待对方到达公共障碍点。
在涉及固定大小的线程的程序中,CyclicBarriers非常有用,这些线程必须偶尔相互等待。

该屏障之所以称为循环屏障(CyclicBarrier),是因为它可以在等待的线程被释放之后被重用。

CyclicBarrier支持一个可选的“Runnable”命令,该命令在每个屏障点运行一次,在队列中的最后一个线程到达之后,但是在所有线程被释放之前运行。可通过构造函数传入。

1
2
3
public CyclicBarrier(int parties, Runnable barrierAction) {
//......
}

这个屏障操作对于在任何一个线程继续之前更新共享状态非常有帮助的。

我们下面来看个例子,这儿我们使用大文件拆分成小文件进行上传来举例。如下,先上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class CyclicBarrierDemo {
/**
* 创建一个可缓存的线程池
*/
private static final ExecutorService executorService = Executors.newCachedThreadPool();

public static void main(String[] args) throws Exception{
List<byte[]> list = new ArrayList<>();

try (FileInputStream input = new FileInputStream("C:\\Users\\DELL-3020\\Desktop\\hacker's delight.pdf")){
byte[] buffer = new byte[1024000];
// 读入多个字节到字节数组中,byteRead为一次读入的字节数
while (input.read(buffer) != -1) {
list.add(buffer);
}
}

CyclicBarrier cyclicBarrier = new CyclicBarrier(list.size(),()->{
System.out.println("上传已全部完成");
});

List<Future<Boolean>> futures = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
Future<Boolean> future = executorService.submit(new UploadHelper(i,list.get(i),cyclicBarrier));
futures.add(future);
}

boolean flag = true;
for (Future<Boolean> future : futures) {
if(!future.get().equals(Boolean.TRUE)){
flag = false;
}
}
System.out.println("上传结果"+flag);

}
}
class UploadHelper implements Callable<Boolean> {
private final int N;
private final byte[] bytes;
private final CyclicBarrier barrier;

public UploadHelper(int n, byte[] bytes, CyclicBarrier barrier) {
N = n;
this.bytes = bytes;
this.barrier = barrier;
}

@Override
public Boolean call() throws Exception {
boolean result = doUpload(N,bytes);
barrier.await();
return result;
}

/**
* 上传逻辑
* @param n
* @param bytes
* @return
*/
private boolean doUpload(int n,byte[] bytes){
//假设的上传逻辑
try{

Random random = new Random();
int time = random.nextInt(2000)+10000;
TimeUnit.MILLISECONDS.sleep(time);
System.out.println("第"+ n + "份数据上传耗时"+ (double)(time)/1000 + "秒");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
}

UploadHelper类用来处理上传逻辑,我们将文件按照1M(1024000bit)进行分割,UploadHelper接受三个参数,n表示第多少份,用于上传完成后数据的按顺序拼接。bytes表示每份的数据。barrier屏障用于监测所有上传是否完成,完成后执行新的逻辑(比如通知用户上传完成等)。

我们运行后,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
第8份数据上传耗时10.399秒
第3份数据上传耗时10.466秒
第0份数据上传耗时10.609秒
第2份数据上传耗时10.71秒
第6份数据上传耗时10.915秒
第9份数据上传耗时11.093秒
第10份数据上传耗时11.167秒
第5份数据上传耗时11.2秒
第1份数据上传耗时11.521秒
第7份数据上传耗时11.536秒
第4份数据上传耗时11.905秒
上传已全部完成
上传结果true

可以看到上传已全部完成是在上传全部完成后输出的,这句话在CyclicBarrier构造器的Runnable参数里。

其实这儿我们也可以模拟一个接受逻辑,通过屏障操作,当我们接收到全部数据后,最后将所有数据合并在一起。

这儿的代码就交给大家去实现了。

这儿有一点需要注意,如果屏障操作(barrier)不依赖于被挂起的线程,那么任何线程都可以在它被释放时执行我们想要的动作。为实现这一点,调用await方法会返回barrier处该线程的到达索引。然后我们可以选择哪个线程执行barrier操作。

比如我们在UploadHelpercall()方法里添上如下代码:

1
2
3
4
5
6
7
public Boolean call() throws Exception {
boolean result = doUpload(N,bytes);
if(barrier.await()==N){
System.out.println("第"+N+"个线程执行barrier操作");
};
return result;
}

我们可以多次运行,可以发现每次barrier的线程都是不一样的。有可能有一个,也有可能多个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
第4份数据上传耗时10.04秒
第1份数据上传耗时10.196秒
第2份数据上传耗时10.235秒
第3份数据上传耗时10.282秒
第6份数据上传耗时10.547秒
第9份数据上传耗时10.713秒
第5份数据上传耗时10.784秒
第7份数据上传耗时11.146秒
第10份数据上传耗时11.68秒
第8份数据上传耗时11.787秒
第0份数据上传耗时11.864秒
上传已全部完成
第0个线程执行barrier操作
第6个线程执行barrier操作
上传结果true

这时候我们可以把barrier.await()==N 变为barrier.await()==5就可以让第5个线程执行barrier操作,而不是其他的线程执行barrier

CyclicBarrier使用了“all-or-none breakage”模型,指的是所有互相等待的线程,要么一起通过barrier,要么一个都不要通过。

如果有一个线程因为中断、失败或者超时等原因失败了,则barrier会被broken,所有等待在该barrier上的线程都会抛出BrokenBarrierException(或者InterruptedException)。

比如我们在UploadHelper类里doUpload方法里添加如下代码:

1
2
3
4
//假设第5个线程异常了
if(N==5){
throw new RuntimeException("第"+N+"个线程处理出现异常");
}

我们执行后可以发现上传永远完成不了,其它线程会在barrier处永远等待。

所以我们在编写代码时在内部方法要处理好可能出现的异常问题。

另外CyclicBarrier提供了一个await(long timeout, TimeUnit unit)方法,我们可以指定await的最长时间。

我们指定await时间为15s,同时在doUpload方法里设定第5个线程执行出现异常。

1
2
3
4
5
public Boolean call() throws Exception {
boolean result = doUpload(N,bytes);
barrier.await(15,TimeUnit.SECONDS);
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean doUpload(int n,byte[] bytes){
//假设的上传逻辑
if(N==5){
throw new RuntimeException("第"+N+"个线程处理出现异常");
}
try{
Random random = new Random();
int time = random.nextInt(2000)+10000;
TimeUnit.MILLISECONDS.sleep(time);
System.out.println("第"+ n + "份数据上传耗时"+ (double)(time)/1000 + "秒");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}

执行后我们就会看到BrokenBarrierException的出现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
第10份数据上传耗时10.037秒
第2份数据上传耗时10.229秒
第9份数据上传耗时10.287秒
第0份数据上传耗时10.799秒
第3份数据上传耗时11.025秒
第4份数据上传耗时11.111秒
第6份数据上传耗时11.863秒
第8份数据上传耗时11.887秒
第1份数据上传耗时11.904秒
第7份数据上传耗时11.961秒
Exception in thread "main" java.util.concurrent.ExecutionException: java.util.concurrent.BrokenBarrierException
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.zwt.helputils.utils.aqs.CyclicBarrierDemo.main(CyclicBarrierDemo.java:44)
Caused by: java.util.concurrent.BrokenBarrierException
at java.base/java.util.concurrent.CyclicBarrier.dowait(CyclicBarrier.java:251)
at java.base/java.util.concurrent.CyclicBarrier.await(CyclicBarrier.java:436)
at com.zwt.helputils.utils.aqs.UploadHelper.call(CyclicBarrierDemo.java:66)
at com.zwt.helputils.utils.aqs.UploadHelper.call(CyclicBarrierDemo.java:52)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.base/java.lang.Thread.run(Thread.java:844)

Semaphore

Semaphore也被称为计数信号量,从概念上来讲,它维护一组许可。

主要方法有两个,acquirereleaseacquire方法用来阻止获取者获得资源许可,直到许可存在可用的为止。release用于添加一个许可,潜在的意义相当于释放了一个阻塞的获取者。

需要注意的是Semaphore并不会保存可用资源对象,而是只保留可用资源数量的计数,并进行许可或阻塞操作。

Semaphore通常用于限制能够访问某些(物理或逻辑)资源的线程数量。

比如我们有一个方法,如果需要对其进行并发限制,可以使用Semaphore来实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
public class SemaphoreDemo {

/**
* 指定数量为5的Semaphore
*/
private static final Semaphore semaphore = new Semaphore(5,true);

/**
* 测试方法
*/
public void doSomething()throws InterruptedException{
//假设方法需要执行5s
TimeUnit.SECONDS.sleep(5);
log.info("这是test方法");
}

/**
* 并发限制
* @throws InterruptedException
*/
public void limit() throws InterruptedException{
semaphore.acquire();
doSomething();
semaphore.release();
}

public static void main(String[] args) throws Exception{
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
for (int i = 0; i < 15; i++) {
new Thread(()->{
try {
semaphoreDemo.limit();
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
}
}

上述代码指定数量为5的Semaphorelimit()方法用来进行并发控制,如上代码的意思是最多允许5个线程调用doSomething()方法。

输出如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
15:14:50.471 [Thread-1] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:50.470 [Thread-0] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:50.471 [Thread-7] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:50.471 [Thread-2] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:50.470 [Thread-3] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
//...注意这儿停顿5s
15:14:55.476 [Thread-6] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:55.476 [Thread-11] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:55.476 [Thread-5] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:55.476 [Thread-4] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:14:55.477 [Thread-13] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
//...注意这儿停顿5s
15:15:00.477 [Thread-12] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:15:00.477 [Thread-14] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:15:00.477 [Thread-10] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:15:00.478 [Thread-9] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法
15:15:00.478 [Thread-8] INFO com.zwt.helputils.utils.aqs.SemaphoreDemo - 这是test方法

当我们初始化数量为1的Semaphore时,此时只有一个可用许可,因此可以当做互斥锁来用。这种通常称为二进制信号量,因为它只有两种状态:1个可用许可,0个可用许可。

当我们使用这种方式时,相比较java.util.concurrent.locks包的锁实现,它有一些额外的特点:即“锁”可以由所有者以外的线程释放,因为Semaphore没有所有权的概念。

这在某些情况下是十分有用的,比如死锁恢复。

除了数量以外,Semaphore构造函数还可以接受一个“公平性”参数。

当参数设置为false时,Semaphore不能保证线程获得许可的顺序。也就是说,抢占是允许的,调用acquire的线程可以比正在等待的线程提前分配许可 ———— 逻辑上,新线程将自己放在等待线程队列的头部。

当参数为true时,Semaphore保证调用acquire方法的线程按照顺序获得许可(先进先出FIFO)。需要注意FIFO的顺序适用于这些方法中特定的内部执行点。因此,一个线程可能在另一个线程之前调用acquire,但是在另一个线程之后到达顺序点,从该方法返回时也是如此。另外,tryAcquire方法不支持公平性设置,但是会接受任何可用的许可。

通常,用于控制资源访问的Semaphore应该初始化为公平,以确保没有线程因为访问资源而耗尽。当将Semaphore用于其他类型的同步控制时,非公平的吞吐量优势常常超过公平性Semaphore

另外,Semaphore还提供了acquire(int)release(int)等方便方法,这些方法通常比循环更有效。

但是,它们没有优先顺序,例如,如果线程A调用s.acquire(3),线程B调用s.acquire(2),并且有两个许可可用,那么不能保证线程B会获得它们,除非它的acquire先来,并且信号量s处于公平模式。

AbstractQueuedSynchronizer

我们可以看到无论是CountDownLatchCyclicBarrier(底层ReentrantLock)、Semaphore还是ReentrantLock,底层均是由AbstractQueuedSynchronizer实现的。

我们将在后面来分析下AQS框架的一些特点和源码。

总结

本文对CountDownLatchCyclicBarrierSemaphore进行了一些分析,列举的它们的一些特点和用法。

对于CountDownLatch,其是在做减法,本身不能重置计数,只能使用一次。

CyclicBarrier是做加法,通过reset()方法可以重置计数,可以多次使用,不过重置需要等到计数结束。

Semaphore主要控制对资源的访问,可以实现控制并发线程的数量,而CountDownLatchCyclicBarrier是无法实现此功能的,而且Semaphore在控制资源访问时,本身相当于信号量控制标识,不会对资源本身加锁。

以上就是三种线程同步工具的全部内容。

相关代码

相关代码可在我的GitHub上找到。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%