|
15 | 15 | */ |
16 | 16 | package rx.internal.operators; |
17 | 17 |
|
| 18 | +import java.util.Map; |
18 | 19 | import java.util.Queue; |
19 | 20 | import java.util.concurrent.ConcurrentHashMap; |
20 | 21 | import java.util.concurrent.ConcurrentLinkedQueue; |
@@ -138,6 +139,8 @@ public Observer<T> getObserver() { |
138 | 139 | @SuppressWarnings("rawtypes") |
139 | 140 | static final AtomicLongFieldUpdater<GroupBySubscriber> BUFFERED_COUNT = AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "bufferedCount"); |
140 | 141 |
|
| 142 | + volatile boolean errorEmitted = false; |
| 143 | + |
141 | 144 | @Override |
142 | 145 | public void onStart() { |
143 | 146 | REQUESTED.set(this, MAX_QUEUE_SIZE); |
@@ -166,6 +169,13 @@ public void onCompleted() { |
166 | 169 | @Override |
167 | 170 | public void onError(Throwable e) { |
168 | 171 | if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) { |
| 172 | + errorEmitted = true; |
| 173 | + |
| 174 | + // It's safe to access all groups and emit the error. |
| 175 | + // onNext and onError are in sequence so no group will be created in the loop. |
| 176 | + for (GroupState<K, T> group : groups.values()) { |
| 177 | + emitItem(group, nl.error(e)); |
| 178 | + } |
169 | 179 | try { |
170 | 180 | // we immediately tear everything down if we receive an error |
171 | 181 | child.onError(e); |
@@ -259,6 +269,11 @@ public void onCompleted() { |
259 | 269 | @Override |
260 | 270 | public void onError(Throwable e) { |
261 | 271 | o.onError(e); |
| 272 | + // eagerly cleanup instead of waiting for unsubscribe |
| 273 | + if (once.compareAndSet(false, true)) { |
| 274 | + // done once per instance, either onComplete or onUnSubscribe |
| 275 | + cleanupGroup(key); |
| 276 | + } |
262 | 277 | } |
263 | 278 |
|
264 | 279 | @Override |
@@ -386,7 +401,7 @@ private void completeInner() { |
386 | 401 | if (child.isUnsubscribed()) { |
387 | 402 | // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up. |
388 | 403 | unsubscribe(); |
389 | | - } else { |
| 404 | + } else if (!errorEmitted) { |
390 | 405 | child.onCompleted(); |
391 | 406 | } |
392 | 407 | } |
|
0 commit comments