#29.Java8之Stream 在动态语言中,操作集合是一个非常方便的事情。例如以下的JavaScript代码:
var list = [0,1,2,3,4,5,6,7,8,9];
var rs = list.filter(function(i){return i % 2 == 0;}).map(function(i){return i * i;}).sort(function(x,y){return x < y;})以上JavaScript代码执行结果为:
[64, 36, 16, 4, 0]
而同样的功能使用Java实现的话,必须套用循环:
List<Integer> list = Arrays.asList(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
List<Integer> rs = new ArrayList<Integer>();
for (Integer i : list) {
if (i % 2 == 0) {
rs.add(i * i);
}
}
rs.sort(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
});不难看出,相比JavaScript,Java代码显得笨重且臃肿。Oracle明显也注意到了这一点,于是在Java8中加入了对集合相应的支持。Java8并未在原有的集合类中直接支持filter、map等操作的,而是引入了一个新的类:Stream。使用Stream对集合进行操作。例如:
List<Integer> list = Arrays.asList(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
List<Integer> rs = list.stream().filter(i -> i % 2 == 0).map(i -> i * i).sorted((x,y) -> y - x).collect(Collectors.toList());使用Java8新语法重构后的代码量接近JavaScript的代码,简洁性与可读性方面也有了很大的提高。当然,Java8支持的集合操作远比JavaScript多的多且更加强大。
Stream提供对集合进行函数式编程操作的类。
Stream类和集合类都用于操作集合,但是Stream跟集合类存在很多不同点:
- Stream并不是一个数据结构存储元素;它只是通过管道的方式将数据从数组、函数生成器、I/O通道中进行传输和操作;
- 对Stream的操作并不会对产生Stream的源数据进行修改;
- 懒惰取值(Laziness-seeking),很多Stream操作,例如filter、map、duplicate等并不会实时产生结果集。Stream的操作区分为两种:中间操作和终点操作,中间操作永远是懒惰取值的。这样做主要是为了性能优化。
- Stream可能是无边界的。虽然集合是有大小限制,但Stream是不受大小限制的。短路操作,例如
limit(n)或findFirst()可以从无边界的Stream中获取定长的数据。 - 可消耗的。Stream的元素的在整个生命周期中只能被访问一次。类似于Iterator,如果想要重新访问Stream中的元素则需要重新生成Stream。
Stream可以通过多种方式获取,例如:
- Collection接口的子类可以通过
stream()和parallelStream()获取; - 数组可以通过
Arrays.stream(Object[])获取; - 可以通过
Stream.of(Object[])、IntStream.range(int, int)或Stream.iterate(Object,unaryOperator)这个三个静态方法获取; - 文件行可以通过
BufferedReader.lines()获取; - 指定路径的文件可以通过Files的方法获取;
- 随机数可以通过
Random.ints()获取; - 其他的跟获取Stream相关的方法还有
BitSet.stream,Pattern.splitAsStream(java.lang.Charsequence)和JarFile.stream()等。
流操作被划分为中间操作和终点操作,并且组合形成管道流。管道流由如下组合而成:数据源(例如集合,数组,函数生成器或I/O通道);紧跟在数据源后面的是零或多个中间操作(例如Stream.filter或Stream.map);最终操作紧跟在中间操作后面(例如Stream.forEach或Stream.reduce等)。
中间操作返回的结果是流。中间操作永远是懒惰取值的;例如Stream.filter操作实际上并没有立即执行任何过滤,取而代之的是产生一个新的流,当进行传输时就会根据过滤谓词对原始的数据进行过滤。只有当终点操作被执行时,管道的遍历才会被执行(真正执行中间操作)。
终点操作,例如Stream.forEach或IntStream.sum,会对流进行遍历并产一个结果或者副作用。当终点操作并执行完后,管道流就被消耗掉,无法再进行利用;如果需要对同样的数据进行遍历,则必须使用原始数据产生新的流。在多数情况下,终点操作被认为是饥饿的,一旦被调用,则会对数据源进行彻底的遍历直到结果被返回。最终操作中,只有iterator()和spliterator()不是饥饿的,他们被认为是**"逃生舱"**,可以让客户端(调用者)自由控制管道的遍历,在很多业务逻辑中,这种方式都是非常高效且常见的。
流的懒惰取值模式是非常高效的。例如在filter-map-sum管道操作中,filter、map和sum可以融合为一次原始数据遍历。同时,这种懒惰取值的方式,可以避免对所有的数据进行遍历,例如需求:取数组中值大于100的前十个数值。如果对整个数组都进行遍历将是毫无必要的,而且非常低效。而流的懒惰取值可避免这种低效做法(特别当流很大甚至是无限大时)。例如:list.filter(x -> x > 100).limit(100)。
中间操作可以进一步分解为有状态操作和无状态操作。无状态操作,如filter和map,处理当前的元素不依赖之前处理的元素,每个元素间的处理都是相互独立,互不影响的。有状态操作,如distinct和sorted,当前元素的处理依赖之前的元素。
有状态操作,可能需要对整个数据进行处理才能产生最终结果。举个例子,对流进行排序,在对整个流进行完全遍历之前是不可能产生正确结果的。As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering.(对需要在有限时间内结束的无限流来说,短路操作是必要但不充分的条件。??)
此外,有些操作被认为是短路操作。在中间操作过程中,当输入是无限,而却需要有限流的输出时,则该中间操作被认为是短路操作;在终点操作过程中,当输入是无限,切需要在有限的时间终止操作时,则该操作被认为是短路操作。Having a short-circuiting operation in the pipeline is a necessary, but not sufficient, condition for the processing of an infinite stream to terminate normally in finite time.
使用循环对元素进行处理本质上来说是串行的。Stream通过对管道的聚合操作进行重构处理使流的并行计算变得更加容易实现,而不是简简单单地对每个元素进行迭代处理。所有的流操作都可以选择使用串行模式或并行模式中进行处理。例如,Collection类有Collection.stream()和Collection.parallelStream()两个方法分别对应串行流和并行流。再如IntStream.range(int,int)产出串行流,但是可以通过调用BaseStream.parallel()方法可以产生并行流。
如下例子分别使用串行和并行计算widgets的weigh总和:
// 串行计算
int sumOfWeights = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
// 并行计算
int sumOfWeights = widgets.parallelStream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();两种方式最大的不同在于一个调用stream(),一个调用parallelStream()。当最终操作被执行时,流操作使用串行或并行决定于调用它的流是Stream或parallelStream。可以根据流的isParallel()方法判断当前流是否支持并行操作,以此同时,可以通过修改BaseStream.sequential()和BaseStream.parallel()操作来修改当前流是否支持并行。
除了某些操作具有明确的不确定性(例如findAndy())之外,流的操作是串行或并行并不会影响到最终的结果。
大部分的流操作都接受参数用于实现用户自定义行为,很多情况下都会使用lambdas表达式进行传参。为了保证正确的处理逻辑,这一些参数必须是互不干扰的,很多时候,这些参数也是无状态的。这些参数通常是函数式接口,例如Function接口、lambdas表达式或方法引用。
## 互不干扰
流让你可以对各种数据源中执行并行聚合操作,包活线程不安全的ArrayList类。只要我们能保证在执行管道流操作时,数据源中的元素互不干扰。Except for the escape-hatch operations iterator() and spliterator(), execution begins when the terminal operation is invoked, and ends when the terminal operation completes.对于大多数数据源而言,确保确保数据的互不干扰意味着在执行管道流的过程中并不会对数据进行任何修改。特别值得引起注意的是,当流的数据源是并发集合时,
Streams enable you to execute possibly-parallel aggregate operations over a variety of data sources, including even non-thread-safe collections such as ArrayList. This is possible only if we can prevent interference with the data source during the execution of a stream pipeline. Except for the escape-hatch operations iterator() and spliterator(), execution begins when the terminal operation is invoked, and ends when the terminal operation completes. For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic. Accordingly, behavioral parameters in stream pipelines whose source might not be concurrent should never modify the stream's data source. A behavioral parameter is said to interfere with a non-concurrent data source if it modifies, or causes to be modified, the stream's data source. The need for non-interference applies to all pipelines, not just parallel ones. Unless the stream source is concurrent, modifying a stream's data source during execution of a stream pipeline can cause exceptions, incorrect answers, or nonconformant behavior. For well-behaved stream sources, the source can be modified before the terminal operation commences and those modifications will be reflected in the covered elements. For example, consider the following code:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list. Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected and joined together. Since the list was modified before the terminal collect operation commenced the result will be a string of "one two three". All the streams returned from JDK collections, and most other JDK classes, are well-behaved in this manner; for streams generated by other libraries, see Low-level stream construction for requirements for building well-behaved
如果流操作的行为参数有状态,那么管道流的操作结果可能不确定或不正确。有状态的lambda表达式(或相应功能的函数接口)的结果依赖的状态可能会在管道流执行过程中改变。
例如下面的一个有状态的lambda表达式:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...在上面代码中,如果map操作并行执行,这是由于线程调度的差异导致相同的输入多次运行的结果都有可能不同。如果这里采用无状态的lambda表达式,那么结果将总是一致的。需要注意的还有,在行为参数中访问可变状态,你将陷入面临选择安全或性能两难局面:如果你不对数据进行同步访问则会导致资源竞争从而破坏代码的逻辑;如果对数据进行同步访问,则会让并行的性能优势荡然无存。最好的做法就是彻底避免在管道流操作中使用有状态的行为参数,通常的做法就是重构管道流让行为参数无状态。
流操作中,行为参数的副作用,在多数情况下,令人非常沮丧,因为它们会常常会在不知情的情况下破坏无状态的要求,同样的还有对线程安全的隐患。如果行为参数确实存在副作用,除非有明确规定,行为参数不保证副作用会其他线程可见,同时也不会保证多个流对同一个元素的进行操作在同一个线程中执行。再者,这些影响的顺序会很奇怪,让人捉摸不透。 Even when a pipeline is constrained to produce a result that is consistent with the encounter order of the stream source (for example, IntStream.range(0,5).parallel().map(x -> x*2).toArray() must produce [0, 2, 4, 6, 8]), no guarantees are made as to the order in which the mapper function is applied to individual elements, or in what thread any behavioral parameter is executed for a given element.
Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. However, side-effects such as using println() for debugging purposes are usually harmless. A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.
As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().
If a stream is ordered, most operations are constrained to operate on the elements in their encounter order; if the source of a stream is a List containing [1, 2, 3], then the result of executing map(x -> x*2) must be [2, 4, 6]. However, if the source has no defined encounter order, then any permutation of the values [2, 4, 6] would be a valid result.
For sequential streams, the presence or absence of an encounter order does not affect performance, only determinism. If a stream is ordered, repeated execution of identical stream pipelines on an identical source will produce an identical result; if it is not ordered, repeated execution might produce different results.
For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.
A reduction operation (also called a fold) takes a sequence of input elements and combines them into a single summary result by repeated application of a combining operation, such as finding the sum or maximum of a set of numbers, or accumulating elements into a list. The streams classes have multiple forms of general reduction operations, called reduce() and collect(), as well as multiple specialized reduction forms such as sum(), max(), or count(). Of course, such operations can be readily implemented as simple sequential loops, as in:
int sum = 0;
for (int x : numbers) {
sum += x;
}
However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction "more abstract" -- it operates on the stream as a whole rather than individual elements -- but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless. For example, given a stream of numbers for which we want to find the sum, we can write:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
or:
int sum = numbers.stream().reduce(0, Integer::sum);
These reduction operations can run safely in parallel with almost no modification:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
Reduction parallellizes well because the implementation can operate on subsets of the data in parallel, and then combine the intermediate results to get the final correct answer. (Even if the language had a "parallel for-each" construct, the mutative accumulation approach would still required the developer to provide thread-safe updates to the shared accumulating variable sum, and the required synchronization would then likely eliminate any performance gain from parallelism.) Using reduce() instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization required.
The "widgets" examples shown earlier shows how reduction combines with other operations to replace for loops with bulk operations. If widgets is a collection of Widget objects, which have a getWeight method, we can find the heaviest widget with:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
In its more general form, a reduce operation on elements of type yielding a result of type requires three parameters:
U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator combiner);
Here, the identity element is both an initial seed value for the reduction and a default result if there are no input elements. The accumulator function takes a partial result and the next element, and produces a new partial result. The combiner function combines two partial results to produce a new partial result. (The combiner is necessary in parallel reductions, where the input is partitioned, a partial accumulation computed for each partition, and then the partial results are combined to produce a final result.) More formally, the identity value must be an identity for the combiner function. This means that for all u, combiner.apply(identity, u) is equal to u. Additionally, the combiner function must be associative and must be compatible with the accumulator function: for all u and t, combiner.apply(u, accumulator.apply(identity, t)) must be equals() to accumulator.apply(u, t).
The three-argument form is a generalization of the two-argument form, incorporating a mapping step into the accumulation step. We could re-cast the simple sum-of-weights example using the more general form as follows:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
though the explicit map-reduce form is more readable and therefore should usually be preferred. The generalized form is provided for cases where significant work can be optimized away by combining mapping and reducing into a single function.
A mutable reduction operation accumulates input elements into a mutable result container, such as a Collection or StringBuilder, as it processes the elements in the stream. If we wanted to take a stream of strings and concatenate them into a single long string, we could achieve this with ordinary reduction:
String concatenated = strings.reduce("", String::concat)
We would get the desired result, and it would even work in parallel. However, we might not be happy about the performance! Such an implementation would do a great deal of string copying, and the run time would be O(n^2) in the number of characters. A more performant approach would be to accumulate the results into a StringBuilder, which is a mutable container for accumulating strings. We can use the same technique to parallelize mutable reduction as we do with ordinary reduction.
The mutable reduction operation is called collect(), as it collects together the desired results into a result container such as a Collection. A collect operation requires three functions: a supplier function to construct new instances of the result container, an accumulator function to incorporate an input element into a result container, and a combining function to merge the contents of one result container into another. The form of this is very similar to the general form of ordinary reduction:
R collect(Supplier supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
As with reduce(), a benefit of expressing collect in this abstract way is that it is directly amenable to parallelization: we can accumulate partial results in parallel and then combine them, so long as the accumulation and combining functions satisfy the appropriate requirements. For example, to collect the String representations of the elements in a stream into an ArrayList, we could write the obvious sequential for-each form:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
Or we could use a parallelizable collect form:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
or, pulling the mapping operation out of the accumulator function, we could express it more succinctly as:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Here, our supplier is just the ArrayList constructor, the accumulator adds the stringified element to an ArrayList, and the combiner simply uses addAll to copy the strings from one container into the other. The three aspects of collect -- supplier, accumulator, and combiner -- are tightly coupled. We can use the abstraction of a Collector to capture all three aspects. The above example for collecting strings into a List can be rewritten using a standard Collector as:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
Packaging mutable reductions into a Collector has another advantage: composability. The class Collectors contains a number of predefined factories for collectors, including combinators that transform one collector into another. For example, suppose we have a collector that computes the sum of the salaries of a stream of employees, as follows:
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(The ? for the second type parameter merely indicates that we don't care about the intermediate representation used by this collector.) If we wanted to create a collector to tabulate the sum of salaries by department, we could reuse summingSalaries using groupingBy:
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
As with the regular reduction operation, collect() operations can only be parallelized if appropriate conditions are met. For any partially accumulated result, combining it with an empty result container must produce an equivalent result. That is, for a partially accumulated result p that is the result of any series of accumulator and combiner invocations, p must be equivalent to combiner.apply(p, supplier.get()).
Further, however the computation is split, it must produce an equivalent result. For any input elements t1 and t2, the results r1 and r2 in the computation below must be equivalent:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
Here, equivalence generally means according to Object.equals(Object). but in some cases equivalence may be relaxed to account for differences in order.
With some complex reduction operations, for example a collect() that produces a Map, such as:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
it may actually be counterproductive to perform the operation in parallel. This is because the combining step (merging one Map into another by key) can be expensive for some Map implementations. Suppose, however, that the result container used in this reduction was a concurrently modifiable collection -- such as a ConcurrentHashMap. In that case, the parallel invocations of the accumulator could actually deposit their results concurrently into the same shared result container, eliminating the need for the combiner to merge distinct result containers. This potentially provides a boost to the parallel execution performance. We call this a concurrent reduction.
A Collector that supports concurrent reduction is marked with the Collector.Characteristics.CONCURRENT characteristic. However, a concurrent collection also has a downside. If multiple threads are depositing results concurrently into a shared container, the order in which results are deposited is non-deterministic. Consequently, a concurrent reduction is only possible if ordering is not important for the stream being processed. The Stream.collect(Collector) implementation will only perform a concurrent reduction if
The stream is parallel; The collector has the Collector.Characteristics.CONCURRENT characteristic, and; Either the stream is unordered, or the collector has the Collector.Characteristics.UNORDERED characteristic. You can ensure the stream is unordered by using the BaseStream.unordered() method. For example:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(where Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) is the concurrent equivalent of groupingBy). Note that if it is important that the elements for a given key appear in the order they appear in the source, then we cannot use a concurrent reduction, as ordering is one of the casualties of concurrent insertion. We would then be constrained to implement either a sequential reduction or a merge-based parallel reduction.
An operator or function op is associative if the following holds:
(a op b) op c == a op (b op c)
The importance of this to parallel evaluation can be seen if we expand this to four terms:
a op b op c op d == (a op b) op (c op d)
So we can evaluate (a op b) in parallel with (c op d), and then invoke op on the results. Examples of associative operations include numeric addition, min, and max, and string concatenation.
So far, all the stream examples have used methods like Collection.stream() or Arrays.stream(Object[]) to obtain a stream. How are those stream-bearing methods implemented? The class StreamSupport has a number of low-level methods for creating a stream, all using some form of a Spliterator. A spliterator is the parallel analogue of an Iterator; it describes a (possibly infinite) collection of elements, with support for sequentially advancing, bulk traversal, and splitting off some portion of the input into another spliterator which can be processed in parallel. At the lowest level, all streams are driven by a spliterator.
There are a number of implementation choices in implementing a spliterator, nearly all of which are tradeoffs between simplicity of implementation and runtime performance of streams using that spliterator. The simplest, but least performant, way to create a spliterator is to create one from an iterator using Spliterators.spliteratorUnknownSize(java.util.Iterator, int). While such a spliterator will work, it will likely offer poor parallel performance, since we have lost sizing information (how big is the underlying data set), as well as being constrained to a simplistic splitting algorithm.
A higher-quality spliterator will provide balanced and known-size splits, accurate sizing information, and a number of other characteristics of the spliterator or data that can be used by implementations to optimize execution.
Spliterators for mutable data sources have an additional challenge; timing of binding to the data, since the data could change between the time the spliterator is created and the time the stream pipeline is executed. Ideally, a spliterator for a stream would report a characteristic of IMMUTABLE or CONCURRENT; if not it should be late-binding. If a source cannot directly supply a recommended spliterator, it may indirectly supply a spliterator using a Supplier, and construct a stream via the Supplier-accepting versions of stream(). The spliterator is obtained from the supplier only after the terminal operation of the stream pipeline commences.
These requirements significantly reduce the scope of potential interference between mutations of the stream source and execution of stream pipelines. Streams based on spliterators with the desired characteristics, or those using the Supplier-based factory forms, are immune to modifications of the data source prior to commencement of the terminal operation (provided the behavioral parameters to the stream operations meet the required criteria for non-interference and statelessness). See Non-Interference for more details.