Skip to content

Commit 456733a

Browse files
committed
[dbsp] Make FileReaders and FileWriters know their own paths.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent c9b9bd1 commit 456733a

21 files changed

Lines changed: 133 additions & 108 deletions

crates/dbsp/src/storage/backend.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ pub use feldera_storage::{
2424
block::{BlockLocation, InvalidBlockLocation},
2525
error::StorageError,
2626
file::FileId,
27-
file::HasFileId,
28-
FileReader, FileWriter, StorageBackend, StorageFileType, StoragePath, StoragePathPart,
27+
FileReader, FileRw, FileWriter, StorageBackend, StorageFileType, StoragePath, StoragePathPart,
2928
};
3029

3130
/// Extension added to files that are incomplete/being written to.

crates/dbsp/src/storage/backend/memory_impl.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
//!
33
//! This is useful for performance testing, not as part of a production system.
44
5-
use super::{
6-
BlockLocation, FileId, FileReader, FileWriter, HasFileId, StorageBackend, StorageError,
7-
};
5+
use super::{BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageBackend, StorageError};
86
use crate::circuit::metrics::FILES_CREATED;
97
use crate::storage::buffer_cache::FBuf;
108
use feldera_storage::{StorageFileType, StoragePath};
9+
use std::fmt::Debug;
1110
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
1211
use std::{
1312
collections::HashMap,
@@ -22,10 +21,13 @@ struct MemoryFile {
2221
size: u64,
2322
}
2423

25-
impl HasFileId for MemoryFile {
24+
impl FileRw for MemoryFile {
2625
fn file_id(&self) -> FileId {
2726
self.file_id
2827
}
28+
fn path(&self) -> &StoragePath {
29+
&self.path
30+
}
2931
}
3032

3133
struct MemoryBackendInner {
@@ -80,10 +82,14 @@ impl MemoryWriter {
8082
}
8183
}
8284

83-
impl HasFileId for MemoryWriter {
85+
impl FileRw for MemoryWriter {
8486
fn file_id(&self) -> FileId {
8587
self.file.file_id
8688
}
89+
90+
fn path(&self) -> &StoragePath {
91+
&self.file.path
92+
}
8793
}
8894

8995
impl FileWriter for MemoryWriter {
@@ -101,8 +107,7 @@ impl FileWriter for MemoryWriter {
101107
Ok(data)
102108
}
103109

104-
fn complete(mut self: Box<Self>) -> Result<(Arc<dyn FileReader>, StoragePath), StorageError> {
105-
let path = self.file.path.clone();
110+
fn complete(mut self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError> {
106111
self.drop.size = 0;
107112
let file = Arc::new(self.file);
108113
self.backend
@@ -116,7 +121,7 @@ impl FileWriter for MemoryWriter {
116121
file,
117122
keep: AtomicBool::new(false),
118123
});
119-
Ok((reader, path))
124+
Ok(reader)
120125
}
121126
}
122127

@@ -137,10 +142,19 @@ struct MemoryReader {
137142
keep: AtomicBool,
138143
}
139144

140-
impl HasFileId for MemoryReader {
145+
impl Debug for MemoryReader {
146+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147+
write!(f, "MemoryReader({})", self.file.path)
148+
}
149+
}
150+
151+
impl FileRw for MemoryReader {
141152
fn file_id(&self) -> FileId {
142153
self.file.file_id
143154
}
155+
fn path(&self) -> &StoragePath {
156+
&self.file.path
157+
}
144158
}
145159

146160
impl FileReader for MemoryReader {

crates/dbsp/src/storage/backend/posixio_impl.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! [StorageBackend] implementation using POSIX I/O.
22
33
use super::{
4-
BlockLocation, FileId, FileReader, FileWriter, HasFileId, StorageCacheFlags, StorageError,
4+
BlockLocation, FileId, FileReader, FileRw, FileWriter, StorageCacheFlags, StorageError,
55
MUTABLE_EXTENSION,
66
};
77
use crate::circuit::metrics::{FILES_CREATED, FILES_DELETED};
@@ -20,6 +20,7 @@ use feldera_types::config::{
2020
FileBackendConfig, StorageBackendConfig, StorageCacheConfig, StorageConfig,
2121
};
2222
use std::ffi::OsString;
23+
use std::fmt::Debug;
2324
use std::fs::{create_dir_all, DirEntry};
2425
use std::io::{ErrorKind, IoSlice, Write};
2526
use std::thread::sleep;
@@ -37,6 +38,7 @@ use std::{
3738
use tracing::warn;
3839

3940
pub(super) struct PosixReader {
41+
path: StoragePath,
4042
file: Arc<File>,
4143
file_id: FileId,
4244
drop: DeleteOnDrop,
@@ -48,15 +50,23 @@ pub(super) struct PosixReader {
4850
ioop_delay: Duration,
4951
}
5052

53+
impl Debug for PosixReader {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
write!(f, "PosixReader({})", self.path)
56+
}
57+
}
58+
5159
impl PosixReader {
5260
fn new(
61+
path: StoragePath,
5362
file: Arc<File>,
5463
file_id: FileId,
5564
drop: DeleteOnDrop,
5665
async_threads: bool,
5766
ioop_delay: Duration,
5867
) -> Self {
5968
Self {
69+
path,
6070
file,
6171
file_id,
6272
drop,
@@ -65,7 +75,8 @@ impl PosixReader {
6575
}
6676
}
6777
fn open(
68-
path: PathBuf,
78+
path: StoragePath,
79+
file_name: PathBuf,
6980
cache: StorageCacheConfig,
7081
usage: Arc<AtomicI64>,
7182
async_threads: bool,
@@ -74,27 +85,31 @@ impl PosixReader {
7485
let file = OpenOptions::new()
7586
.read(true)
7687
.cache_flags(&cache)
77-
.open(&path)
78-
.map_err(|e| StorageError::stdio(e.kind(), "open", path.display()))?;
88+
.open(&file_name)
89+
.map_err(|e| StorageError::stdio(e.kind(), "open", file_name.display()))?;
7990
let size = file
8091
.metadata()
81-
.map_err(|e| StorageError::stdio(e.kind(), "fstat", path.display()))?
92+
.map_err(|e| StorageError::stdio(e.kind(), "fstat", file_name.display()))?
8293
.size();
8394

8495
Ok(Arc::new(Self::new(
96+
path,
8597
Arc::new(file),
8698
FileId::new(),
87-
DeleteOnDrop::new(path, true, size, usage),
99+
DeleteOnDrop::new(file_name, true, size, usage),
88100
async_threads,
89101
ioop_delay,
90102
)))
91103
}
92104
}
93105

94-
impl HasFileId for PosixReader {
106+
impl FileRw for PosixReader {
95107
fn file_id(&self) -> FileId {
96108
self.file_id
97109
}
110+
fn path(&self) -> &StoragePath {
111+
&self.path
112+
}
98113
}
99114

100115
impl FileReader for PosixReader {
@@ -219,10 +234,14 @@ struct PosixWriter {
219234
ioop_delay: Duration,
220235
}
221236

222-
impl HasFileId for PosixWriter {
237+
impl FileRw for PosixWriter {
223238
fn file_id(&self) -> FileId {
224239
self.file_id
225240
}
241+
242+
fn path(&self) -> &StoragePath {
243+
&self.name
244+
}
226245
}
227246

228247
impl FileWriter for PosixWriter {
@@ -232,7 +251,7 @@ impl FileWriter for PosixWriter {
232251
Ok(block)
233252
}
234253

235-
fn complete(mut self: Box<Self>) -> Result<(Arc<dyn FileReader>, StoragePath), StorageError> {
254+
fn complete(mut self: Box<Self>) -> Result<Arc<dyn FileReader>, StorageError> {
236255
if !self.buffers.is_empty() {
237256
self.flush()?;
238257
}
@@ -247,16 +266,14 @@ impl FileWriter for PosixWriter {
247266
fs::rename(&self.drop.path, &finalized_path)
248267
.map_err(|e| StorageError::stdio(e.kind(), "rename", self.drop.path.display()))?;
249268

250-
Ok((
251-
Arc::new(PosixReader::new(
252-
Arc::new(self.file),
253-
self.file_id,
254-
self.drop.with_path(finalized_path),
255-
self.async_threads,
256-
self.ioop_delay,
257-
)) as Arc<dyn FileReader>,
269+
Ok(Arc::new(PosixReader::new(
258270
self.name,
259-
))
271+
Arc::new(self.file),
272+
self.file_id,
273+
self.drop.with_path(finalized_path),
274+
self.async_threads,
275+
self.ioop_delay,
276+
)) as Arc<dyn FileReader>)
260277
})
261278
}
262279
}
@@ -452,6 +469,7 @@ impl StorageBackend for PosixBackend {
452469

453470
fn open(&self, name: &StoragePath) -> Result<Arc<dyn FileReader>, StorageError> {
454471
PosixReader::open(
472+
name.clone(),
455473
self.fs_path(name),
456474
self.cache,
457475
self.usage.clone(),

crates/dbsp/src/storage/backend/tests.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ pub(super) fn test_backend(
113113
writer.write_block(block).unwrap();
114114
}
115115

116-
let (reader, name) = writer.complete().unwrap();
116+
let reader = writer.complete().unwrap();
117+
let name = reader.path().clone();
117118
assert_eq!(backend.usage().load(Ordering::Relaxed), expected_size);
118119
test_read(reader.as_ref(), &data);
119120
if mark_for_checkpoint {

crates/dbsp/src/storage/file/reader.rs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,7 +1271,6 @@ impl Column {
12711271

12721272
/// Encapsulates storage and a file handle.
12731273
struct ImmutableFileRef {
1274-
path: StoragePath,
12751274
cache: fn() -> Arc<BufferCache>,
12761275
file_handle: Arc<dyn FileReader>,
12771276
compression: Option<Compression>,
@@ -1280,9 +1279,7 @@ struct ImmutableFileRef {
12801279

12811280
impl Debug for ImmutableFileRef {
12821281
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
1283-
f.debug_struct("ImmutableFileRef")
1284-
.field("path", &self.path)
1285-
.finish()
1282+
write!(f, "ImmutableFileRef({:?})", &self.file_handle)
12861283
}
12871284
}
12881285
impl Drop for ImmutableFileRef {
@@ -1297,13 +1294,11 @@ impl ImmutableFileRef {
12971294
fn new(
12981295
cache: fn() -> Arc<BufferCache>,
12991296
file_handle: Arc<dyn FileReader>,
1300-
path: StoragePath,
13011297
compression: Option<Compression>,
13021298
stats: AtomicCacheStats,
13031299
) -> Self {
13041300
Self {
13051301
cache,
1306-
path,
13071302
file_handle,
13081303
compression,
13091304
stats,
@@ -1436,20 +1431,19 @@ where
14361431
/// Creates and returns a new `Reader` for `file`.
14371432
pub(crate) fn new(
14381433
factories: &[&AnyFactories],
1439-
path: StoragePath,
14401434
cache: fn() -> Arc<BufferCache>,
1441-
file_handle: Arc<dyn FileReader>,
1435+
file: Arc<dyn FileReader>,
14421436
bloom_filter: Option<BloomFilter>,
14431437
) -> Result<Self, Error> {
1444-
let file_size = file_handle.get_size()?;
1438+
let file_size = file.get_size()?;
14451439
if file_size < 512 || (file_size % 512) != 0 {
14461440
return Err(CorruptionError::InvalidFileSize(file_size).into());
14471441
}
14481442

14491443
let stats = AtomicCacheStats::default();
14501444
let file_trailer = FileTrailer::new(
14511445
cache,
1452-
&*file_handle,
1446+
&*file,
14531447
BlockLocation::new(file_size - 512, 512).unwrap(),
14541448
&stats,
14551449
)?;
@@ -1460,7 +1454,7 @@ where
14601454
}
14611455
2 => {
14621456
// Version before [fastbloom] crate was upgraded to one with an incompatible format.
1463-
warn!("{path}: reading old format storage file, performance may be reduced due to incompatible Bloom filters");
1457+
warn!("{}: reading old format storage file, performance may be reduced due to incompatible Bloom filters", file.path());
14641458
Some(false)
14651459
}
14661460
VERSION_NUMBER => Some(true),
@@ -1473,7 +1467,8 @@ where
14731467

14741468
if file_trailer.compatible_features != 0 {
14751469
info!(
1476-
"{path}: storage file uses unsupported compatible features {:#x}",
1470+
"{}: storage file uses unsupported compatible features {:#x}",
1471+
file.path(),
14771472
file_trailer.compatible_features
14781473
);
14791474
}
@@ -1519,7 +1514,7 @@ where
15191514
Some(bloom_filter) => Some(bloom_filter),
15201515
None if has_compatible_bloom_filter && file_trailer.filter_offset != 0 => Some(
15211516
FilterBlock::new(
1522-
&*file_handle,
1517+
&*file,
15231518
BlockLocation::new(
15241519
file_trailer.filter_offset,
15251520
file_trailer.filter_size as usize,
@@ -1534,7 +1529,7 @@ where
15341529
};
15351530

15361531
Ok(Self {
1537-
file: ImmutableFileRef::new(cache, file_handle, path, file_trailer.compression, stats),
1532+
file: ImmutableFileRef::new(cache, file, file_trailer.compression, stats),
15381533
columns,
15391534
bloom_filter,
15401535
_phantom: PhantomData,
@@ -1553,13 +1548,7 @@ where
15531548
storage_backend: &dyn StorageBackend,
15541549
path: &StoragePath,
15551550
) -> Result<Self, Error> {
1556-
Self::new(
1557-
factories,
1558-
path.clone(),
1559-
cache,
1560-
storage_backend.open(path)?,
1561-
None,
1562-
)
1551+
Self::new(factories, cache, storage_backend.open(path)?, None)
15631552
}
15641553

15651554
/// The number of columns in the layer file.
@@ -1580,8 +1569,8 @@ where
15801569
}
15811570

15821571
/// Returns the storage path for the underlying object.
1583-
pub fn path(&self) -> StoragePath {
1584-
self.file.path.clone()
1572+
pub fn path(&self) -> &StoragePath {
1573+
self.file.file_handle.path()
15851574
}
15861575

15871576
/// Returns the size of the underlying file in bytes.
@@ -1602,8 +1591,8 @@ where
16021591
}
16031592

16041593
/// Returns the `FileReader` embedded in this `Reader`.
1605-
pub fn file_handle(&self) -> &dyn FileReader {
1606-
&*self.file.file_handle
1594+
pub fn file_handle(&self) -> &Arc<dyn FileReader> {
1595+
&self.file.file_handle
16071596
}
16081597
}
16091598

0 commit comments

Comments
 (0)