Fork me on GitHub

一道Java试题引发的思考

前言

无意中看到了自己入门Java时的一本Java基础书,看到了其中一个小结的习题,颇为简单。
求一个数组所有数据的和。

分析

那时入门Java,看到以前自己在书上写下的for循环。大致如下:

1
2
3
4
5
6
7
public static Long sum(int [] numbers){
long result=0L;
for(int i=0;i<numbers.length;i++){
result+=numbers[i];
}
return result;
}

现已入行Java颇有时日,遂想到当n逐渐变大时,其执行效率会逐渐降低的。

近日对多线程有些许研究,故想到可以将数据平均分为2份,3份或者多份,每份进行和值,最后相加得到结果。也是一个不错的想法。

好在Java里有这么一个框架,名字叫分支/合并框架。我们来配合例子研究下。

分支/合并框架

分支/合并框架的目的是以递归的方式将可以并行的任务分为更小的任务,然后将每个子任务的结果合并起来生成整体的结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(ForkJoinPool)中的工作线程。那我们如何来定义任务和子任务呢?

要把任务提交到ForkJoinPool,必须创建RecursiveTask< R >的一个子类,其中R是并行化任务产生的结果类型。它其中有唯一一个抽象方法compute,用来定义将任务拆分成子任务的逻辑,以及无法再拆分或不方便拆分时,生成单个子任务结果的逻辑。

伪代码如下:

1
2
3
4
5
6
7
if(任务足够小或者不可分){
顺序执行该任务
}else{
将任务分为两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务结果
}

那如何定义一个任务是否可以在拆分呢?

一般来说没有明确的标准决定一个任务是否可以在拆分,但是有几种试探方法可以帮助我们做出决定,分支/合并框架采用了一种称为工作窃取的技术来解决这个问题。每个线程的子任务都保存在一个双向链式队列里,每完成一个任务,就从队列头部获取一个任务,如果有线程先执行完成,它不会“闲”下来,而是去未完成的队列尾部“窃取”一个任务进行完成,直至所有子任务完成返回结果。

实践

我们用分支/合并框架对数组数据进行并行求和。

代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

//要求和的数组
private final int[] numbers;
//子任务处理的数组的起始位置
private final int start;
//子任务处理的数组的终止位置
private final int end;

//不再将任务划分的子任务数组大小
public static final long THRESHOLD=10000;

public ForkJoinSumCalculator(int[] numbers){
this(numbers,0,numbers.length);
}
private ForkJoinSumCalculator(int[] numbers,int start,int end){
this.numbers=numbers;
this.start=start;
this.end=end;
}

@Override
protected Long compute() {
int length=end-start;
//小于等于阈值,计算结果
if(length<=THRESHOLD){
return computeSequentially();
}
//创建一个子任务来为数组的前一半求和
ForkJoinSumCalculator leftTask=new ForkJoinSumCalculator(numbers,start,start+length/2);
//利用另一个ForkJoinPool线程异步执行新创建的子任务
leftTask.fork();
//创建一个任务为数组的后一半求和
ForkJoinSumCalculator rightTask=new ForkJoinSumCalculator(numbers,start+length/2,end);
//同步执行第二个子任务,有可能允许进一步递归划分
Long rightResult=rightTask.compute();
//读取第一个子任务的结果,没有完成就等待
Long leftResult=leftTask.join();
//合并结果
return rightResult+leftResult;
}

//子任务数组求和
private long computeSequentially(){
long sum=0;
for(int i=start;i<end;i++){
sum+=numbers[i];
}
return sum;
}
}

这样,我们在编写一个方法并行对数组求和就很简单了。

1
2
3
4
public static long forkJoinSum(int [] numbers){
ForkJoinTask<Long> task=new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}

我们可以写一个测试方法,测试这两种方法的执行效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {
//构建一个数组
int [] numbers=new int[100000000];
for(int i=0;i<numbers.length;i++){
numbers[i]=(int)(Math.random() * Integer.MAX_VALUE);
}

//分支/合并框架计算执行速度
long fastest=Long.MAX_VALUE;
for(int i=0;i<10;i++){
long start=System.nanoTime();
forkJoinSum(numbers);
long duration=(System.nanoTime()-start);
if(duration<fastest){
fastest=duration;
}
}
System.out.println("分支/合并最快执行速度为"+fastest+"ns");


//普通方法计算执行速度
long fastest1=Long.MAX_VALUE;
for(int i=0;i<10;i++){
long start=System.nanoTime();
sum(numbers);
long duration=(System.nanoTime()-start);
if(duration<fastest1){
fastest1=duration;
}
}
System.out.println("普通算法最快执行速度为"+fastest1+"ns");
}

输出如下:

1
2
分支/合并最快执行速度为25894038ns
普通算法最快执行速度为38811709ns

可以看到速度是有明显提升的。

其他

源数据问题

这个计算的数组之所以随机,是因为我之前测试了1-n的和计算,发现for循环居然比分支/合并框架快!!我加大了n值也是如此,所以对于这种比较特殊的计算,Java虚拟机或者编译器对它们的计算做了优化,因此用这些数据测试时,可能得不到我们想要的结果,这也是并发处理比较难的地方。有的时候我们多线程处理的代码可能还没有单线程快,或者快是快了,但结果错了。

拥抱Java8

看到上面我们为了求个和冒着出错的风险写了一个sum类,是不是心里退却了?不过啊,Java 8已经给我们提供了类似的功能啦。

它就是全新的Stream API,这个我们有时间在介绍。先看看Stream API对于这个是如何求和的。

1
2
Arrays.stream(numbers).sum();
Arrays.stream(numbers).parallel().sum();

是不是很简单??

第一个为串行求和,即单线程,第二个为并行求和。

我们来测试下他们的效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//Stream API 串行方法计算执行速度
long fastest2=Long.MAX_VALUE;
for(int i=0;i<10;i++){
long start=System.nanoTime();
//sum(numbers);
Arrays.stream(numbers).sum();
long duration=(System.nanoTime()-start);
if(duration<fastest2){
fastest2=duration;
}
}
System.out.println("Stream API 串行 最快执行速度为"+fastest2+"ns");


//Stream API 并行方法计算执行速度
long fastest3=Long.MAX_VALUE;
for(int i=0;i<10;i++){
long start=System.nanoTime();
//sum(numbers);
Arrays.stream(numbers).parallel().sum();
long duration=(System.nanoTime()-start);
if(duration<fastest3){
fastest3=duration;
}
}
System.out.println("Stream API 并行 最快执行速度为"+fastest3+"ns");

输出如下结果:

1
2
3
4
分支/合并最快执行速度为25316712ns
普通算法最快执行速度为38812671ns
Stream API 串行 最快执行速度为36572646ns
Stream API 并行 最快执行速度为24291637ns

可以看到,并行情况下时间与刚才写的分支/合并框架相近。

其实:并行流背后使用的基础框架就是分支/合并框架。

这只是最简单的求和例子,遇到实际问题,可能使用Stream流比普通实现简单快速,因此,Stream是值得我们学习的。

结论

学习就是不断思考不断进步的过程,有的时候看看自己之前写的代码,应该少一些“我去,写的什么玩意,原来?”之类的抱怨,多一些深入的思考及优化。

相关代码均已上传Github : https://github.com/JavaZWT/framework-base




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%