排序算法(五)-双调排序

前言

续我们 排序算法(四) 所说的那样,今天我们来看下一种很适合多路归并的排序算法,双调排序(BitonicSort)。

正文

双调排序(BitonicSort)

原理及简介

双调排序(BitonicSort)的原理是使用双调序列(Bitonic sequence)的性质来进行排序。

那什么是双调序列呢?

如果一个序列A=[X0, X1, X2, ...... , Xn-1],存在一个下标i(0≤i≤n-1),使得:

X0 ≤ X1 ≤ ...... ≤ Xi, and Xi ≥ Xi+1 ≥ ...... ≥ Xn-1

那么我们称这个序列是双调的(Bitonic)。

如下面的数据格式:

[5,9,10,17,6,3,2,1]

需要注意的几点:

  • 一个序列如果是完全的非降序或非升序的,它也是Bitonic的。(i为0或者n-1时)
  • Bitonic序列的子序列仍为Bitonic的。
  • 将一个Bitonic序列进行循环移位操作后,也是Bitonic序列。
  • 任意两个实数,都可以组成双调序列。

如下图:

upload successful

因此可以知道,如果一个序列循环移位后可以表示成 up -> down 形式,那么它就是Bitonic的。

如果一个无序序列(满足长度为2的幂),则可以从最小的Bitonic序列(数组中的每两个数为一个Bitonic序列)开始,将整个序列变为Bitonic的。

我们再来看下Batcher定理,这个定理用于双调排序的双调合并。

Batcher定理

将任意一个长为2n的双调序列A分为等长的两半X和Y,将X中的元素与Y中的元素一一按原序比较,即a[i]与a[i+n] (i < n)比较,将较大者放入MAX序列,较小者放入MIN序列。则得到的MAX和MIN序列仍然是双调序列,并且MAX序列中的任意一个元素不小于MIN序列中的任意一个元素。

PS:这个定理的证明和 0-1序列 有关,有兴趣的可以查阅相关资料

PS: 0-1序列,一个只有0,1元素的序列。

根据这个定理,我们继续下去,如果MAX和MIN也是长为2m的,那么它们可以继续拆分成 MAX(max),MAX(min)和MIN(max)和MIN(min)……一直继续下去,直到要拆分的序列长度为1,这时候整个序列A会完全有序排列。

这个定理非常有意思,我们来看下。

比如对于一个序列 A [3,5,7,9,10,17,19,22,17,15,12,10,9,8,5,1],明显看出它是一个双调序列,其中X为[3,5,7,9,10,17,19,22]单调递增,Y为[17,15,12,10,9,8,5,1]单调递减,且X序列长度等于Y序列长度,我们试着按照上面定理将数据按照原序进行比较,得到MAX和MIN两个序列,它们分别为 MAX [17,15,12,10,10,17,19,22],MIN [3,5,7,9,9,8,5,1],对MAX和MIN按照Batcher定理继续拆分……

我们将上述过程用图的形式描述出来,对于无序数组 [8,12,6,18,9,1,3,4],其排序过程如下:

upload successful

代码实现

上述过程我们使用Java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class BitonicSort {
/**
* 升降序标志
* true 升序
* false 降序
*/
private final static boolean ASCENDING = true, DESCENDING = false;

public static void bitonicSort(int[] a) {
sort(a, 0, a.length, ASCENDING);
}

private static void sort(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = n / 2;
sort(a, lo, m, ASCENDING);
sort(a, lo + m, m, DESCENDING);
bitonicMerge(a, lo, n, dir);
}
}

private static void bitonicMerge(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = n / 2;
for (int i = lo; i < lo + m; i++) {
compare(a, i, i + m, dir);
}
bitonicMerge(a, lo, m, dir);
bitonicMerge(a, lo + m, m, dir);
}
}

private static void compare(int[] a, int i, int j, boolean dir) {
if (dir == (a[i] > a[j])) {
exchange(a, i, j);
}
}

private static void exchange(int[] a, int i, int j) {
int t = a[i];
a[i] = a[j];
a[j] = t;
}

