Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[dbsp] Fix occasional panic on background task shutdown.
Sometimes, running the unit tests failed with:

```
panicked at crates/dbsp/src/circuit/runtime.rs:972:64:
cannot access a task-local storage value without setting it first
thread panicked while processing panic. aborting.
Aborted (core dumped)
```

The panic comes inside Drop for ImmutableFileRef, which attempts to
evict the file being dropped from the buffer cache.  But
ImmutableFileRef doesn't have a reference to the BufferCache, it only
has a fn() -> Arc<BufferCache> that it can call to get a buffer
cache.  This fn is actually Runtime::buffer_cache, which is what's
panicking.

The problem is that tokio task shutdown does not provide values for
task-local variables, which is what stores the buffer cache.  This
commit avoids the problem by allowing the buffer cache to be
unavailable for Drop in ImmutableFileRef.  This requires some code
refactoring so that the cache fn, and Runtime::buffer_cache(),
return an Option.

Signed-off-by: Ben Pfaff <blp@feldera.com>
  • Loading branch information
blp committed Apr 3, 2026
commit 158253d1d48d7fd2552b26a4e66f453614d5090e
88 changes: 50 additions & 38 deletions crates/dbsp/src/circuit/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,10 @@ impl Runtime {
})
}

/// Returns this thread's buffer-cache handle, but:
/// Returns this thread's buffer-cache handle:
///
/// - If the thread is a foreground thread or a background task, returns
/// that thread or task's buffer cache.
///
/// - If the thread's [Runtime] does not have storage configured, the cache
/// size is trivially small.
Expand All @@ -823,49 +826,58 @@ impl Runtime {
/// all such threads. (Such a thread might be in a circuit that uses
/// storage, but there's no way to know because only [Runtime] makes that
/// available at a thread level.)
pub fn buffer_cache() -> Arc<BufferCache> {
// This cache is shared by all auxiliary threads in the runtime.
// In particular, output connector threads use it to maintain their output buffers.

// FIXME: We may need a tunable strategy for aux threads. We cannot simply give each of them the
// same cache as DBSP worker threads, as there can be dozens of aux threads (currently one per
// output connector), which do not necessarily need a large cache. OTOH, sharing the same cache
// across all of them may potentially cause performance issues.
static AUXILIARY_CACHE: LazyLock<Arc<BufferCache>> =
LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));

// Fast path, look up from TLS

if current_thread_type() == Some(ThreadType::Background) {
// The TOKIO_BUFFER_CACHE variable should be set on task startup.
return TOKIO_BUFFER_CACHE.get().clone();
} else if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
return buffer_cache;
}

// Slow path for initializing the thread-local.
if let Some(rt) = Runtime::runtime() {
match current_thread_type() {
None => {
let buffer_cache = AUXILIARY_CACHE.clone();
BUFFER_CACHE.set(Some(buffer_cache.clone()));
buffer_cache
}
Some(ThreadType::Background) => {
// The TOKIO_BUFFER_CACHE variable should be set on task startup.
panic!("background thread should not call buffer_cache()");
}
Some(thread_type) => {
///
/// - If the thread is a background thread, but no buffer cache is set, this
/// ordinarily indicates a bug. However, this can happen when a
/// background task is dropped, because it's normal for task-local
/// variables to be unavailable in `Drop` during task shutdown. The
/// function returns `None` in this case (and only in this case).
pub fn buffer_cache() -> Option<Arc<BufferCache>> {
if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
// Fast path common case for foreground threads.
Some(buffer_cache)
} else if let Ok(buffer_cache) = TOKIO_BUFFER_CACHE.try_get() {
// Background tasks case.
Some(buffer_cache)
} else if let Some(rt) = Runtime::runtime()
&& let Some(thread_type) = current_thread_type()
{
// Slow path for threads running in a [Runtime].
match thread_type {
ThreadType::Foreground => {
let buffer_cache =
rt.get_buffer_cache(Runtime::local_worker_offset(), thread_type);
BUFFER_CACHE.set(Some(buffer_cache.clone()));
buffer_cache
Some(buffer_cache)
}
ThreadType::Background => {
// We are in a background thread in a Tokio runtime but no
// buffer cache is set in [TOKIO_BUFFER_CACHE]. This is
// probably a bug, but it can happen when a background task
// is canceled. It is the reason that this function returns
// an `Option`.
None
}
}
} else {
// TODO: We shouldn't create batches outside of a runtime. This should probably
// be converted into a panic.
AUXILIARY_CACHE.clone()
// Fallback path for threads outside a [Runtime].
//
// This cache is shared by all auxiliary threads in the runtime. In
// particular, output connector threads use it to maintain their
// output buffers.
//
// FIXME: We may need a tunable strategy for aux threads. We cannot
// simply give each of them the same cache as DBSP worker threads,
// as there can be dozens of aux threads (currently one per output
// connector), which do not necessarily need a large cache. OTOH,
// sharing the same cache across all of them may potentially cause
// performance issues.
static AUXILIARY_CACHE: LazyLock<Arc<BufferCache>> =
LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));

let buffer_cache = AUXILIARY_CACHE.clone();
BUFFER_CACHE.set(Some(buffer_cache.clone()));
Some(buffer_cache)
}
}

