Fork me on GitHub

如何以并发方式在同一个流上执行多种操作

前言

Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到一个处理结果。当流进行终端操作后,如果你在试图遍历它,就会出现异常。

1
java.lang.IllegalStateException: stream has already been operated upon or closed

虽然流就是如此设计的,但是我们有时候就希望可以通过流获取多个结果。或者说,你希望一次性向流中传入多个Lambda表达式。 为了达到这一目标,我们应该需要一个fork类型的方法,对每个复制的流应用不同的函数。理想情况下,这些操作也应该支持并行去拿到运算结果。

这一特性在Java 8中是没有的,不过我们可以利用一个通用API,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现这一特性。

正文

复制流

要达到此效果,我们首先应该创建一个StreamForker,它会对原始的流进行封装,在此基础上在执行各种操作。我们来看下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class StreamForker<T> {
private final Stream<T> stream;
private final Map<Object, Function<Stream<T>,?>> forks=new HashMap<>();
public StreamForker(Stream<T> stream) {
this.stream = stream;
}
public StreamForker<T> fork(Object key,Function<Stream<T>,?> f){
//使用一个键对流上的函数进行索引
forks.put(key,f);
//返回this从而保证多次顺畅的调用fork方法
return this;
}
public Results getResults(){
//TODO
}
}

fork方法接受两个参数。

Function:对流进行处理,转变成这些操作结果的类型。

key: 通过它拿到结果,这些结果被放到内部的一个Map中。

fork方法需要返回自身,这样可以复制多个操作构成流水线。

如图:

upload successful

上图不难理解。

而由fork方法添加的操作如何执行呢,就是通过getResults方法的调用触发,该方法返回一个Results接口的实现。接口定义如下:

1
2
3
public interface Results {
public <R> R get(Object key);
}

实现Results接口

我们使用ForkingStreamConsumer实现Results接口。

1
2
3
4
5
6
7
8
9
public Results getResults(){
ForkingStreamConsumer<T> consumer=build();
try{
stream.sequential().forEach(consumer);
}finally {
consumer.finish();
}
return consumer;
}

ForkingStreamConsumer同时实现了Results和Consumer接口。其主要任务就是来处理流元素,将他们分发到多个BlockingQuenes中处理,BlockingQuenes的数量和通过fork方法提交的操作数是一致的。这里的getResults的实现,流应该是顺序处理的,否则,forEach后元素的顺序就会变化。finish方法用来表明队列中没有更多要处理的元素了。build方法主要用于创建ForkingStreamConsumer。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private ForkingStreamConsumer<T> build(){
//创建由队列组成的列表,每一个队列对应一个操作
List<BlockingQueue<T>> queues=new ArrayList<>();
//建立用于标识操作的键与包含操作结果的Future之间的映射关系
Map<Object,Future<?>> actions=
forks.entrySet().stream().reduce(
new HashMap<Object,Future<?>>(),
(map,e)->{
map.put(e.getKey(),getOperationResult(queues,e.getValue()));
return map;
},
(m1,m2)->{
m1.putAll(m2);
return m1;
}
);
return new ForkingStreamConsumer<>(queues,actions);
}

可以看到,我们先创建了BlockingQuenes列表。接着创建了一个Map,Map的键就是用来标识不同操作的键,值包含着Future里。最终BlockingQuenes和Map会被传递给ForkingStreamConsumer的构造函数。每个Future通过关键方法getOperationResult创建。

来看看getOperationResult的实现。

1
2
3
4
5
6
7
8
9
10
11
private Future<?> getOperationResult(List<BlockingQueue<T>> queues,Function<Stream<T>,?> f){
//创建一个队列,并将其添加到队列的列表中
BlockingQueue<T> queue=new LinkedBlockingDeque<>();
queues.add(queue);
//创建一个Spliterator,遍历队列中的元素
Spliterator<T> spliterator=new BlockingQueueSpliterator<>(queue);
//创建一个流,将Spliterator作为数据源
Stream<T> source= StreamSupport.stream(spliterator,false);
//创建一个Future对象,以异步方式计算在流上执行特定函数的结果
return CompletableFuture.supplyAsync(()->f.apply(source));
}

该方法创建一个新的BlockingQuene,并将其添加到队列列表。队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator。然后我们创建一个顺序流对Spliterator进行遍历,最终创建一个Future收集结果。