public static void main(String[] args) {
int[] a = new int[128];
Random r = new Random();
for (int i = 0; i < a.length; i++) {
a[i] = r.nextInt(10000);
}
System.out.println("BitonicSort排序前:");
System.out.println(Arrays.toString(a));
bitonicSort(a);
System.out.println("BitonicSort排序后:");
System.out.println(Arrays.toString(a));
}
}

动图演示

我们来看下排序动图,也很直观,可以看到非常明显的双调序列变化过程。

upload successful

缺陷及优化

上面的代码我们可以发现当数组长度为 2^n 时才会排序正确,长度不为2的幂的情况下排序是不正确的。

这一点是非常好理解的,对于我们上面双调序列和Batcher定理来说,只有当数组长度为2的幂的情况下数组才会被正确分割运算下去。

但是现实中数据长度大部分都不是2的幂次的,如何让我们的双调排序适应非2的幂次的长度的数据呢?

n!=2^k数据的双调排序方案

改造数据长度

有一些想法是改造数组,将数据填充到2的幂次长度,填充值为最大值,排序完成后,删除数组尾部填充长度的数据。

这种做法有一个明显缺陷,比如一个长度为1025的数据,我们要填充到2048长度才能解决问题,对于更大的数据,我们或许填充的更多,这显然不是一个好办法。

那如果只排序前 2^n 部分,对于剩余部分使用经典排序(插入排序、选择排序)处理呢?

这种做法如果对于一个长度为2047的数组,使用双调排序排1024部分,剩下的1023用经典排序处理,好像也不是很完美。

如果开始判断数组长度,接近高次2的幂的数据就用填充法,接近低次2的幂的数据就用部分双调排序法呢?好像也不是特别理想。

n!=2^k数据的双调排序算法

下面我们介绍一种对于任意n的双调排序网络算法,它的定理来源于Batcher定理的引申。

设想如果我们的序列A长度n!=2^k,我们可以找到一个最小的2的幂次长度p=2^q,使得p>n,这时候p-n部分填充最大值Max,同时p/2 之前的序列单调递减,p/2 之后的队列单调递增,如下:

[8,7,6,5,4,3,2,1,7]

[8,7,6,5,4,3,2,1,7,Max,Max,Max,Max,Max,Max,Max]

我们使用Batcher定理可得到如下序列:

X [7,7,6,5,4,3,2,1] Y[8,Max,Max,Max,Max,Max,Max,Max] ->[8]

可以发现Y序列的每一个值都是不小于X序列的每一个值的,即 Xi <= Yj。

我们可以用图表示这个过程,如下:

我们看下[8,12,6,18,9,1,3]这个数组的排序模拟过程。

upload successful

可以看到,我们虚构出来的Max部分是始终不参与交换的。

在实际排序中,我们不用虚构Max部分,直接将数组长度/2 进行分割,保证左半部分单调递减,右半部分单调递增,再使用Batcher定理,就会得到需要的已排序序列。

我们可以写出适用于任意数据长度的双调排序算法代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class BitonicBetterSort {
/**
* sorting direction
*/
private final static boolean ASCENDING = true;

public static void bitonicSort(int[] a) {
sort(a,0, a.length, ASCENDING);
}

private static void sort(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = n / 2;
sort(a,lo, m, !dir);
sort(a,lo + m, n - m, dir);
bitonicMerge(a,lo, n, dir);
}
}

private static void bitonicMerge(int[] a,int lo, int n, boolean dir) {
if (n > 1) {
int m = greatestPowerOfTwoLessThan(n);
for (int i = lo; i < lo + n - m; i++) {
compare(a,i, i + m, dir);
}
bitonicMerge(a,lo, m, dir);
bitonicMerge(a,lo + m, n - m, dir);
}
}

private static void compare(int[] a,int i, int j, boolean dir) {
if (dir == (a[i] > a[j])) {
exchange(a,i, j);
}
}

private static void exchange(int[] a,int i, int j) {
int t = a[i];
a[i] = a[j];
a[j] = t;
}

private static int greatestPowerOfTwoLessThan(int n) {
int k = 1;
while (k < n) {
k = k << 1;
}
return k >> 1;
}

public static void main(String[] args) {
int[] a = new int[9];
Random r = new Random();
for (int i = 0; i < a.length; i++) {
a[i] = r.nextInt(10000);
}
System.out.println("BitonicSort排序前:");
System.out.println(Arrays.toString(a));
bitonicSort(a);
System.out.println("BitonicSort排序后:");
System.out.println(Arrays.toString(a));

}
}

