Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[dbsp] Use new protocol for exchanging data among hosts.
Until now, we've used `tarpc`, a Rust crate for RPC, to exchange data
among hosts.  This crate serializes everything we pass into it, but we've
already serialized it, and this wastes CPU time and memory.

`tarpc` also doesn't guarantee alignment of the data at receive time, but
rkyv needs data to be aligned on 16-byte boundaries.  We actually don't
do that carefully, and we're just lucky that the Rust allocator (or at
least the one we use) happens to do that for `Vec<u8>` anyhow.

This commit replaces `tarpc` by a simple protocol that just writes the
data we need to exchange to a socket with a short fixed header in front of
it.

We also double-copy a lot of the data we send in another way: we
first partition it to a batch, and then we serialize the batch.  This
commit fixes that problem too.

This reduced commit time for my test case from 48 seconds to 23 seconds.

Signed-off-by: Ben Pfaff <blp@feldera.com>
  • Loading branch information
blp committed Mar 24, 2026
commit 3fdae5a56b74b31463cc59f6f9496e0e48d1a6d2
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/adapters/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,9 +1054,8 @@ fn get_env_filter(config: &PipelineConfig) -> EnvFilter {
}
}

// Otherwise, fall back to `INFO`, except for `tarpc`, which is too verbose
// at that level.
EnvFilter::try_new("tarpc=warn,object_store=warn,info").unwrap()
// Otherwise, fall back to `INFO`
EnvFilter::try_new("object_store=warn,info").unwrap()
}

fn do_bootstrap(
Expand Down
3 changes: 2 additions & 1 deletion crates/dbsp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ size-of = { workspace = true, features = [
"xxhash-xxh3",
"arcstr"
] }
tarpc = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }
paste = { workspace = true }
Expand Down Expand Up @@ -122,6 +122,7 @@ indicatif = { workspace = true }
reqwest = { workspace = true, features = ["blocking"] }
chrono = { workspace = true, features = ["serde"] }
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
tarpc = { workspace = true, features = ["full"] }

[[bench]]
name = "galen"
Expand Down
1 change: 1 addition & 0 deletions crates/dbsp/src/circuit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use dbsp_handle::{
};
pub use runtime::{
Error as RuntimeError, LocalStore, LocalStoreMarker, Runtime, RuntimeHandle, WeakRuntime,
WorkerLocation, WorkerLocations,
};

pub use schedule::Error as SchedulerError;
Expand Down
94 changes: 85 additions & 9 deletions crates/dbsp/src/circuit/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use crate::error::Error as DbspError;
use crate::operator::communication::Exchange;
use crate::storage::backend::StorageBackend;
use crate::storage::file::format::Compression;
use crate::storage::file::to_bytes;
use crate::storage::file::writer::Parameters;
use crate::trace::unaligned_deserialize;
use crate::trace::aligned_deserialize;
use crate::utils::process_rss_bytes;
use crate::{
DetailedError,
Expand All @@ -36,6 +35,7 @@ use indexmap::IndexSet;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::iter::repeat;
use std::ops::Range;
use std::path::Path;
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize};
use std::sync::{LazyLock, Mutex};
Expand Down Expand Up @@ -1290,12 +1290,11 @@ impl Consensus {
match Runtime::runtime() {
Some(runtime) if Runtime::num_workers() > 1 => {
let worker_index = Runtime::worker_index();
let exchange_id = runtime.sequence_next();
let exchange_id = runtime.sequence_next().try_into().unwrap();
let exchange = Exchange::with_runtime(
&runtime,
exchange_id,
Box::new(|vote| to_bytes(&vote).unwrap().into_vec()),
Box::new(|data| unaligned_deserialize(&data[..])),
Box::new(|data| aligned_deserialize(&data[..])),
);

let notify_sender = Arc::new(Notify::new());
Expand Down Expand Up @@ -1334,7 +1333,11 @@ impl Consensus {
notify_receiver,
exchange,
} => {
while !exchange.try_send_all(Runtime::worker_index(), repeat(local)) {
while !exchange.try_send_all_with_serializer(
Runtime::worker_index(),
repeat(local),
|local| vec![local as u8],
) {
if Runtime::kill_in_progress() {
return Err(SchedulerError::Killed);
}
Expand Down Expand Up @@ -1376,12 +1379,11 @@ where
match Runtime::runtime() {
Some(runtime) if Runtime::num_workers() > 1 => {
let worker_index = Runtime::worker_index();
let exchange_id = runtime.sequence_next();
let exchange_id = runtime.sequence_next().try_into().unwrap();
let exchange = Exchange::with_runtime(
&runtime,
exchange_id,
// TODO: handle serialization/deserialization errors better.
Box::new(|x| rmp_serde::to_vec(&x).unwrap()),
Box::new(|data| rmp_serde::from_slice(&data).unwrap()),
);

Expand Down Expand Up @@ -1421,7 +1423,11 @@ where
notify_receiver,
exchange,
} => {
while !exchange.try_send_all(Runtime::worker_index(), repeat(local.clone())) {
while !exchange.try_send_all_with_serializer(
Runtime::worker_index(),
repeat(local.clone()),
|local| rmp_serde::to_vec(&local).unwrap(),
) {
if Runtime::kill_in_progress() {
return Err(SchedulerError::Killed);
}
Expand Down Expand Up @@ -1598,6 +1604,76 @@ impl RuntimeHandle {
}
}

/// Where a worker thread is.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum WorkerLocation {
/// In this process.
Local,
/// Across the network.
Remote,
}

/// An iterator for all of the workers in a runtime.
///
/// For every worker in a [Runtime], this iterator yields its [WorkerLocation].
#[derive(Clone, Debug)]
pub struct WorkerLocations {
workers: Range<usize>,
local_workers: Range<usize>,
}

impl Default for WorkerLocations {
fn default() -> Self {
Self::new()
}
}

impl WorkerLocations {
/// Constructs a new iterator for all the worker locations in the current
/// [Runtime]. Use [Iterator::enumerate] to also obtain the associated
/// worker indexes.
pub fn new() -> Self {
if let Some(runtime) = Runtime::runtime() {
let layout = runtime.layout();
Self {
workers: 0..layout.n_workers(),
local_workers: layout.local_workers(),
}
} else {
Self {
workers: 0..1,
local_workers: 0..1,
}
}
}

/// Returns the location of the worker with the given index.
pub fn get(&self, worker: usize) -> WorkerLocation {
debug_assert!(worker < self.workers.end);
if self.local_workers.contains(&worker) {
WorkerLocation::Local
} else {
WorkerLocation::Remote
}
}
}

impl Iterator for WorkerLocations {
type Item = WorkerLocation;

fn next(&mut self) -> Option<Self::Item> {
let worker = self.workers.next()?;
Some(self.get(worker))
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.workers.len();
(len, Some(len))
}
}

impl ExactSizeIterator for WorkerLocations {}

#[cfg(test)]
mod tests {
use super::{Runtime, RuntimeInner};
Expand Down
2 changes: 1 addition & 1 deletion crates/dbsp/src/operator/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod gather;
mod shard;

pub(crate) use exchange::Exchange;
pub use exchange::{ExchangeReceiver, ExchangeSender, new_exchange_operators};
pub use exchange::{ExchangeReceiver, ExchangeSender, Mailbox, new_exchange_operators};
Loading