前言
Java 8中,流有一个非常大的局限性,使用时,对它操作一次仅能得到一个处理结果。当流进行终端操作后,如果你在试图遍历它,就会出现异常。
1
   | java.lang.IllegalStateException: stream has already been operated upon or closed
   | 
虽然流就是如此设计的,但是我们有时候就希望可以通过流获取多个结果。或者说,你希望一次性向流中传入多个Lambda表达式。 为了达到这一目标,我们应该需要一个fork类型的方法,对每个复制的流应用不同的函数。理想情况下,这些操作也应该支持并行去拿到运算结果。
这一特性在Java 8中是没有的,不过我们可以利用一个通用API,即Spliterator,尤其是它的延迟绑定能力,结合BlockingQueues和Futures来实现这一特性。
正文
复制流
要达到此效果,我们首先应该创建一个StreamForker,它会对原始的流进行封装,在此基础上在执行各种操作。我们来看下代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
   | public class StreamForker<T> { 	private final Stream<T> stream; 	private final Map<Object, Function<Stream<T>,?>> forks=new HashMap<>(); 	public StreamForker(Stream<T> stream) { 		this.stream = stream; 	} 	public StreamForker<T> fork(Object key,Function<Stream<T>,?> f){ 		 		forks.put(key,f); 		 		return this; 	} 	public Results getResults(){ 		 	} }
  | 
fork方法接受两个参数。
**Function:**对流进行处理,转变成这些操作结果的类型。
key: 通过它拿到结果,这些结果被放到内部的一个Map中。
fork方法需要返回自身,这样可以复制多个操作构成流水线。
如图:
![upload successful]()
上图不难理解。
而由fork方法添加的操作如何执行呢,就是通过getResults方法的调用触发,该方法返回一个Results接口的实现。接口定义如下:
1 2 3
   | public interface Results { 	public <R> R get(Object key); }
  | 
实现Results接口
我们使用ForkingStreamConsumer实现Results接口。
1 2 3 4 5 6 7 8 9
   | public Results getResults(){ 	ForkingStreamConsumer<T> consumer=build(); 	try{ 		stream.sequential().forEach(consumer); 	}finally { 		consumer.finish(); 	} 	return consumer; }
  | 
ForkingStreamConsumer同时实现了Results和Consumer接口。其主要任务就是来处理流元素,将他们分发到多个BlockingQuenes中处理,BlockingQuenes的数量和通过fork方法提交的操作数是一致的。这里的getResults的实现,流应该是顺序处理的,否则,forEach后元素的顺序就会变化。finish方法用来表明队列中没有更多要处理的元素了。build方法主要用于创建ForkingStreamConsumer。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | private ForkingStreamConsumer<T> build(){ 	 	List<BlockingQueue<T>> queues=new ArrayList<>(); 	 	Map<Object,Future<?>> actions=    		forks.entrySet().stream().reduce( 			new HashMap<Object,Future<?>>(), 			(map,e)->{ 				map.put(e.getKey(),getOperationResult(queues,e.getValue())); 				return map; 			}, 			(m1,m2)->{ 				m1.putAll(m2); 				return m1; 			} 		); 	return new ForkingStreamConsumer<>(queues,actions); }
  | 
可以看到,我们先创建了BlockingQuenes列表。接着创建了一个Map,Map的键就是用来标识不同操作的键,值包含着Future里。最终BlockingQuenes和Map会被传递给ForkingStreamConsumer的构造函数。每个Future通过关键方法getOperationResult创建。
来看看getOperationResult的实现。
1 2 3 4 5 6 7 8 9 10 11
   | private Future<?> getOperationResult(List<BlockingQueue<T>> queues,Function<Stream<T>,?> f){ 	 	BlockingQueue<T> queue=new LinkedBlockingDeque<>(); 	queues.add(queue); 	 	Spliterator<T> spliterator=new BlockingQueueSpliterator<>(queue); 	 	Stream<T> source= StreamSupport.stream(spliterator,false); 	 	return CompletableFuture.supplyAsync(()->f.apply(source)); }
  | 
