Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/dbsp/benches/star_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn run_star_join(
.unwrap();

let start = Instant::now();
for (i, mut batches) in data.into_iter().enumerate() {
for mut batches in data {
// println!("step: {}", i);
h1.append(&mut batches[0]);
h2.append(&mut batches[1]);
Expand Down
35 changes: 34 additions & 1 deletion crates/dbsp/src/circuit/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub enum CircuitMetricCategory {
Cache,
Time,
Balancer,
Multihost,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
Expand Down Expand Up @@ -72,6 +73,14 @@ pub const INPUT_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("input_batches_
pub const OUTPUT_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("output_batches_stats"));
pub const EXCHANGE_WAIT_TIME_SECONDS: MetricId =
MetricId(Cow::Borrowed("exchange_wait_time_seconds"));
pub const EXCHANGE_SERIALIZATION_TIME_SECONDS: MetricId =
MetricId(Cow::Borrowed("exchange_serialization_time_seconds"));
pub const EXCHANGE_SERIALIZED_BYTES: MetricId =
MetricId(Cow::Borrowed("exchange_serialized_bytes"));
pub const EXCHANGE_DESERIALIZATION_TIME_SECONDS: MetricId =
MetricId(Cow::Borrowed("exchange_deserialization_time_seconds"));
pub const EXCHANGE_DESERIALIZED_BYTES: MetricId =
MetricId(Cow::Borrowed("exchange_deserialized_bytes"));
pub const KEY_DISTRIBUTION: MetricId = MetricId(Cow::Borrowed("key_distribution"));
pub const SIZE_DISTRIBUTION: MetricId = MetricId(Cow::Borrowed("size_distribution"));
pub const LOCAL_SHARD_RECORDS_COUNT: MetricId =
Expand Down Expand Up @@ -151,7 +160,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche
pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId =
MetricId(Cow::Borrowed("input_integral_records_count"));

pub const CIRCUIT_METRICS: [CircuitMetric; 61] = [
pub const CIRCUIT_METRICS: [CircuitMetric; 65] = [
// State
CircuitMetric {
name: USED_MEMORY_BYTES,
Expand Down Expand Up @@ -524,6 +533,30 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 61] = [
advanced: false,
description: "Occupancy of the background cache.",
},
CircuitMetric {
name: EXCHANGE_SERIALIZATION_TIME_SECONDS,
category: CircuitMetricCategory::Multihost,
advanced: false,
description: "Time spent serializing data to send to other hosts.",
},
CircuitMetric {
name: EXCHANGE_SERIALIZED_BYTES,
category: CircuitMetricCategory::Multihost,
advanced: false,
description: "Amount of data serialized to send to other hosts.",
},
CircuitMetric {
name: EXCHANGE_DESERIALIZATION_TIME_SECONDS,
category: CircuitMetricCategory::Multihost,
advanced: false,
description: "Time spent deserializing data received from other hosts.",
},
CircuitMetric {
name: EXCHANGE_DESERIALIZED_BYTES,
category: CircuitMetricCategory::Multihost,
advanced: false,
description: "Amount of serialized data received from other hosts.",
},
];

/// An operator's location within the source program
Expand Down
43 changes: 39 additions & 4 deletions crates/dbsp/src/operator/communication/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use crate::{
circuit::{
Host, LocalStoreMarker, OwnershipPreference, Runtime, Scope,
metadata::{
BatchSizeStats, EXCHANGE_WAIT_TIME_SECONDS, INPUT_BATCHES_STATS, MetaItem,
OUTPUT_BATCHES_STATS, OperatorLocation, OperatorMeta,
BatchSizeStats, EXCHANGE_DESERIALIZATION_TIME_SECONDS, EXCHANGE_DESERIALIZED_BYTES,
EXCHANGE_SERIALIZATION_TIME_SECONDS, EXCHANGE_SERIALIZED_BYTES,
EXCHANGE_WAIT_TIME_SECONDS, INPUT_BATCHES_STATS, MetaItem, OUTPUT_BATCHES_STATS,
OperatorLocation, OperatorMeta,
},
operator_traits::{Operator, SinkOperator, SourceOperator},
tokio::TOKIO,
Expand Down Expand Up @@ -236,6 +238,10 @@ struct InnerExchange {
/// A callback that takes the raw data exchanged over RPC and deserializes
/// and delivers it to the receiver's mailbox.
deliver: Box<dyn Fn(Vec<u8>, usize, usize) + Send + Sync + 'static>,
/// The amount of time spent in `deliver`.
delivery_usecs: AtomicU64,
/// The number of bytes passed to `deliver`.
delivered_bytes: AtomicUsize,
}

impl InnerExchange {
Expand Down Expand Up @@ -264,6 +270,8 @@ impl InnerExchange {
.collect(),
sender_callbacks: (0..npeers).map(|_| Callback::empty()).collect(),
deliver: Box::new(deliver),
delivery_usecs: AtomicU64::new(0),
delivered_bytes: AtomicUsize::new(0),
sent: AtomicUsize::new(0),
}
}
Expand Down Expand Up @@ -296,12 +304,19 @@ impl InnerExchange {
let receivers = &self.local_workers;

// Deliver all of the data into the exchange's mailboxes.
let start = Instant::now();
let mut delivered_bytes = 0;
for (sender, data) in senders.clone().zip(data.into_iter()) {
assert_eq!(data.len(), receivers.len());
for (receiver, data) in receivers.clone().zip(data.into_iter()) {
delivered_bytes += data.len();
(self.deliver)(data, sender, receiver);
}
}
self.delivery_usecs
.fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
self.delivered_bytes
.fetch_add(delivered_bytes, Ordering::Relaxed);

// Increment the receiver counters and deliver callbacks if necessary.
for receiver in receivers.clone() {
Expand Down Expand Up @@ -404,6 +419,12 @@ pub(crate) struct Exchange<T> {
/// ```
mailboxes: Arc<Vec<Mutex<Option<T>>>>,
serialize: Box<dyn Fn(T) -> Vec<u8> + Send + Sync>,

/// The amount of time we've spent calling `serialize`.
serialization_usecs: AtomicU64,

/// The number of bytes produced by `serialize`.
serialized_bytes: AtomicUsize,
}

async fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
Expand Down Expand Up @@ -475,6 +496,8 @@ where
inner,
mailboxes,
serialize,
serialization_usecs: AtomicU64::new(0),
serialized_bytes: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -596,8 +619,10 @@ where
// accumulate all of the data from our local `senders` to all
// of the `receivers` on that host.
let senders = &this.inner.local_workers;
let start = Instant::now();
for host in runtime.layout().other_hosts() {
let receivers = &host.workers;
let mut serialized_bytes = 0;
let items: Vec<Vec<_>> = senders
.clone()
.map(|sender| {
Expand All @@ -610,11 +635,17 @@ where
.unwrap()
.take()
.unwrap();
(this.serialize)(item)
let serialized = (this.serialize)(item);
serialized_bytes += serialized.len();
serialized
})
.collect()
})
.collect();
this.serialization_usecs
.fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
this.serialized_bytes
.fetch_add(serialized_bytes, Ordering::Relaxed);

let client = this.inner.clients.connect(receivers.start).await;

Expand Down Expand Up @@ -926,6 +957,8 @@ where
fn metadata(&self, meta: &mut OperatorMeta) {
meta.extend(metadata! {
INPUT_BATCHES_STATS => self.input_batch_stats.metadata(),
EXCHANGE_SERIALIZATION_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.exchange.serialization_usecs.load(Ordering::Acquire))),
EXCHANGE_SERIALIZED_BYTES => MetaItem::bytes(self.exchange.serialized_bytes.load(Ordering::Acquire))
});
}

Expand Down Expand Up @@ -1072,7 +1105,9 @@ where
fn metadata(&self, meta: &mut OperatorMeta) {
meta.extend(metadata! {
OUTPUT_BATCHES_STATS => self.output_batch_stats.metadata(),
EXCHANGE_WAIT_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.total_wait_time.load(Ordering::Acquire)))
EXCHANGE_WAIT_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.total_wait_time.load(Ordering::Acquire))),
EXCHANGE_DESERIALIZATION_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.exchange.inner.delivery_usecs.load(Ordering::Acquire))),
EXCHANGE_DESERIALIZED_BYTES => MetaItem::bytes(self.exchange.inner.delivered_bytes.load(Ordering::Acquire)),
});
}

Expand Down
2 changes: 1 addition & 1 deletion js-packages/profiler-lib/src/profile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { type MirNode, SourcePositionRanges, SourcePositionRange, Sources, type
type JsonMeasurement = Array<any>;
export type NodeId = string;

export type CircuitMetricCategory = "State" | "Inputs" | "Outputs" | "Cache" | "Time" | "Balancer";
export type CircuitMetricCategory = string;

export interface ProfileMetricDescription {
readonly name: string;
Expand Down
Loading