前言
Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到一个处理结果。当流进行终端操作后,如果你在试图遍历它,就会出现异常。
1 | java.lang.IllegalStateException: stream has already been operated upon or closed |
虽然流就是如此设计的,但是我们有时候就希望可以通过流获取多个结果。或者说,你希望一次性向流中传入多个Lambda表达式。 为了达到这一目标,我们应该需要一个fork类型的方法,对每个复制的流应用不同的函数。理想情况下,这些操作也应该支持并行去拿到运算结果。
这一特性在Java 8中是没有的,不过我们可以利用一个通用API,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现这一特性。
正文
复制流
要达到此效果,我们首先应该创建一个StreamForker,它会对原始的流进行封装,在此基础上在执行各种操作。我们来看下代码。
1 | public class StreamForker<T> { |
fork方法接受两个参数。
Function:对流进行处理,转变成这些操作结果的类型。
key: 通过它拿到结果,这些结果被放到内部的一个Map中。
fork方法需要返回自身,这样可以复制多个操作构成流水线。
如图:
上图不难理解。
而由fork方法添加的操作如何执行呢,就是通过getResults方法的调用触发,该方法返回一个Results接口的实现。接口定义如下:
1 | public interface Results { |
实现Results接口
我们使用ForkingStreamConsumer实现Results接口。
1 | public Results getResults(){ |
ForkingStreamConsumer同时实现了Results和Consumer接口。其主要任务就是来处理流元素,将他们分发到多个BlockingQuenes中处理,BlockingQuenes的数量和通过fork方法提交的操作数是一致的。这里的getResults的实现,流应该是顺序处理的,否则,forEach后元素的顺序就会变化。finish方法用来表明队列中没有更多要处理的元素了。build方法主要用于创建ForkingStreamConsumer。代码如下:
1 | private ForkingStreamConsumer<T> build(){ |
可以看到,我们先创建了BlockingQuenes列表。接着创建了一个Map,Map的键就是用来标识不同操作的键,值包含着Future里。最终BlockingQuenes和Map会被传递给ForkingStreamConsumer的构造函数。每个Future通过关键方法getOperationResult创建。
来看看getOperationResult的实现。
1 | private Future<?> getOperationResult(List<BlockingQueue<T>> queues,Function<Stream<T>,?> f){ |
该方法创建一个新的BlockingQuene,并将其添加到队列列表。队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator。然后我们创建一个顺序流对Spliterator进行遍历,最终创建一个Future收集结果。
开发ForkingStreamConsumer
代码如下:
1 | public class ForkingStreamConsumer<T> implements Consumer<T>,Results { |
这个类同时实现了Consumer接口和Results接口。
Consumer接口要求实现accept方法,每当ForkingStreamConsumer接受流中的一个元素,它就会将元素添加到所有BlockingQuenes中当所有元素都添加到所有队列后,finish方法将最后一个元素添加到所有队列。处理时碰上这个元素表明后面没有元素要处理了。
Results接口需要实现get方法。一旦处理结束,get方法会获取Map中由键索引的Future,解析到结果后返回。
每有一个操作,就会对应一个BlockingQueueSpliterator。我们来看下BlockingQueueSpliterator的实现。
开发BlockingQueueSpliterator
1 | public class BlockingQueueSpliterator<T> implements Spliterator<T> { |
可以看到该Spliterator未定义任何切割流的策略,仅仅利用了流的延迟绑定能力。也没有实现trySplit方法。由于我们的操作数是不确定的,故estimateSize不能提供任何有意义的数字,返回0.也没有体现Spliterator的特性,故characteristics返回0.
仅仅实现了tryAdvance方法,它从BlockingQueue中取得原始流元素,进一步传给Consumer对象。当返回true时表明还有元素要处理,直到发现最后一个元素时终止。
以上基本上是在一个流上执行多种操作的代码。
我们下面来检测一下正确性。
测试
编写测试类。如下数据:
1 | public static void main(String[] args) { |
输出结果:
可以看到,使用了一个流,通过我们实现的方法进行了多次终端操作返回正确结果。
性能问题
这是我们用一个流实现多种终端操作的方式,当然这并不意味着会比普通的写法效率高,如果对于上述问题,我们可以分个构建若干个流进行一一实现。
这种一个流进行多个终端操作的情况使用,一定是生成流比较耗费资源性能时才会用到,比如操作一个较大文件时生成的字符流,我们想统计字数,检查某些单词出现的次数,统计行数等等操作,重复生成流显然是耗费资源的。这种情况可以考虑使用这种一个流进行多个终端操作的实现。
当然,具体到具体问题优化,建议认真分析两者的资源消耗。这是比较稳妥的做法。