@@ -108,21 +108,14 @@ void startEmitting() {
108108
109109 @ Override
110110 public void request (long n ) {
111+ if (requested == Long .MAX_VALUE ) {
112+ return ;
113+ }
111114 long _c ;
112115 if (n == Long .MAX_VALUE ) {
113116 _c = REQUESTED_UPDATER .getAndSet (this , Long .MAX_VALUE );
114117 } else {
115- for (;;) {
116- _c = requested ;
117- if (_c == Long .MAX_VALUE ) {
118- // If `requested` is Long.MAX_VALUE, `c+n` will be overflow.
119- // Therefore, always check before setting to `c+n`
120- return ;
121- }
122- if (REQUESTED_UPDATER .compareAndSet (this , _c , _c + n )) {
123- break ;
124- }
125- }
118+ _c = REQUESTED_UPDATER .getAndAdd (this , n );
126119 }
127120 if (!emittingStarted ) {
128121 // we haven't started yet, so record what was requested and return
@@ -169,10 +162,21 @@ void emit(long previousRequested) {
169162 emitted ++;
170163 }
171164 }
172-
173- if (REQUESTED_UPDATER .addAndGet (this , -emitted ) == 0 ) {
174- // we're done emitting the number requested so return
175- return ;
165+ for (;;) {
166+ long oldRequested = requested ;
167+ long newRequested = oldRequested - emitted ;
168+ if (oldRequested == Long .MAX_VALUE ) {
169+ // became unbounded during the loop
170+ // continue the outer loop to emit the rest events.
171+ break ;
172+ }
173+ if (REQUESTED_UPDATER .compareAndSet (this , oldRequested , newRequested )) {
174+ if (newRequested == 0 ) {
175+ // we're done emitting the number requested so return
176+ return ;
177+ }
178+ break ;
179+ }
176180 }
177181 }
178182 }
0 commit comments