该方法创建一个新的BlockingQuene,并将其添加到队列列表。队列会被传递给一个新的BlockingQueueSpliterator对象,后者是一个延迟绑定的Spliterator。然后我们创建一个顺序流对Spliterator进行遍历,最终创建一个Future收集结果。
开发ForkingStreamConsumer
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | public class ForkingStreamConsumer<T> implements Consumer<T>,Results { 	public static final Object END_OF_STREAM=new Object(); 	private final List<BlockingQueue<T>> queues; 	private final Map<Object, Future<?>> actions; 	public ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) { 		this.queues = queues; 		this.actions = actions; 	} 	@Override 	public <R> R get(Object key) { 		try { 			return ((Future<R>)actions.get(key)).get(); 		}catch(Exception e){ 			throw new RuntimeException(e); 		} 	} 	@Override 	public void accept(T t) { 		 		queues.forEach(q->q.add(t)); 	} 	void finish(){ 		 		accept((T)END_OF_STREAM); 	} }
  | 
这个类同时实现了Consumer接口和Results接口。
Consumer接口要求实现accept方法,每当ForkingStreamConsumer接受流中的一个元素,它就会将元素添加到所有BlockingQuenes中当所有元素都添加到所有队列后,finish方法将最后一个元素添加到所有队列。处理时碰上这个元素表明后面没有元素要处理了。
Results接口需要实现get方法。一旦处理结束,get方法会获取Map中由键索引的Future,解析到结果后返回。
每有一个操作,就会对应一个BlockingQueueSpliterator。我们来看下BlockingQueueSpliterator的实现。
开发BlockingQueueSpliterator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | public class BlockingQueueSpliterator<T> implements Spliterator<T> { 	private final BlockingQueue<T> q; 	public BlockingQueueSpliterator(BlockingQueue<T> q) { 		this.q = q; 	} 	@Override 	public boolean tryAdvance(Consumer<? super T> action) { 		T t; 		while (true){ 			try { 				t=q.take(); 				break; 			}catch(InterruptedException e){ 			} 		} 		if(t!=ForkingStreamConsumer.END_OF_STREAM){ 			action.accept(t); 			return true; 		} 		return false; 	} 	@Override 	public Spliterator<T> trySplit() { 		return null; 	} 	@Override 	public long estimateSize() { 		return 0; 	} 	@Override 	public int characteristics() { 		return 0; 	} }
  | 
可以看到该Spliterator未定义任何切割流的策略,仅仅利用了流的延迟绑定能力。也没有实现trySplit方法。由于我们的操作数是不确定的,故estimateSize不能提供任何有意义的数字,返回0.也没有体现Spliterator的特性,故characteristics返回0.
仅仅实现了tryAdvance方法,它从BlockingQueue中取得原始流元素,进一步传给Consumer对象。当返回true时表明还有元素要处理,直到发现最后一个元素时终止。
以上基本上是在一个流上执行多种操作的代码。
我们下面来检测一下正确性。
测试
编写测试类。如下数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | public static void main(String[] args) { 	 	List<Integer> list1=IntStream.rangeClosed(1,1000).filter(n->n%2==0).boxed().collect(Collectors.toList()); 	List<Integer> list2=IntStream.rangeClosed(1,1000).filter(n->n%5==0).boxed().collect(Collectors.toList()); 	 	Results results=new StreamForker<Integer>(list1.stream()) 		.fork("sum",s->s.mapToInt(Integer::intValue).sum()) 		.fork("count",s->s.count()) 		.fork("list3",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).collect(Collectors.toList())) 		.fork("max",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).max(Comparator.naturalOrder())) 		.fork("min",s->s.flatMap(i->list2.stream().filter(j->i.equals(j))).min(Comparator.naturalOrder())) 		.getResults(); 	System.out.println("sum="+results.get("sum")); 	System.out.println("count="+results.get("count")); 	System.out.println("max="+((Optional) results.get("max")).get()); 	System.out.println("min="+((Optional)results.get("min")).get());     ((List<Integer>)results.get("list3")).stream().forEach(System.out::println); }
  | 
输出结果:
![upload successful]()
可以看到,使用了一个流,通过我们实现的方法进行了多次终端操作返回正确结果。
性能问题
这是我们用一个流实现多种终端操作的方式,当然这并不意味着会比普通的写法效率高,如果对于上述问题,我们可以分个构建若干个流进行一一实现。
这种一个流进行多个终端操作的情况使用,一定是生成流比较耗费资源性能时才会用到,比如操作一个较大文件时生成的字符流,我们想统计字数,检查某些单词出现的次数,统计行数等等操作,重复生成流显然是耗费资源的。这种情况可以考虑使用这种一个流进行多个终端操作的实现。
当然,具体到具体问题优化,建议认真分析两者的资源消耗。这是比较稳妥的做法。