@@ -120,31 +120,38 @@ public void testMultiThreadedWithNPE() {
120120
121121 @ Test
122122 public void testMultiThreadedWithNPEinMiddle () {
123- TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable ("one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
124- Observable <String > w = Observable .create (onSubscribe );
125-
126- BusyObserver busyobserver = new BusyObserver ();
127-
128- w .serialize ().subscribe (busyobserver );
129- onSubscribe .waitToFinish ();
130-
131- System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
132- // this should not be the full number of items since the error should stop it before it completes all 9
133- System .out .println ("onNext count: " + busyobserver .onNextCount .get ());
134- assertTrue (busyobserver .onNextCount .get () < 9 );
135- assertTrue (busyobserver .onError );
136- // no onCompleted because onError was invoked
137- assertFalse (busyobserver .onCompleted );
138- // non-deterministic because unsubscribe happens after 'waitToFinish' releases
139- // so commenting out for now as this is not a critical thing to test here
140- // verify(s, times(1)).unsubscribe();
141-
142- // we can have concurrency ...
143- assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
144- // ... but the onNext execution should be single threaded
145- assertEquals (1 , busyobserver .maxConcurrentThreads .get ());
123+ boolean lessThan9 = false ;
124+ for (int i = 0 ; i < 3 ; i ++) {
125+ TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable ("one" , "two" , "three" , null , "four" , "five" , "six" , "seven" , "eight" , "nine" );
126+ Observable <String > w = Observable .create (onSubscribe );
127+
128+ BusyObserver busyobserver = new BusyObserver ();
129+
130+ w .serialize ().subscribe (busyobserver );
131+ onSubscribe .waitToFinish ();
132+
133+ System .out .println ("maxConcurrentThreads: " + onSubscribe .maxConcurrentThreads .get ());
134+ // this should not always be the full number of items since the error should (very often)
135+ // stop it before it completes all 9
136+ System .out .println ("onNext count: " + busyobserver .onNextCount .get ());
137+ if (busyobserver .onNextCount .get () < 9 ) {
138+ lessThan9 = true ;
139+ }
140+ assertTrue (busyobserver .onError );
141+ // no onCompleted because onError was invoked
142+ assertFalse (busyobserver .onCompleted );
143+ // non-deterministic because unsubscribe happens after 'waitToFinish' releases
144+ // so commenting out for now as this is not a critical thing to test here
145+ // verify(s, times(1)).unsubscribe();
146+
147+ // we can have concurrency ...
148+ assertTrue (onSubscribe .maxConcurrentThreads .get () > 1 );
149+ // ... but the onNext execution should be single threaded
150+ assertEquals (1 , busyobserver .maxConcurrentThreads .get ());
151+ }
152+ assertTrue (lessThan9 );
146153 }
147-
154+
148155 /**
149156 * A thread that will pass data to onNext
150157 */
@@ -276,6 +283,7 @@ public TestMultiThreadedObservable(String... values) {
276283 @ Override
277284 public void call (final Subscriber <? super String > observer ) {
278285 System .out .println ("TestMultiThreadedObservable subscribed to ..." );
286+ final NullPointerException npe = new NullPointerException ();
279287 t = new Thread (new Runnable () {
280288
281289 @ Override
@@ -290,11 +298,12 @@ public void run() {
290298 threadsRunning .incrementAndGet ();
291299 try {
292300 // perform onNext call
293- System .out .println ("TestMultiThreadedObservable onNext: " + s );
294301 if (s == null ) {
302+ System .out .println ("TestMultiThreadedObservable onNext: null" );
295303 // force an error
296- throw new NullPointerException ();
297- }
304+ throw npe ;
305+ } else
306+ System .out .println ("TestMultiThreadedObservable onNext: " + s );
298307 observer .onNext (s );
299308 // capture 'maxThreads'
300309 int concurrentThreads = threadsRunning .get ();
0 commit comments