1+ /**
2+ * Copyright 2015 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+ * compliance with the License. You may obtain a copy of the License at
6+ *
7+ * http://www.apache.org/licenses/LICENSE-2.0
8+ *
9+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
10+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+ * the License for the specific language governing permissions and limitations under the License.
12+ */
13+
14+ package io .reactivex ;
15+
16+ import java .util .*;
17+ import java .util .concurrent .TimeUnit ;
18+
19+ import org .openjdk .jmh .annotations .*;
20+ import org .openjdk .jmh .infra .Blackhole ;
21+
22+ @ BenchmarkMode (Mode .Throughput )
23+ @ Warmup (iterations = 5 )
24+ @ Measurement (iterations = 5 , time = 5 , timeUnit = TimeUnit .SECONDS )
25+ @ OutputTimeUnit (TimeUnit .SECONDS )
26+ @ Fork (value = 1 )
27+ @ State (Scope .Thread )
28+ public class RxVsStreamPerf {
29+ @ Param ({ "1" , "1000" , "1000000" })
30+ public int times ;
31+
32+ Observable <Integer > range ;
33+
34+ NbpObservable <Integer > rangeNbp ;
35+
36+ Observable <Integer > rangeFlatMap ;
37+
38+ NbpObservable <Integer > rangeNbpFlatMap ;
39+
40+ List <Integer > values ;
41+
42+ @ Setup
43+ public void setup () {
44+ range = Observable .range (1 , times );
45+
46+ rangeFlatMap = range .flatMap (v -> Observable .range (v , 2 ));
47+
48+ rangeNbp = NbpObservable .range (1 , times );
49+
50+ rangeNbpFlatMap = rangeNbp .flatMap (v -> NbpObservable .range (v , 2 ));
51+
52+ values = range .toList ().toBlocking ().first ();
53+ }
54+
55+ @ Benchmark
56+ public void range (Blackhole bh ) {
57+ range .subscribe (new LatchedObserver <>(bh ));
58+ }
59+
60+ @ Benchmark
61+ public void rangeNbp (Blackhole bh ) {
62+ rangeNbp .subscribe (new LatchedNbpObserver <>(bh ));
63+ }
64+
65+ @ Benchmark
66+ public void rangeFlatMap (Blackhole bh ) {
67+ rangeFlatMap .subscribe (new LatchedObserver <>(bh ));
68+ }
69+
70+ @ Benchmark
71+ public void rangeNbpFlatMap (Blackhole bh ) {
72+ rangeNbpFlatMap .subscribe (new LatchedNbpObserver <>(bh ));
73+ }
74+
75+ @ Benchmark
76+ public void stream (Blackhole bh ) {
77+ values .stream ().forEach (bh ::consume );
78+ }
79+
80+ @ Benchmark
81+ public void streamFlatMap (Blackhole bh ) {
82+ values .stream ()
83+ .flatMap (v -> Arrays .asList (v , v + 1 ).stream ())
84+ .forEach (bh ::consume );
85+ }
86+
87+ @ Benchmark
88+ public void streamParallel (Blackhole bh ) {
89+ values .stream ().parallel ().forEach (bh ::consume );
90+ }
91+
92+ @ Benchmark
93+ public void streamParallelFlatMap (Blackhole bh ) {
94+ values .stream ()
95+ .flatMap (v -> Arrays .asList (v , v + 1 ).stream ())
96+ .parallel ()
97+ .forEach (bh ::consume );
98+ }
99+ }
0 commit comments