Expand Down
33 changes: 20 additions & 13 deletions crates/dbsp/src/storage/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ where

fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
let start = Instant::now();
let cache = (file.cache)();
let cache = file.cache();
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
Some(entry) => (
Expand Down Expand Up @@ -1030,7 +1030,7 @@ where

fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
let start = Instant::now();
let cache = (file.cache)();
let cache = file.cache();
let first_row = node.rows.start;
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
Expand Down Expand Up @@ -1311,13 +1311,13 @@ impl FileTrailer {
})
}
fn new(
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
file_handle: &dyn FileReader,
location: BlockLocation,
stats: &AtomicCacheStats,
) -> Result<Arc<FileTrailer>, Error> {
let start = Instant::now();
let cache = cache();
let cache = cache().expect("Should have a buffer cache");
#[allow(clippy::borrow_deref_ref)]
let (access, entry) = match cache.get(&*file_handle, location) {
Some(entry) => {
Expand Down Expand Up @@ -1397,7 +1397,7 @@ impl Column {
/// Encapsulates storage and a file handle.
#[derive(SizeOf)]
struct ImmutableFileRef {
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
#[size_of(skip)]
file_handle: Arc<dyn FileReader>,
compression: Option<Compression>,
Expand All @@ -1412,15 +1412,17 @@ impl Debug for ImmutableFileRef {
}
impl Drop for ImmutableFileRef {
fn drop(&mut self) {
if Arc::strong_count(&self.file_handle) == 1 {
self.evict();
if Arc::strong_count(&self.file_handle) == 1
&& let Some(cache) = (self.cache)()
{
cache.evict(&*self.file_handle);
}
}
}

impl ImmutableFileRef {
fn new(
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
file_handle: Arc<dyn FileReader>,
compression: Option<Compression>,
stats: AtomicCacheStats,
Expand All @@ -1435,8 +1437,13 @@ impl ImmutableFileRef {
}
}

fn cache(&self) -> Arc<BufferCache> {
(self.cache)().expect("Should have a buffer cache")
}

#[cfg(test)]
pub fn evict(&self) {
(self.cache)().evict(&*self.file_handle);
self.cache().evict(&*self.file_handle);
}

pub fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, Error> {
Expand Down Expand Up @@ -1573,7 +1580,7 @@ where
/// Creates and returns a new `Reader` for `file`.
pub(crate) fn new(
factories: &[&AnyFactories],
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
file: Arc<dyn FileReader>,
) -> Result<Self, Error> {
let (reader, _membership_filter) = Self::new_with_filter(factories, cache, file, None)?;
Expand All @@ -1582,7 +1589,7 @@ where

pub(crate) fn new_with_filter(
factories: &[&AnyFactories],
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
file: Arc<dyn FileReader>,
membership_filter: Option<TrackingBloomFilter>,
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
Expand Down Expand Up @@ -1709,7 +1716,7 @@ where
/// Instantiates a reader given an existing path.
pub fn open(
factories: &[&AnyFactories],
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
path: &StoragePath,
) -> Result<Self, Error> {
Expand All @@ -1718,7 +1725,7 @@ where

pub(crate) fn open_with_filter(
factories: &[&AnyFactories],
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
path: &StoragePath,
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
Expand Down
2 changes: 1 addition & 1 deletion crates/dbsp/src/storage/file/reader/bulk_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where
let (sender, receiver) = channel();
let mut this = Self {
reader,
cache: (reader.file.cache)(),
cache: reader.file.cache(),
factories: reader.columns[column].factories.factories(),
column,
row: 0,
Expand Down
2 changes: 1 addition & 1 deletion crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ where
let mut this = Self {
keys,
reader,
cache: (reader.file.cache)(),
cache: reader.file.cache(),
factories,
sender,
receiver,
Expand Down
2 changes: 1 addition & 1 deletion crates/dbsp/src/storage/file/reader/fetch_zset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ where
let mut this = Self {
keys,
reader,
cache: (reader.file.cache)(),
cache: reader.file.cache(),
factories,
sender,
receiver,
Expand Down
4 changes: 2 additions & 2 deletions crates/dbsp/src/storage/file/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ fn tup65_from_bits(bits: u128) -> Tup65OptString {
)
}

fn test_buffer_cache() -> Arc<BufferCache> {
fn test_buffer_cache() -> Option<Arc<BufferCache>> {
thread_local! {
static BUFFER_CACHE: Arc<BufferCache> = Arc::new(BufferCache::new(1024 * 1024));
}
BUFFER_CACHE.with(|cache| cache.clone())
Some(BUFFER_CACHE.with(|cache| cache.clone()))
}

fn for_each_compression_type<F>(parameters: Parameters, f: F)
Expand Down
17 changes: 10 additions & 7 deletions crates/dbsp/src/storage/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ impl BlockWriter {
/// all the same type. Thus, [`Writer1`] and [`Writer2`] exist for writing
/// 1-column and 2-column layer files, respectively, with added type safety.
struct Writer {
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
writer: BlockWriter,
bloom_filter: Option<TrackingBloomFilter>,
cws: Vec<ColumnWriter>,
Expand All @@ -1164,7 +1164,7 @@ impl Writer {

pub fn new(
factories: &[&AnyFactories],
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
parameters: Parameters,
n_columns: usize,
Expand Down Expand Up @@ -1193,7 +1193,10 @@ impl Writer {
let worker = format!("w{}-", Runtime::worker_index());
let writer = Self {
cache,
writer: BlockWriter::new(cache(), storage_backend.create_with_prefix(&worker.into())?),
writer: BlockWriter::new(
cache().expect("Should have a buffer cache"),
storage_backend.create_with_prefix(&worker.into())?,
),
bloom_filter,
cws,
finished_columns,
Expand Down Expand Up @@ -1351,7 +1354,7 @@ impl Writer {
/// }, &StorageOptions::default()).unwrap();
/// let parameters = Parameters::default();
/// let mut file =
/// Writer1::new(&factories, || Arc::new(BufferCache::new(1024 * 1024)), &*storage_backend, parameters, 1_000_000).unwrap();
/// Writer1::new(&factories, || Some(Arc::new(BufferCache::new(1024 * 1024))), &*storage_backend, parameters, 1_000_000).unwrap();
/// for i in 0..1000_u32 {
/// file.write0((i.erase(), ().erase())).unwrap();
/// }
Expand All @@ -1377,7 +1380,7 @@ where
/// Creates a new writer with the given parameters.
pub fn new(
factories: &Factories<K0, A0>,
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
parameters: Parameters,
estimated_keys: usize,
Expand Down Expand Up @@ -1515,7 +1518,7 @@ where
/// }, &StorageOptions::default()).unwrap();
/// let parameters = Parameters::default();
/// let mut file =
/// Writer2::new(&factories, &factories, || Arc::new(BufferCache::new(1024 * 1024)), &*storage_backend, parameters, 1_000_000).unwrap();
/// Writer2::new(&factories, &factories, || Some(Arc::new(BufferCache::new(1024 * 1024))), &*storage_backend, parameters, 1_000_000).unwrap();
/// for i in 0..1000_u32 {
/// for j in 0..10_u32 {
/// file.write1((&j, &())).unwrap();
Expand Down Expand Up @@ -1552,7 +1555,7 @@ where
pub fn new(
factories0: &Factories<K0, A0>,
factories1: &Factories<K1, A1>,
cache: fn() -> Arc<BufferCache>,
cache: fn() -> Option<Arc<BufferCache>>,
storage_backend: &dyn StorageBackend,
parameters: Parameters,
estimated_keys: usize,
Expand Down
7 changes: 5 additions & 2 deletions crates/sqllib/tests/tuple_proptest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,11 @@ where
Ok(())
}

fn buffer_cache() -> Arc<BufferCache> {
Arc::new(BufferCache::new(1024 * 1024))
fn buffer_cache() -> Option<Arc<BufferCache>> {
thread_local! {
static BUFFER_CACHE: Arc<BufferCache> = Arc::new(BufferCache::new(1024 * 1024));
}
Some(BUFFER_CACHE.with(|cache| cache.clone()))
}

fn storage_roundtrip_eq<T>(mut values: Vec<T>) -> Result<(), TestCaseError>
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-test-compat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ pub fn storage_base_and_path(output: &Path) -> (PathBuf, StoragePath) {
)
}

pub fn buffer_cache() -> Arc<BufferCache> {
pub fn buffer_cache() -> Option<Arc<BufferCache>> {
thread_local! {
static BUFFER_CACHE: Arc<BufferCache> = Arc::new(BufferCache::new(1024 * 1024));
}
BUFFER_CACHE.with(|cache| cache.clone())
Some(BUFFER_CACHE.with(|cache| cache.clone()))
}

fn maybe<T>(row: usize, value: T) -> Option<T> {
Expand Down
Loading