Skip to content

Commit 2e8eed1

Browse files
committed
加入结果存储部分
1 parent bd3f42c commit 2e8eed1

1 file changed

Lines changed: 35 additions & 12 deletions

File tree

6-Stream Pipelines.md

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ int longestStringLengthStartingWithA
2929

3030
上述代码求出以字母*A*开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。类库的实现着使用流水线(*Pipeline*)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。为讲解方便我们汇总了Stream的所有操作。
3131

32-
<table align="center"><tr><td colspan="3" align="center" border="0">Stream操作分类</td></tr><tr><td rowspan="2" border="1">中间操作(Intermediate operations)</td><td>无状态(Stateless)</td><td>unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap()<br> flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td></tr><tr><td>有状态(Stateful)</td><td>distinct() sorted() sorted() limit() skip() </td></tr><tr><td rowspan="2" border="1">结束操作(Terminal operations)</td><td>非短路操作</td><td>forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td></tr><tr><td>短路操作(short-circuiting)</td><td>anyMatch() allMatch() noneMatch() findFirst() findAny()</td></tr></table>
32+
<table width="600"><tr><td colspan="3" align="center" border="0">Stream操作分类</td></tr><tr><td rowspan="2" border="1">中间操作(Intermediate operations)</td><td>无状态(Stateless)</td><td>unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()</td></tr><tr><td>有状态(Stateful)</td><td>distinct() sorted() sorted() limit() skip() </td></tr><tr><td rowspan="2" border="1">结束操作(Terminal operations)</td><td>非短路操作</td><td>forEach() forEachOrdered() toArray() reduce() collect() max() min() count()</td></tr><tr><td>短路操作(short-circuiting)</td><td>anyMatch() allMatch() noneMatch() findFirst() findAny()</td></tr></table>
3333

3434
Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(*Stateless*)和有状态的(*Stateful*),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如*找到第一个满足条件的元素*。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。
3535

3636
## 一种直白的实现方式
3737

3838
<img src="./Figures/Stream_pipeline_naive.png" width="500px" align="right" alt="Stream_pipeline_naive"/>
3939

40-
仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用*filter()*方法后立即执行,选出所有以*A*开头的字符串并放到一个列表list1中,之后让list1传递给*mapToInt()*方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:
40+
仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(比如数组,容器等)。具体说来,就是调用`filter()`方法后立即执行,选出所有以*A*开头的字符串并放到一个列表list1中,之后让list1传递给`mapToInt()`方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:
4141

4242
这样做实现起来非常简单直观,但有两个明显的弊端:
4343

