Skip to content

Commit 4189b8a

Browse files
Leonid Ryzhykryzhyk
authored andcommitted
Dynamic dispatch
This commit contains a rewrite of the DBSP library to use dynamic dispatch. Background ========== DBSP operators manipulate batches of records that represent keys, values, and weights in various collections. Each record is represented as a concrete Rust type, e.g., a struct or a tuple. Operators are parameterized by the types of their input and output batches, which are in turn parameterized by the types of keys, values, and weights in the batch. This means that, had we used static typing (which is the default in Rust), the Rust compiler would generate a specialized implementation of each operator for each combination of input and output types that occur in the program. In an earlier version of this library, this approach led to very long compilation times for even medium-sized programs. The solution we adopt relies on dynamic dispatch instead. We bundle primitive operations on types (comparison, cloning, etc.) as object-safe traits and expose them to operators as trait object. This way we are able to achieve faster compilation without sacrificing much performance. Implementing this approach involves some machinery. First, Trait objects cannot be efficiently combined into any kind of containers (vectors, sets, or even tuples or structs) without wrapping each object in a `Box` or some other heap allocated type, leading to many small allocations. To avoid this, we represent such containers as separate trait objects. For instance, options (`Option<T>`), 2-tuples (`(T1, T2)`), vectors (`Vec<T>`), and sets (`BTreeSet<T>`) are modeled as traits. Second, this design introduces unsafe code that cannot be nicely encapsulated. By replacing concrete types with trait objects, we lose the ability to distinguish between trait objects backed by different types. Operations like comparison and cloning must downcast their arguments to the same concrete type. This can be done safely by checking the type id's of the arguments, but this check is expensive in Rust when performed on the critical path. We therefore elide this check in release builds, making all such operations unsafe. Third, operators require the ability to create new values, which in turn requires passing a **factory** object for each type the operator may need to instantiate. Most operators (as well as low-level data structures they use internally like batches and traces) require multiple factory objects. Finally, since trait objects don't have a statically known size, they cannot be returned by value (without boxing them, which we don't want to do in most cases). We therefore pass mutable references for functions to write return values to. This requires the caller to allocate space for the return value using a factory object. Likewise, trait objects cannot be passed by value; therefore functions that would normally take an owned value instead take a mutable reference, which they are allowed to use without cloning, (see, e.g., `ClonableTrait::move_to`, which moves the value, leaving the default value in its place. Trait hierarchy =============== Traits and types used to implement dynamic dispatch are found in `/src/dynamic`. We encapsulate common operations in a hierarchy of object-safe traits, i.e., traits for which the compiler can build vtables and expose them as trait objects (`dyn <Trait>`). Basic traits ============ The following traits must be implemented for all values that DBSP can compute on: * `AsAny` - convert a reference to a type to `&dyn Any`. * `Clonable` - an object-safe version of `Clone`. * `Comparable` - an object-safe version of `Ord`. * `SerializeDyn` - an object-safe version of `rkyv::Serialize`. * `DeserializableDyn` - an object-safe version of `rkyv::Deserialize`. * `Data` - the lowest common denominator for all types that DBSP can compute on. Combines all of the above traits, along with `Debug`, `SizeOf`, and `Send`. This is an object-safe version of `DBData`. In addition, the `Erase` trait is provided to convert concrete types into trait objects. Trait object traits =================== The following traits are meant to be implemented by **trait objects** rather than regular types, i.e., an instance of one of these traits is a `dyn Trait` (which is a type in Rust). The reason we need these traits is ergonomics, since not all useful behaviors can be captured nicely in an object-safe way. For instance `Eq`, `Ord`, and `Clone` traits are not object-safe, so we cannot expose them directly via vtables. We instead introduce `Comparable` and `Clonable` traits mentioned above. These traits define an unsafe API that accepts the second argument as a raw byte pointer (`*mut u8`). These are not meant to be used directly. Instead, we use them to implement `Eq`, `Ord`, and `Clone` for trait objects * `DowncastTrait` - cast a trait object reference to a concrete type. * `ClonableTrait` - trait for trait objects whose inner type can be cloned and moved. * `ArchiveTrait` - trait for trait objects that can be serialized and deserialized with `rkyv`. * `DataTrait` - the lowest common denominator for all trait objects that DBSP can compute on. Combines all of the above traits, along with `Data`. This is the trait object version of `crate::DBData`. Creating trait objects ====================== There are three ways to create a trait object: * The `Factory` trait can be used to allocate an instance of a concrete type with the default value on the heap or on the stack and return a reference to it as a trait object. * The `Erase` trait converts a reference to a concrete type into a trait object. * `DeserializableDyn` and `DeserializeDyn` deserialize values of the concrete type from a byte array or from an archived representation respectively and returns them as trait objects. Derived traits ============== The traits described above are applicable to all DBSP values. The following traits specify extended functionality available for certain types and their combinations. * `DynWeight` - types with the addition operation and a zero element. This is an object-safe version of `DBWeight`. * `DynOpt` - a dynamically typed version of the `Option` type. * `DynPair` - a dynamically typed version of a two-tuple `(T1, T2)`. * `DynVec` - a dynamically typed vector. * `DynSet` - a dynamically typed B-tree set. * `DynPairs` - a vector of pairs. * `DynWeightedPairs` - a vector of key-value pairs, where the value behaves as weight, meaning that tuples with the same key can be consolidated by adding their weights. Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
1 parent dfa7e85 commit 4189b8a

File tree

300 files changed

+42816
-33672
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

300 files changed

+42816
-33672
lines changed

Cargo.lock

Lines changed: 916 additions & 1123 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
members = [
33
"crates/adapters",
44
"crates/dbsp",
5-
"crates/feldera-storage",
65
"crates/nexmark",
76
"crates/pipeline-types",
87
"crates/pipeline_manager",

Earthfile

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ build-dbsp:
131131
FROM +rust-sources
132132
DO rust+CARGO --args="build --package dbsp"
133133
DO rust+CARGO --args="build --package pipeline_types"
134-
DO rust+CARGO --args="build --package feldera-storage"
135134

136135
build-sql:
137136
FROM +build-dbsp
@@ -177,13 +176,8 @@ build-nexmark:
177176

178177
test-dbsp:
179178
FROM +build-dbsp
180-
# Limit test execution to tests in trace::persistent::tests, because
181-
# executing everything takes too long and (in theory) the proptests we have
182-
# should ensure equivalence with the DRAM trace implementation:
183179
ENV RUST_BACKTRACE 1
184-
DO rust+CARGO --args="test --package=dbsp --features=persistence -- trace::persistent::tests"
185180
DO rust+CARGO --args="test --package dbsp"
186-
DO rust+CARGO --args="test --package feldera-storage"
187181

188182
test-nexmark:
189183
FROM +build-nexmark
@@ -288,7 +282,6 @@ build-pipeline-manager-container:
288282
COPY crates/dbsp database-stream-processor/crates/dbsp
289283
COPY crates/pipeline-types database-stream-processor/crates/pipeline-types
290284
COPY crates/adapters database-stream-processor/crates/adapters
291-
COPY crates/feldera-storage database-stream-processor/crates/feldera-storage
292285
COPY README.md database-stream-processor/README.md
293286

294287
# Then copy over the required SQL compiler files
@@ -514,7 +507,6 @@ benchmark:
514507
RUN bash scripts/bench.bash
515508
SAVE ARTIFACT crates/nexmark/nexmark_results.csv AS LOCAL .
516509
SAVE ARTIFACT crates/nexmark/dram_nexmark_results.csv AS LOCAL .
517-
SAVE ARTIFACT crates/nexmark/persistence_nexmark_results.csv AS LOCAL .
518510
SAVE ARTIFACT crates/dbsp/galen_results.csv AS LOCAL .
519511
SAVE ARTIFACT crates/dbsp/ldbc_results.csv AS LOCAL .
520512

crates/adapters/src/catalog.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,16 @@ where
5555
/// stream.
5656
///
5757
/// A trait for a type that wraps a
58-
/// [`CollectionHandle`](`dbsp::CollectionHandle`) or
59-
/// an [`UpsertHandle`](`dbsp::UpsertHandle`) and pushes serialized relational
58+
/// [`ZSetHandle`](`dbsp::ZSetHandle`) or
59+
/// an [`MapHandle`](`dbsp::MapHandle`) and pushes serialized relational
6060
/// data to the associated input stream record-by-record. The client passes a
6161
/// byte array with a serialized data record (e.g., in JSON or CSV format)
6262
/// to [`insert`](`Self::insert`), [`delete`](`Self::delete`), and
6363
/// [`update`](`Self::update`) methods. The record gets deserialized into the
6464
/// strongly typed representation expected by the input stream and gets buffered
6565
/// inside the handle. The [`flush`](`Self::flush`) method pushes all buffered
66-
/// data to the underlying [`CollectionHandle`](`dbsp::CollectionHandle`) or
67-
/// [`UpsertHandle`](`dbsp::UpsertHandle`).
66+
/// data to the underlying [`ZSetHandle`](`dbsp::ZSetHandle`) or
67+
/// [`MapHandle`](`dbsp::MapHandle`).
6868
///
6969
/// Instances of this trait are created by calling
7070
/// [`DeCollectionHandle::configure_deserializer`].
@@ -154,6 +154,8 @@ pub trait DeCollectionHandle: Send {
154154
/// This is a wrapper around the DBSP `Batch` trait that returns a cursor that
155155
/// yields `erased_serde::Serialize` trait objects that can be used to serialize
156156
/// the contents of the batch without knowing its key and value types.
157+
// The reason we need the `Sync` trait below is so that we can wrap batches
158+
// in `Arc` and send the same batch to multiple output endpoint threads.
157159
pub trait SerBatch: Send + Sync {
158160
/// Number of keys in the batch.
159161
fn key_count(&self) -> usize;

crates/adapters/src/format/json/output.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use rand::{rngs::StdRng, Rng, SeedableRng};
1414
use serde::Deserialize;
1515
use serde_urlencoded::Deserializer as UrlDeserializer;
1616
use serde_yaml::Value as YamlValue;
17-
use std::io::Write;
18-
use std::{borrow::Cow, mem::take, sync::Arc};
17+
use std::{borrow::Cow, io::Write, mem::take, sync::Arc};
1918

2019
/// JSON format encoder.
2120
pub struct JsonOutputFormat;
@@ -376,24 +375,23 @@ impl Encoder for JsonEncoder {
376375
mod test {
377376
use super::{JsonEncoder, JsonEncoderConfig};
378377
use crate::format::json::{DebeziumOp, DebeziumPayload, DebeziumUpdate};
379-
use crate::test::{generate_test_batches_with_weights, test_struct_schema};
380378
use crate::{
381379
catalog::SerBatch,
382380
format::{
383381
json::{InsDelUpdate, SnowflakeAction, SnowflakeUpdate},
384382
Encoder,
385383
},
386384
static_compile::seroutput::SerBatchImpl,
387-
test::{MockOutputConsumer, TestStruct},
385+
test::{
386+
generate_test_batches_with_weights, test_struct_schema, MockOutputConsumer, TestStruct,
387+
},
388388
};
389-
use dbsp::{trace::Batch, IndexedZSet, OrdZSet};
389+
use dbsp::{utils::Tup2, OrdZSet};
390390
use log::trace;
391391
use pipeline_types::format::json::JsonUpdateFormat;
392392
use proptest::prelude::*;
393393
use serde::Deserialize;
394-
use std::cell::RefCell;
395-
use std::rc::Rc;
396-
use std::{fmt::Debug, sync::Arc};
394+
use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
397395

398396
trait OutputUpdate: Debug + for<'de> Deserialize<'de> + Eq + Ord {
399397
type Val;
@@ -482,7 +480,7 @@ mod test {
482480

483481
fn test_json<U: OutputUpdate<Val = TestStruct>>(
484482
array: bool,
485-
batches: Vec<Vec<(TestStruct, i64)>>,
483+
batches: Vec<Vec<Tup2<TestStruct, i64>>>,
486484
) {
487485
let config = JsonEncoderConfig {
488486
update_format: U::update_format(),
@@ -502,7 +500,7 @@ mod test {
502500
(),
503501
batch
504502
.iter()
505-
.map(|(x, w)| (x.clone(), *w))
503+
.map(|Tup2(x, w)| Tup2(x.clone(), *w))
506504
.collect::<Vec<_>>(),
507505
);
508506
Arc::new(<SerBatchImpl<_, TestStruct, ()>>::new(zset)) as Arc<dyn SerBatch>
@@ -518,7 +516,7 @@ mod test {
518516
(),
519517
batch
520518
.iter()
521-
.map(|(x, w)| (x.clone(), *w))
519+
.map(|Tup2(x, w)| Tup2(x.clone(), *w))
522520
.collect::<Vec<_>>(),
523521
);
524522
let mut deletes = zset
@@ -588,10 +586,10 @@ mod test {
588586
assert_eq!(actual_output, expected_output);
589587
}
590588

591-
fn test_data() -> Vec<Vec<(TestStruct, i64)>> {
589+
fn test_data() -> Vec<Vec<Tup2<TestStruct, i64>>> {
592590
vec![
593591
vec![
594-
(
592+
Tup2(
595593
TestStruct {
596594
id: 0,
597595
b: true,
@@ -600,7 +598,7 @@ mod test {
600598
},
601599
1,
602600
),
603-
(
601+
Tup2(
604602
TestStruct {
605603
id: 1,
606604
b: false,
@@ -611,7 +609,7 @@ mod test {
611609
),
612610
],
613611
vec![
614-
(
612+
Tup2(
615613
TestStruct {
616614
id: 2,
617615
b: true,
@@ -620,7 +618,7 @@ mod test {
620618
},
621619
-2,
622620
),
623-
(
621+
Tup2(
624622
TestStruct {
625623
id: 3,
626624
b: false,
@@ -631,7 +629,7 @@ mod test {
631629
),
632630
],
633631
vec![
634-
(
632+
Tup2(
635633
TestStruct {
636634
id: 4,
637635
b: true,
@@ -640,7 +638,7 @@ mod test {
640638
},
641639
-1,
642640
),
643-
(
641+
Tup2(
644642
TestStruct {
645643
id: 5,
646644
b: false,

crates/adapters/src/format/parquet/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use arrow::array::{
1010
};
1111
use arrow::datatypes::{DataType, Schema, TimeUnit};
1212
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
13-
use dbsp::trace::Batch;
13+
use dbsp::utils::Tup2;
1414
use dbsp::OrdZSet;
1515
use parquet::arrow::ArrowWriter;
1616
use parquet::file::properties::WriterProperties;
@@ -469,7 +469,7 @@ fn parquet_output() {
469469
.expect("Can't create encoder");
470470
let zset = OrdZSet::from_keys(
471471
(),
472-
vec![(test_data[0].clone(), 2), (test_data[1].clone(), 1)],
472+
vec![Tup2(test_data[0].clone(), 2), Tup2(test_data[1].clone(), 1)],
473473
);
474474

475475
let zset = Arc::new(<SerBatchImpl<_, TestStruct, ()>>::new(zset)) as Arc<dyn SerBatch>;

0 commit comments

Comments
 (0)