File tree Expand file tree Collapse file tree 3 files changed +35
-2
lines changed
main/java/io/reactivex/internal/operators
test/java/io/reactivex/internal/operators Expand file tree Collapse file tree 3 files changed +35
-2
lines changed Original file line number Diff line number Diff line change @@ -62,9 +62,10 @@ public void onSubscribe(Subscription s) {
6262 }
6363 @ Override
6464 public void onNext (T t ) {
65- if (!done ) {
65+ if (!done && remaining -- > 0 ) {
66+ boolean stop = remaining == 0 ;
6667 actual .onNext (t );
67- if (-- remaining == 0L ) {
68+ if (stop ) {
6869 onComplete ();
6970 }
7071 }
Original file line number Diff line number Diff line change 3030import io .reactivex .exceptions .TestException ;
3131import io .reactivex .internal .subscriptions .*;
3232import io .reactivex .schedulers .Schedulers ;
33+ import io .reactivex .subjects .PublishSubject ;
3334import io .reactivex .subscribers .TestSubscriber ;
3435
3536public class OperatorTakeTest {
@@ -420,4 +421,19 @@ public void onNext(Integer t) {
420421 ts .assertError (TestException .class );
421422 ts .assertNotComplete ();
422423 }
424+
425+ @ Test
426+ public void testReentrantTake () {
427+ PublishSubject <Integer > source = PublishSubject .create ();
428+
429+ TestSubscriber <Integer > ts = new TestSubscriber <>();
430+
431+ source .take (1 ).doOnNext (v -> source .onNext (2 )).subscribe (ts );
432+
433+ source .onNext (1 );
434+
435+ ts .assertValue (1 );
436+ ts .assertNoErrors ();
437+ ts .assertComplete ();
438+ }
423439}
Original file line number Diff line number Diff line change 3131import io .reactivex .exceptions .TestException ;
3232import io .reactivex .internal .disposables .EmptyDisposable ;
3333import io .reactivex .schedulers .Schedulers ;
34+ import io .reactivex .subjects .nbp .NbpPublishSubject ;
3435import io .reactivex .subscribers .nbp .NbpTestSubscriber ;
3536
3637public class NbpOperatorTakeTest {
@@ -338,4 +339,19 @@ public void onNext(Integer t) {
338339 ts .assertError (TestException .class );
339340 ts .assertNotComplete ();
340341 }
342+
343+ @ Test
344+ public void testReentrantTake () {
345+ NbpPublishSubject <Integer > source = NbpPublishSubject .create ();
346+
347+ NbpTestSubscriber <Integer > ts = new NbpTestSubscriber <>();
348+
349+ source .take (1 ).doOnNext (v -> source .onNext (2 )).subscribe (ts );
350+
351+ source .onNext (1 );
352+
353+ ts .assertValue (1 );
354+ ts .assertNoErrors ();
355+ ts .assertComplete ();
356+ }
341357}
You can’t perform that action at this time.
0 commit comments