Skip to content

Commit 257544a

Browse files
committed
4.4 added examples
1 parent 52150e0 commit 257544a

1 file changed

Lines changed: 139 additions & 12 deletions

File tree

Part 4 - Concurrency/4. Backpressure.md

Lines changed: 139 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,100 @@ producer.subscribe(i -> {
2020

2121
Here, the producer has its values ready and can emit them with no delay. The consumer is very slow by comparison. But this isn't going to cause problems, because the synchronous nature of the code above automatically regulates the rates of the producer and consumer. When `o.onNext(1);` is called, execution for the producer is blocked until the entire Rx chain completes. Only when that expression returns can the execution proceed to `o.onNext(2);`.
2222

23-
But this is only for synchronous execution. It is very common for the producer and the consumer to be asynchronous and we've already seen Rx's elegant way of handling and introducing asynchronicity. So, what happens when a producer and a consumer operate asynchronously at different speeds?
23+
But this is only for synchronous execution. It is very common for the producer and the consumer to be asynchronous So, what happens when a producer and a consumer operate asynchronously at different speeds?
2424

25-
Let's first consider a pull-based model, such as an iterator. In a pull-based model, the consumer requests the values. If the producer is slower, the consumer will block on request and resume when the next value arrives. If the procuder is faster, then it will have idle time waiting for the consumer request the next value.
25+
Let's first consider the traditional pull-based model, such as an iterator. In a pull-based model, the consumer requests the values. If the producer is slower, the consumer will block on request and resume when the next value arrives. If the procuder is faster, then it will have idle time waiting for the consumer request the next value.
2626

2727
Rx push-based, not pull-based. In Rx it is the producer that pushes values to the consumer when the values are ready. If the producer is slower, then the consumer will have idle time waiting for the next value to arrive. If producer is faster, without any provisions, it will keep force-feeding data to consumer without ever knowing about the consumer's difficulties.
2828

29+
```java
30+
Observable.interval(1, TimeUnit.MILLISECONDS)
31+
.observeOn(Schedulers.newThread())
32+
.subscribe(
33+
i -> {
34+
System.out.println(i);
35+
try {
36+
Thread.sleep(100);
37+
} catch (Exception e) { }
38+
},
39+
System.out::println);
40+
```
41+
Output
42+
```
43+
0
44+
1
45+
rx.exceptions.MissingBackpressureException
46+
```
47+
48+
Here, the `MissingBackpressureException` is letting us know that the producer is too fast and the operators that we used can't deal with it.
49+
2950
## Remedies for the consumer
3051

3152
Some of the operators we've seen in previous chapters can help the consumer lessen the stress caused by too much input.
3253

3354
#### Thin out the data
3455

35-
The [sample](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) operator allows you to specify a maximum rate of input, while the rest will be ignored.
56+
The [sample](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) operator naturally allows you to specify a maximum rate of input.
3657
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png)
37-
There other similar operators that will thin out your data, such as [throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling) and [debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing).
58+
59+
```java
60+
Observable.interval(1, TimeUnit.MILLISECONDS)
61+
.observeOn(Schedulers.newThread())
62+
.sample(100, TimeUnit.MILLISECONDS)
63+
.subscribe(
64+
i -> {
65+
System.out.println(i);
66+
try {
67+
Thread.sleep(100);
68+
} catch (Exception e) { }
69+
},
70+
System.out::println);
71+
```
72+
Output
73+
```
74+
82
75+
182
76+
283
77+
...
78+
```
79+
80+
The[throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling) family of operators also filters on rate, but allows you to speficy in a diffent way which element to let through when stressed. [Debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing) will does not cut the rate to fixed maximum. Instead, it will completely remove any burst of information and replace it with a single value.
3881

3982
#### Collect
4083

41-
You can also use `buffer` and `window` to collect overflowing data while the consumer is busy. Then you can decide manually how many and which of the buffered items are worth processing.
84+
You can also use `buffer` and `window` to collect overflowing data while the consumer is busy. This is useful if processing items in batches is faster. Alternatively, you can decide manually how many and which of the buffered items to process.
85+
86+
The example that we saw previously processes multiple items with the same speed that it processes bulks. Here we slowed down the producer to make the batches fit a line.
87+
88+
```java
89+
Observable.interval(10, TimeUnit.MILLISECONDS)
90+
.observeOn(Schedulers.newThread())
91+
.buffer(100, TimeUnit.MILLISECONDS)
92+
.subscribe(
93+
i -> {
94+
System.out.println(i);
95+
try {
96+
Thread.sleep(100);
97+
} catch (Exception e) { }
98+
},
99+
System.out::println);
100+
```
101+
Output
102+
```
103+
[0, 1, 2, 3, 4, 5, 6, 7]
104+
[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
105+
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
106+
...
107+
```
42108

43109

44110
## Reactive pull
45111

46-
The above remedies are legitimate solutions to the problem. However, they aren't always the best way to deal with an overproducing observable. Sometimes the problem can be better handled on the side of the producer. We call the process of resisting emissions by an observable _backpressure_.
112+
The above remedies are legitimate solutions to the problem. However, they aren't always the best way to deal with an overproducing observable. Sometimes the problem can be better handled on the side of the producer. Backpressure is a way for the pipeline to resist the emission of values.
47113

48114
> Back pressure refers to pressure opposed to the desired flow of a fluid in a confined place such as a pipe. It is often caused by obstructions or tight bends in the confinement vessel along which it is moving, such as piping or air vents. _- Wikipedia_
49115
50-
RxJava has implemented a way for a subscriber to regulate the rate of an observable. The `Subscriber` has a `request(n)` method, with which it notifies the observable that it is ready to accept `n` more values. By calling `request` on the `onStart` method of your `Subscriber` you establish reactive pull backpressure. This isn't a pull: it doesn't return any values and will not block if values are not ready. Instead, it merely notifies the observable of how many values the `Subscriber` is ready to accept and to hold the rest. Subsequent calls to `request` will allow more values through.
116+
RxJava has implemented a way for a subscriber to regulate the rate of an observable. The `Subscriber` has a `request(n)` method, with which it notifies the observable that it is ready to accept `n` more values. By calling `request` on the `onStart` method of your `Subscriber` you establish reactive pull backpressure. This isn't a pull in the sense of a pull-based model: it doesn't return any values and will not block if values are not ready. Instead, it merely notifies the observable of how many values the `Subscriber` is ready to accept and to hold the rest. Subsequent calls to `request` will allow more values through.
51117

52118
This is a `Subscriber` that takes values one at a time:
53119
```java
@@ -79,24 +145,85 @@ The `request(1)` in `onStart` establishes backpressure and that the observable s
79145

80146
## Backpressure policies
81147

82-
Many Rx operators use backpressure internally to avoid overfilling their internal queues. That way, the problem is propagated backwards into the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. An overproducing observable can still overwhelm the internal buffers of operators.
148+
Many Rx operators use backpressure internally to avoid overfilling their internal queues. That way, the problem of a slow consumer is propagated backwards in the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. We still need to decide what to do with the values of an overproducing observable.
83149

84-
There are Rx operators that declare how you want to deal with situations where values are not consumed fast enough.
150+
There are Rx operators that declare how you want to deal with situations where a subscriber cannot accept values comming through.
85151

86152
### onBackpressureBuffer
87153

88-
The `onBackpressureBuffer` operator with cause every value that can't be consumed to stored until the observer can consume it.
154+
The `onBackpressureBuffer` operator with cause every value that can't be consumed to be stored until the observer can consume it.
89155

90156
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png)
91157

92-
You can have a buffer of infinite size of a buffer with a maximum capacity. If the buffer overflows, it will fail with a `BufferOverflowException`.
158+
You can have a buffer of infinite size of a buffer with a maximum capacity. If the buffer overflows, the observable will fail.
159+
160+
```java
161+
Observable.interval(1, TimeUnit.MILLISECONDS)
162+
.onBackpressureBuffer(1000)
163+
.observeOn(Schedulers.newThread())
164+
.subscribe(
165+
i -> {
166+
System.out.println(i);
167+
try {
168+
Thread.sleep(100);
169+
} catch (Exception e) { }
170+
},
171+
System.out::println
172+
);
173+
```
174+
Output
175+
```
176+
0
177+
1
178+
2
179+
3
180+
4
181+
5
182+
6
183+
7
184+
8
185+
9
186+
10
187+
11
188+
rx.exceptions.MissingBackpressureException: Overflowed buffer of 1000
189+
```
190+
191+
What happens here is that the producer is 100 times faster than the consumer. We try to deal with that by buffering up to 1000 items. It is easy to calculate that, by the time that the consumer consumes the 11th item, the producer has produced 1100 items, well over our buffer's capacity. The observable then fails, as it can't deal with the backpressure.
93192

94193
### onBackpressureDrop
95194

96-
The `onBackpressureDrop` operator discards items that are not ready to be received.
195+
The `onBackpressureDrop` operator discards items if they can't be received.
97196

98197
![](https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png)
99198

199+
```java
200+
Observable.interval(1, TimeUnit.MILLISECONDS)
201+
.onBackpressureDrop()
202+
.observeOn(Schedulers.newThread())
203+
.subscribe(
204+
i -> {
205+
System.out.println(i);
206+
try {
207+
Thread.sleep(100);
208+
} catch (Exception e) { }
209+
},
210+
System.out::println);
211+
```
212+
Output
213+
```
214+
0
215+
1
216+
2
217+
...
218+
126
219+
127
220+
12861
221+
12862
222+
...
223+
```
224+
225+
What we see here is that the first 128 items where consumed normally, but then we jumped forward. The items inbetween were dropped by `onBackPressureDrop`. Even though we did not request it, the first 128 items where still buffered. Rx employs small buffers even when we don't request it.
226+
100227

101228
| Previous | Next |
102229
| --- | --- |

0 commit comments

Comments
 (0)