forked from ReactiveX/RxJava
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathObservers.java
More file actions
129 lines (102 loc) · 3.27 KB
/
Observers.java
File metadata and controls
129 lines (102 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package rx.observers;
import rx.Observer;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
public class Observers {
private static final Observer<Object> EMPTY = new Observer<Object>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(Object args) {
// do nothing
}
};
@SuppressWarnings("unchecked")
public static <T> Observer<T> empty() {
return (Observer<T>) EMPTY;
}
/**
* Create an Observer that receives `onNext` and ignores `onError` and `onCompleted`.
*/
public static final <T> Observer<T> create(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
return new Observer<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
throw new OnErrorNotImplementedException(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
};
}
/**
* Create an Observer that receives `onNext` and `onError` and ignores `onCompleted`.
*
*/
public static final <T> Observer<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
return new Observer<T>() {
@Override
public final void onCompleted() {
// do nothing
}
@Override
public final void onError(Throwable e) {
onError.call(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
};
}
/**
* Create an Observer that receives `onNext`, `onError` and `onCompleted`.
*
*/
public static final <T> Observer<T> create(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
if (onError == null) {
throw new IllegalArgumentException("onError can not be null");
}
if (onComplete == null) {
throw new IllegalArgumentException("onComplete can not be null");
}
return new Observer<T>() {
@Override
public final void onCompleted() {
onComplete.call();
}
@Override
public final void onError(Throwable e) {
onError.call(e);
}
@Override
public final void onNext(T args) {
onNext.call(args);
}
};
}
}