forked from ReactiveX/RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathOperator.java
More file actions
70 lines (55 loc) · 1.71 KB
/
Operator.java
File metadata and controls
70 lines (55 loc) · 1.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package rx;
import rx.subscriptions.CompositeSubscription;
public abstract class Operator<T> implements Observer<T>, Subscription {
private final CompositeSubscription cs;
// TODO I'm questioning this API, it could be confusing and misused
protected Operator(Operator<?> op) {
this.cs = op.cs;
}
protected Operator(CompositeSubscription cs) {
this.cs = cs;
}
public static <T> Operator<T> create(final Observer<? super T> o, CompositeSubscription cs) {
if (o == null) {
throw new IllegalArgumentException("Observer can not be null");
}
if (cs == null) {
throw new IllegalArgumentException("CompositeSubscription can not be null");
}
return new Operator<T>(cs) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T v) {
o.onNext(v);
}
};
}
public static <T> Operator<T> create(final Observer<? super T> o, Subscription s) {
if (s == null) {
throw new IllegalArgumentException("Subscription can not be null");
}
CompositeSubscription cs = new CompositeSubscription();
cs.add(s);
return create(o, cs);
}
/**
* Used to register an unsubscribe callback.
*/
public final void add(Subscription s) {
cs.add(s);
}
@Override
public final void unsubscribe() {
cs.unsubscribe();
}
public final boolean isUnsubscribed() {
return cs.isUnsubscribed();
}
}