Skip to content

Commit fc3dd4d

Browse files
committed
[dbsp] Use Cell in place of RefCell where we can.
`RefCell` is relatively expensive because it maintains a borrow counter and associated lock-like logic. `Cell` is just a transparent wrapper around the inner type, so that it uses less time and space. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent db92bc8 commit fc3dd4d

File tree

12 files changed

+196
-180
lines changed

12 files changed

+196
-180
lines changed

crates/dbsp/src/circuit/circuit_builder.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ use serde::{Deserialize, Serialize, Serializer, de::DeserializeOwned};
6262
use std::{
6363
any::{Any, TypeId, type_name_of_val},
6464
borrow::Cow,
65-
cell::{Ref, RefCell, RefMut},
65+
cell::{Cell, Ref, RefCell, RefMut},
6666
collections::{BTreeMap, BTreeSet, HashMap},
6767
fmt::{self, Debug, Display, Write},
6868
future::Future,
@@ -104,15 +104,15 @@ struct StreamValue<D> {
104104
/// The last consumer to read from the stream (`tokens` drops to 0) obtains
105105
/// an owned value rather than a borrow. See description of
106106
/// [ownership-aware scheduling](`OwnershipPreference`) for details.
107-
tokens: RefCell<usize>,
107+
tokens: Cell<usize>,
108108
}
109109

110110
impl<D> StreamValue<D> {
111111
const fn empty() -> Self {
112112
Self {
113113
val: None,
114114
consumers: 0,
115-
tokens: RefCell::new(0),
115+
tokens: Cell::new(0),
116116
}
117117
}
118118

@@ -125,7 +125,7 @@ impl<D> StreamValue<D> {
125125
// If the stream is not connected to any consumers, drop the output
126126
// on the floor.
127127
if self.consumers > 0 {
128-
self.tokens = RefCell::new(self.consumers);
128+
self.tokens = Cell::new(self.consumers);
129129
self.val = Some(val);
130130
}
131131
}
@@ -135,7 +135,7 @@ impl<D> StreamValue<D> {
135135
where
136136
R: Deref<Target = Self>,
137137
{
138-
debug_assert_ne!(*this.tokens.borrow(), 0);
138+
debug_assert_ne!(this.tokens.get(), 0);
139139

140140
this.val.as_ref().unwrap()
141141
}
@@ -146,7 +146,7 @@ impl<D> StreamValue<D> {
146146
where
147147
D: Clone,
148148
{
149-
let tokens = *this.borrow().tokens.borrow();
149+
let tokens = this.borrow().tokens.get();
150150
debug_assert_ne!(tokens, 0);
151151

152152
if tokens == 1 {
@@ -163,9 +163,9 @@ impl<D> StreamValue<D> {
163163
/// consumer can retrieve the value using `Self::take`.
164164
fn consume_token(this: &RefCell<Self>) {
165165
let this_ref = this.borrow();
166-
debug_assert_ne!(*this_ref.tokens.borrow(), 0);
167-
*this_ref.tokens.borrow_mut() -= 1;
168-
if *this_ref.tokens.borrow() == 0 {
166+
debug_assert_ne!(this_ref.tokens.get(), 0);
167+
this_ref.tokens.update(|tokens| tokens - 1);
168+
if this_ref.tokens.get() == 0 {
169169
// We're the last consumer. It's now safe to take a mutable reference to `this`.
170170
drop(this_ref);
171171
this.borrow_mut().val.take();

crates/dbsp/src/operator/condition.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
schedule::{Error as SchedulerError, Scheduler},
1010
},
1111
};
12-
use std::{cell::RefCell, marker::PhantomData, rc::Rc};
12+
use std::{cell::Cell, marker::PhantomData, rc::Rc};
1313

1414
impl<C, D> Stream<C, D>
1515
where
@@ -27,9 +27,9 @@ where
2727
where
2828
F: 'static + Fn(&D) -> bool,
2929
{
30-
let cond = Rc::new(RefCell::new(false));
30+
let cond = Rc::new(Cell::new(false));
3131
let cond_clone = cond.clone();
32-
self.inspect(move |v| *cond_clone.borrow_mut() = condition_func(v));
32+
self.inspect(move |v| cond_clone.set(condition_func(v)));
3333
Condition::new(cond)
3434
}
3535
}
@@ -132,20 +132,20 @@ where
132132
///
133133
/// A condition is created by the [`Stream::condition`] method.
134134
pub struct Condition<C> {
135-
cond: Rc<RefCell<bool>>,
135+
cond: Rc<Cell<bool>>,
136136
_phantom: PhantomData<C>,
137137
}
138138

139139
impl<C> Condition<C> {
140-
fn new(cond: Rc<RefCell<bool>>) -> Self {
140+
fn new(cond: Rc<Cell<bool>>) -> Self {
141141
Self {
142142
cond,
143143
_phantom: PhantomData,
144144
}
145145
}
146146

147147
fn check(&self) -> bool {
148-
*self.cond.borrow()
148+
self.cond.get()
149149
}
150150
}
151151

crates/dbsp/src/operator/dynamic/aggregate.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use futures::Stream as AsyncStream;
66
use std::{
77
any::TypeId,
88
borrow::Cow,
9-
cell::RefCell,
9+
cell::{Cell, RefCell},
1010
cmp::{Ordering, min},
1111
collections::BTreeMap,
1212
marker::PhantomData,
@@ -815,9 +815,9 @@ where
815815
>,
816816
>,
817817
// The last input batch was empty - used in fixedpoint computation.
818-
empty_input: RefCell<bool>,
818+
empty_input: Cell<bool>,
819819
// The last output batch was empty - used in fixedpoint computation.
820-
empty_output: RefCell<bool>,
820+
empty_output: Cell<bool>,
821821
// Keys that may need updating at future times.
822822
keys_of_interest: RefCell<BTreeMap<<IT::Batch as BatchReader>::Time, Box<DynSet<Z::Key>>>>,
823823

@@ -858,8 +858,8 @@ where
858858
output_pairs_factory,
859859
clock,
860860
aggregator: clone_box(aggregator),
861-
empty_input: RefCell::new(false),
862-
empty_output: RefCell::new(false),
861+
empty_input: Cell::new(false),
862+
empty_output: Cell::new(false),
863863
keys_of_interest: RefCell::new(BTreeMap::new()),
864864
input_batch_stats: RefCell::new(BatchSizeStats::new()),
865865
output_batch_stats: RefCell::new(BatchSizeStats::new()),
@@ -989,8 +989,8 @@ where
989989

990990
fn clock_start(&mut self, scope: Scope) {
991991
if scope == 0 {
992-
*self.empty_input.borrow_mut() = false;
993-
*self.empty_output.borrow_mut() = false;
992+
self.empty_input.set(false);
993+
self.empty_output.set(false);
994994
}
995995
}
996996

@@ -1016,8 +1016,8 @@ where
10161016
fn fixedpoint(&self, scope: Scope) -> bool {
10171017
let epoch_end = self.clock.time().epoch_end(scope);
10181018

1019-
*self.empty_input.borrow()
1020-
&& *self.empty_output.borrow()
1019+
self.empty_input.get()
1020+
&& self.empty_output.get()
10211021
&& self
10221022
.keys_of_interest
10231023
.borrow()
@@ -1073,8 +1073,8 @@ where
10731073
// Runtime::worker_index(),
10741074
// self.time
10751075
// );
1076-
*self.empty_input.borrow_mut() = delta.is_empty();
1077-
*self.empty_output.borrow_mut() = true;
1076+
self.empty_input.set(delta.is_empty());
1077+
self.empty_output.set(true);
10781078

10791079
let mut result = self.output_pairs_factory.default_box();
10801080
result.reserve(chunk_size);
@@ -1142,7 +1142,7 @@ where
11421142

11431143
if result.len() >= chunk_size {
11441144
// println!("yield {:?}", result);
1145-
*self.empty_output.borrow_mut() &= result.is_empty();
1145+
self.empty_output.update(|empty_output| empty_output & result.is_empty());
11461146
self.output_batch_stats.borrow_mut().add_batch(result.len());
11471147
yield (result, false, delta_cursor.position());
11481148
result = self.output_pairs_factory.default_box();
@@ -1165,7 +1165,7 @@ where
11651165
if result.len() >= chunk_size {
11661166
// println!("yield {:?}", result);
11671167

1168-
*self.empty_output.borrow_mut() &= result.is_empty();
1168+
self.empty_output.update(|empty_output| empty_output & result.is_empty());
11691169
self.output_batch_stats.borrow_mut().add_batch(result.len());
11701170
yield (result, false, delta_cursor.position());
11711171
result = self.output_pairs_factory.default_box();
@@ -1188,7 +1188,7 @@ where
11881188
if result.len() >= chunk_size {
11891189
// println!("yield {:?}", result);
11901190

1191-
*self.empty_output.borrow_mut() &= result.is_empty();
1191+
self.empty_output.update(|empty_output| empty_output & result.is_empty());
11921192
self.output_batch_stats.borrow_mut().add_batch(result.len());
11931193
yield (result, !(delta_cursor.key_valid() || key_of_interest.is_some()), delta_cursor.position());
11941194
result = self.output_pairs_factory.default_box();

crates/dbsp/src/operator/dynamic/asof_join.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
use std::{
2-
borrow::Cow, cell::RefCell, cmp::Ordering, marker::PhantomData, panic::Location, rc::Rc,
2+
borrow::Cow,
3+
cell::{Cell, RefCell},
4+
cmp::Ordering,
5+
marker::PhantomData,
6+
panic::Location,
7+
rc::Rc,
38
};
49

510
use async_stream::stream;
@@ -228,7 +233,7 @@ where
228233
valts_cmp_func: Box<dyn Fn(&I1::Val, &TS) -> Ordering>,
229234
join_func: Box<AsofJoinFunc<I1::Key, I1::Val, I2::Val, Z::Key, Z::Val>>,
230235
location: &'static Location<'static>,
231-
flush: RefCell<bool>,
236+
flush: Cell<bool>,
232237
delta1: RefCell<Option<SpineSnapshot<I1>>>,
233238
delta2: RefCell<Option<SpineSnapshot<I2>>>,
234239

@@ -266,7 +271,7 @@ where
266271
valts_cmp_func,
267272
join_func,
268273
location,
269-
flush: RefCell::new(false),
274+
flush: Cell::new(false),
270275
delta1: RefCell::new(None),
271276
delta2: RefCell::new(None),
272277
delta1_batch_stats: RefCell::new(BatchSizeStats::new()),
@@ -566,7 +571,7 @@ where
566571
}
567572

568573
fn flush(&mut self) {
569-
*self.flush.borrow_mut() = true;
574+
self.flush.set(true);
570575
}
571576

572577
fn metadata(&self, meta: &mut OperatorMeta) {
@@ -608,13 +613,13 @@ where
608613
*self.delta2.borrow_mut() = Some(delta2.ro_snapshot());
609614
};
610615

611-
let delayed_trace1 = if *self.flush.borrow() {
616+
let delayed_trace1 = if self.flush.get() {
612617
Some(delayed_trace1.as_ref().ro_snapshot())
613618
} else {
614619
None
615620
};
616621

617-
let delayed_trace2 = if *self.flush.borrow() {
622+
let delayed_trace2 = if self.flush.get() {
618623
Some(delayed_trace2.ro_snapshot())
619624
} else {
620625
None
@@ -623,9 +628,7 @@ where
623628
stream! {
624629
let chunk_size = splitter_output_chunk_size();
625630

626-
if *self.flush.borrow() {
627-
*self.flush.borrow_mut() = false;
628-
} else {
631+
if !self.flush.replace(false) {
629632
// println!("yield empty");
630633
yield(Z::dyn_empty(&self.factories.output_factories), true, None);
631634
return;

0 commit comments

Comments
 (0)