开发ForkingStreamConsumer

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ForkingStreamConsumer<T> implements Consumer<T>,Results {
public static final Object END_OF_STREAM=new Object();
private final List<BlockingQueue<T>> queues;
private final Map<Object, Future<?>> actions;
public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
this.queues = queues;
this.actions = actions;
}
@Override
public <R> R get(Object key) {
try {
return ((Future<R>)actions.get(key)).get();
}catch(Exception e){
throw new RuntimeException(e);
}
}
@Override
public void accept(T t) {
//将流中遍历的元素添加到所有的队列中
queues.forEach(q->q.add(t));
}
void finish(){
//将最后一个元素添加到队列中,表明该流已经结束
accept((T)END_OF_STREAM);
}
}

这个类同时实现了Consumer接口和Results接口。

Consumer接口要求实现accept方法,每当ForkingStreamConsumer接受流中的一个元素,它就会将元素添加到所有BlockingQuenes中当所有元素都添加到所有队列后,finish方法将最后一个元素添加到所有队列。处理时碰上这个元素表明后面没有元素要处理了。

Results接口需要实现get方法。一旦处理结束,get方法会获取Map中由键索引的Future,解析到结果后返回。

每有一个操作,就会对应一个BlockingQueueSpliterator。我们来看下BlockingQueueSpliterator的实现。

开发BlockingQueueSpliterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class BlockingQueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> q;
public BlockingQueueSpliterator(BlockingQueue<T> q) {
this.q = q;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T t;
while (true){
try {
t=q.take();
break;
}catch(InterruptedException e){
}
}
if(t!=ForkingStreamConsumer.END_OF_STREAM){
action.accept(t);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
@Override
public int characteristics() {
return 0;
}
}

可以看到该Spliterator未定义任何切割流的策略,仅仅利用了流的延迟绑定能力。也没有实现trySplit方法。由于我们的操作数是不确定的,故estimateSize不能提供任何有意义的数字,返回0.也没有体现Spliterator的特性,故characteristics返回0.

仅仅实现了tryAdvance方法,它从BlockingQueue中取得原始流元素,进一步传给Consumer对象。当返回true时表明还有元素要处理,直到发现最后一个元素时终止。

以上基本上是在一个流上执行多种操作的代码。

我们下面来检测一下正确性。

测试

编写测试类。如下数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
//生成1到1000的数组
List<Integer> list1=IntStream.rangeClosed(1,1000).filter(n->n%2==0).boxed().collect(Collectors.toList());
List<Integer> list2=IntStream.rangeClosed(1,1000).filter(n->n%5==0).boxed().collect(Collectors.toList());
//同时对list1数据求和,统计list1数据数量,统计list1和list2相同元素,统计list1和list2相同元素的最大值和最小值
Results results=new StreamForker<Integer>(list1.stream())
.fork("sum",s->s.mapToInt(Integer::intValue).sum())
.fork("count",s->s.count())
.fork("list3",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).collect(Collectors.toList()))
.fork("max",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).max(Comparator.naturalOrder()))
.fork("min",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).min(Comparator.naturalOrder()))
.getResults();
System.out.println("sum="+results.get("sum"));
System.out.println("count="+results.get("count"));
System.out.println("max="+((Optional) results.get("max")).get());
System.out.println("min="+((Optional)results.get("min")).get());
((List<Integer>)results.get("list3")).stream().forEach(System.out::println);
}

输出结果:

upload successful

可以看到,使用了一个流,通过我们实现的方法进行了多次终端操作返回正确结果。

性能问题

这是我们用一个流实现多种终端操作的方式,当然这并不意味着会比普通的写法效率高,如果对于上述问题,我们可以分个构建若干个流进行一一实现。

这种一个流进行多个终端操作的情况使用,一定是生成流比较耗费资源性能时才会用到,比如操作一个较大文件时生成的字符流,我们想统计字数,检查某些单词出现的次数,统计行数等等操作,重复生成流显然是耗费资源的。这种情况可以考虑使用这种一个流进行多个终端操作的实现。

当然,具体到具体问题优化,建议认真分析两者的资源消耗。这是比较稳妥的做法。




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

SakuraTears wechat
扫一扫关注我的公众号
您的支持就是我创作的动力!
0%