Java 8 中引入了 Stream 流,上手使用流对开发人员来说都很简单(大多数场景下,只需要调用 java.util.Collection#stream 方法就可以创建一个流),这是因为流就是一个 DSL (Domain Specific Language,领域特定语言),抛开 Java 的上下文,使用流完成集合的操作,就像编写 SQL 语句一样流畅(fluent),简洁。

JDK 8 中关于 java.util.stream 包的介绍,即关于引入流的一些介绍,包括了对流的认识与使用的注意事项。

流与集合的区别

  1. 流不是一个存储数据的数据结构,流的数据来自于背后绑定的数据源 (data source),可以是一个集合,一个数组,I/O 信道 (I/O Channel), 一个数据生成函数(可以生成一个无限流,Lazy 模式,需要时生成数据),流通过一个计算操作管道 (pipeline of computational operations) 传输数据源中的数据。
  1. 天生的函数式特质,流上的操作生成一个结果,并不修改它背后的数据源. 这句话的意思其实就是因为流不改变作为创建它本身的数据源,无副作用(side affect),符合函数式编程。例如对一个流应用 filter 操作,会生成一个只包含满足指定条件元素的流,并不会删除数据源中的元素。
  1. 懒加载 (Laziness-seeking),许多流操作,如 filtermapdistinct 操作都可以懒惰地实现 (implemented lazily), 给了对这些操作进行优化的机会, 例如流的截断操作 (limit) 和短路操作 (如 findFirst), 使得并不需要遍历流中的所有元素就可以返回结果(流的中间操作 (intermediate) 都是 lazy 的);还有组成流管道的众多操作,可以被作为一个整体,对流中的元素仅应用一次,减少中间状态的使用。
  1. 可能是无界的,Java 中肯定无法包含一个的无限元素的 ArrayList,或者在特定大小的内存下,比如一个所有偶数的列表,但是流可以做到,其实就是因为流是懒加载的,通过生成器函数,在需要时才生成下一个遍历的元素,流的 limitfindFirst 操作可以在一个无限流上进行截断操作有限的数据。
  1. 可消费的 (Consumable). 流中的元素只在流的生命周期中可以访问,和 Iterator 一样,如果需要重新遍历数据源中的元素,需要生成一个新的流。

状态

中间操作可以被进一步分为无状态 (stateless) 和有状态 (stateful) 的操作。无状态的操作,如 filtermap ,处理新元素时不需要保留之前已经处理过元素的任何状态,每一个元素的处理都是独立的。有状态的操作,例如 distinct, sorted, 就需要保留之前已经处理过元素的一些状态。

有状态的操作可能要遍历整个流才能生成一个结果,例如,只有遍历完流的所有元素,才能完成对流的排序操作。在并行计算下,一些包含保留状态的中间操作的流管道可能需要多次处理数据或者需要缓存大量的数据。而仅包含无状态的流管道,可以一次性处理,无论是在串行还是并行情况下,都需要进行很少的数据缓存。

短路操作

一些操作也可以被看作短路操作 short-circuiting,对于中间操作,短路操作意味着可以将一个无限流转换为一个有限流;对于终端操作,短路操作意味着对于一个无限流,可以在有限的时间内结束对流的处理。

并行流

所有的流可以以串行或者并行的方式执行,串行的流就是将显示的外部迭代变为内部迭代,流的并行也就是对数据进行递归划分,使用 ForkJoin 公用池的工作线程来执行。默认创建的流都是串行流,对于集合类,可以使用 Collection.parallelStream() 创建并行流,对于其它流,可以使用 BaseStream.parallel 方法指定该流为并行流,BaseStream.sequential 方法指定为串行流。

除了一些显示标识为结果不确定的一些操作(例如 findAny),流的串行执行和并行执行结果应该保持一致。

大多数流操作都接受描述用户指定行为的参数,通常是 lambda 表达式。为了使得最终的结果正确,这些行为参数应该是无干扰(non-interfering)的,大多数场景下应该是无状态的(stateless)。

Non-interference

流可以让我么你在不同的数据源上可能并行的执行一些聚合操作,甚至包括非线程安全的集合,如 ArrayList。但是只有在六管道执行的时候哦,阻止对数据源的干扰才能做到。流的执行,基本都是通过调用终端操作触发,当终端操作结束后,流的执行也就结束了。对于大多数数据源,保持非干扰性,意味着确保流管道在执行的过程中完全保持不对数据源进行修改。例外就是数据源本身是一个并发集合,本身就被设计为使用在并发场景下。并发的流数据源,它们的 Spliterator 应该具有 CONCURRENT 的特征。

因此,如果流的数据源是不支持并发的, 应用于流管道的行为参数就不应该修改流的数据源。如果行为参数本身或者间接导致了流数据源的修改,则称该行为会干扰非并发数据源的流。这种约束适用于所有的流. 对于行为良好的流的数据源,可以在终端操作开始之前修改数据源,这些修改将反映在涵盖的元素中。

1
2
3
4
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" ")); // 最终 s 的结果为 'one two three'

