@@ -31,7 +31,12 @@ public void oneStreamOfNthatMergesIn1(final InputMillion input) throws Interrupt
3131 Observable <Observable <Integer >> os = Observable .range (1 , input .size ).map (Observable ::just );
3232 LatchedObserver <Integer > o = input .newLatchedObserver ();
3333 Observable .merge (os ).subscribe (o );
34- o .latch .await ();
34+
35+ if (input .size == 1 ) {
36+ while (o .latch .getCount () != 0 );
37+ } else {
38+ o .latch .await ();
39+ }
3540 }
3641
3742 // flatMap
@@ -42,7 +47,12 @@ public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedExce
4247 });
4348 LatchedObserver <Integer > o = input .newLatchedObserver ();
4449 Observable .merge (os ).subscribe (o );
45- o .latch .await ();
50+
51+ if (input .size == 1 ) {
52+ while (o .latch .getCount () != 0 );
53+ } else {
54+ o .latch .await ();
55+ }
4656 }
4757
4858 @ Benchmark
@@ -52,7 +62,11 @@ public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedEx
5262 });
5363 LatchedObserver <Integer > o = input .newLatchedObserver ();
5464 Observable .merge (os ).subscribe (o );
55- o .latch .await ();
65+ if (input .size == 1 ) {
66+ while (o .latch .getCount () != 0 );
67+ } else {
68+ o .latch .await ();
69+ }
5670 }
5771
5872 @ Benchmark
@@ -62,22 +76,34 @@ public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedE
6276 });
6377 LatchedObserver <Integer > o = input .newLatchedObserver ();
6478 Observable .merge (os ).subscribe (o );
65- o .latch .await ();
79+ if (input .size == 1 ) {
80+ while (o .latch .getCount () != 0 );
81+ } else {
82+ o .latch .await ();
83+ }
6684 }
6785
6886 @ Benchmark
6987 public void mergeTwoAsyncStreamsOfN (final InputThousand input ) throws InterruptedException {
7088 LatchedObserver <Integer > o = input .newLatchedObserver ();
7189 Observable <Integer > ob = Observable .range (0 , input .size ).subscribeOn (Schedulers .computation ());
7290 Observable .merge (ob , ob ).subscribe (o );
73- o .latch .await ();
91+ if (input .size == 1 ) {
92+ while (o .latch .getCount () != 0 );
93+ } else {
94+ o .latch .await ();
95+ }
7496 }
7597
7698 @ Benchmark
7799 public void mergeNSyncStreamsOf1 (final InputForMergeN input ) throws InterruptedException {
78100 LatchedObserver <Integer > o = input .newLatchedObserver ();
79101 Observable .merge (input .observables ).subscribe (o );
80- o .latch .await ();
102+ if (input .size == 1 ) {
103+ while (o .latch .getCount () != 0 );
104+ } else {
105+ o .latch .await ();
106+ }
81107 }
82108
83109 @ State (Scope .Thread )
0 commit comments