Skip to content

Commit c0aff9c

Browse files
authored
2.x: fix SubscriptionArbiter reentrancy/cancel bug (ReactiveX#4310)
* 2.x: fix SubscriptionArbiter reentrancy/cancel bug * Use an infinite source with timeout as 1G element takes only 2 secs.
1 parent fe2445d commit c0aff9c

File tree

3 files changed

+56
-19
lines changed

3 files changed

+56
-19
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableRedo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.functions.*;
2323
import io.reactivex.internal.subscribers.flowable.ToNotificationSubscriber;
2424
import io.reactivex.internal.subscriptions.SubscriptionArbiter;
25-
import io.reactivex.processors.BehaviorProcessor;
25+
import io.reactivex.processors.*;
2626

2727
// FIXME split and update to the Rsc version
2828
public final class FlowableRedo<T> extends Flowable<T> {
@@ -39,7 +39,7 @@ public FlowableRedo(Publisher<? extends T> source,
3939
public void subscribeActual(Subscriber<? super T> s) {
4040

4141
// FIXE use BehaviorSubject? (once available)
42-
BehaviorProcessor<Try<Optional<Object>>> subject = BehaviorProcessor.create();
42+
FlowProcessor<Try<Optional<Object>>> subject = BehaviorProcessor.<Try<Optional<Object>>>create().toSerialized();
4343

4444
final RedoSubscriber<T> parent = new RedoSubscriber<T>(s, subject, source);
4545

@@ -70,13 +70,13 @@ static final class RedoSubscriber<T> extends AtomicBoolean implements Subscriber
7070
/** */
7171
private static final long serialVersionUID = -1151903143112844287L;
7272
final Subscriber<? super T> actual;
73-
final BehaviorProcessor<Try<Optional<Object>>> subject;
73+
final FlowProcessor<Try<Optional<Object>>> subject;
7474
final Publisher<? extends T> source;
7575
final SubscriptionArbiter arbiter;
7676

7777
final AtomicInteger wip = new AtomicInteger();
7878

79-
public RedoSubscriber(Subscriber<? super T> actual, BehaviorProcessor<Try<Optional<Object>>> subject, Publisher<? extends T> source) {
79+
public RedoSubscriber(Subscriber<? super T> actual, FlowProcessor<Try<Optional<Object>>> subject, Publisher<? extends T> source) {
8080
this.actual = actual;
8181
this.subject = subject;
8282
this.source = source;

src/main/java/io/reactivex/internal/subscriptions/SubscriptionArbiter.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,14 @@ public final void setSubscription(Subscription s) {
9696
actual = s;
9797

9898
long r = requested;
99-
if (r != 0L) {
100-
s.request(r);
101-
}
10299

103-
if (decrementAndGet() == 0) {
104-
return;
100+
if (decrementAndGet() != 0) {
101+
drainLoop();
105102
}
106103

107-
drainLoop();
104+
if (r != 0L) {
105+
s.request(r);
106+
}
108107

109108
return;
110109
}
@@ -133,15 +132,14 @@ public final void request(long n) {
133132
}
134133
}
135134
Subscription a = actual;
136-
if (a != null) {
137-
a.request(n);
138-
}
139135

140-
if (decrementAndGet() == 0) {
141-
return;
136+
if (decrementAndGet() != 0) {
137+
drainLoop();
142138
}
143139

144-
drainLoop();
140+
if (a != null) {
141+
a.request(n);
142+
}
145143

146144
return;
147145
}
@@ -235,6 +233,9 @@ final void drain() {
235233
final void drainLoop() {
236234
int missed = 1;
237235

236+
long requestAmount = 0L;
237+
Subscription requestTarget = null;
238+
238239
for (; ; ) {
239240

240241
Subscription ms = missedSubscription.get();
@@ -287,15 +288,20 @@ final void drainLoop() {
287288
}
288289
actual = ms;
289290
if (r != 0L) {
290-
ms.request(r);
291+
requestAmount = BackpressureHelper.addCap(requestAmount, r);
292+
requestTarget = ms;
291293
}
292-
} else if (mr != 0L && a != null) {
293-
a.request(mr);
294+
} else if (a != null && mr != 0L) {
295+
requestAmount = BackpressureHelper.addCap(requestAmount, mr);
296+
requestTarget = a;
294297
}
295298
}
296299

297300
missed = addAndGet(-missed);
298301
if (missed == 0) {
302+
if (requestAmount != 0L) {
303+
requestTarget.request(requestAmount);
304+
}
299305
return;
300306
}
301307
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,4 +1007,35 @@ public void startWith() throws Exception {
10071007
ts.assertComplete();
10081008
}
10091009
}
1010+
1011+
static final class InfiniteIterator implements Iterator<Integer>, Iterable<Integer> {
1012+
1013+
int count;
1014+
1015+
@Override
1016+
public boolean hasNext() {
1017+
return true;
1018+
}
1019+
1020+
@Override
1021+
public Integer next() {
1022+
return count++;
1023+
}
1024+
1025+
@Override
1026+
public void remove() {
1027+
}
1028+
1029+
@Override
1030+
public Iterator<Integer> iterator() {
1031+
return this;
1032+
}
1033+
}
1034+
1035+
@Test(timeout = 5000)
1036+
public void veryLongTake() {
1037+
Flowable.fromIterable(new InfiniteIterator()).concatWith(Flowable.<Integer>empty()).take(10)
1038+
.test()
1039+
.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
1040+
}
10101041
}

0 commit comments

Comments
 (0)