JDK 集合类返回的流和 JDK 中包含的大多数类返回的流都是 well-behave.

Stateless behaviors

如果行为参数是有状态的, 那么流的计算结果可能是不确定的或者不正确的. 一个有状态的行为指的是它的结果取决于任何在流的执行期间有可能改变的任何状态.

1
2
3
4

Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());

stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...

这个例子中的流是一个并行流, 针对一个相同的元素, 每次调用 map 返回的结果可能是不同的, 这是因为线程的调度是不确定的, 而无状态的行为就会保证行为的结果总是一样的.

同样,应该注意的是, 在行为参数中方位可变的状态,会造成安全和性能方面的问题,如果你尝试使用同步机制来进行访问,那意味着要在数据竞争的情况下损失一定的性能。所以最好的办好就是使用无状态的行为参数。

Side-effects

不鼓励在流的计算中使用具有副作用的行为参数,因为它们通常会导致在不知情的情况下违反无状态要求,并危害线程安全。

如果行为参数具有副作用,除非显示声明,不能保证这些副作用对其它线程的可见性,也不能保证同一个流的同一个元素的不同的操作的执行是在同一个线程中. 这些副作用的影响顺序也可能令人意外.

即使一个流被限制为它的计算结果于流绑定的数据源元素的顺序一致, (如 IntStream.range(0,5).parallel().map(x -> x*2).toArray() 的结果必须是 [0, 2, 4, 6, 8]), 也不保证映射器函数应用于单个元素的顺序,或者对于给定元素在哪个线程中执行任何行为参数。

使用流提供的 reduce 操作, 可以让一些倾向于使用副作用的计算可以更安全、更有效. 当然, forEach, peek 这样的操作必定是副作用的, 使用时需要注意. 一些具有副作用的语句, 如处于 debug 使用 println , 通常没有什么危害性.

将具有副作用的操作转换为无副作用

before

1
2
3
4
5
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!

// 运行在并行流的情况下, ArrayList 不是线程安全的, 会导致不正确的结果或者抛出异常, 添加同步机制,会导致数据竞争

after

1
2
3
4
     List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
// 使用无副作用的 toList 收集器, 更安全

Ordering

流是否有序 (encounter order, 遇到每个元素的顺序), 取决于绑定的数据源以及中间操作, 特定的流数据源 (例如 List 或者数组) 本来就是有序的, 而其它的 (如 HashSet) 是无序的. 一些中间操作, 如 sorted, 会将一个无序的流变为有序的, 其它的操作可能与之相反, 将一个有序流变为无需流, 如 BaseStream.unordered. 此外, 一些终端操作可能忽略有序性, 如 forEach 操作.

如果流是有序的,则大多数操作都被限制为按元素遇到的顺序对元素进行操作, 如果流的源是一个包含 [1, 2, 3] 的 List,那么执行 map(x -> x*2) 的结果一定是 [2, 4, 6]. 但是,如果源没有定义的相遇顺序,则值 [2, 4, 6] 的任何排列都将是有效结果。

对串行流, 一个流是否有序, 并不会影响性能, 影响的是确定性. 如果一个流是有序的, 相同数据源的生成流的重复执行会生成一致的结果, 如果是无序的, 则可能会生成不一致的结果.

对于并行流, 如果流是无序的, 可以更高效地执行. 一些特定的聚合操作, 如 distinct, Collectors.groupingBy, 如果流是无序的, 可以被更高效的实现. 类似地, 与遇到顺序相关的操作,例如 limit,可能需要缓存以确保正确排序,从而破坏了并行性的好处。

如果某个流是有序的, 但是用户并不关心流中元素被处理地顺序, 使用 unordered() 显式将流变为无序, 可能会提高某些有状态或终端操作的并行性能. 然而,大多数流管道,例如计算一个整数流的总和,即使流是有序的, 仍然可以有效地并行化。

这里的有序和无序指的是流中的元素的顺序是确定的还是不确定的.