forked from ReactiveX/RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSafeObserver.java
More file actions
165 lines (153 loc) · 6.56 KB
/
SafeObserver.java
File metadata and controls
165 lines (153 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import rx.Observer;
import rx.Subscription;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
import rx.util.OnErrorNotImplementedException;
/**
* Wrapper around Observer to ensure compliance with Rx contract.
* <p>
* The following is taken from the Rx Design Guidelines document: http://go.microsoft.com/fwlink/?LinkID=205219
* <pre>
* Messages sent to instances of the IObserver interface follow the following grammar:
*
* OnNext* (OnCompleted | OnError)?
*
* This grammar allows observable sequences to send any amount (0 or more) of OnNext messages to the subscribed
* observer instance, optionally followed by a single success (OnCompleted) or failure (OnError) message.
*
* The single message indicating that an observable sequence has finished ensures that consumers of the observable
* sequence can deterministically establish that it is safe to perform cleanup operations.
*
* A single failure further ensures that abort semantics can be maintained for operators that work on
* multiple observable sequences (see paragraph 6.6).
* </pre>
*
* <p>
* This wrapper will do the following:
* <ul>
* <li>Allow only single execution of either onError or onCompleted.</li>
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
* <li>When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).</li>
* </ul>
* <p>
* It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that.
*
* @param <T>
*/
public class SafeObserver<T> implements Observer<T> {
private final Observer<? super T> actual;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final Subscription subscription;
public SafeObserver(Observer<? super T> actual) {
this.subscription = Subscriptions.empty();
this.actual = actual;
}
public SafeObserver(SafeObservableSubscription subscription, Observer<? super T> actual) {
this.subscription = subscription;
this.actual = actual;
}
@Override
public void onCompleted() {
if (isFinished.compareAndSet(false, true)) {
try {
actual.onCompleted();
} catch (Throwable e) {
// handle errors if the onCompleted implementation fails, not just if the Observable fails
_onError(e);
} finally {
// auto-unsubscribe
subscription.unsubscribe();
}
}
}
@Override
public void onError(Throwable e) {
if (isFinished.compareAndSet(false, true)) {
_onError(e);
}
}
@Override
public void onNext(T args) {
try {
if (!isFinished.get()) {
actual.onNext(args);
}
} catch (Throwable e) {
// handle errors if the onNext implementation fails, not just if the Observable fails
onError(e);
}
}
/*
* The logic for `onError` without the `isFinished` check so it can be called from within `onCompleted`.
*
* See https://github.com/Netflix/RxJava/issues/630 for the report of this bug.
*/
protected void _onError(Throwable e) {
try {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
actual.onError(e);
} catch (Throwable e2) {
if (e2 instanceof OnErrorNotImplementedException) {
/*
* onError isn't implemented so throw
*
* https://github.com/Netflix/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
* to rethrow the exception on the thread that the message comes out from the observable sequence.
* The OnCompleted behavior in this case is to do nothing."
*/
try {
subscription.unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
}
throw (OnErrorNotImplementedException) e2;
} else {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/Netflix/RxJava/issues/198
*/
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
try {
subscription.unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
}
// if we did not throw about we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
subscription.unsubscribe();
} catch (RuntimeException unsubscribeException) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
throw unsubscribeException;
}
}
}