Skip to content

Commit cc69260

Browse files
committed
fix reactor#947 Add getContext() to Signal
- filled for doOnEach - not filled for materialize for now - defaults to Context.empty() when not filled - use singleton if Signal.complete() built with empty Context
1 parent b9e3aa9 commit cc69260

11 files changed

Lines changed: 656 additions & 33 deletions

File tree

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3526,7 +3526,8 @@ public final Flux<T> doOnComplete(Runnable onComplete) {
35263526
* Add behavior (side-effects) triggered when the {@link Flux} emits an item, fails with an error
35273527
* or completes successfully. All these events are represented as a {@link Signal}
35283528
* that is passed to the side-effect callback. Note that this is an advanced operator,
3529-
* typically used for monitoring of a Flux.
3529+
* typically used for monitoring of a Flux. These {@link Signal} have a {@link Context}
3530+
* associated to them.
35303531
*
35313532
* @param signalConsumer the mandatory callback to call on
35323533
* {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
@@ -4910,6 +4911,8 @@ public final <V> Flux<V> map(Function<? super T, ? extends V> mapper) {
49104911
* materializing these signals.
49114912
* Since the error is materialized as a {@code Signal}, the propagation will be stopped and onComplete will be
49124913
* emitted. Complete signal will first emit a {@code Signal.complete()} and then effectively complete the flux.
4914+
* <p>
4915+
* These {@link Signal} don't have a {@link Context} associated with them (empty {@link Context}).
49134916
*
49144917
* <p>
49154918
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/materialize.png" alt="">

reactor-core/src/main/java/reactor/core/publisher/FluxDoOnEach.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import reactor.core.CoreSubscriber;
2424
import reactor.core.Exceptions;
2525
import reactor.util.annotation.Nullable;
26+
import reactor.util.context.Context;
2627

2728
/**
2829
* Peek into the lifecycle events and signals of a sequence
@@ -122,7 +123,7 @@ public void onError(Throwable t) {
122123
done = true;
123124
try {
124125
//noinspection ConstantConditions
125-
onSignal.accept(Signal.error(t));
126+
onSignal.accept(Signal.error(t, actual.currentContext()));
126127
}
127128
catch (Throwable e) {
128129
//this performs a throwIfFatal or suppresses t in e
@@ -147,8 +148,7 @@ public void onComplete() {
147148
}
148149
done = true;
149150
try {
150-
//noinspection ConstantConditions
151-
onSignal.accept(Signal.complete());
151+
onSignal.accept(Signal.complete(actual.currentContext()));
152152
}
153153
catch (Throwable e) {
154154
done = false;
@@ -182,9 +182,19 @@ public T get() {
182182
return t;
183183
}
184184

185+
@Override
186+
public Context getContext() {
187+
return actual.currentContext();
188+
}
189+
185190
@Override
186191
public SignalType getType() {
187192
return SignalType.ON_NEXT;
188193
}
194+
195+
@Override
196+
public String toString() {
197+
return "doOnEach_onNext(" + t + ")";
198+
}
189199
}
190200
}

reactor-core/src/main/java/reactor/core/publisher/FluxMaterialize.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.reactivestreams.Subscription;
2525
import reactor.core.CoreSubscriber;
2626
import reactor.util.annotation.Nullable;
27+
import reactor.util.context.Context;
2728

2829
/**
2930
* @author Stephane Maldini
@@ -182,6 +183,6 @@ public int size() {
182183
return terminalSignal == null || terminalSignal == empty ? 0 : 1;
183184
}
184185

185-
static final Signal empty = new ImmutableSignal<>(SignalType.ON_NEXT, null, null, null);
186+
static final Signal empty = new ImmutableSignal<>(Context.empty(), SignalType.ON_NEXT, null, null, null);
186187
}
187188
}

reactor-core/src/main/java/reactor/core/publisher/ImmutableSignal.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.reactivestreams.Subscription;
2323
import reactor.util.annotation.Nullable;
24+
import reactor.util.context.Context;
2425

2526
/**
2627
* The common implementation of a {@link Signal} (serializable and immutable).
@@ -33,14 +34,16 @@ final class ImmutableSignal<T> implements Signal<T>, Serializable {
3334

3435
private static final long serialVersionUID = -2004454746525418508L;
3536

37+
private final Context context;
3638
private final SignalType type;
3739
private final Throwable throwable;
3840

3941
private final T value;
4042

4143
private transient final Subscription subscription;
4244

43-
ImmutableSignal(SignalType type, @Nullable T value, @Nullable Throwable e, @Nullable Subscription subscription) {
45+
ImmutableSignal(Context context, SignalType type, @Nullable T value, @Nullable Throwable e, @Nullable Subscription subscription) {
46+
this.context = context;
4447
this.value = value;
4548
this.subscription = subscription;
4649
this.throwable = e;
@@ -70,6 +73,11 @@ public SignalType getType() {
7073
return type;
7174
}
7275

76+
@Override
77+
public Context getContext() {
78+
return context;
79+
}
80+
7381
@Override
7482
public boolean equals(@Nullable Object o) {
7583
if (this == o) {
@@ -133,6 +141,10 @@ public String toString() {
133141
}
134142
}
135143

144+
/**
145+
* @deprecated as Signal is now associated with {@link Context}, prefer using per-subscription instances.
146+
*/
147+
@Deprecated
136148
static final Signal<Void> ON_COMPLETE =
137-
new ImmutableSignal<>(SignalType.ON_COMPLETE, null, null, null);
149+
new ImmutableSignal<>(Context.empty(), SignalType.ON_COMPLETE, null, null, null);
138150
}

reactor-core/src/main/java/reactor/core/publisher/Mono.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1673,6 +1673,7 @@ public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess) {
16731673
* or completes successfully. All these events are represented as a {@link Signal}
16741674
* that is passed to the side-effect callback. Note that this is an advanced operator,
16751675
* typically used for monitoring of a Mono.
1676+
* These {@link Signal} have a {@link Context} associated to them.
16761677
*
16771678
* @param signalConsumer the mandatory callback to call on
16781679
* {@link Subscriber#onNext(Object)}, {@link Subscriber#onError(Throwable)} and
@@ -1685,11 +1686,8 @@ public final Mono<T> doOnSuccess(Consumer<? super T> onSuccess) {
16851686
*/
16861687
public final Mono<T> doOnEach(Consumer<? super Signal<T>> signalConsumer) {
16871688
Objects.requireNonNull(signalConsumer, "signalConsumer");
1688-
return doOnSignal(this,
1689-
null,
1690-
v -> signalConsumer.accept(Signal.next(v)),
1691-
e -> signalConsumer.accept(Signal.error(e)),
1692-
() -> signalConsumer.accept(Signal.complete()), null, null);
1689+
return onAssembly(new MonoDoOnEach<>(this, signalConsumer));
1690+
16931691
}
16941692

16951693
/**
@@ -2320,7 +2318,8 @@ public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) {
23202318
* materializing these signals.
23212319
* Since the error is materialized as a {@code Signal}, the propagation will be stopped and onComplete will be
23222320
* emitted. Complete signal will first emit a {@code Signal.complete()} and then effectively complete the flux.
2323-
*
2321+
* <p>
2322+
* These {@link Signal} don't have a {@link Context} associated with them (empty {@link Context}).
23242323
* <p>
23252324
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.1.RELEASE/src/docs/marble/materialize1.png" alt="">
23262325
*

reactor-core/src/main/java/reactor/core/publisher/MonoCacheTime.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.util.Logger;
2727
import reactor.util.Loggers;
2828
import reactor.util.annotation.Nullable;
29+
import reactor.util.context.Context;
2930

3031
/**
3132
* An operator that caches the value from a source Mono with a TTL, after which the value
@@ -44,7 +45,7 @@ class MonoCacheTime<T> extends MonoOperator<T, T> implements Runnable {
4445
static final AtomicReferenceFieldUpdater<MonoCacheTime, Signal> STATE =
4546
AtomicReferenceFieldUpdater.newUpdater(MonoCacheTime.class, Signal.class, "state");
4647

47-
static final Signal<?> EMPTY = new ImmutableSignal<>(SignalType.ON_NEXT, null, null, null);
48+
static final Signal<?> EMPTY = new ImmutableSignal<>(Context.empty(), SignalType.ON_NEXT, null, null, null);
4849

4950
MonoCacheTime(Mono<? extends T> source, Duration ttl, Scheduler clock) {
5051
super(source);
@@ -156,6 +157,15 @@ public SignalType getType() {
156157
throw new UnsupportedOperationException("illegal signal use");
157158
}
158159

160+
/**
161+
* unused in this context as the {@link Signal} interface is only
162+
* implemented for use in the main's STATE compareAndSet.
163+
*/
164+
@Override
165+
public Context getContext() {
166+
throw new UnsupportedOperationException("illegal signal use: getContext");
167+
}
168+
159169
final boolean add(Operators.MonoSubscriber<T, T> toAdd) {
160170
for (; ; ) {
161171
Operators.MonoSubscriber<T, T>[] a = subscribers;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.publisher;
18+
19+
import java.util.Objects;
20+
import java.util.function.Consumer;
21+
22+
import reactor.core.CoreSubscriber;
23+
24+
/**
25+
* Peek into the lifecycle events and signals of a sequence
26+
*
27+
* @param <T> the value type
28+
*
29+
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
30+
*/
31+
final class MonoDoOnEach<T> extends MonoOperator<T, T> {
32+
33+
final Consumer<? super Signal<T>> onSignal;
34+
35+
MonoDoOnEach(Mono<? extends T> source, Consumer<? super Signal<T>> onSignal) {
36+
super(source);
37+
this.onSignal = Objects.requireNonNull(onSignal, "onSignal");
38+
}
39+
40+
@Override
41+
public void subscribe(CoreSubscriber<? super T> actual) {
42+
//TODO fuseable version?
43+
//TODO conditional version?
44+
source.subscribe(new FluxDoOnEach.DoOnEachSubscriber<>(actual, onSignal));
45+
}
46+
}

0 commit comments

Comments
 (0)