下面的动图给出了数组长度为25的数组排序演示:

upload successful

算法复杂度

为了从两个长度为n/2的排序序列中形成长度为n的排序序列,需要进行log(n)次比较。整个排序网络的比较方程T(n)为:

T(n) = log(n) + T(n/2)

这个递归方程的解是:

T(n) = log(n) + log(n)-1 + log(n)-2 +…+1 = log(n)·(log(n)+1) / 2

排序网络的每个阶段需要进行n/2个比较。

总的来说,复杂度为 [log(n)·(log(n)+1) / 2] · n/2 = [ n·log(n)·(log(n)+1) ]/4 = O(n·(log n)^2)

算法的空间复杂度为O(1)。

该算法是一种不稳定排序算法。

以上是在串行运行下的复杂度情况,可以看出它比一般的归并排序(O(n·(log n)))要慢。

并行条件下的双调排序

既然说到双调排序十分适用于并行排序,我们就简单改造下代码,使其使用并行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class BitonicParallelSort {
/**
* 可用CPU核数
*/
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* 分片数,取偶数,如果核数过少就使用默认值
*/
private static final int MIN_PROCESSORS = NCPU >= 16 ? (NCPU % 2 == 0 ? NCPU : NCPU + 1) : 16;
/**
* 工作线程池
*/
private final static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 分片大小
*/
private static int PIECE = 10000;
/**
* sorting direction
*/
private final static boolean ASCENDING = true;

public static void bitonicSort(int[] a) {
//初始化分片大小
PIECE = a.length/MIN_PROCESSORS;
sort(a,0, a.length, ASCENDING);
executorService.shutdown();
}

private static void sort(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = n / 2;
if(n > PIECE){
Future future1 = parallelSort(a, lo, m, !dir);
Future future2 = parallelSort(a, lo + m, n - m, dir);
while (true){
if(future1.isDone()&&future2.isDone()){
break;
}
}
}else{
sort(a, lo, m, !dir);
sort(a, lo + m, n - m, dir);
}
bitonicMerge(a, lo, n, dir);
}
}

private static void bitonicMerge(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = greatestPowerOfTwoLessThan(n);
for (int i = lo; i < lo + n - m; i++) {
compare(a, i, i + m, dir);
}
if(n > PIECE){
Future future1 = parallelMerge(a, lo, m, dir);
Future future2 = parallelMerge(a, lo + m, n - m, dir);
while (true){
if(future1.isDone()&&future2.isDone()){
break;
}
}
}else{
bitonicMerge(a, lo, m, dir);
bitonicMerge(a, lo + m, n - m, dir);
}
}
}

private static void compare(int[] a, int i, int j, boolean dir) {
if (dir == (a[i] > a[j])) {
exchange(a, i, j);
}
}

private static void exchange(int[] a, int i, int j) {
int t = a[i];
a[i] = a[j];
a[j] = t;
}

private static int greatestPowerOfTwoLessThan(int n) {
int k = 1;
while (k < n) {
k = k << 1;
}
return k >> 1;
}

private static Future parallelSort(int[] a, int lo, int n, boolean dir) {
return executorService.submit(()->sort(a,lo,n,dir));
}

private static Future parallelMerge(int[] a, int lo, int n, boolean dir){
return executorService.submit(()->bitonicMerge(a,lo,n,dir));
}


public static void main(String[] args) {
int[] a = new int[100000000];
Random r = new Random();
for (int i = 0; i < a.length; i++) {
a[i] = r.nextInt(100000000);
}
System.out.println("BitonicParallelSort排序开始:");
long start = System.currentTimeMillis();
bitonicSort(a);
System.out.println("BitonicParallelSort耗时:"+(System.currentTimeMillis()-start)+"ms");
System.out.println("BitonicParallelSort排序完成!");
System.out.println("数组是否有序:"+isOrdered(a));
}

public static boolean isOrdered(int[] a){
for(int i=0;i<a.length -1;i++){
if(a[i]<=a[i+1] != ASCENDING){
System.out.println(a[i]+"....."+a[i+1]);
return false;
}
}
return true;
}
}

