|
| 1 | +# Backpressure |
| 2 | + |
| 3 | +Rx leads events from end of a pipeline to the other end. The actions that take place on each end can be very dissimilar. What happens when the producer and the consumer require different amounts of time to process a value? In a synchronous model this question isn't an issue. Consider the following example: |
| 4 | + |
| 5 | +```java |
| 6 | +// Produce |
| 7 | +Observable<Integer> producer = Observable.create(o -> { |
| 8 | + o.onNext(1); |
| 9 | + o.onNext(2); |
| 10 | + o.onCompleted(); |
| 11 | +}); |
| 12 | +// Consume |
| 13 | +producer.subscribe(i -> { |
| 14 | + try { |
| 15 | + Thread.sleep(1000); |
| 16 | + System.out.println(i); |
| 17 | + } catch (Exception e) { } |
| 18 | +}); |
| 19 | +``` |
| 20 | + |
| 21 | +Here, the producer has its values ready and can emit them with no delay. The consumer is very slow by comparison. But this isn't going to cause problems, because the synchronous nature of the code above automatically regulates the rates of the producer and consumer. When `o.onNext(1);` is called, execution for the producer is blocked until the entire Rx chain completes. Only when that expression returns can the execution proceed to `o.onNext(2);`. |
| 22 | + |
| 23 | +But this is only for synchronous execution. It is very common for the producer and the consumer to be asynchronous and we've already seen Rx's elegant way of handling and introducing asynchronicity. So, what happens when a producer and a consumer operate asynchronously at different speeds? |
| 24 | + |
| 25 | +Let's first consider a pull-based model, such as an iterator. In a pull-based model, the consumer requests the values. If the producer is slower, the consumer will block on request and resume when the next value arrives. If the procuder is faster, then it will have idle time waiting for the consumer request the next value. |
| 26 | + |
| 27 | +Rx push-based, not pull-based. In Rx it is the producer that pushes values to the consumer when the values are ready. If the producer is slower, then the consumer will have idle time waiting for the next value to arrive. If producer is faster, without any provisions, it will keep force-feeding data to consumer without ever knowing about the consumer's difficulties. |
| 28 | + |
| 29 | +## Remedies for the consumer |
| 30 | + |
| 31 | +Some of the operators we've seen in previous chapters can help the consumer lessen the stress caused by too much input. |
| 32 | + |
| 33 | +#### Thin out the data |
| 34 | + |
| 35 | +The [sample](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#sample) operator allows you to specify a maximum rate of input, while the rest will be ignored. |
| 36 | + |
| 37 | +There other similar operators that will thin out your data, such as [throttle](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#throttling) and [debounce](/Part 3 - Taming the sequence/5. Time-shifted sequences.md#debouncing). |
| 38 | + |
| 39 | +#### Collect |
| 40 | + |
| 41 | +You can also use `buffer` and `window` to collect overflowing data while the consumer is busy. Then you can decide manually how many and which of the buffered items are worth processing. |
| 42 | + |
| 43 | + |
| 44 | +## Reactive pull |
| 45 | + |
| 46 | +The above remedies are legitimate solutions to the problem. However, they aren't always the best way to deal with an overproducing observable. Sometimes the problem can be better handled on the side of the producer. We call the process of resisting emissions by an observable _backpressure_. |
| 47 | + |
| 48 | +> Back pressure refers to pressure opposed to the desired flow of a fluid in a confined place such as a pipe. It is often caused by obstructions or tight bends in the confinement vessel along which it is moving, such as piping or air vents. _- Wikipedia_ |
| 49 | +
|
| 50 | +RxJava has implemented a way for a subscriber to regulate the rate of an observable. The `Subscriber` has a `request(n)` method, with which it notifies the observable that it is ready to accept `n` more values. By calling `request` on the `onStart` method of your `Subscriber` you establish reactive pull backpressure. This isn't a pull: it doesn't return any values and will not block if values are not ready. Instead, it merely notifies the observable of how many values the `Subscriber` is ready to accept and to hold the rest. Subsequent calls to `request` will allow more values through. |
| 51 | + |
| 52 | +This is a `Subscriber` that takes values one at a time: |
| 53 | +```java |
| 54 | +class MySubscriber extends Subscriber<T> { |
| 55 | + @Override |
| 56 | + public void onStart() { |
| 57 | + request(1); |
| 58 | + } |
| 59 | + |
| 60 | + @Override |
| 61 | + public void onCompleted() { |
| 62 | + ... |
| 63 | + } |
| 64 | + |
| 65 | + @Override |
| 66 | + public void onError(Throwable e) { |
| 67 | + ... |
| 68 | + } |
| 69 | + |
| 70 | + @Override |
| 71 | + public void onNext(T n) { |
| 72 | + ... |
| 73 | + request(1); |
| 74 | + } |
| 75 | +} |
| 76 | +``` |
| 77 | + |
| 78 | +The `request(1)` in `onStart` establishes backpressure and that the observable should only emit the first value. After processing in `onNext`, we request the next item to be sent, if and when it is available. Calling `request(Long.MAX_VALUE)` disables backpressure. |
| 79 | + |
| 80 | +## Backpressure policies |
| 81 | + |
| 82 | +Many Rx operators use backpressure internally to avoid overfilling their internal queues. That way, the problem is propagated backwards into the chain of operators. Backpressure doesn't make the problem go away. It merely moves it where it may be handled better. An overproducing observable can still overwhelm the internal buffers of operators. |
| 83 | + |
| 84 | +There are Rx operators that declare how you want to deal with situations where values are not consumed fast enough. |
| 85 | + |
| 86 | +### onBackpressureBuffer |
| 87 | + |
| 88 | +The `onBackpressureBuffer` operator with cause every value that can't be consumed to stored until the observer can consume it. |
| 89 | + |
| 90 | + |
| 91 | + |
| 92 | +You can have a buffer of infinite size of a buffer with a maximum capacity. If the buffer overflows, it will fail with a `BufferOverflowException`. |
| 93 | + |
| 94 | +### onBackpressureDrop |
| 95 | + |
| 96 | +The `onBackpressureDrop` operator discards items that are not ready to be received. |
| 97 | + |
| 98 | + |
1 | 99 |
|
2 | 100 |
|
3 | 101 | | Previous | Next | |
|
0 commit comments