From 904c198cc55b9586095f03ff564bdf2d4d2876b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 1 Jun 2016 22:39:45 +0200 Subject: [PATCH 1/4] 2.x: Design.md +extension +fusion --- DESIGN.md | 241 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 233 insertions(+), 8 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 77b1fddb16..06bf4175b0 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -325,18 +325,149 @@ In the addition of the previous rules, an operator for `Flowable`: ### Creation -Creation of the various types should be exposed through factory methods that provide safe construction. +Unlike RxJava 1.x, 2.x base classes are to be abstract, stateless and generally no longer wrap an `OnSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes. + +Instead of the indirection of an `OnSubscribe` and `lift`, operators are to be implemented by extending the base classes. For example, the `map` +operator will look like this: ```java -Flowable.create(SyncGenerator generator) +public final class FlowableMap extends Flowable { + + final Flowable source; + + final Function mapper; + + public FlowableMap(Flowable source, Function mapper) { + this.source = source; + this.mapper = mapper; + } + + @Override + protected void subscribeActual(Subscriber subscriber) { + source.subscribe(new FlowableMapSubscriber(subscriber, mapper)); + } + + static final class FlowableMapSubscriber implements Subscriber, Subscription { + // ... + } +} +``` + +Since Java still doesn't have extension methods, "adding" more operators can only happen through helper methods such as `lift(C -> C)` and `compose(R -> P)` where `C` is the default consumer type (i.e., `rs.Subscriber`), `R` is the base type (i.e., `Flowable`) and `P` is the base interface (i.e., `rs.Publisher`). As before, the library itself may gain or lose standard operators and/or overloads through the same community process. + +In concert, `create(OnSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap(Publisher)` static method. + +(*The unfortunate effect of `create` in 1.x was the ignorance of the Observable contract and beginner's first choice as an entry point. We can't eliminate this path since `rs.Publisher` is a single method functional interface that can be implemented just as badly.*) -Flowable.create(AsyncGenerator generator) +Therefore, new standard factory methods will try to address the common entry point requirements: + - `create(SyncGenerator)` to safe, synchronous generation of signals, one-by-one + - `create(AsyncGenerator)` to batch-create signals based on request patterns + - `create(Consumer>)` to relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `create(Consumer>)` to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). + - `create(Consumer` signal a completion or error from valueless reactive sources + +The following table lists where these create methods will be available in respect of the base types: + +| Method | Flowable | Observable | Single | Completable | +|--------|----------|------------|--------|-------------| +| `create(SyncGenerator)` | Yes | Yes | No | No | +| `create(AsyncOnSubscribe)` | Yes | No | No | No | +| `create(Consumer>)` | Yes | Yes | Yes | No | +| `create(Consumer>)` | Yes | Yes | No | No | +| `create(Consumer)` | Yes | Yes | No | Yes | + +The first two `create` methods take an implementation of an interface which provides state and the generator methods: + +```java +interface SyncGenerator { -Observable.create(OnSubscribe> onSubscribe) + S createState(); + + S generate(S state, Observer output); + + void disposeState(S state); +} -Single.create(OnSubscribe> onSubscribe) +interface AsyncGenerator { -Completable.create(OnSubscribe> onSubscribe) + S createState(); + + S generate(S state, long requested, Observer> output); + + void disposeState(S state); +} +``` + +These latter three `create` methods will provide the following interaction interfaces to the `java.util.function.Consumer`: + +```java +interface SingleEmitter { + + complete(T value); + + fail(Throwable error); + + stop(); + + setDisposable(Disposable d); + +} + +interface FlowEmitter { + + void next(T value); + + void fail(Throwable error); + + void complete(); + + void stop(); + + setDisposable(Disposable d); + + enum BackpressureHandling { + IGNORE, + ERROR, + DROP, + LATEST, + BUFFER + } + + void setBackpressureHandling(BackpressureHandling mode); + +} + +interface CompletableEmitter { + + complete(); + + fail(Throwable error); + + stop(); + + setDisposable(Disposable d); + +} + +``` + +By extending the base classes, operator implementations would loose the tracking/wrapping features of 1.x. To avoid this, the methods `subscribe(C)` will be final and operators have to implement a protected `subscribeActual` (or any other reasonable name). + +```java +@Override +public final void subscribe(Subscriber s) { + subscribeActual(hook.onSubscribe(s)); +} + +protected abstract void subscribeActual(Subscriber s); +``` + +Assembly-time hooks will be moved into the individual standard methods on the base types: + +```java +public final Flowable map(Function mapper) { + return hook.onAssembly(new FlowableMap(this, mapper)); +} ``` ### Terminal behavior @@ -359,6 +490,100 @@ This section contains current design work which needs more discussion and elabor We are investigate a base interface (similar to `Publisher`) for the `Observable`, `Single`, and `Completable` (currently referred to as `Consumable` or `ConsumableObservable`). This would empower library owners and api developers to implement their own type of `Observable`, `Single`, or `Completable` without extending the class. This would result in a change the type signatures of `subscribe` as well as any operator that operates over an `Observable`, `Single`, or `Completable` to accept a more generic type (i.e. `ConsumableObservable`). For more information see the proof of concept project [Consumable](https://github.com/stealthcode/Consumable). -#### Fusion (To be confirmed) +#### Fusion + +Operator fusion exploits the declarative nature of building flows; the developer specifies the "what", "where" and "when", the library then tries to optimize the "how". + +There are two main levels of operator fusion: *macro* and *micro*. + +##### Macro-fusion + +Macro fusion deals with the higher level view of the operators, their identity and their combination (mostly in the form of subsequence). This is partially an internal affair of the operators, triggered by the downstream operator and may work with several cases. Given an operator application pair `a().b()` where `a` could be a source or an intermediate operator itself, when the application of `b` happens in assembly time, the following can happen: + + - `b` identifies `a` and decides to not apply itself. Example: `empty().flatMap()` is functionally a no-op + - `b` identifies `a` and decides to apply a different, conventional operator. Example: `just().subscribeOn()` is turned into `just().observeOn()`. + - `b` decides to apply a new custom operator, combining and inlining existing behavior. Example: `just().subscribeOn()` internally goes to `ScalarScheduledPublisher`. + - `a` is `b` and the two operator's parameter set can be combined into a single application. Example: `filter(p1).filter(p2)` combined into `filter(p1 && p2)` + +Participating in the macro-fusion externally is possible by implementing a marker interface when extending `Flowable`. Two kinds of interfaces are available: + + - `java.util.Callable`: the Java standard, throwing interface, indicating the single value has to be extracted in subscription time (or later). + - `ScalarCallable`: to indicate the single value can be safely extracted during assembly time and used/inlined in other operators: + +```java +interface ScalarCallable extends java.util.Callable { + @Override + T call(); +} +``` + +`ScalarCallable` is also `Callable` and thus its value can be extracted practically anytime. For convenience (and for sense), `ScalarCallable` overrides and hides the superclass' `throws Exception` clause - throwing during assembly time is likely unreasonable for scalars. + +Since Reactive-Streams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`. + +Interoperating with other libraries, at this level is possible. Reactor-Core uses the same pattern and the two libraries can work with each other's `Publisher+Callable` types. Unfortunately, this means subscription-time only fusion as `ScalarCallable`s live locally in each library. + +##### Micro-fusion + +Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard Reactive-Streams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol. + +Currently, two main kinds of micro-fusion opportunities are available. + +###### 1) Conditional Subscriber + +This extends the RS `Subscriber`interface with an extra method: `boolean tryOnNext(T value)` and can help avoiding small request amounts in case an operator didn't forward but dropped the value. The canonical use is for the `filter()` operator where if the predicate returns false, the operator has to request 1 from upstream (since the downstream doesn't know there was a value dropped and thus not request itself). Operators wanting to participate in this fusion have to implement and subscribe with an extended Subscriber interface: + +```java +interface ConditionalSubscriber { + boolean tryOnNext(T value); +} + +//... +@Override +protected void subscribeActual(Subscriber s) { + if (s instanceof ConditionalSubscriber) { + source.subscribe(new FilterConditionalSubscriber<>(s, predicate)); + } else { + source.subscribe(new FilterRegularSubscriber<>(s, predicate)); + } +} +``` + +(Note that this may lead to extra case-implementations in operators that have some kind of queue-drain emission model.) + +###### 2) Queue-fusion + +The second category is when two (or more) operators share the same underlying queue and each append activity at the exit point (i.e., poll()) of the queue. This can work in two modes: synchronous and asynchronous. + +In synchronous mode, the elements of the sequence is already available (i.e., a fixed `range()` or `fromArray()`, or can be synchronously calculated in a pull fashion in `fromIterable`. In this mode, the requesting and regular onError-path is bypassed and is forbidden. Sources have to return null from `pull()` and false from `isEmpty()` if they have no more values and throw from these methods if they want to indicate an exceptional case. + +In asynchronous mode, elements may become available at any time, therefore, `pull` returning null, as with regular queue-drain, is just the indication of temporary lack of source values. Completion and error still has to go through `onComplete` and `onError` as usual, requesting still happens as usual but when a value is available in the shared queue, it is indicated by an `onNext(null)` call. This can trigger a chain of `drain` calls without moving values in or out of different queues. + +In both modes, `cancel` works and behaves as usual. + +Since this fusion mode is an optional extension, the mode switch has to be negotiated and the shared queue interface established. Operators already working with internal queues then can, mostly, keep their current `drain()` algorithm. Queue-fusion has its own interface and protocol built on top of the existing `onSubscribe`-`Subscription` rail: + +```java +interface QueueSubscription implements Queue, Subscription { + int NONE = 0; + int SYNC = 1; + int ASYNC = 2; + int ANY = SYNC | ASYNC; + int BOUNDARY = 4; + + int requestFusion(int mode); +} +``` + +For performance, the mode is an integer bitflags setup, called early during subscription time, and allows negotiating the fusion mode. Usually, producers can do only one mode and consumers can do both mode. Because fused, intermediate operators attach logic (which is many times user-callback) to the exit point of the queue interface (poll()), it may change the computation location of those callbacks in an unwanted way. The flag `BOUNDARY` is added by consumers indicating that they will consume the queue over an async boundary. Intermediate operators, such as `map` and `filter` then can reject the fusion in such sequences. + +Since RxJava 2.x is still JDK 6 compatible, the `QueueSubscription` can't itself default unnecessary methods and implementations are required to throw `UnsupportedOperationException` for `Queue` methods other than the following: + + - `poll()` + - `isEmpty()` + - `clear()` + - `size()` + +Even though other modern libraries also define this interface, they live in local packages and thus non-reusable without dragging in the whole library. Therefore, until externalized and standardized, cross-library micro-fusion won't happen. -We intend to enable operator fusion, but we don't have any specification yet. Nothing we do here should prevent the implementation of fusion. +A consequence of the extension of the `onSubscribe`-`Subscription` rail is that intermediate operators are no longer allowed to pass an upstream `Subscription` directly to its downstream `Subscriber.onSubscribe`. Doing so is likely to have the fused sequence skip the operator completely, losing behavior or causing runtime exceptions. Since RS `Subscriber` is an interface, operators can simply implement both `Subscriber` and `Subscription` on themselves, delegating the `request` and `cancel` calls to the upstream and calling `child.onSubscribe(this)`. \ No newline at end of file From a8afb07c7094c829df4074642f48f90e968bd0e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 1 Jun 2016 23:24:27 +0200 Subject: [PATCH 2/4] Flatten the create-table --- DESIGN.md | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 06bf4175b0..da80a7ea79 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -366,15 +366,29 @@ Therefore, new standard factory methods will try to address the common entry poi - `create(Consumer>)` to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). - `create(Consumer` signal a completion or error from valueless reactive sources -The following table lists where these create methods will be available in respect of the base types: - -| Method | Flowable | Observable | Single | Completable | -|--------|----------|------------|--------|-------------| -| `create(SyncGenerator)` | Yes | Yes | No | No | -| `create(AsyncOnSubscribe)` | Yes | No | No | No | -| `create(Consumer>)` | Yes | Yes | Yes | No | -| `create(Consumer>)` | Yes | Yes | No | No | -| `create(Consumer)` | Yes | Yes | No | Yes | +The `Flowable` will contain the following `create` methods: + + - `create(SyncGenerator)` + - `create(AsyncOnSubscribe)` + - `create(Consumer>)` + - `create(Consumer>)` + - `create(Consumer)` + +The `Observable` will contain the following `create` methods: + + - `create(SyncGenerator)` + - `create(Consumer>)` + - `create(Consumer>)` + - `create(Consumer)` + +The `Single` will contain the following `create` method: + + - `create(Consumer>)` + +The `Completable` will contain the following `create` method: + + - `create(Consumer)` + The first two `create` methods take an implementation of an interface which provides state and the generator methods: From d8a716ef1b40b4b45a2116cb5ba70f447071cf16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 1 Jun 2016 23:32:05 +0200 Subject: [PATCH 3/4] Update create naming to avoid overload conflicts --- DESIGN.md | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index da80a7ea79..2cf175040b 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -359,35 +359,30 @@ In concert, `create(OnSubscribe)` will not be available; standard operators exte (*The unfortunate effect of `create` in 1.x was the ignorance of the Observable contract and beginner's first choice as an entry point. We can't eliminate this path since `rs.Publisher` is a single method functional interface that can be implemented just as badly.*) -Therefore, new standard factory methods will try to address the common entry point requirements: - - `create(SyncGenerator)` to safe, synchronous generation of signals, one-by-one - - `create(AsyncGenerator)` to batch-create signals based on request patterns - - `create(Consumer>)` to relay a single value or error from other reactive sources (i.e., addListener callbacks) - - `create(Consumer>)` to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). - - `create(Consumer` signal a completion or error from valueless reactive sources +Therefore, new standard factory methods will try to address the common entry point requirements. The `Flowable` will contain the following `create` methods: - - `create(SyncGenerator)` - - `create(AsyncOnSubscribe)` - - `create(Consumer>)` - - `create(Consumer>)` - - `create(Consumer)` + - `create(SyncGenerator)`: safe, synchronous generation of signals, one-by-one + - `create(AsyncOnSubscribe)`: to batch-create signals based on request patterns + - `create(Consumer>)`: to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). + - `createSingle(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `createEmpty(Consumer)` The `Observable` will contain the following `create` methods: - - `create(SyncGenerator)` - - `create(Consumer>)` - - `create(Consumer>)` - - `create(Consumer)` + - `create(SyncGenerator)`: safe, synchronous generation of signals, one-by-one + - `create(Consumer>)`: to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). + - `createSingle(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `createEmpty(Consumer)`: signal a completion or error from valueless reactive sources The `Single` will contain the following `create` method: - - `create(Consumer>)` + - `create(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) The `Completable` will contain the following `create` method: - - `create(Consumer)` + - `create(Consumer)`: signal a completion or error from valueless reactive sources The first two `create` methods take an implementation of an interface which provides state and the generator methods: From cd17a205deb13edbb3fcbe3b97902fc8dfa749eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Wed, 1 Jun 2016 23:33:11 +0200 Subject: [PATCH 4/4] Add missing explanation, remove words --- DESIGN.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/DESIGN.md b/DESIGN.md index 2cf175040b..7d65f88461 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -364,21 +364,21 @@ Therefore, new standard factory methods will try to address the common entry poi The `Flowable` will contain the following `create` methods: - `create(SyncGenerator)`: safe, synchronous generation of signals, one-by-one - - `create(AsyncOnSubscribe)`: to batch-create signals based on request patterns - - `create(Consumer>)`: to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). - - `createSingle(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) - - `createEmpty(Consumer)` + - `create(AsyncOnSubscribe)`: batch-create signals based on request patterns + - `create(Consumer>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). + - `createSingle(Consumer>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `createEmpty(Consumer)`: signal a completion or error from valueless reactive sources The `Observable` will contain the following `create` methods: - `create(SyncGenerator)`: safe, synchronous generation of signals, one-by-one - - `create(Consumer>)`: to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). - - `createSingle(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `create(Consumer>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.). + - `createSingle(Consumer>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks) - `createEmpty(Consumer)`: signal a completion or error from valueless reactive sources The `Single` will contain the following `create` method: - - `create(Consumer>)`: to relay a single value or error from other reactive sources (i.e., addListener callbacks) + - `create(Consumer>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks) The `Completable` will contain the following `create` method: