Skip to content

Commit 3371e35

Browse files
committed
2.x: operator throttleFirst, timeInterval and timeout + bugfixes
1 parent d823c7d commit 3371e35

7 files changed

Lines changed: 1094 additions & 91 deletions

File tree

src/main/java/io/reactivex/internal/operators/OperatorThrottleFirstTimed.java

Lines changed: 34 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ public Subscriber<? super T> apply(Subscriber<? super T> t) {
4545
timeout, unit, scheduler.createWorker());
4646
}
4747

48-
static final class DebounceTimedSubscriber<T> extends AtomicInteger
48+
static final class DebounceTimedSubscriber<T>
49+
extends AtomicLong
4950
implements Subscriber<T>, Subscription, Runnable {
5051
/** */
5152
private static final long serialVersionUID = -9102637559663639004L;
@@ -59,29 +60,21 @@ static final class DebounceTimedSubscriber<T> extends AtomicInteger
5960
volatile Disposable timer;
6061
@SuppressWarnings("rawtypes")
6162
static final AtomicReferenceFieldUpdater<DebounceTimedSubscriber, Disposable> TIMER =
62-
AtomicReferenceFieldUpdater.newUpdater(DebounceTimedSubscriber.class, Disposable.class, "debouncer");
63+
AtomicReferenceFieldUpdater.newUpdater(DebounceTimedSubscriber.class, Disposable.class, "timer");
6364

6465
static final Disposable CANCELLED = () -> { };
6566

6667
static final Disposable NEW_TIMER = () -> { };
6768

6869
volatile boolean gate;
6970

70-
volatile long requested;
71-
@SuppressWarnings("rawtypes")
72-
static final AtomicLongFieldUpdater<DebounceTimedSubscriber> REQUESTED =
73-
AtomicLongFieldUpdater.newUpdater(DebounceTimedSubscriber.class, "requested");
74-
7571
boolean done;
7672

77-
T value;
78-
7973
public DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
8074
this.actual = actual;
8175
this.timeout = timeout;
8276
this.unit = unit;
8377
this.worker = worker;
84-
lazySet(1);
8578
}
8679

8780
public void disposeTimer() {
@@ -109,50 +102,42 @@ public void onNext(T t) {
109102
if (done) {
110103
return;
111104
}
112-
113-
if (gate) {
114-
value = t;
115-
gate = false;
116-
}
117-
118-
Disposable d = timer;
119-
if (d != null) {
120-
d.dispose();
121-
}
122-
123-
if (TIMER.compareAndSet(this, null, NEW_TIMER)) {
124-
d = worker.schedule(this, timeout, unit);
125-
if (!TIMER.compareAndSet(this, NEW_TIMER, d)) {
126-
d.dispose();
127-
value = null;
128-
}
129-
}
130-
}
131-
132-
@Override
133-
public void run() {
105+
134106
if (!gate) {
135-
T v = value;
136-
value = null;
137-
138-
long r = requested;
107+
gate = true;
108+
long r = get();
139109
if (r != 0L) {
140-
actual.onNext(v);
110+
actual.onNext(t);
141111
if (r != Long.MAX_VALUE) {
142-
REQUESTED.decrementAndGet(this);
143-
}
144-
if (decrementAndGet() == 0) {
145-
disposeTimer();
146-
worker.dispose();
147-
actual.onComplete();
148-
return;
112+
decrementAndGet();
149113
}
150-
gate = true;
151114
} else {
115+
done = true;
152116
cancel();
153117
actual.onError(new IllegalStateException("Could not deliver value due to lack of requests"));
118+
return;
119+
}
120+
121+
// FIXME should this be a periodic blocking or a value-relative blocking?
122+
Disposable d = timer;
123+
if (d != null) {
124+
d.dispose();
125+
}
126+
127+
if (TIMER.compareAndSet(this, d, NEW_TIMER)) {
128+
d = worker.schedule(this, timeout, unit);
129+
if (!TIMER.compareAndSet(this, NEW_TIMER, d)) {
130+
d.dispose();
131+
}
154132
}
155133
}
134+
135+
136+
}
137+
138+
@Override
139+
public void run() {
140+
gate = false;
156141
}
157142

158143
@Override
@@ -172,19 +157,17 @@ public void onComplete() {
172157
return;
173158
}
174159
done = true;
175-
if (decrementAndGet() == 0) {
176-
disposeTimer();
177-
worker.dispose();
178-
actual.onComplete();
179-
}
160+
disposeTimer();
161+
worker.dispose();
162+
actual.onComplete();
180163
}
181164

