diff --git a/crates/dbsp/benches/star_join.rs b/crates/dbsp/benches/star_join.rs index 2280e6eb143..358b9fa4626 100644 --- a/crates/dbsp/benches/star_join.rs +++ b/crates/dbsp/benches/star_join.rs @@ -189,7 +189,7 @@ fn run_star_join( .unwrap(); let start = Instant::now(); - for (i, mut batches) in data.into_iter().enumerate() { + for mut batches in data { // println!("step: {}", i); h1.append(&mut batches[0]); h2.append(&mut batches[1]); diff --git a/crates/dbsp/src/circuit/metadata.rs b/crates/dbsp/src/circuit/metadata.rs index 2af9d66ffd5..f9af66c952b 100644 --- a/crates/dbsp/src/circuit/metadata.rs +++ b/crates/dbsp/src/circuit/metadata.rs @@ -33,6 +33,7 @@ pub enum CircuitMetricCategory { Cache, Time, Balancer, + Multihost, } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -72,6 +73,14 @@ pub const INPUT_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("input_batches_ pub const OUTPUT_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("output_batches_stats")); pub const EXCHANGE_WAIT_TIME_SECONDS: MetricId = MetricId(Cow::Borrowed("exchange_wait_time_seconds")); +pub const EXCHANGE_SERIALIZATION_TIME_SECONDS: MetricId = + MetricId(Cow::Borrowed("exchange_serialization_time_seconds")); +pub const EXCHANGE_SERIALIZED_BYTES: MetricId = + MetricId(Cow::Borrowed("exchange_serialized_bytes")); +pub const EXCHANGE_DESERIALIZATION_TIME_SECONDS: MetricId = + MetricId(Cow::Borrowed("exchange_deserialization_time_seconds")); +pub const EXCHANGE_DESERIALIZED_BYTES: MetricId = + MetricId(Cow::Borrowed("exchange_deserialized_bytes")); pub const KEY_DISTRIBUTION: MetricId = MetricId(Cow::Borrowed("key_distribution")); pub const SIZE_DISTRIBUTION: MetricId = MetricId(Cow::Borrowed("size_distribution")); pub const LOCAL_SHARD_RECORDS_COUNT: MetricId = @@ -151,7 +160,7 @@ pub const PREFIX_BATCHES_STATS: MetricId = MetricId(Cow::Borrowed("prefix_batche pub const INPUT_INTEGRAL_RECORDS_COUNT: MetricId = MetricId(Cow::Borrowed("input_integral_records_count")); -pub const CIRCUIT_METRICS: [CircuitMetric; 61] = [ +pub const CIRCUIT_METRICS: [CircuitMetric; 65] = [ // State CircuitMetric { name: USED_MEMORY_BYTES, @@ -524,6 +533,30 @@ pub const CIRCUIT_METRICS: [CircuitMetric; 61] = [ advanced: false, description: "Occupancy of the background cache.", }, + CircuitMetric { + name: EXCHANGE_SERIALIZATION_TIME_SECONDS, + category: CircuitMetricCategory::Multihost, + advanced: false, + description: "Time spent serializing data to send to other hosts.", + }, + CircuitMetric { + name: EXCHANGE_SERIALIZED_BYTES, + category: CircuitMetricCategory::Multihost, + advanced: false, + description: "Amount of data serialized to send to other hosts.", + }, + CircuitMetric { + name: EXCHANGE_DESERIALIZATION_TIME_SECONDS, + category: CircuitMetricCategory::Multihost, + advanced: false, + description: "Time spent deserializing data received from other hosts.", + }, + CircuitMetric { + name: EXCHANGE_DESERIALIZED_BYTES, + category: CircuitMetricCategory::Multihost, + advanced: false, + description: "Amount of serialized data received from other hosts.", + }, ]; /// An operator's location within the source program diff --git a/crates/dbsp/src/operator/communication/exchange.rs b/crates/dbsp/src/operator/communication/exchange.rs index 42d775b563c..eab1f023746 100644 --- a/crates/dbsp/src/operator/communication/exchange.rs +++ b/crates/dbsp/src/operator/communication/exchange.rs @@ -10,8 +10,10 @@ use crate::{ circuit::{ Host, LocalStoreMarker, OwnershipPreference, Runtime, Scope, metadata::{ - BatchSizeStats, EXCHANGE_WAIT_TIME_SECONDS, INPUT_BATCHES_STATS, MetaItem, - OUTPUT_BATCHES_STATS, OperatorLocation, OperatorMeta, + BatchSizeStats, EXCHANGE_DESERIALIZATION_TIME_SECONDS, EXCHANGE_DESERIALIZED_BYTES, + EXCHANGE_SERIALIZATION_TIME_SECONDS, EXCHANGE_SERIALIZED_BYTES, + EXCHANGE_WAIT_TIME_SECONDS, INPUT_BATCHES_STATS, MetaItem, OUTPUT_BATCHES_STATS, + OperatorLocation, OperatorMeta, }, operator_traits::{Operator, SinkOperator, SourceOperator}, tokio::TOKIO, @@ -236,6 +238,10 @@ struct InnerExchange { /// A callback that takes the raw data exchanged over RPC and deserializes /// and delivers it to the receiver's mailbox. deliver: Box, usize, usize) + Send + Sync + 'static>, + /// The amount of time spent in `deliver`. + delivery_usecs: AtomicU64, + /// The number of bytes passed to `deliver`. + delivered_bytes: AtomicUsize, } impl InnerExchange { @@ -264,6 +270,8 @@ impl InnerExchange { .collect(), sender_callbacks: (0..npeers).map(|_| Callback::empty()).collect(), deliver: Box::new(deliver), + delivery_usecs: AtomicU64::new(0), + delivered_bytes: AtomicUsize::new(0), sent: AtomicUsize::new(0), } } @@ -296,12 +304,19 @@ impl InnerExchange { let receivers = &self.local_workers; // Deliver all of the data into the exchange's mailboxes. + let start = Instant::now(); + let mut delivered_bytes = 0; for (sender, data) in senders.clone().zip(data.into_iter()) { assert_eq!(data.len(), receivers.len()); for (receiver, data) in receivers.clone().zip(data.into_iter()) { + delivered_bytes += data.len(); (self.deliver)(data, sender, receiver); } } + self.delivery_usecs + .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed); + self.delivered_bytes + .fetch_add(delivered_bytes, Ordering::Relaxed); // Increment the receiver counters and deliver callbacks if necessary. for receiver in receivers.clone() { @@ -404,6 +419,12 @@ pub(crate) struct Exchange { /// ``` mailboxes: Arc>>>, serialize: Box Vec + Send + Sync>, + + /// The amount of time we've spent calling `serialize`. + serialization_usecs: AtomicU64, + + /// The number of bytes produced by `serialize`. + serialized_bytes: AtomicUsize, } async fn spawn(fut: impl Future + Send + 'static) { @@ -475,6 +496,8 @@ where inner, mailboxes, serialize, + serialization_usecs: AtomicU64::new(0), + serialized_bytes: AtomicUsize::new(0), } } @@ -596,8 +619,10 @@ where // accumulate all of the data from our local `senders` to all // of the `receivers` on that host. let senders = &this.inner.local_workers; + let start = Instant::now(); for host in runtime.layout().other_hosts() { let receivers = &host.workers; + let mut serialized_bytes = 0; let items: Vec> = senders .clone() .map(|sender| { @@ -610,11 +635,17 @@ where .unwrap() .take() .unwrap(); - (this.serialize)(item) + let serialized = (this.serialize)(item); + serialized_bytes += serialized.len(); + serialized }) .collect() }) .collect(); + this.serialization_usecs + .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed); + this.serialized_bytes + .fetch_add(serialized_bytes, Ordering::Relaxed); let client = this.inner.clients.connect(receivers.start).await; @@ -926,6 +957,8 @@ where fn metadata(&self, meta: &mut OperatorMeta) { meta.extend(metadata! { INPUT_BATCHES_STATS => self.input_batch_stats.metadata(), + EXCHANGE_SERIALIZATION_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.exchange.serialization_usecs.load(Ordering::Acquire))), + EXCHANGE_SERIALIZED_BYTES => MetaItem::bytes(self.exchange.serialized_bytes.load(Ordering::Acquire)) }); } @@ -1072,7 +1105,9 @@ where fn metadata(&self, meta: &mut OperatorMeta) { meta.extend(metadata! { OUTPUT_BATCHES_STATS => self.output_batch_stats.metadata(), - EXCHANGE_WAIT_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.total_wait_time.load(Ordering::Acquire))) + EXCHANGE_WAIT_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.total_wait_time.load(Ordering::Acquire))), + EXCHANGE_DESERIALIZATION_TIME_SECONDS => MetaItem::Duration(Duration::from_micros(self.exchange.inner.delivery_usecs.load(Ordering::Acquire))), + EXCHANGE_DESERIALIZED_BYTES => MetaItem::bytes(self.exchange.inner.delivered_bytes.load(Ordering::Acquire)), }); } diff --git a/js-packages/profiler-lib/src/profile.ts b/js-packages/profiler-lib/src/profile.ts index a0df203191e..57b176c63b6 100644 --- a/js-packages/profiler-lib/src/profile.ts +++ b/js-packages/profiler-lib/src/profile.ts @@ -6,7 +6,7 @@ import { type MirNode, SourcePositionRanges, SourcePositionRange, Sources, type type JsonMeasurement = Array; export type NodeId = string; -export type CircuitMetricCategory = "State" | "Inputs" | "Outputs" | "Cache" | "Time" | "Balancer"; +export type CircuitMetricCategory = string; export interface ProfileMetricDescription { readonly name: string;