Skip to content

Commit a719fe4

Browse files
committed
[dbsp] Clear rkyv scratch space from one serialization to the next.
A customer reported large and growing memory use that showed up in heap profiles attributed to rkyv serialization in the storage file writer. Only some of this made sense, in particular the part written to FBufs, which is data blocks that will end up in the cache. The rest was not more specifically attributed. Some investigation showed the possibility that data is accumulating in our per-thread scratch space cache. This cache should get emptied for every use, but that depends on the rkyv serialization implementations being correct both in the common case and the error case. DBSP hits the error case frequently in practice (once per data block), because it uses errors to avoid going over block size limits. Perhaps the rkyv built-in implementations handle errors correctly regarding scratch space; I don't know whether DBSP implementations of serializers do. If this is the problem, this commit will avoid it, by clearing the scratch space every time that we use it. It is larger than otherwise necessary because rkvy's `HeapScratch` and `FallbackScratch` don't provide any way to clear themselves, so this has to copy in their implementations. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent e94b613 commit a719fe4

File tree

1 file changed

+87
-9
lines changed

1 file changed

+87
-9
lines changed

crates/dbsp/src/storage/file.rs

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,25 @@ use crate::{
7272
dynamic::{ArchivedDBData, DynVec, LeanVec},
7373
storage::buffer_cache::{FBuf, FBufSerializer},
7474
};
75-
use rkyv::de::deserializers::SharedDeserializeMap;
7675
use rkyv::de::{SharedDeserializeRegistry, SharedPointer};
76+
use rkyv::{
77+
AlignedBytes,
78+
de::deserializers::SharedDeserializeMap,
79+
ser::{ScratchSpace, serializers::BufferScratch},
80+
};
7781
use rkyv::{
7882
Archive, Archived, Deserialize, Fallible, Serialize,
7983
ser::{
8084
Serializer as _,
81-
serializers::{
82-
AllocScratch, CompositeSerializer, FallbackScratch, HeapScratch, SharedSerializeMap,
83-
},
85+
serializers::{AllocScratch, CompositeSerializer, SharedSerializeMap},
8486
},
8587
};
86-
use std::cell::RefCell;
87-
use std::fmt::Debug;
88+
use std::{
89+
alloc::{Layout, alloc_zeroed},
90+
cell::RefCell,
91+
};
8892
use std::{any::Any, sync::Arc};
93+
use std::{fmt::Debug, ptr::NonNull};
8994

9095
pub mod format;
9196
mod item;
@@ -276,8 +281,70 @@ where
276281
/// The particular [`rkyv::ser::Serializer`] that we use.
277282
pub type Serializer = CompositeSerializer<FBufSerializer<FBuf>, DbspScratch, SharedSerializeMap>;
278283

279-
/// The particular [`rkyv::ser::ScratchSpace`] that we use.
280-
pub type DbspScratch = FallbackScratch<HeapScratch<65536>, AllocScratch>;
284+
/// Scratch space type used by DBSP.
285+
///
286+
/// This is `FallbackScratch<HeapScratch<SCRATCH_SIZE>, AllocScratch>` with the
287+
/// added ability to clear it.
288+
pub struct DbspScratch {
289+
main: BufferScratch<Box<AlignedBytes<SCRATCH_SIZE>>>,
290+
fallback: AllocScratch,
291+
}
292+
293+
impl Default for DbspScratch {
294+
fn default() -> Self {
295+
Self::new()
296+
}
297+
}
298+
299+
impl DbspScratch {
300+
fn new() -> Self {
301+
Self {
302+
main: {
303+
// This code is copied from `rkyv::ser::serializers::HeapScratch::new`.
304+
let layout = Layout::new::<AlignedBytes<SCRATCH_SIZE>>();
305+
unsafe {
306+
// SAFETY: `layout` does have nonzero size.
307+
let ptr = alloc_zeroed(layout).cast::<AlignedBytes<SCRATCH_SIZE>>();
308+
assert!(!ptr.is_null());
309+
// SAFETY: We are using the raw pointer only once and the memory is
310+
// allocated by the global allocator using a layout for the correct type.
311+
BufferScratch::new(Box::from_raw(ptr))
312+
}
313+
},
314+
fallback: AllocScratch::new(),
315+
}
316+
}
317+
318+
fn cleared(mut self) -> Self {
319+
self.main.clear();
320+
self.fallback = AllocScratch::new();
321+
self
322+
}
323+
}
324+
325+
impl Fallible for DbspScratch {
326+
type Error = <AllocScratch as Fallible>::Error;
327+
}
328+
329+
impl ScratchSpace for DbspScratch {
330+
#[inline]
331+
unsafe fn push_scratch(&mut self, layout: Layout) -> Result<NonNull<[u8]>, Self::Error> {
332+
unsafe {
333+
self.main
334+
.push_scratch(layout)
335+
.or_else(|_| self.fallback.push_scratch(layout))
336+
}
337+
}
338+
339+
#[inline]
340+
unsafe fn pop_scratch(&mut self, ptr: NonNull<u8>, layout: Layout) -> Result<(), Self::Error> {
341+
unsafe {
342+
self.main
343+
.pop_scratch(ptr, layout)
344+
.or_else(|_| self.fallback.pop_scratch(ptr, layout))
345+
}
346+
}
347+
}
281348

282349
/// The particular [`rkyv`] deserializer that we use.
283350
#[derive(Debug)]
@@ -339,6 +406,13 @@ impl SharedDeserializeRegistry for Deserializer {
339406
}
340407
}
341408

409+
/// Scratch space size.
410+
///
411+
/// This is the amount of space we allocate as base scratch space for rkyv
412+
/// serialization. If more is needed for a particular serialization, then we
413+
/// fall back to [AllocScratch].
414+
pub const SCRATCH_SIZE: usize = 65536;
415+
342416
/// Creates an instance of [Serializer] that will serialize to `serializer` and
343417
/// passes it to `f`. Returns a tuple of the `FBuf` from the [Serializer] and
344418
/// the return value of `f`.
@@ -354,7 +428,11 @@ where
354428
static SCRATCH: RefCell<Option<DbspScratch>> = RefCell::new(Some(Default::default()));
355429
}
356430

357-
let mut serializer = Serializer::new(serializer, SCRATCH.take().unwrap(), Default::default());
431+
let mut serializer = Serializer::new(
432+
serializer,
433+
SCRATCH.take().unwrap().cleared(),
434+
Default::default(),
435+
);
358436
let result = f(&mut serializer);
359437
let (serializer, scratch, _shared) = serializer.into_components();
360438
SCRATCH.replace(Some(scratch));

0 commit comments

Comments
 (0)