本章内容
用并行流并行处理数据
并行流的性能分析
分支/合并框架
使用
Spliterator
分割流
在前面三章中,我们已经看到了新的Stream
接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内部迭代能够让原生Java库控制流元素的处理。这种方法让Java程序员无需显式实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。
例如,在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。我们会在7.2节探讨这一框架。
在本章中,你将了解Stream
接口如何让你不用太费力气就能对数据集执行并行操作。它允许你声明性地将顺序流变为并行流。此外,你将看到Java是如何变戏法的,或者更实际地来说,流是如何在幕后应用Java 7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的很重要,因为如果你忽视这一方面,就可能因误用而得到意外的(很可能是错的)结果。
我们会特别演示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰是这些错误且无法解释的结果的根源。因此,你将会学习如何通过实现和使用你自己的Spliterator
来控制这个划分过程。
7.1 并行流
在第4章中,我们简要地提到了Stream
接口可以让你非常方便地处理它的元素:可以通过对收集源调用parallelStream
方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。让我们用一个简单的例子来试验一下这个思想。
假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的BinaryOperator
来归约这个流,如下所示:
public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) ←─生成自然数无限流 .limit(n) ←─限制到前n个数 .reduce(0L, Long::sum); ←─对所有数字求和来归纳流}
用更为传统的Java术语来说,这段代码与下面的迭代等价:
public static long iterativeSum(long n) { long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result;}
这似乎是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?
根本用不着担心啦。用并行流的话,这问题就简单多了!
7.1.1 将顺序流转换为并行流
你可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用parallel
方法:
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel ←─将流转换为并行流 .reduce(0L, Long::sum);}
在上面的代码中,对流中所有数字求和的归纳过程的执行方式和5.4.1节中说的差不多。不同之处在于Stream
在内部分成了几块。因此可以对不同的块独立并行进行归纳操作,如图7-1所示。最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。
图 7-1 并行归纳操作
请注意,在现实中,对顺序流调用parallel
方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean
标志,表示你想让调用parallel
之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential
方法就可以把它变成顺序流。请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:
stream.parallel .filter(...) .sequential .map(...) .parallel .reduce;
但最后一次parallel
或sequential
调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。
配置并行流使用的线程池
看看流的
parallel
方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?并行流内部使用了默认的
ForkJoinPool
(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime.availableProcessors
得到的。但是你可以通过系统属性
java.util.concurrent.ForkJoinPool.common. parallelism
来改变线程池大小,如下所示:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让
ForkJoinPool
的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。
回到我们的数字求和练习,我们说过,在多核处理器上运行并行版本时,会有显著的性能提升。现在你有三个方法,用三种不同的方式(迭代式、顺序归纳和并行归纳)做完全相同的操作,让我们看看谁最快吧!
7.1.2 测量流性能
我们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。为此,你可以开发一个方法,它与6.6.2节中用于比较划分质数的两个收集器性能的测试框架非常类似,如下所示。
代码清单7-1 测量对前 n 个自然数求和的函数的性能
public long measureSumPerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime; long sum = adder.apply(n); long duration = (System.nanoTime - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) fastest = duration; } return fastest;}
这个方法接受一个函数和一个long
作为参数。它会对传给方法的long
应用函数10次,记录每次执行的时间(以毫秒为单位),并返回最短的一次执行时间。假设你把先前开发的所有方法都放进了一个名为ParallelStreams
的类,你就可以用这个框架来测试顺序加法器函数对前一千万个自然数求和要用多久:
System.out.println("Sequential sum done in:" + measureSumPerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
请注意,我们对这个结果应持保留态度。影响执行时间的因素有很多,比如你的电脑支持多少个内核。你可以在自己的机器上跑一下这些代码。我们在一台四核英特尔i7 2.3 GHz的MacBook Pro上运行它,输出是这样的:
Sequential sum done in: 97 msecs
用传统for
循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。如果你试着测量它的性能,
System.out.println("Iterative sum done in:" + measureSumPerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
将得到:
Iterative sum done in: 2 msecs
现在我们来对函数的并行版本做测试:
System.out.println("Parallel sum done in: " + measureSumPerf(ParallelStreams::parallelSum, 10_000_000) + " msecs" );
看看会出现什么情况:
Parallel sum done in: 164 msecs
这相当令人失望,求和方法的并行版本比顺序版本要慢很多。你如何解释这个意外的结果呢?这里实际上有两个问题:
iterate
生成的是装箱的对象,必须拆箱成数字才能求和;我们很难把
iterate
分成多个独立块来并行执行。
第二个问题更有意思一点,因为你必须意识到某些流操作比其他操作更容易并行化。具体来说,iterate
很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,如图7-2所示。
图 7-2 iterate
在本质上是顺序的
这意味着,在这个特定情况下,归纳进程不是像图7-1那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如iterate
),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的parallel
操作时,了解背后到底发生了什么是很有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?我们在第5章中讨论了一个叫LongStream.rangeClosed
的方法。这个方法与iterate
相比有两个优点。
LongStream.rangeClosed
直接产生原始类型的long
数字,没有装箱拆箱的开销。LongStream.rangeClosed
会生成数字范围,很容易拆分为独立的小块。例如,范围1~20可分为1~5、6~10、11~15和16~20。
让我们先看一下它用于顺序流时的性能如何,看看拆箱的开销到底要不要紧:
public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum);}
这一次的输出是:
Ranged sum done in: 17 msecs
这个数值流比前面那个用iterate
工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对性流那些没必要的自动装箱和拆箱操作。由此可见,选择适当的数据结构往往比并行化算法更重要。但要是对这个新版本应用并行流呢?
public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel .reduce(0L, Long::sum);}
现在把这个函数传给你的测试方法:
System.out.println("Parallel range sum done in:" + measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs");
你会得到:
Parallel range sum done in: 1 msecs
终于,我们得到了一个比顺序执行更快的并行归纳,因为这一次归纳操作可以像图7-1那样执行了。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。
尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。然而,在使用并行Stream
加速代码之前,你必须确保用得对;如果结果错了,算得快就毫无意义了。让我们来看一个常见的陷阱。
7.1.3 正确使用并行流
错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。下面是另一种实现对前 n 个自然数求和的方法,但这会改变一个共享累加器:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator; LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total;}public class Accumulator { public long total = 0; public void add(long value) { total += value; }}
这种代码非常普遍,特别是对那些熟悉指令式编程范式的程序员来说。这段代码和你习惯的那种指令式迭代数字列表的方式很像:初始化一个累加器,一个个遍历列表中的元素,把它们和累加器相加。
那这种代码又有什么问题呢?不幸的是,它真的无可救药,因为它在本质上就是顺序的。每次访问total
都会出现数据竞争。如果你尝试用同步来修复,那就完全失去并行的意义了。为了说明这一点,让我们试着把Stream
变成并行的:
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator; LongStream.rangeClosed(1, n).parallel.forEach(accumulator::add); return accumulator.total;}
用代码清单7-1中的测试框架来执行这个方法,并打印每次执行的结果:
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );
你可能会得到类似于下面这种输出:
Result: 5959989000692Result: 7425264100768Result: 6827235020033Result: 7192970417739Result: 6714157975331Result: 7497810541907Result: 6435348440385Result: 6999349840672Result: 7435914379978Result: 7715125932481SideEffect parallel sum done in: 49 msecs
这回方法的性能无关紧要了,唯一要紧的是每次执行都会返回不同的结果,都离正确值50000005000000
差很远。这是由于多个线程在同时访问累加器,执行total += value
,而这一句虽然看似简单,却不是一个原子操作。问题的根源在于,forEach
中调用的方法有副作用,它会改变多个线程共享的对象的可变状态。要是你想用并行Stream
又不想引发类似的意外,就必须避免这种情况。
现在你知道了,共享可变状态会影响并行流以及并行计算。第13章和第14章详细讨论函数式编程的时候,我们还会谈到这一点。现在,记住要避免共享可变状态,确保并行Stream
得到正确的结果。接下来,我们会看到一些实用建议,你可以由此判断什么时候可以利用并行流来提升性能。
7.1.4 高效使用并行流
一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。
如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(
IntStream
、LongStream
、DoubleStream
)来避免这种操作,但凡有可能都应该用这些流。有些操作本身在并行流上的性能就比顺序流差。特别是
limit
和findFirst
等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny
会比findFirst
性能好,因为它不一定要按顺序来执行。你总是可以调用unordered
方法来把有序流变成无序流。那么,如果你需要流中的n 个元素而不是专门要前n 个的话,对无序并行流调用limit
可能会比单个有序流(比如数据源是一个List
)更高效。还要考虑流的操作流水线的总计算成本。设 N 是要处理的元素的总数,Q 是一个元素通过流水线的大致处理成本,则 N*Q 就是这个对成本的一个粗略的定性估计。Q 值较高就意味着使用并行流时性能好的可能性比较大。
对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
要考虑流背后的数据结构是否易于分解。例如,
ArrayList
的拆分效率比LinkedList
高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range
工厂方法创建的原始类型流也可以快速分解。最后,你将在7.3节中学到,你可以自己实现Spliterator
来完全掌控分解过程。流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个
SIZED
流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。还要考虑终端操作中合并步骤的代价是大是小(例如
Collector
中的combiner
方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
表7-1按照可分解性总结了一些流数据源适不适于并行。
表7-1 流的数据源和可分解性
源
可分解性
ArrayList
极佳
LinkedList
差
IntStream.range
极佳
Stream.iterate
差
HashSet
好
TreeSet
好
最后,我们还要强调并行流背后使用的基础架构是Java 7中引入的分支/合并框架。并行汇总的示例证明了要想正确使用并行流,了解它的内部原理至关重要,所以我们会在下一节仔细研究分支/合并框架。
7.2 分支/合并框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService
接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool
)中的工作线程。首先来看看如何定义任务和子任务。
7.2.1 使用RecursiveTask
要把任务提交到这个池,必须创建RecursiveTask<R>
的一个子类,其中R
是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction
类型(当然它可能会更新其他非局部机构)。要定义RecursiveTask
,只需实现它唯一的抽象方法compute
:
protected abstract R compute;
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:
if (任务足够小或不可分) { 顺序计算该任务} else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果}
一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做出这一决定。我们会在7.2.1节中进一步澄清。递归的任务拆分过程如图7-3所示。
图 7-3 分支/合并过程
你可能已经注意到,这只不过是著名的分治算法的并行版本而已。这里举一个用分支/合并框架的实际例子,还以前面的例子为基础,让我们试着用这个框架为一个数字范围(这里用一个long
数组表示)求和。如前所述,你需要先为RecursiveTask
类做一个实现,就是下面代码清单中的ForkJoinSumCalculator
。
代码清单7-2 用分支/合并框架执行并行求和
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> { ←─继承RecursiveTask来创建可以用于分支/合并框架的任务 private final long numbers; ←─要求和的数组 private final int start; ←─子任务处理的数组的起始和终止位置 private final int end; public static final long THRESHOLD = 10_000; ←─不再将任务分解为子任务的数组大小 public ForkJoinSumCalculator(long numbers) { ←─公共构造函数用于创建主任务 this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long numbers, int start, int end) { ←─私有构造函数用于以递归方式为主任务创建子任务 this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute { ←─覆盖RecursiveTask抽象方法 int length = end - start; ←─该任务负责求和的部分的大小 if (length <= THRESHOLD) { return computeSequentially; ←─如果大小小于或等于阈值,顺序计算结果 } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); ←─创建一个子任务来为数组的前一半求和 leftTask.fork; ←─利用另一个ForkJoinPool线程异步执行新创建的子任务 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end); ←─创建一个任务为数组的后一半求和 Long rightResult = rightTask.compute; ←─同步执行第二个子任务,有可能允许进一步递归划分 Long leftResult = leftTask.join; ←─读取第一个子任务的结果,如果尚未完成就等待 return leftResult + rightResult; ←─该任务的结果是两个子任务结果的组合 } private long computeSequentially { ←─在子任务不再可分时计算结果的简单算法 long sum = 0; for (int i = start; i < end; i++) {{ sum += numbers[i]; } return sum; }}
现在编写一个方法来并行对前 n 个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator
的构造函数:
public static long forkJoinSum(long n) { long numbers = LongStream.rangeClosed(1, n).toArray; ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool.invoke(task);}
这里用了一个LongStream
来生成包含前 n 个自然数的数组,然后创建一个ForkJoinTask
(RecursiveTask
的父类),并把数组传递给代码清单7-2所示ForkJoinSumCalculator
的公共构造函数。最后,你创建了一个新的ForkJoinPool
,并把任务传给它的调用方法 。在ForkJoinPool
中执行时,最后一个方法返回的值就是ForkJoinSumCalculator
类定义的任务结果。
请注意在实际应用时,使用多个ForkJoinPool
是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用Runtime.availableProcessors
的返回值来决定线程池使用的线程数。请注意availableProcessors
方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
运行ForkJoinSumCalculator
当把ForkJoinSumCalculator
任务传给ForkJoinPool
时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute
方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator
,而它们也由ForkJoinPool
安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。这一过程如图7-4所示。
图 7-4 分支/合并算法
你可以再用一次本章开始时写的测试框架,来看看显式使用分支/合并框架的求和方法的性能:
System.out.println("ForkJoin sum done in: " + measureSumPerf( ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs" );
它生成以下输出:
ForkJoin sum done in: 41 msecs
这个性能看起来比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个long
,之后才能在ForkJoinSumCalculator
任务中使用它。
7.2.2 使用分支/合并框架的最佳做法
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的最佳做法。
对一个任务调用
join
方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。不应该在
RecursiveTask
内部使用ForkJoinPool
的invoke
方法。相反,你应该始终直接调用compute
或fork
方法,只有顺序代码才应该用invoke
来启动并行计算。对子任务调用
fork
方法可以把它排进ForkJoinPool
。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute
低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并计算上就不行了,因为调用
compute
的线程并不是概念上的调用方,后者是调用fork
的那个。和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)。
对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。我们会在下一节中就此给出一些提示。
7.2.3 工作窃取
在ForkJoinSumCalculator
的例子中,我们决定在要求和的数组中最多包含10 000个项目时就不再创建子任务了。这个选择是很随意的,但大多数情况下也很难找到一个好的启发式方法来确定它,只能试几个不同的值来尝试优化它。在我们的测试案例中,我们先用了一个有1000万项目的数组,意味着ForkJoinSumCalculator
至少会分出1000个子任务来。这似乎有点浪费资源,因为我们用来运行它的机器上只有四个内核。在这个特定例子中可能确实是这样,因为所有的任务都受CPU约束,预计所花的时间也差不多。
但分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或是需要和外部服务协调执行。
分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool
中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。
一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。图7-5展示了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真。
图 7-5 分支/合并框架使用的工作窃取算法
现在你应该清楚流如何使用分支/合并框架来并行处理它的项目了,不过还有一点没有讲。本节中我们分析了一个例子,你明确地指定了将数字数组拆分成多个任务的逻辑。但是,使用本章前面讲的并行流时就用不着这么做了,这就意味着,肯定有一种自动机制来为你拆分流。这种新的自动机制称为Spliterator
,我们会在下一节中讨论。
7.3 Spliterator
Spliterator
是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator
一样,Spliterator
也用于遍历数据源中的元素,但它是为了并行执行而设计的。虽然在实践中可能用不着自己开发Spliterator
,但了解一下它的实现方式会让你对并行流的工作原理有更深入的了解。Java 8已经为集合框架中包含的所有数据结构提供了一个默认的Spliterator
实现。集合实现了Spliterator
接口,接口提供了一个spliterator
方法。这个接口定义了若干方法,如下面的代码清单所示。
代码清单7-3 Spliterator
接口
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit; long estimateSize; int characteristics;}
与往常一样,T
是Spliterator
遍历的元素的类型。tryAdvance
方法的行为类似于普通的Iterator
,因为它会按顺序一个一个使用Spliterator
中的元素,并且如果还有其他元素要遍历就返回true
。但trySplit
是专为Spliterator
接口设计的,因为它可以把一些元素划出去分给第二个Spliterator
(由该方法返回),让它们两个并行处理。Spliterator
还可通过estimateSize
方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点。
重要的是,要了解这个拆分过程在内部是如何执行的,以便在需要时能够掌控它。因此,我们会在下一节中详细地分析它。
7.3.1 拆分过程
将Stream
拆分成多个部分的算法是一个递归过程,如图7-6所示。第一步是对第一个Spliterator
调用trySplit
,生成第二个Spliterator
。第二步对这两个Spliterator
调用trysplit
,这样总共就有了四个Spliterator
。这个框架不断对Spliterator
调用trySplit
直到它返回null
,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator
在调用trySplit
时都返回了null
。
图 7-6 递归拆分过程
这个拆分过程也受Spliterator
本身的特性影响,而特性是通过characteristics
方法声明的。
Spliterator
的特性
Spliterator
接口声明的最后一个抽象方法是characteristics
,它将返回一个int
,代表Spliterator
本身特性集的编码。使用Spliterator
的客户可以用这些特性来更好地控制和优化它的使用。表7-2总结了这些特性。(不幸的是,虽然它们在概念上与收集器的特性有重叠,编码却不一样。)
表7-2 Spliterator
的特性
特性
含义
ORDERED
元素有既定的顺序(例如List
),因此Spliterator
在遍历和划分时也会遵循这一顺序
DISTINCT
对于任意一对遍历过的元素x
和y
,x.equals(y)
返回false
SORTED
遍历的元素按照一个预定义的顺序排序
SIZED
该Spliterator
由一个已知大小的源建立(例如Set
),因此estimatedSize
返回的是准确值
NONNULL
保证遍历的元素不会为null
IMMUTABLE
Spliterator
的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素
CONCURRENT
该Spliterator
的数据源可以被其他线程同时修改而无需同步
SUBSIZED
该Spliterator
和所有从它拆分出来的Spliterator
都是SIZED
现在你已经看到了Spliterator
接口是什么以及它定义了哪些方法,你可以试着自己实现一个Spliterator
了。
7.3.2 实现你自己的Spliterator
让我们来看一个可能需要你自己实现Spliterator
的实际例子。我们要开发一个简单的方法来数数一个String
中的单词数。这个方法的一个迭代版本可以写成下面的样子。
代码清单7-4 一个迭代式字数统计方法
public int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray) { ←─逐个遍历String中的所有字符 if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) counter++; ←─上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一 lastSpace = false; } } return counter;}
让我们把这个方法用在但丁的《神曲》的《地狱篇》的第一句话上:1
1请参阅http://en.wikipedia.org/wiki/Inferno_(Dante)。
final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita ";System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
请注意,我们在句子里添加了一些额外的随机空格,以演示这个迭代实现即使在两个词之间存在多个空格时也能正常工作。正如我们所料,这段代码将打印以下内容:
Found 19 words
理想情况下,你会想要用更为函数式的风格来实现它,因为就像我们前面说过的,这样你就可以用并行Stream
来并行化这个过程,而无需显式地处理线程和同步问题。
1. 以函数式风格重写单词计数器
首先你需要把String
转换成一个流。不幸的是,原始类型的流仅限于int
、long
和double
,所以你只能用Stream<Character>
:
Stream<Character> stream = IntStream.range(0, SENTENCE.length) .mapToObj(SENTENCE::charAt);
你可以对这个流做归约来计算字数。在归约流时,你得保留由两个变量组成的状态:一个int
用来计算到目前为止数过的字数,还有一个boolean
用来记得上一个遇到的Character
是不是空格。因为Java没有元组(tuple,用来表示由异类元素组成的有序列表的结构,不需要包装对象),所以你必须创建一个新类WordCounter
来把这个状态封装起来,如下所示。
代码清单7-5 用来在遍历Character
流时计数的类
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { ←─和迭代算法一样,accumulate 方法一个个遍历Character if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); ←─上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一 } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { ←─合并两个Word-Counter,把其计数器加起来 return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); ←─仅需要计数器的总和,无需关心lastSpace } public int getCounter { return counter; }}
在这个列表中,accumulate
方法定义了如何更改WordCounter
的状态,或更确切地说是用哪个状态来建立新的WordCounter
,因为这个类是不可变的。每次遍历到Stream
中的一个新的Character
时,就会调用accumulate
方法。具体来说,就像代码清单7-4中的countWordsIteratively
方法一样,当上一个字符是空格,新字符不是空格时,计数器就加一。图7-7展示了accumulate
方法遍历到新的Character
时,WordCounter
的状态转换。
调用第二个方法combine
时,会对作用于Character
流的两个不同子部分的两个WordCounter
的部分结果进行汇总,也就是把两个WordCounter
内部的计数器加起来。
图 7-7 遍历到新的Character c
时WordCounter
的状态转换
现在你已经写好了在WordCounter
中累计字符,以及在WordCounter
中把它们结合起来的逻辑,那写一个方法来归约Character
流就很简单了:
private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter;}
现在你就可以试一试这个方法,给它由包含但丁的《神曲》中《地狱篇》第一句的String
创建的流:
Stream<Character> stream = IntStream.range(0, SENTENCE.length) .mapToObj(SENTENCE::charAt);System.out.println("Found " + countWords(stream) + " words");
你可以和迭代版本比较一下输出:
Found 19 words
到现在为止都很好,但我们以函数式实现WordCounter
的主要原因之一就是能轻松地并行处理,让我们来看看具体是如何实现的。
2. 让WordCounter
并行工作
你可以尝试用并行流来加快字数统计,如下所示:
System.out.println("Found " + countWords(stream.parallel) + " words");
不幸的是,这次的输出是:
Found 25 words
显然有什么不对,可到底是哪里不对呢?问题的根源并不难找。因为原始的String
在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。
如何解决这个问题呢?解决方案就是要确保String
不是在随机位置拆开的,而只能在词尾拆开。要做到这一点,你必须为Character
实现一个Spliterator
,它只能在两个词之间拆开String
(如下所示),然后由此创建并行流。
代码清单7-6 WordCounterSpliterator
class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); ←─处理当前字符 return currentChar < string.length; ←─如果还有字符要处理,则返回true } @Override public Spliterator<Character> trySplit { int currentSize = string.length - currentChar; if (currentSize < 10) { return null; ←─返回null表示要解析的String已经足够小,可以顺序处理 } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length; splitPos++) { ←─将试探拆分位置设定为要解析的String的中间 if (Character.isWhitespace(string.charAt(splitPos))) { ←─让拆分位置前进直到下一个空格 Spliterator<Character> spliterator = ←─创建一个新WordCounter-Spliterator来解析String从开始到拆分位置的部分 new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; ←─将这个WordCounter-Spliterator 的起始位置设为拆分位置 return spliterator; } } return null; } @Override public long estimateSize { return string.length - currentChar; } @Override public int characteristics { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; }}
这个Spliterator
由要解析的String
创建,并遍历了其中的Character
,同时保存了当前正在遍历的字符位置。让我们快速回顾一下实现了Spliterator
接口的WordCounterSpliterator
中的各个函数。
tryAdvance
方法把String
中当前位置的Character
传给了Consumer
,并让位置加一。作为参数传递的Consumer
是一个Java内部类,在遍历流时将要处理的Character
传给了一系列要对其执行的函数。这里只有一个归约函数,即WordCounter
类的accumulate
方法。如果新的指针位置小于String
的总长,且还有要遍历的Character
,则tryAdvance
返回true
。trySplit
方法是Spliterator
中最重要的一个方法,因为它定义了拆分要遍历的数据结构的逻辑。就像在代码清单7-1中实现的RecursiveTask
的compute
方法一样(分支/合并框架的使用方式),首先要设定不再进一步拆分的下限。这里用了一个非常低的下限——10个Character
,仅仅是为了保证程序会对那个比较短的String
做几次拆分。在实际应用中,就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的任务。如果剩余的Character
数量低于下限,你就返回null
表示无需进一步拆分。相反,如果你需要执行拆分,就把试探的拆分位置设在要解析的String
块的中间。但我们没有直接使用这个拆分位置,因为要避免把词在中间断开,于是就往前找,直到找到一个空格。一旦找到了适当的拆分位置,就可以创建一个新的Spliterator
来遍历从当前位置到拆分位置的子串;把当前位置this
设为拆分位置,因为之前的部分将由新Spliterator
来处理,最后返回。还需要遍历的元素的
estimatedSize
就是这个Spliterator
解析的String
的总长度和当前遍历的位置的差。最后,
characteristic
方法告诉框架这个Spliterator
是ORDERED
(顺序就是String
中各个Character
的次序)、SIZED
(estimatedSize
方法的返回值是精确的)、SUBSIZED
(trySplit
方法创建的其他Spliterator
也有确切大小)、NONNULL
(String
中不能有为null
的Character
)和IMMUTABLE
(在解析String
时不能再添加Character
,因为String
本身是一个不可变类)的。
3. 运用WordCounterSpliterator
现在就可以用这个新的WordCounterSpliterator
来处理并行流了,如下所示:
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);Stream<Character> stream = StreamSupport.stream(spliterator, true);
传给StreamSupport.stream
工厂方法的第二个布尔参数意味着你想创建一个并行流。把这个并行流传给countWords
方法:
System.out.println("Found " + countWords(stream) + " words");
可以得到意料之中的正确输出:
Found 19 words
你已经看到了Spliterator
如何让你控制拆分数据结构的策略。Spliterator
还有最后一个值得注意的功能,就是可以在第一次遍历、第一次拆分或第一次查询估计大小时绑定元素的数据源,而不是在创建时就绑定。这种情况下,它称为延迟绑定(late-binding)的Spliterator
。我们专门用附录C来展示如何开发一个工具类来利用这个功能在同一个流上执行多个操作。
7.4 小结
在本章中,你了解了以下内容。
内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。
从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎总是比尝试并行化某些操作更为重要。
分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。
Spliterator
定义了并行流如何拆分它要遍历的数据。