You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: Part 1 - Getting Started/2. Key types.md
+58-26Lines changed: 58 additions & 26 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -1,8 +1,8 @@
1
1
# Key types
2
2
3
-
Rx is based around two fundamental types, and several others that expand the functionality around them. Those two types are the `Observable` and the `Observer`. we will introduce those, as well as`Subject`s, which ease the learning curve.
3
+
Rx is based around two fundamental types, while several others expand the functionality around the core types. Those two core types are the `Observable` and the `Observer`, which will be introduced in this chapter. We will also introduce`Subject`s, which ease the learning curve.
4
4
5
-
Rx builds upon the [Observer](http://en.wikipedia.org/wiki/Observer_pattern) pattern. It is not unique in doing so. Event handling already exists in Java (e.g. JavaFX's EventHandler). These simpler approaches suffer in comparison to Rx:
5
+
Rx builds upon the [Observer](http://en.wikipedia.org/wiki/Observer_pattern) pattern. It is not unique in doing so. Event handling already exists in Java (e.g. JavaFX's EventHandler). Those are simpler approaches, which suffer in comparison to Rx:
6
6
7
7
* Events through event handlers are hard to compose.
8
8
* They cannot be queried over time
@@ -12,13 +12,13 @@ Rx builds upon the [Observer](http://en.wikipedia.org/wiki/Observer_pattern) pat
12
12
13
13
## Observable
14
14
15
-
[Observable](http://reactivex.io/RxJava/javadoc/rx/Observable) is the first core element that we will see. Since Java does not support extension methods, this class contains a lot of the implementation in Rx. We will be examining it step by step throughout this book. For now, we must understand the`Subscribe` method. Here is one key overload of the method:
15
+
[Observable](http://reactivex.io/RxJava/javadoc/rx/Observable) is the first core element that we will see. This class contains a lot of the implementation of Rx, including all of the core operators. We will be examining it step by step throughout this book. For now, we must understand the `Subscribe` method. Here is one key overload of the method:
16
16
17
17
```java
18
18
publicfinalSubscription subscribe(Subscriber<? super T> subscriber)
19
19
```
20
20
21
-
This is the method that you use to receive the values emitted by the observable. As the values come to be pushed (through policies that we will discuss throughout this book), they are pushed to the subscriber, which is then responsible for the behaviour based on this example. The `Subscriber` here is an implementation of the `Observer`.
21
+
This is the method that you use to receive the values emitted by the observable. As the values come to be pushed (through policies that we will discuss throughout this book), they are pushed to the subscriber, which is then responsible for the behaviour intended by the consumer. The `Subscriber` here is an implementation of the `Observer` interface.
22
22
23
23
An observable pushes 3 kinds of events
24
24
* Values
@@ -28,7 +28,7 @@ An observable pushes 3 kinds of events
28
28
29
29
## Observer
30
30
31
-
We already saw one abstract implementation of the [Observer](http://reactivex.io/RxJava/javadoc/rx/Observer.html), `Subscriber`. `Subscriber` implements some extra functionality and we will see why it is needer later. For now, we should understand the simpler interface.
31
+
We already saw one abstract implementation of the [Observer](http://reactivex.io/RxJava/javadoc/rx/Observer.html), `Subscriber`. `Subscriber` implements some extra functionality and should be used as the basis for our implementations of `Observer`. For now, it is simpler to first understand the interface.
32
32
33
33
```java
34
34
interfaceObserver<T> {
@@ -38,42 +38,68 @@ interface Observer<T> {
38
38
}
39
39
```
40
40
41
-
Those three methods are the behaviour that is executed every time the observable pushes a value. The observer's will have its `onNext` called zero or more times, optionally followed by an `onCompleted` or an `onError`. No calls happen after a call to `onError` or `onCompleted`.
41
+
Those three methods are the behaviour that is executed every time the observable pushes a value. The observer will have its `onNext` called zero or more times, optionally followed by an `onCompleted` or an `onError`. No calls happen after a call to `onError` or `onCompleted`.
42
42
43
-
You'll see a lot of the `Observable`, but not so much of the `Observer`. While it is important to understand the `Observer`, you'll be using a lot of method overloads that hide it.
43
+
When developing Rx code, you'll see a lot of `Observable`, but not so much of `Observer`. While it is important to understand the `Observer`, there are shorthands that that remove the need to instantiate it yourself.
44
44
45
45
46
46
## Implementing Observable and Observer
47
47
48
-
You could manually implement `Observer` or extend `Observable`. In reality that may be unnecessary, since Rx provides all the building blocks you need. It is also dangerous, as interaction between parts of Rx includes conventions and internal plumming that are not obvious. It is both simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.
48
+
You could manually implement `Observer` or extend `Observable`. In reality that will usually be unnecessary, since Rx already provides all the building blocks you need. It is also dangerous, as interaction between parts of Rx includes conventions and internal plumming that are not obvious to a beginner. It is both simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.
49
49
50
-
To subscribe to an observable, it is not necessary to provide instances of Observer at all. There are overloads to `subscribe` that take the functions to be executed for `onNext`, `onError` and `onSubscribe`, hiding the creation of instances. It is not even necessary to provide each of those functions. You can even provide a subset of them, i.e. for`onNext` or for`onNext` and `onError`.
50
+
To subscribe to an observable, it is not necessary to provide instances of `Observer` at all. There are overloads to `subscribe` that simply take the functions to be executed for `onNext`, `onError` and `onSubscribe`, hiding away the instantiation of the corresponding `Observer`. It is not even necessary to provide each of those functions. You can provide a subset of them, i.e. just`onNext` or just`onNext` and `onError`.
51
51
52
-
The introduction of lambda functions in Java 1.8 makes these overload very convenient for our examples.
52
+
The introduction of lambda functions in Java 1.8 makes these overloads very convenient for the short examples that exist in this book.
53
53
54
54
## Subject
55
55
56
-
Subjects are an extension of the `Observable` that also implements the `Observer` interface. The idea may sound odd at first, but they make things a lot simpler in some cases. They can have events pushed to them, which they then push further to their own subscribers. This makes them ideal entry points into Rx code: when you have values coming in from outside of Rx, you can push them into a `Subject`, turning them into an observable. You can think of them as entry points to Rx code.
56
+
Subjects are an extension of the `Observable` that also implements the `Observer` interface. The idea may sound odd at first, but they make things a lot simpler in some cases. They can have events pushed to them (like observers), which they then push further to their own subscribers (like observables). This makes them ideal entry points into Rx code: when you have values coming in from outside of Rx, you can push them into a `Subject`, turning them into an observable. You can think of them as entry points to an Rx pipeline.
57
57
58
-
`Subject` has two parameter types: the input type and the output type. This is not because `Subject` is the right place transform your values. There are transformation operators to do that, which we will see later.
58
+
`Subject` has two parameter types: the input type and the output type. This was designed so for the sake of abstraction and not because the common uses for subjects involve transforming values. There are transformation operators to do that, which we will see later.
59
59
60
60
There are a few different implementations of `Subject`. We will now examine the most important ones and their differences.
61
61
62
-
### ReplaySubject
62
+
### PublishSubject
63
63
64
-
`ReplaySubject` has the special feature of caching all the values pushed to it. When a new subscription is made, the event sequence is replayed from the start for the new subscriber.
64
+
`PublishSubject` is the most straight-forward kind of subject. When a value is pushed into a `PublishSubject`, the subject pushes it to every subscriber that is subscribed to it at that moment.
As we can see in the example, `1` isn't printed because we weren't subscribed when it was pushed. After we subscribed, we began receiving the values that were pushed to the subject.
84
+
85
+
This is the first time we see `subscribe` being used, so it is worth paying attention to how it was used. In this case, we used the overload which expects one [Function](http://reactivex.io/RxJava/javadoc/rx/functions/Function.html) for the case of onNext. That function takes an argument of type `Integer` and returns nothing. Functions without a return type are also called actions. We can provide that function in different ways:
86
+
* we can supply an instance of `Action1<Integer>`,
87
+
* implicitly create one using a [lambda expression](http://en.wikipedia.org/wiki/Anonymous_function#Java) or
88
+
* pass a reference to an existing method that fits the signature.
89
+
In this case, `System.out::println` has an overload that accepts `Object`, so we passed a reference to it. `subscribe` will call `println` with the arriving values as the argument.
90
+
91
+
### ReplaySubject
76
92
93
+
`ReplaySubject` has the special feature of caching all the values pushed to it. When a new subscription is made, the event sequence is replayed from the start for the new subscriber. After catching up, every subscriber receives new events as they come.
94
+
95
+
```java
96
+
ReplaySubject<Integer> s =ReplaySubject.create();
97
+
s.subscribe(v ->System.out.println("Early:"+ v));
98
+
s.onNext(0);
99
+
s.onNext(1);
100
+
s.subscribe(v ->System.out.println("Late: "+ v));
101
+
s.onNext(2);
102
+
```
77
103
Output
78
104
```
79
105
Early:0
@@ -104,10 +130,12 @@ Late: 2
104
130
Late: 3
105
131
```
106
132
107
-
Our late subscriber now missed the first value, which fell off our buffer of size 2.
133
+
Our late subscriber now missed the first value, which fell off the buffer of size 2. Similarily, old values fall off the buffer as time passes, when the subject is created with `createWithTime`
108
134
109
135
```java
110
-
ReplaySubject<Integer> s =ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS, Schedulers.immediate());
136
+
ReplaySubject<Integer> s =ReplaySubject.createWithTime(
137
+
150, TimeUnit.MILLISECONDS,
138
+
Schedulers.immediate());
111
139
s.onNext(0);
112
140
Thread.sleep(100);
113
141
s.onNext(1);
@@ -124,11 +152,13 @@ Late: 2
124
152
Late: 3
125
153
```
126
154
155
+
Creating a `ReplaySubject` with time requires a `Scheduler`, which is Rx's way of keeping time. Feel free to ignore this for now, as we will properly introduce schedulers in the chapter about concurrency.
156
+
127
157
`ReplaySubject.createWithTimeAndSize` limits both, which ever comes first.
128
158
129
159
### BehaviorSubject
130
160
131
-
`BehaviorSubject` only remembers the last value. It is similar to a `ReplaySubject` with a buffer of size 1. An initial value can be provided on creation, therefore guaranteeing that a value will be available immediately on subscription.
161
+
`BehaviorSubject` only remembers the last value. It is similar to a `ReplaySubject` with a buffer of size 1. An initial value can be provided on creation, therefore guaranteeing that a value always will be available immediately on subscription.
132
162
133
163
```java
134
164
BehaviorSubject<Integer> s =BehaviorSubject.create();
@@ -160,7 +190,7 @@ s.subscribe(
160
190
);
161
191
```
162
192
163
-
An initial value is available if anyone subscribes before the first value is pushed
193
+
An initial value is provided to be available if anyone subscribes before the first value is pushed.
164
194
165
195
```java
166
196
BehaviorSubject<Integer> s =BehaviorSubject.create(0);
@@ -178,7 +208,7 @@ Since the defining role of a `BehaviorSubject` is to always have a value readily
178
208
179
209
### AsyncSubject
180
210
181
-
`AsyncSubject` also caches the single last value. The difference now is that it doesn't emit anything until the sequence completes. Its use is to emit a single value and immediately complete.
211
+
`AsyncSubject` also caches the last value. The difference now is that it doesn't emit anything until the sequence completes. Its use is to emit a single value and immediately complete.
182
212
183
213
```java
184
214
AsyncSubject<Integer> s =AsyncSubject.create();
@@ -198,7 +228,7 @@ Note that, if we didn't do `s.onCompleted();`, this example would have printed n
198
228
199
229
## Implicit contracts
200
230
201
-
As we already mentioned, there are contracts in Rx that are not obvious in the code. An important one is that no events are emitted after a termination event (`onError` or `onCompleted`). The implemented subjects respect that:
231
+
As we already mentioned, there are contracts in Rx that are not obvious in the code. An important one is that no events are emitted after a termination event (`onError` or `onCompleted`). The implemented subjects respect that, and the `subscribe` method also prevents some violations of the contract.
202
232
203
233
```java
204
234
Subject<Integer, Integer> s =ReplaySubject.create();
@@ -213,6 +243,8 @@ Output
213
243
0
214
244
```
215
245
246
+
Safety nets like these are not guaranteed in the entirety of the implementation of Rx. It is best that you are mindful not to violate the contract, as this may lead to undefined behaviour.
Copy file name to clipboardExpand all lines: Part 1 - Getting Started/3. Lifetime management.md
+14-10Lines changed: 14 additions & 10 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -15,7 +15,7 @@ Subscription subscribe(Observer<? super T> observer)
15
15
Subscription subscribe(Subscriber<? super T> subscriber)
16
16
```
17
17
18
-
`subscribe()` consumes events but performs no actions. The overloads that take an `Action1`construct a `Subscriber` with the functions that you provide, or doing nothing where the action is missing.
18
+
`subscribe()` consumes events but performs no actions. The overloads that take one or more `Action` will construct a `Subscriber` with the functions that you provide. Where you don't give an action, the no action is performed.
19
19
20
20
In the following example, we handle the error of a sequence that failed.
21
21
@@ -34,15 +34,15 @@ Output
34
34
java.lang.Exception: Oops
35
35
```
36
36
37
-
If we do not provide a function for error handling, an `OnErrorNotImplementedException` will be *thrown* at the point where `s.onError` is called. It happens here that the producer and the consumer are side-by-side, so you could do a try-catch. However, on a compartmentalised system you won't receive the exception unless you provide a handler to `subscribe`.
37
+
If we do not provide a function for error handling, an `OnErrorNotImplementedException` will be *thrown* at the point where `s.onError` is called, which is the producer's side. It happens here that the producer and the consumer are side-by-side, so we could do a traditional try-catch. However, on a compartmentalised system, the producer and the subscriber very often are in different places. Unless the consumer provides a handle for errors to `subscribe`, they will never know that an error has occured and that the sequence was terminated.
38
38
39
39
## Unsubscribing
40
40
41
41
You can also stop receiving values *before* a sequence terminates. Every `subscribe` overload returns an instance of `Subscription`, which is an interface with 2 methods:
42
42
43
43
```java
44
-
booleanisUnsubscribed()
45
-
voidunsubscribe()
44
+
boolean isUnsubscribed()
45
+
void unsubscribe()
46
46
```
47
47
48
48
Calling `unsubscribe` will stop events from being pushed to your observer.
@@ -66,7 +66,7 @@ Output
66
66
1
67
67
```
68
68
69
-
Unsubscribing one observer does not interfere with other observers
69
+
Unsubscribing one observer does not interfere with other observers on the same observable.
`onError` and `onCompleted` mean the termination of a sequence. An observable that complies with the standard will not emit anything after either of those events. This is something to note both when consuming in Rx and when implementing your own observable.
98
+
`onError` and `onCompleted` mean the termination of a sequence. An observable that complies with the Rx contract will not emit anything after either of those events. This is something to note both when consuming in Rx and when implementing your own observable.
A `Subscription`can be tied to the resources it uses. For that reason, you should remember to dispose of subscriptions. You can create that binding a `Subscription`with the necessary using the [Subscriptions](http://reactivex.io/RxJava/javadoc/rx/subscriptions/Subscriptions.html) factory.
122
+
A `Subscription`is tied to the resources it uses. For that reason, you should remember to dispose of subscriptions. You can create the binding between a `Subscription`and the necessary resources using the [Subscriptions](http://reactivex.io/RxJava/javadoc/rx/subscriptions/Subscriptions.html) factory.
123
123
124
124
```java
125
125
Subscription s =Subscriptions.create(() ->System.out.println("Clean"));
126
126
s.unsubscribe();
127
127
```
128
-
129
128
Output
130
129
```
131
130
Clean
132
131
```
133
132
134
-
The `Subscriptions` factory has more methods, which allow you to compose the use of resources into a single `Subscription`. `Subscription` itself has several implementations.
133
+
`Subscriptions.create` takes an action that will be executed on unsubscription to release the resources. There also are shorthand for common actions when creating a sequence.
134
+
*`Subscriptions.empty()` returns a `Subscription` that does nothing when disposed. This is useful when you are required to return an instance of `Subscription`, but your implementation doesn't actually need to release any resources.
135
+
*`Subscriptions.from(Subscription... subscriptions)` returns a `Subscription` that will dispose of multiple other subscriptions when it is disposed.
136
+
*`Subscriptions.unsubscribed()` returns a `Subscription` that is already disposed of.
137
+
138
+
There are several implementations of `Subscription`.
135
139
136
140
*`BooleanSubscription`
137
141
*`CompositeSubscription`
@@ -144,7 +148,7 @@ The `Subscriptions` factory has more methods, which allow you to compose the use
144
148
*`Subscriber`
145
149
*`TestSubscriber`
146
150
147
-
We will see some more later on.
151
+
We will see more of them later in this book. It is interesting to note that `Subscriber` also implements `Subscription`. This means that we can also use a reference to a `Subscriber` to terminate a subscription.
0 commit comments