我这边使用了线程池来进行处理,我们定义了最小分片长度,这个长度需要初始化,一般取数组长度与每级分片数量之比;

可以看到我们上面生成双调序列(sort方法)和Batcher合并(bitonicMerge方法)均使用了多线程进行处理。

上面的方法按照理论上应该比正常的串行双调排序快1倍时间。

我们来验证下,我们准备1亿数据量,数据大小区间在[0 - 100000000],对上述的 BitonicBetterSort(串行)和 BitonicParallelSort(并行)进行测试验证。

测试结果大致如下:

upload successful

upload successful

可以看到BitonicBetterSort(串行)耗时在55s左右,BitonicParallelSort(并行)耗时在30s左右,还是比较符合预期的。

我们看到,无论sort方法还是bitonicMerge方法,并行情况下(N > PEICE),递归的每级都丢给了两个线程(一个负责升序排列的线程、一个负责降序排列的线程)。

原来的串行程序是执行完升序,再执行降序;现在是升序降序同时执行,所以快1倍时间由此得来。

我们可以看下双调并行排序动图演示过程。

upload successful

并行条件下的双调排序调优

以上就是双调排序(BitonicSort)的全部内容,通过这篇文章我们了解到了双调排序的原理和一些特点。

PS:本来这儿的标题是总结,但是写着写着又想到了一些东西,就把标题改了。

双调排序可以说是非常适合并行运行的一种排序算法了,它的无论开始先生成双调序列的方法,还是后面Batcher合并,都可以做到在同一数据组里处理而不相互干扰。

就如对于一个长度为64的序列,分为前32部分和后32部分,这两部分最后生成1个升序序列和一个降序序列(或者一个降序一个升序)即可,最后它们在进行Batcher合并,因此双调序列并行情况下也可以结合其他一些就地排序算法进行处理。

比如我们上述并行代码中,当数据长度不足以分片时,我们是继续按照双调排序进行的,其实这样效率会降低。

比如1亿量的数据,如果我们分了1000次片,每个片就有10w数据,对于这些数据,我们目的就是生成一个升序(或者降序)的序列,此时使用串行的双调排序效率不高,继续分片的话线程数量(系统开销)又大大增加,得到的时间提升甚微。

这种情况我们就可以使用一些O(n * log n)级别的就地排序算法,比如快速排序

我下面写了一个当分片不足时采用并行快排的双调并行排序算法代码,大家可以看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
public class BitonicParallelMixtureSort {
/**
* 可用CPU核数
*/
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* 分片数,取偶数,如果核数过少就使用默认值
*/
private static final int MIN_PROCESSORS = NCPU >= 16 ? (NCPU % 2 == 0 ? NCPU : NCPU + 1) : 16;
/**
* 工作线程池
*/
private final static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 分片大小
*/
private static int PIECE = 10000;
/**
* sorting direction
*/
private final static boolean ASCENDING = true;

public static void bitonicSort(int[] a) {
//初始化分片大小
PIECE = a.length/MIN_PROCESSORS;
sort(a,0, a.length, ASCENDING);
executorService.shutdown();
}

private static void sort(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = n / 2;
if(n > PIECE){
Future future1 = parallelSort(a, lo, m, !dir);
Future future2 = parallelSort(a, lo + m, n - m, dir);
while (true){
if(future1.isDone()&&future2.isDone()){
break;
}
}
}else{
Future future1 = parallelQuickSort(a, lo, lo + m - 1, !dir);
Future future2 = parallelQuickSort(a, lo + m, lo + n - 1, dir);
while (true){
if(future1.isDone()&&future2.isDone()){
break;
}
}
}
bitonicMerge(a, lo, n, dir);
}
}

private static void bitonicMerge(int[] a, int lo, int n, boolean dir) {
if (n > 1) {
int m = greatestPowerOfTwoLessThan(n);
for (int i = lo; i < lo + n - m; i++) {
compare(a, i, i + m, dir);
}
if(n > PIECE){
Future future1 = parallelMerge(a, lo, m, dir);
Future future2 = parallelMerge(a, lo + m, n - m, dir);
while (true){
if(future1.isDone()&&future2.isDone()){
break;
}
}
}else{
bitonicMerge(a, lo, m, dir);
bitonicMerge(a, lo + m, n - m, dir);
}
}
}

