|
14 | 14 | package io.reactivex.internal.operators.flowable; |
15 | 15 |
|
16 | 16 | import static org.junit.Assert.*; |
| 17 | +import static org.mockito.ArgumentMatchers.*; |
17 | 18 | import static org.mockito.Mockito.*; |
18 | 19 |
|
19 | 20 | import java.io.IOException; |
|
29 | 30 | import com.google.common.cache.*; |
30 | 31 |
|
31 | 32 | import io.reactivex.*; |
32 | | -import io.reactivex.exceptions.TestException; |
| 33 | +import io.reactivex.exceptions.*; |
33 | 34 | import io.reactivex.flowables.GroupedFlowable; |
34 | 35 | import io.reactivex.functions.*; |
35 | 36 | import io.reactivex.internal.functions.Functions; |
36 | | -import io.reactivex.internal.fuseable.QueueFuseable; |
| 37 | +import io.reactivex.internal.fuseable.*; |
37 | 38 | import io.reactivex.internal.subscriptions.BooleanSubscription; |
| 39 | +import io.reactivex.plugins.RxJavaPlugins; |
38 | 40 | import io.reactivex.processors.PublishProcessor; |
39 | 41 | import io.reactivex.schedulers.Schedulers; |
40 | 42 | import io.reactivex.subjects.PublishSubject; |
@@ -2205,4 +2207,83 @@ public void accept(Object object) { |
2205 | 2207 | }}; |
2206 | 2208 | return evictingMapFactory; |
2207 | 2209 | } |
| 2210 | + |
| 2211 | + @Test |
| 2212 | + public void fusedNoConcurrentCleanDueToCancel() { |
| 2213 | + for (int j = 0; j < TestHelper.RACE_LONG_LOOPS; j++) { |
| 2214 | + List<Throwable> errors = TestHelper.trackPluginErrors(); |
| 2215 | + try { |
| 2216 | + final PublishProcessor<Integer> pp = PublishProcessor.create(); |
| 2217 | + |
| 2218 | + final AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>> qs = new AtomicReference<QueueSubscription<GroupedFlowable<Object, Integer>>>(); |
| 2219 | + |
| 2220 | + final TestSubscriber<Integer> ts2 = new TestSubscriber<Integer>(); |
| 2221 | + |
| 2222 | + pp.groupBy(Functions.identity(), Functions.<Integer>identity(), false, 4) |
| 2223 | + .subscribe(new FlowableSubscriber<GroupedFlowable<Object, Integer>>() { |
| 2224 | + |
| 2225 | + boolean once; |
| 2226 | + |
| 2227 | + @Override |
| 2228 | + public void onNext(GroupedFlowable<Object, Integer> g) { |
| 2229 | + if (!once) { |
| 2230 | + try { |
| 2231 | + GroupedFlowable<Object, Integer> t = qs.get().poll(); |
| 2232 | + if (t != null) { |
| 2233 | + once = true; |
| 2234 | + t.subscribe(ts2); |
| 2235 | + } |
| 2236 | + } catch (Throwable ignored) { |
| 2237 | + // not relevant here |
| 2238 | + } |
| 2239 | + } |
| 2240 | + } |
| 2241 | + |
| 2242 | + @Override |
| 2243 | + public void onError(Throwable t) { |
| 2244 | + } |
| 2245 | + |
| 2246 | + @Override |
| 2247 | + public void onComplete() { |
| 2248 | + } |
| 2249 | + |
| 2250 | + @Override |
| 2251 | + public void onSubscribe(Subscription s) { |
| 2252 | + @SuppressWarnings("unchecked") |
| 2253 | + QueueSubscription<GroupedFlowable<Object, Integer>> q = (QueueSubscription<GroupedFlowable<Object, Integer>>)s; |
| 2254 | + qs.set(q); |
| 2255 | + q.requestFusion(QueueFuseable.ANY); |
| 2256 | + q.request(1); |
| 2257 | + } |
| 2258 | + }) |
| 2259 | + ; |
| 2260 | + |
| 2261 | + Runnable r1 = new Runnable() { |
| 2262 | + @Override |
| 2263 | + public void run() { |
| 2264 | + qs.get().cancel(); |
| 2265 | + qs.get().clear(); |
| 2266 | + } |
| 2267 | + }; |
| 2268 | + Runnable r2 = new Runnable() { |
| 2269 | + @Override |
| 2270 | + public void run() { |
| 2271 | + ts2.cancel(); |
| 2272 | + } |
| 2273 | + }; |
| 2274 | + |
| 2275 | + for (int i = 0; i < 100; i++) { |
| 2276 | + pp.onNext(i); |
| 2277 | + } |
| 2278 | + |
| 2279 | + TestHelper.race(r1, r2); |
| 2280 | + |
| 2281 | + if (!errors.isEmpty()) { |
| 2282 | + throw new CompositeException(errors); |
| 2283 | + } |
| 2284 | + } finally { |
| 2285 | + RxJavaPlugins.reset(); |
| 2286 | + } |
| 2287 | + } |
| 2288 | + } |
2208 | 2289 | } |
0 commit comments