182165
@Override
183166
public void request(long n) {
184167
if (SubscriptionHelper.validateRequest(n)) {
185168
return;
186169
}
187-
BackpressureHelper.add(REQUESTED, this, n);
170+
BackpressureHelper.add(this, n);
188171
}
189172

190173
@Override

src/main/java/io/reactivex/internal/operators/OperatorTimeout.java

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -89,25 +89,29 @@ public void onSubscribe(Subscription s) {
8989

9090
Publisher<U> p;
9191

92-
try {
93-
p = firstTimeoutSelector.get();
94-
} catch (Exception ex) {
95-
cancel();
96-
EmptySubscription.error(ex, a);
97-
return;
98-
}
99-
100-
if (p == null) {
101-
cancel();
102-
EmptySubscription.error(new NullPointerException("The first timeout publisher is null"), a);
103-
return;
104-
}
105-
106-
TimeoutInnerSubscriber<T, U, V> tis = new TimeoutInnerSubscriber<>(this, 0);
107-
108-
if (TIMEOUT.compareAndSet(this, null, tis)) {
92+
if (firstTimeoutSelector != null) {
93+
try {
94+
p = firstTimeoutSelector.get();
95+
} catch (Exception ex) {
96+
cancel();
97+
EmptySubscription.error(ex, a);
98+
return;
99+
}
100+
101+
if (p == null) {
102+
cancel();
103+
EmptySubscription.error(new NullPointerException("The first timeout publisher is null"), a);
104+
return;
105+
}
106+
107+
TimeoutInnerSubscriber<T, U, V> tis = new TimeoutInnerSubscriber<>(this, 0);
108+
109+
if (TIMEOUT.compareAndSet(this, null, tis)) {
110+
a.onSubscribe(s);
111+
p.subscribe(tis);
112+
}
113+
} else {
109114
a.onSubscribe(s);
110-
p.subscribe(tis);
111115
}
112116
}
113117

@@ -272,27 +276,31 @@ public void onSubscribe(Subscription s) {
272276
}
273277
Subscriber<? super T> a = actual;
274278

275-
Publisher<U> p;
276-
277-
try {
278-
p = firstTimeoutSelector.get();
279-
} catch (Exception ex) {
280-
dispose();
281-
EmptySubscription.error(ex, a);
282-
return;
283-
}
284-
285-
if (p == null) {
286-
dispose();
287-
EmptySubscription.error(new NullPointerException("The first timeout publisher is null"), a);
288-
return;
289-
}
290-
291-
TimeoutInnerSubscriber<T, U, V> tis = new TimeoutInnerSubscriber<>(this, 0);
292-
293-
if (TIMEOUT.compareAndSet(this, null, tis)) {
279+
if (firstTimeoutSelector != null) {
280+
Publisher<U> p;
281+
282+
try {
283+
p = firstTimeoutSelector.get();
284+
} catch (Exception ex) {
285+
dispose();
286+
EmptySubscription.error(ex, a);
287+
return;
288+
}
289+
290+
if (p == null) {
291+
dispose();
292+
EmptySubscription.error(new NullPointerException("The first timeout publisher is null"), a);
293+
return;
294+
}
295+
296+
TimeoutInnerSubscriber<T, U, V> tis = new TimeoutInnerSubscriber<>(this, 0);
297+
298+
if (TIMEOUT.compareAndSet(this, null, tis)) {
299+
a.onSubscribe(arbiter);
300+
p.subscribe(tis);
301+
}
302+
} else {
294303
a.onSubscribe(arbiter);
295-
p.subscribe(tis);
296304
}
297305
}
298306

src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,20 @@ public final class FullArbiter<T> extends FullArbiterPad2 implements Subscriptio
3333
final SpscLinkedArrayQueue<Object> queue;
3434

3535
long requested;
36-
Subscription s;
36+
37+
volatile Subscription s;
38+
static final Subscription INITIAL = new Subscription() {
39+
@Override
40+
public void request(long n) {
41+
42+
}
43+
@Override
44+
public void cancel() {
45+
// TODO Auto-generated method stub
46+
47+
}
48+
};
49+
3750

3851
Disposable resource;
3952

@@ -45,6 +58,7 @@ public FullArbiter(Subscriber<? super T> actual, Disposable resource, int capaci
4558
this.actual = actual;
4659
this.resource = resource;
4760
this.queue = new SpscLinkedArrayQueue<>(capacity);
61+
this.s = INITIAL;
4862
}
4963

5064
@Override
@@ -78,7 +92,7 @@ public boolean setSubscription(Subscription s) {
7892
return false;
7993
}
8094

81-
queue.offer(s, NotificationLite.subscription(s));
95+
queue.offer(this.s, NotificationLite.subscription(s));
8296
drain();
8397
return true;
8498
}

0 commit comments

Comments
 (0)