private static void compare(int[] a, int i, int j, boolean dir) {
if (dir == (a[i] > a[j])) {
exchange(a, i, j);
}
}

private static void exchange(int[] a, int i, int j) {
int t = a[i];
a[i] = a[j];
a[j] = t;
}

private static int greatestPowerOfTwoLessThan(int n) {
int k = 1;
while (k < n) {
k = k << 1;
}
return k >> 1;
}

private static Future parallelSort(int[] a, int lo, int n, boolean dir) {
return executorService.submit(()->sort(a,lo,n,dir));
}

private static Future parallelMerge(int[] a, int lo, int n, boolean dir){
return executorService.submit(()->bitonicMerge(a,lo,n,dir));
}

private static Future parallelQuickSort(int[] a,int low,int high,boolean dir){
return executorService.submit(()->quickSort(a,low,high,dir));
}

/**
* 快排
* @param a
* @param low
* @param high
* @param dir true 表示从小到大 正序排序
*/
private static void quickSort(int[] a,int low,int high,boolean dir){
if(low<high){
//将表一分为二
int privotLoc=partition(a,low,high,dir);
quickSort(a,low,privotLoc-1,dir);
quickSort(a,privotLoc+1,high,dir);
}
}
private static int partition(int[] a, int low, int high, boolean dir) {
int pivot = a[high];
while (low < high) {
//升序
if (dir) {
while (a[low] < pivot && low < high) {
low++;
}
if (low < high) {
a[high--] = a[low];
}
while (a[high] > pivot && low < high) {
high--;
}
if (low < high) {
a[low++] = a[high];
}
} else {
//降序
while (a[low] > pivot && low < high) {
low++;
}
if (low < high) {
a[high--] = a[low];
}
while (a[high] < pivot && low < high) {
high--;
}
if (low < high) {
a[low++] = a[high];
}
}
}
a[low] = pivot;
return low;
}

public static void main(String[] args) {
int[] a = new int[100000000];
Random r = new Random();
for (int i = 0; i < a.length; i++) {
a[i] = r.nextInt(100000000);
}
System.out.println("BitonicParallelMixtureSort排序开始:");
long start = System.currentTimeMillis();
bitonicSort(a);
System.out.println("BitonicParallelMixtureSort耗时:"+(System.currentTimeMillis()-start)+"ms");
System.out.println("BitonicParallelMixtureSort排序完成!");
System.out.println("数组是否有序:"+isOrdered(a));
}

public static boolean isOrdered(int[] a){
for(int i=0;i<a.length -1;i++){
if(a[i]<=a[i+1] != ASCENDING){
System.out.println(a[i]+"....."+a[i+1]);
return false;
}
}
return true;
}
}

我们同样运行1亿数据量,测试该算法的耗时,如下:

upload successful

可以看到处理1亿数据时间缩短到了15s,相比之前又提高了一倍。

上述代码主要在分片小于a.length/MIN_PROCESSORS的地方进行了优化,由于两部分不冲突,快速排序部分也使用了并行处理。

可以看到,我们不知不觉的写了一个属于自己的混合排序算法,emmmm…..

当然,这里不是结束,算法还是可以优化的,比如当处理到的数据过小时(比如处理长度小于16),快排的优势无法显著体现,我们这时候可以使用插入排序处理数据等,可以参考内省排序(IntroSort)的处理。

总结

说到这里,基本上算是结束了,本文通过介绍双调排序,理解了它的运行原理的同时,又提供了一个并行版本,以及并行版本的一些优化,让我们了解到了排序算法非常有意思的一些地方。

本来以为到这儿就结束了,我手贱测试了下快排和内省排序1亿数据量的运行情况。

如下图:

upload successful

upload successful

内省排序不愧是C++标准模板库的排序算法,只耗时10s就完成了,比我们的并行排序要快,哈哈!!

快速排序耗时12s左右,瞬间感觉到将快排应用到我们双调并行排序中是大材小用了。

同时也可以看到内省排序确实对快排做了优化,数据量越大越明显。

总的来说,快排大法好(内省也主要使用了快排)!

源码

本文中提到的所有源代码均可以在我的Github上看到。

Github-JavaZWT




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道