Skip to content

Commit d140cbf

Browse files
committed
[dbsp] Fix occasional panic on background task shutdown.
Sometimes, running the unit tests failed with: ``` panicked at crates/dbsp/src/circuit/runtime.rs:972:64: cannot access a task-local storage value without setting it first thread panicked while processing panic. aborting. Aborted (core dumped) ``` The panic comes inside Drop for ImmutableFileRef, which attempts to evict the file being dropped from the buffer cache. But ImmutableFileRef doesn't have a reference to the BufferCache, it only has a fn() -> Arc<BufferCache> that it can call to get a buffer cache. This fn is actually Runtime::buffer_cache, which is what's panicking. The problem is that tokio task shutdown does not provide values for task-local variables, which is what stores the buffer cache. This commit avoids the problem by allowing the buffer cache to be unavailable for Drop in ImmutableFileRef. This requires some code refactoring so that the cache fn, and Runtime::buffer_cache(), return an Option. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 3ab4236 commit d140cbf

File tree

7 files changed

+83
-61
lines changed

7 files changed

+83
-61
lines changed

crates/dbsp/src/circuit/runtime.rs

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -814,7 +814,10 @@ impl Runtime {
814814
})
815815
}
816816

817-
/// Returns this thread's buffer-cache handle, but:
817+
/// Returns this thread's buffer-cache handle:
818+
///
819+
/// - If the thread is a foreground thread or a background task, returns
820+
/// that thread or task's buffer cache.
818821
///
819822
/// - If the thread's [Runtime] does not have storage configured, the cache
820823
/// size is trivially small.
@@ -823,49 +826,58 @@ impl Runtime {
823826
/// all such threads. (Such a thread might be in a circuit that uses
824827
/// storage, but there's no way to know because only [Runtime] makes that
825828
/// available at a thread level.)
826-
pub fn buffer_cache() -> Arc<BufferCache> {
827-
// This cache is shared by all auxiliary threads in the runtime.
828-
// In particular, output connector threads use it to maintain their output buffers.
829-
830-
// FIXME: We may need a tunable strategy for aux threads. We cannot simply give each of them the
831-
// same cache as DBSP worker threads, as there can be dozens of aux threads (currently one per
832-
// output connector), which do not necessarily need a large cache. OTOH, sharing the same cache
833-
// across all of them may potentially cause performance issues.
834-
static AUXILIARY_CACHE: LazyLock<Arc<BufferCache>> =
835-
LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));
836-
837-
// Fast path, look up from TLS
838-
839-
if current_thread_type() == Some(ThreadType::Background) {
840-
// The TOKIO_BUFFER_CACHE variable should be set on task startup.
841-
return TOKIO_BUFFER_CACHE.get().clone();
842-
} else if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
843-
return buffer_cache;
844-
}
845-
846-
// Slow path for initializing the thread-local.
847-
if let Some(rt) = Runtime::runtime() {
848-
match current_thread_type() {
849-
None => {
850-
let buffer_cache = AUXILIARY_CACHE.clone();
851-
BUFFER_CACHE.set(Some(buffer_cache.clone()));
852-
buffer_cache
853-
}
854-
Some(ThreadType::Background) => {
855-
// The TOKIO_BUFFER_CACHE variable should be set on task startup.
856-
panic!("background thread should not call buffer_cache()");
857-
}
858-
Some(thread_type) => {
829+
///
830+
/// - If the thread is a background thread, but no buffer cache is set, this
831+
/// ordinarily indicates a bug. However, this can happen when a
832+
/// background task is dropped, because it's normal for task-local
833+
/// variables to be unavailable in `Drop` during task shutdown. The
834+
/// function returns `None` in this case (and only in this case).
835+
pub fn buffer_cache() -> Option<Arc<BufferCache>> {
836+
if let Some(buffer_cache) = BUFFER_CACHE.with(|bc| bc.borrow().clone()) {
837+
// Fast path common case for foreground threads.
838+
Some(buffer_cache)
839+
} else if let Ok(buffer_cache) = TOKIO_BUFFER_CACHE.try_get() {
840+
// Background tasks case.
841+
Some(buffer_cache)
842+
} else if let Some(rt) = Runtime::runtime()
843+
&& let Some(thread_type) = current_thread_type()
844+
{
845+
// Slow path for threads running in a [Runtime].
846+
match thread_type {
847+
ThreadType::Foreground => {
859848
let buffer_cache =
860849
rt.get_buffer_cache(Runtime::local_worker_offset(), thread_type);
861850
BUFFER_CACHE.set(Some(buffer_cache.clone()));
862-
buffer_cache
851+
Some(buffer_cache)
852+
}
853+
ThreadType::Background => {
854+
// We are in a background thread in a Tokio runtime but no
855+
// buffer cache is set in [TOKIO_BUFFER_CACHE]. This is
856+
// probably a bug, but it can happen when a background task
857+
// is canceled. It is the reason that this function returns
858+
// an `Option`.
859+
None
863860
}
864861
}
865862
} else {
866-
// TODO: We shouldn't create batches outside of a runtime. This should probably
867-
// be converted into a panic.
868-
AUXILIARY_CACHE.clone()
863+
// Fallback path for threads outside a [Runtime].
864+
//
865+
// This cache is shared by all auxiliary threads in the runtime. In
866+
// particular, output connector threads use it to maintain their
867+
// output buffers.
868+
//
869+
// FIXME: We may need a tunable strategy for aux threads. We cannot
870+
// simply give each of them the same cache as DBSP worker threads,
871+
// as there can be dozens of aux threads (currently one per output
872+
// connector), which do not necessarily need a large cache. OTOH,
873+
// sharing the same cache across all of them may potentially cause
874+
// performance issues.
875+
static AUXILIARY_CACHE: LazyLock<Arc<BufferCache>> =
876+
LazyLock::new(|| Arc::new(BufferCache::new(1024 * 1024 * 256)));
877+
878+
let buffer_cache = AUXILIARY_CACHE.clone();
879+
BUFFER_CACHE.set(Some(buffer_cache.clone()));
880+
Some(buffer_cache)
869881
}
870882
}
871883

crates/dbsp/src/storage/file/reader.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ where
547547

548548
fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
549549
let start = Instant::now();
550-
let cache = (file.cache)();
550+
let cache = file.cache();
551551
#[allow(clippy::borrow_deref_ref)]
552552
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
553553
Some(entry) => (
@@ -1030,7 +1030,7 @@ where
10301030

10311031
fn new(file: &ImmutableFileRef, node: &TreeNode) -> Result<Arc<Self>, Error> {
10321032
let start = Instant::now();
1033-
let cache = (file.cache)();
1033+
let cache = file.cache();
10341034
let first_row = node.rows.start;
10351035
#[allow(clippy::borrow_deref_ref)]
10361036
let (access, entry) = match cache.get(&*file.file_handle, node.location) {
@@ -1311,13 +1311,13 @@ impl FileTrailer {
13111311
})
13121312
}
13131313
fn new(
1314-
cache: fn() -> Arc<BufferCache>,
1314+
cache: fn() -> Option<Arc<BufferCache>>,
13151315
file_handle: &dyn FileReader,
13161316
location: BlockLocation,
13171317
stats: &AtomicCacheStats,
13181318
) -> Result<Arc<FileTrailer>, Error> {
13191319
let start = Instant::now();
1320-
let cache = cache();
1320+
let cache = cache().expect("Should have a buffer cache");
13211321
#[allow(clippy::borrow_deref_ref)]
13221322
let (access, entry) = match cache.get(&*file_handle, location) {
13231323
Some(entry) => {
@@ -1397,7 +1397,7 @@ impl Column {
13971397
/// Encapsulates storage and a file handle.
13981398
#[derive(SizeOf)]
13991399
struct ImmutableFileRef {
1400-
cache: fn() -> Arc<BufferCache>,
1400+
cache: fn() -> Option<Arc<BufferCache>>,
14011401
#[size_of(skip)]
14021402
file_handle: Arc<dyn FileReader>,
14031403
compression: Option<Compression>,
@@ -1412,15 +1412,17 @@ impl Debug for ImmutableFileRef {
14121412
}
14131413
impl Drop for ImmutableFileRef {
14141414
fn drop(&mut self) {
1415-
if Arc::strong_count(&self.file_handle) == 1 {
1416-
self.evict();
1415+
if Arc::strong_count(&self.file_handle) == 1
1416+
&& let Some(cache) = (self.cache)()
1417+
{
1418+
cache.evict(&*self.file_handle);
14171419
}
14181420
}
14191421
}
14201422

14211423
impl ImmutableFileRef {
14221424
fn new(
1423-
cache: fn() -> Arc<BufferCache>,
1425+
cache: fn() -> Option<Arc<BufferCache>>,
14241426
file_handle: Arc<dyn FileReader>,
14251427
compression: Option<Compression>,
14261428
stats: AtomicCacheStats,
@@ -1435,8 +1437,13 @@ impl ImmutableFileRef {
14351437
}
14361438
}
14371439

1440+
fn cache(&self) -> Arc<BufferCache> {
1441+
(self.cache)().expect("Should have a buffer cache")
1442+
}
1443+
1444+
#[cfg(test)]
14381445
pub fn evict(&self) {
1439-
(self.cache)().evict(&*self.file_handle);
1446+
self.cache().evict(&*self.file_handle);
14401447
}
14411448

14421449
pub fn read_block(&self, location: BlockLocation) -> Result<Arc<FBuf>, Error> {
@@ -1573,7 +1580,7 @@ where
15731580
/// Creates and returns a new `Reader` for `file`.
15741581
pub(crate) fn new(
15751582
factories: &[&AnyFactories],
1576-
cache: fn() -> Arc<BufferCache>,
1583+
cache: fn() -> Option<Arc<BufferCache>>,
15771584
file: Arc<dyn FileReader>,
15781585
) -> Result<Self, Error> {
15791586
let (reader, _membership_filter) = Self::new_with_filter(factories, cache, file, None)?;
@@ -1582,7 +1589,7 @@ where
15821589

15831590
pub(crate) fn new_with_filter(
15841591
factories: &[&AnyFactories],
1585-
cache: fn() -> Arc<BufferCache>,
1592+
cache: fn() -> Option<Arc<BufferCache>>,
15861593
file: Arc<dyn FileReader>,
15871594
membership_filter: Option<TrackingBloomFilter>,
15881595
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {
@@ -1709,7 +1716,7 @@ where
17091716
/// Instantiates a reader given an existing path.
17101717
pub fn open(
17111718
factories: &[&AnyFactories],
1712-
cache: fn() -> Arc<BufferCache>,
1719+
cache: fn() -> Option<Arc<BufferCache>>,
17131720
storage_backend: &dyn StorageBackend,
17141721
path: &StoragePath,
17151722
) -> Result<Self, Error> {
@@ -1718,7 +1725,7 @@ where
17181725

17191726
pub(crate) fn open_with_filter(
17201727
factories: &[&AnyFactories],
1721-
cache: fn() -> Arc<BufferCache>,
1728+
cache: fn() -> Option<Arc<BufferCache>>,
17221729
storage_backend: &dyn StorageBackend,
17231730
path: &StoragePath,
17241731
) -> Result<(Self, Option<TrackingBloomFilter>), Error> {

crates/dbsp/src/storage/file/reader/bulk_rows.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ where
137137
let (sender, receiver) = channel();
138138
let mut this = Self {
139139
reader,
140-
cache: (reader.file.cache)(),
140+
cache: reader.file.cache(),
141141
factories: reader.columns[column].factories.factories(),
142142
column,
143143
row: 0,

crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ where
187187
let mut this = Self {
188188
keys,
189189
reader,
190-
cache: (reader.file.cache)(),
190+
cache: reader.file.cache(),
191191
factories,
192192
sender,
193193
receiver,

crates/dbsp/src/storage/file/reader/fetch_zset.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ where
6868
let mut this = Self {
6969
keys,
7070
reader,
71-
cache: (reader.file.cache)(),
71+
cache: reader.file.cache(),
7272
factories,
7373
sender,
7474
receiver,

crates/dbsp/src/storage/file/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ fn tup65_from_bits(bits: u128) -> Tup65OptString {
193193
)
194194
}
195195

196-
fn test_buffer_cache() -> Arc<BufferCache> {
196+
fn test_buffer_cache() -> Option<Arc<BufferCache>> {
197197
thread_local! {
198198
static BUFFER_CACHE: Arc<BufferCache> = Arc::new(BufferCache::new(1024 * 1024));
199199
}
200-
BUFFER_CACHE.with(|cache| cache.clone())
200+
Some(BUFFER_CACHE.with(|cache| cache.clone()))
201201
}
202202

203203
fn for_each_compression_type<F>(parameters: Parameters, f: F)

crates/dbsp/src/storage/file/writer.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,7 +1138,7 @@ impl BlockWriter {
11381138
/// all the same type. Thus, [`Writer1`] and [`Writer2`] exist for writing
11391139
/// 1-column and 2-column layer files, respectively, with added type safety.
11401140
struct Writer {
1141-
cache: fn() -> Arc<BufferCache>,
1141+
cache: fn() -> Option<Arc<BufferCache>>,
11421142
writer: BlockWriter,
11431143
bloom_filter: Option<TrackingBloomFilter>,
11441144
cws: Vec<ColumnWriter>,
@@ -1164,7 +1164,7 @@ impl Writer {
11641164

11651165
pub fn new(
11661166
factories: &[&AnyFactories],
1167-
cache: fn() -> Arc<BufferCache>,
1167+
cache: fn() -> Option<Arc<BufferCache>>,
11681168
storage_backend: &dyn StorageBackend,
11691169
parameters: Parameters,
11701170
n_columns: usize,
@@ -1193,7 +1193,10 @@ impl Writer {
11931193
let worker = format!("w{}-", Runtime::worker_index());
11941194
let writer = Self {
11951195
cache,
1196-
writer: BlockWriter::new(cache(), storage_backend.create_with_prefix(&worker.into())?),
1196+
writer: BlockWriter::new(
1197+
cache().expect("Should have a buffer cache"),
1198+
storage_backend.create_with_prefix(&worker.into())?,
1199+
),
11971200
bloom_filter,
11981201
cws,
11991202
finished_columns,
@@ -1377,7 +1380,7 @@ where
13771380
/// Creates a new writer with the given parameters.
13781381
pub fn new(
13791382
factories: &Factories<K0, A0>,
1380-
cache: fn() -> Arc<BufferCache>,
1383+
cache: fn() -> Option<Arc<BufferCache>>,
13811384
storage_backend: &dyn StorageBackend,
13821385
parameters: Parameters,
13831386
estimated_keys: usize,
@@ -1552,7 +1555,7 @@ where
15521555
pub fn new(
15531556
factories0: &Factories<K0, A0>,
15541557
factories1: &Factories<K1, A1>,
1555-
cache: fn() -> Arc<BufferCache>,
1558+
cache: fn() -> Option<Arc<BufferCache>>,
15561559
storage_backend: &dyn StorageBackend,
15571560
parameters: Parameters,
15581561
estimated_keys: usize,

0 commit comments

Comments
 (0)