@@ -65,6 +65,7 @@ for(String str : strings){
6565
1. 用户的操作如何记录?
6666
2. 操作如何叠加?
6767
3. 叠加之后的操作如何执行?
68+
4. 执行后的结果(如果有)在哪里?
6869

6970
### >> 操作如何记录
7071

@@ -76,21 +77,21 @@ for(String str : strings){
7677

7778
一个可能的流水线示意图如下:
7879

79-
<img src="./Figures/Stream_pipeline_example.png" width="600px" alt="Stream_pipeline_example"/>
80+
<img src="./Figures/Stream_pipeline_example.png" width="700px" alt="Stream_pipeline_example"/>
8081

81-
图中通过*Collection.stream()*方法得到*Head*也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。**这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作**。这就是Stream记录操作的方式。
82+
图中通过`Collection.stream()`方法得到*Head*也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。**这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作**。这就是Stream记录操作的方式。
8283

8384
### >> 操作如何叠加
8485

8586
以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。
8687

8788
这种协议由*Sink*接口完成,*Sink*接口包含的方法如下表所示:
8889

89-
<table><tr><td>方法名</td><td>作用</td></tr><tr><td>void begin(long size)</td><td>开始遍历元素之前调用该方法,通知Sink做好准备。</td></tr><tr><td>void end()</td><td>所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td></tr><tr><td>boolean cancellationRequested()</td><td>是否可以结束操作,可以让短路操作尽早结束。</td></tr><tr><td>void accept(T t)</td><td>遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。</td></tr></table>
90+
<table width="600px"><tr><td align="center">方法名</td><td align="center">作用</td></tr><tr><td>void begin(long size)</td><td>开始遍历元素之前调用该方法,通知Sink做好准备。</td></tr><tr><td>void end()</td><td>所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td></tr><tr><td>boolean cancellationRequested()</td><td>是否可以结束操作,可以让短路操作尽早结束。</td></tr><tr><td>void accept(T t)</td><td>遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。</td></tr></table>
9091

91-
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的`accept()`方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的`begin()``end()`方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的*Sink.begin()*方法可能创建一个乘放结果的容器,*accept()*方法负责将元素添加到该容器,最后*end()*负责对容器进行排序。对于短路操作,`Sink.cancellationRequested()`也是必须实现的,比如*Stream.findFirst()*是短路操作,只要找到一个元素,*cancellationRequested()*就应该返回*true*,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。**实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法**
92+
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的`accept()`方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的`begin()``end()`方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个乘放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,`Sink.cancellationRequested()`也是必须实现的,比如*Stream.findFirst()*是短路操作,只要找到一个元素,cancellationRequested()就应该返回*true*,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。**实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法**
9293

93-
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的*Sink.accept()*方法流程是这样的:
94+
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的Sink.accept()方法流程是这样的:
9495

9596
```Java
9697
void accept(U u){
@@ -99,7 +100,7 @@ void accept(U u){
99100
}
100101
```
101102

102-
Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看*Stream.map()*方法:
103+
Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:
103104

104105
```Java
105106
// Stream.map(),调用该方法将产生一个新的Stream
@@ -169,16 +170,17 @@ class RefSortingSink<T> extends AbstractRefSortingSink<T> {
169170

170171
### >> 叠加之后的操作如何执行
171172

172-
<img src="./Figures/Stream_pipeline_Sink.png" width="250px" align="right" hspace="10px" alt="Stream_pipeline_Sink"/>
173+
<img src="./Figures/Stream_pipeline_Sink.png" width="300px" align="right" hspace="10px" alt="Stream_pipeline_Sink"/>
173174

174175
Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。
175176

176177
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。
177178

178-
我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在*PipelineHelper*中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个`Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)`方法来得到Sink,该方法的作用是产生一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:
179+
我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在*PipelineHelper*中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个`Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)`方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:
179180

180181
```Java
181-
// 从下游向上游不断包装Sink。如果最初传入的sink是结束操作代表的,
182+
// AbstractPipeline.wrapSink()
183+
// 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
182184
// 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
183185
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
184186
...
@@ -192,7 +194,7 @@ final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
192194
现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:
193195

194196
```Java
195-
// 对spliterator代表的数据执行wrappedSink代表的操作。
197+
// AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
196198
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
197199
...
198200
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
@@ -206,9 +208,30 @@ final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator
206208

207209
上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,[参阅](https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/3-Lambda%20and%20Collections.md#spliterator)),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
208210

211+
### >> 执行后的结果在哪里
209212

213+
最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(*Side-effects*),比如使用`Stream.forEach()`方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?
210214

215+
> 特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作([参阅](./5-Streams%20API(II).md))更安全和有效的完成。
211216
217+
```Java
218+
// 错误的收集方式
219+
ArrayList<String> results = new ArrayList<>();
220+
stream.filter(s -> pattern.matcher(s).matches())
221+
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
222+
// 正确的收集方式
223+
List<String>results =
224+
stream.filter(s -> pattern.matcher(s).matches())
225+
.collect(Collectors.toList()); // No side-effects!
226+
```
227+
228+
回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。
229+
230+
<table width="350px" align="left"><tr><td align="center">返回类型</td><td align="center">对应的结束操作</td></tr><tr><td>boolean</td><td>anyMatch() allMatch() noneMatch()</td></tr><tr><td>Optional</td><td>findFirst() findAny()</td></tr><tr><td>归约结果</td><td>reduce() collect()</td></tr><tr><td>数组</td><td>toArray()</td></tr></table>
231+
232+
1. 对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。
233+
2. 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定,[参阅](./5-Streams%20API(II).md#收集器))。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过直接调用[reduce()](./5-Streams%20API(II).md#多面手reduce)方法实现的。
234+
3. 对于返回是数组的情况,是不是会以为毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做*Node*的数据结构中的,Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节讲解Stream如何并行执行时详细说明。
212235

213236

214237

0 commit comments

Comments
 (0)