Java 8中,流有一个非常大的(也可能是最大的)局限性,使用时,对它操作一次仅能得到一个处理结果。实际操作中,如果你试图多次遍历同一个流,结果只有一个,那就是遭遇下面这样的异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
虽然流的设计就是如此,但我们在处理流时经常希望能同时获取多个结果。譬如,你可能会用一个流来解析日志文件,就像我们在5.7.3节中所做的那样,而不是在某个单一步骤中收集多个数据。或者,你想要维持菜单的数据模型,就像我们第4章到第6章用于解释流特性的那个例子,你希望在遍历由“佳肴”构成的流时收集多种信息。
换句话说,你希望一次性向流中传递多个Lambda表达式。为了达到这一目标,你需要一个fork
类型的方法,对每个复制的流应用不同的函数。更理想的情况是你能以并发的方式执行这些操作,用不同的线程执行各自的运算得到对应的结果。
不幸的是,这些特性目前还没有在Java 8的流实现中提供。不过,本附录会为你展示一种方法,利用一个通用API1,即Spliterator
,尤其是它的延迟绑定能力,结合BlockingQueues
和Futures
来实现这一大有裨益的特性。
1本附录接下来介绍的实现基于Paul Sandoz向lambda-dev邮件列表http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html提供的解决方案。
C.1 复制流
要达到在一个流上并发地执行多个操作的效果,你需要做的第一件事就是创建一个StreamForker
,这个StreamForker
会对原始的流进行封装,在此基础之上你可以继续定义你希望执行的各种操作。我们看看下面这段代码。
代码清单C-1 定义一个StreamForker
,在一个流上执行多个操作
public class StreamForker<T> { private final Stream<T> stream; private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>; public StreamForker(Stream<T> stream) { this.stream = stream; } public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) { forks.put(key, f); ←─使用一个键对流上的函数进行索引 return this; ←─返回this 从而保证多次流畅地调用fork方法 } public Results getResults { // To be implemented }}
这里的fork
方法接受两个参数。
Function
参数,它对流进行处理,将流转变为代表这些操作结果的任何类型。key
参数,通过它你可以取得操作的结果,并将这些键/函数对累积到一个内部的Map
中。
fork
方法返回StreamForker
自身,因此,你可以通过复制多个操作构造一个流水线。图C-1展示了StreamForker
背后的主要思想。
图 C-1 StreamForker
详解
这里用户定义了希望在流上执行的三种操作,这三种操作通过三个键索引标识。StreamForker
会遍历原始的流,并创建它的三个副本。这时就可以并行地在复制的流上执行这三种操作,这些函数运行的结果由对应的键进行索引,最终会填入到结果的Map
。
所有由fork
方法添加的操作的执行都是通过getResults
方法的调用触发的,该方法返回一个Results
接口的实现,具体的定义如下:
public static interface Results { public <R> R get(Object key);}
这一接口只有一个方法,你可以将fork
方法中使用的key
对象作为参数传入,方法会返回该键对应的操作结果。
C.1.1 使用ForkingStreamConsumer
实现Results
接口
你可以用下面的方式实现getResults
方法:
public Results getResults { ForkingStreamConsumer<T> consumer = build; try { stream.sequential.forEach(consumer); } finally { consumer.finish; } return consumer;}
ForkingStreamConsumer
同时实现了前面定义的Results
接口和Consumer
接口。随着我们进一步剖析它的实现细节,你会看到它主要的任务就是处理流中的元素,将它们分发到多个BlockingQueues
中处理,BlockingQueues
的数量和通过fork
方法提交的操作数是一致的。注意,我们很明确地知道流是顺序处理的,不过,如果你在一个并发流上执行forEach
方法,它的元素可能就不是顺序地被插入到队列中了。finish
方法会在队列的末尾插入特殊元素表明该队列已经没有更多需要处理的元素了。build
方法主要用于创建ForkingStreamConsumer
,详细内容请参考下面的代码清单。
代码清单C-2 使用build
方法创建ForkingStreamConsumer
private ForkingStreamConsumer<T> build { List<BlockingQueue<T>> queues = new ArrayList<>; ←─创建由队列组成的列表,每一个队列对应一个操作 Map<Object, Future<?>> actions = ←─建立用于标识操作的键与包含操作结果的Future之间的映射关系 forks.entrySet.stream.reduce( new HashMap<Object, Future<?>>, (map, e) -> { map.put(e.getKey, getOperationResult(queues, e.getValue)); return map; }, (m1, m2) -> { m1.putAll(m2); return m1; }); return new ForkingStreamConsumer<>(queues, actions);}
代码清单C-2中,你首先创建了我们前面提到的由BlockingQueues
组成的列表。紧接着,你创建了一个Map
,Map
的键就是你在流中用于标识不同操作的键,值包含在Future
中,Future
中包含了这些操作对应的处理结果。BlockingQueues
的列表和Future
组成的Map
会被传递给ForkingStreamConsumer
的构造函数。每个Future
都是通过getOperationResult
方法创建的,代码清单如下。
代码清单C-3 使用getOperationResult
方法创建Future
private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) { BlockingQueue<T> queue = new LinkedBlockingQueue<>; queues.add(queue); ←─创建一个队列,并将其添加到队列的列表中 Spliterator<T> spliterator =new BlockingQueueSpliterator<>(queue); ←─创建一个Spliterator,遍历队列中的元素 Stream<T> source = StreamSupport.stream(spliterator, false); ←─创建一个流,将Spliterator作为数据源 return CompletableFuture.supplyAsync( -> f.apply(source) ); ←─创建一个Future对象,以异步方式计算在流上执行特定函数的结果}
getOperationResult
方法会创建一个新的BlockingQueue
,并将其添加到队列的列表。这个队列会被传递给一个新的BlockingQueueSpliterator
对象,后者是一个延迟绑定的Spliterator
,它会遍历读取队列中的每个元素;我们很快会看到这是如何做到的。
接下来你创建了一个顺序流对该Spliterator
进行遍历,最终你会创建一个Future
在流上执行某个你希望的操作并收集其结果。这里的Future
使用CompletableFuture
类的一个静态工厂方法创建,CompletableFuture
实现了Future
接口。这是Java 8新引入的一个类,我们在第11章对它进行过详细的介绍。
C.1.2 开发ForkingStreamConsumer
和BlockingQueueSpliterator
还有两个非常重要的部分你需要实现,分别是前面提到过的ForkingStreamConsumer
类和BlockingQueueSpliterator
类。你可以用下面的方式实现前者。
代码清单C-4 实现ForkingStreamConsumer
类,为其添加处理多个队列的流元素
static class ForkingStreamConsumer<T> implements Consumer<T>, Results { static final Object END_OF_STREAM = new Object; private final List<BlockingQueue<T>> queues; private final Map<Object, Future<?>> actions; ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); ←─将流中遍历的元素添加到所有的队列中 } void finish { accept((T) END_OF_STREAM); ←─将最后一个元素添加到队列中,表明该流已经结束 } @Override public <R> R get(Object key) { try { return ((Future<R>) actions.get(key)).get; ←─等待Future完成相关的计算,返回由特定键标识的处理结果 } catch (Exception e) { throw new RuntimeException(e); } }}
这个类同时实现了Consumer
和Results
接口,并持有两个引用,一个指向由BlockingQueues
组成的列表,另一个是执行了由Future
构成的Map
结构,它们表示的是即将在流上执行的各种操作。
Consumer
接口要求实现accept
方法。这里,每当ForkingStreamConsumer
接受流中的一个元素,它就会将该元素添加到所有的BlockingQueues
中。另外,当原始流中的所有元素都添加到所有队列后,finish
方法会将最后一个元素添加所有队列。BlockingQueueSpliterators
碰到最后这个元素时会知道队列中不再有需要处理的元素了。
Results
接口需要实现get
方法。一旦处理结束,get
方法会获得Map
中由键索引的Future
,解析处理的结果并返回。
最后,流上要进行的每个操作都会对应一个BlockingQueueSpliterator
。每个BlockingQueueSpliterator
都持有一个指向BlockingQueues
的引用,这个BlockingQueues
是由ForkingStreamConsumer
生成的,你可以用下面这段代码清单类似的方法实现一个BlockingQueueSpliterator
。
代码清单C-5 一个遍历BlockingQueue
并读取其中元素的Spliterator
class BlockingQueueSpliterator<T> implements Spliterator<T> { private final BlockingQueue<T> q; BlockingQueueSpliterator(BlockingQueue<T> q) { this.q = q; } @Override public boolean tryAdvance(Consumer<? super T> action) { T t; while (true) { try { t = q.take; break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator<T> trySplit { return null; } @Override public long estimateSize { return 0; } @Override public int characteristics { return 0; }}
这段代码实现了一个Spliterator
,不过它并未定义如何切分流的策略,仅仅利用了流的延迟绑定能力。由于这个原因,它也没有实现trySplit
方法。
由于无法预测能从队列中取得多少个元素,所以estimatedSize
方法也无法返回任何有意义的值。更进一步,由于你没有试图进行任何切分,所以这时的估算也没什么用处。
这一实现并没有体现表7-2中列出的Spliterator
的任何特性,因此characteristic
方法返回0
。
这段代码中提供了实现的唯一方法是tryAdvance
,它从BlockingQueue
中取得原始流中的元素,而这些元素最初由ForkingSteamConsumer
添加。依据getOperationResult
方法创建Spliterator
同样的方式,这些元素会被作为进一步处理流的源头传递给Consumer
对象(在流上要执行的函数会作为参数传递给某个fork
方法调用)。tryAdvance
方法返回true
通知调用方还有其他的元素需要处理,直到它发现由ForkingSteamConsumer
添加的特殊对象,表明队列中已经没有更多需要处理的元素了。图C-2展示了StreamForker
及其构建模块的概述。
图 C-2 StreamForker
及其合作的构造块
这幅图中,左上角的StreamForker
中包含一个Map
结构,以方法的形式定义了流上要执行的操作,这些方法分别由对应的键索引。右边的ForkingStreamConsumer
为每一种操作的对象维护了一个队列,原始流中的所有元素会被分发到这些队列中。
图的下半部分,每一个队列都有一个BlockingQueueSpliterator
从队列中提取元素作为各个流处理的源头。最后,由原始流复制创建的每个流,都会被作为参数传递给某个处理函数,执行对应的操作。至此,你已经实现了 StreamForker
所有组件,可以开始工作了。
C.1.3 将StreamForker
运用于实战
我们将StreamForker
应用到第4章中定义的menu
数据模型上,希望对它进行一些处理。通过复制原始的菜肴(dish)流,我们想以并发的方式执行四种不同的操作,代码清单如下所示。这尤其适用于以下情况:你想要生成一份由逗号分隔的菜肴名列表,计算菜单的总热量,找出热量最高的菜肴,并按照菜的类型对这些菜进行分类。
代码清单C-6 将StreamForker
运用于实战
Stream<Dish> menuStream = menu.stream;StreamForker.Results results = new StreamForker<Dish>(menuStream) .fork("shortMenu", s -> s.map(Dish::getName) .collect(joining(", "))) .fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum) .fork("mostCaloricDish", s -> s.collect(reducing( (d1, d2) -> d1.getCalories > d2.getCalories ? d1 : d2)) .get) .fork("dishesByType", s -> s.collect(groupingBy(Dish::getType))) .getResults;String shortMenu = results.get("shortMenu");int totalCalories = results.get("totalCalories");Dish mostCaloricDish = results.get("mostCaloricDish");Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");System.out.println("Short menu: " + shortMenu);System.out.println("Total calories: " + totalCalories);System.out.println("Most caloric dish: " + mostCaloricDish);System.out.println("Dishes by type: " + dishesByType);
StreamForker
提供了一种使用简便、结构流畅的API,它能够复制流,并对每个复制的流施加不同的操作。这些应用在流上以函数的形式表示,可以用任何对象的方式标识,在这个例子里,我们选择使用String
的方式。如果你没有更多的流需要添加,可以调用StreamForker
的getResults
方法,触发所有定义的操作开始执行,并取得StreamForker.Results
。由于这些操作的内部实现就是异步的,getResults
方法调用后会立刻返回,不会等待所有的操作完成,拿到所有的执行结果才返回。
你可以通过向StreamForker.Results
接口传递标识特定操作的键取得某个操作的结果。如果该时刻操作已经完成,get
方法会返回对应的结果;否则,该方法会阻塞,直到计算结束,取得对应的操作结果。
正如我们所预期的,这段代码会产生下面这些输出:
Short menu: pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmonTotal calories: 4300Most caloric dish: porkDishes by type: {OTHER=[french fries, rice, season fruit, pizza], MEAT=[pork, beef, chicken], FISH=[prawns, salmon]}
C.2 性能的考量
提起性能,你不应该想当然地认为这种方法比多次遍历流的方式更加高效。如果构成流的数据都保存在内存中,阻塞式队列所引发的开销很容易就抵消了由并发执行操作所带来的性能提升。
与此相反,如果操作涉及大量的I/O,譬如流的源头是一个巨型文件,那么单次访问流可能是个不错的选择;因此(大多数情况下)优化应用性能唯一有意义的规则是“好好地度量它”。
通过这个例子,我们展示了怎样一次性地在同一个流上执行多个操作。更重要地是,我们相信这个例子也证明了一点,即使某个特性原生的Java API暂时还不支持,充分利用Lambda表达式的灵活性和一点点的创意,整合现有的功能,你完全可以实现想要的新特性。