Skip to content

Commit 62f881d

Browse files
committed
[dbsp] Make DevTweaks public so it will get documented.
Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent a1001ee commit 62f881d

File tree

35 files changed

+871
-369
lines changed

35 files changed

+871
-369
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/src/controller.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use dbsp::circuit::metrics::{
5858
DBSP_STEP_LATENCY_MICROSECONDS, FILES_CREATED, FILES_DELETED, TOTAL_LATE_RECORDS,
5959
};
6060
use dbsp::circuit::tokio::TOKIO;
61-
use dbsp::circuit::{CheckpointCommitter, CircuitStorageConfig, DevTweaks, Mode};
61+
use dbsp::circuit::{CheckpointCommitter, CircuitStorageConfig, Mode};
6262
use dbsp::samply::{MARKER_BYTES, Markers, SamplySpan};
6363
use dbsp::storage::backend::{StorageBackend, StoragePath};
6464
use dbsp::utils::process_rss_bytes;
@@ -159,8 +159,7 @@ pub use feldera_types::config::{
159159
RuntimeConfig, TransportConfig,
160160
};
161161
use feldera_types::config::{
162-
DEFAULT_MAX_WORKER_BATCH_SIZE, FileBackendConfig, FtConfig, FtModel, OutputBufferConfig,
163-
StorageBackendConfig, SyncConfig,
162+
DEFAULT_MAX_WORKER_BATCH_SIZE, DevTweaks, FileBackendConfig, FtConfig, FtModel, OutputBufferConfig, StorageBackendConfig, SyncConfig
164163
};
165164
use feldera_types::constants::{STATE_FILE, STEPS_FILE};
166165
use feldera_types::format::json::{JsonFlavor, JsonParserConfig, JsonUpdateFormat};
@@ -4400,7 +4399,10 @@ impl ControllerInit {
44004399
pipeline_config: &PipelineConfig,
44014400
storage: Option<CircuitStorageConfig>,
44024401
) -> Result<CircuitConfig, ControllerError> {
4403-
let dev_tweaks = DevTweaks::from_config(&pipeline_config.global.dev_tweaks);
4402+
let dev_tweaks = pipeline_config.global.dev_tweaks.clone();
4403+
if dev_tweaks != DevTweaks::default() {
4404+
info!("using non-default `dev_tweaks`: {dev_tweaks:#?}")
4405+
}
44044406

44054407
let mut max_rss_mb = pipeline_config.global.max_rss_mb;
44064408

crates/adapters/src/server.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -625,13 +625,7 @@ pub fn run_server(
625625

626626
// Install stack overflow handler early, before creating the controller and parsing DevTweaks.
627627
#[cfg(target_family = "unix")]
628-
if config
629-
.global
630-
.dev_tweaks
631-
.get("stack_overflow_backtrace")
632-
.cloned()
633-
== Some(serde_json::Value::Bool(true))
634-
{
628+
if config.global.dev_tweaks.stack_overflow_backtrace() {
635629
unsafe {
636630
use crate::server::stack_overflow_backtrace::enable_stack_overflow_backtrace_with_limit;
637631

crates/adapters/src/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ pub(crate) fn run_in_posix_runtime<F>(
718718
F: FnOnce() + Send + 'static,
719719
{
720720
use dbsp::Runtime;
721-
use dbsp::circuit::{CircuitConfig, CircuitStorageConfig, DevTweaks, Layout, Mode};
721+
use dbsp::circuit::{CircuitConfig, CircuitStorageConfig, Layout, Mode};
722722
use feldera_types::config::{StorageCacheConfig, StorageConfig, StorageOptions};
723723
use std::sync::{Arc, Mutex};
724724

@@ -742,7 +742,7 @@ pub(crate) fn run_in_posix_runtime<F>(
742742
)
743743
.expect("failed to configure storage"),
744744
),
745-
dev_tweaks: DevTweaks::default(),
745+
dev_tweaks: Default::default(),
746746
};
747747

748748
let test_fn: Arc<Mutex<Option<F>>> = Arc::new(Mutex::new(Some(test_fn)));

crates/buffer-cache/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ description = "Weighted in-memory buffer caches with LRU and S3-FIFO eviction"
1313
[dependencies]
1414
crossbeam-utils = { workspace = true }
1515
enum-map = { workspace = true }
16+
feldera-types = { workspace = true }
1617
quick_cache = { workspace = true }
1718
serde = { workspace = true, features = ["derive"] }
1819
tracing = { workspace = true }

crates/buffer-cache/src/builder.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,12 @@
11
use crate::ThreadType;
22
use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
33
use enum_map::{Enum, EnumMap};
4-
use serde::{Deserialize, Serialize};
4+
use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
55
use std::fmt::Debug;
66
use std::hash::{BuildHasher, Hash, RandomState};
77
use std::marker::PhantomData;
88
use tracing::warn;
99

10-
/// Selects which eviction strategy backs a cache instance.
11-
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
12-
#[serde(rename_all = "snake_case")]
13-
pub enum BufferCacheStrategy {
14-
/// Use the sharded S3-FIFO cache backed by `quick_cache`.
15-
#[default]
16-
S3Fifo,
17-
18-
/// Use the mutex-protected weighted LRU cache.
19-
Lru,
20-
}
21-
22-
/// Controls how caches are shared across a foreground/background worker pair.
23-
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
24-
#[serde(rename_all = "snake_case")]
25-
pub enum BufferCacheAllocationStrategy {
26-
/// Share one cache across a foreground/background worker pair.
27-
#[default]
28-
SharedPerWorkerPair,
29-
30-
/// Create a separate cache for each foreground/background thread.
31-
PerThread,
32-
33-
/// Share one cache across all foreground/background threads.
34-
Global,
35-
}
36-
3710
/// Builds the cache layout used by DBSP runtime worker pairs.
3811
pub struct BufferCacheBuilder<K, V, S = RandomState> {
3912
/// Eviction strategy used for newly constructed caches.

crates/buffer-cache/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ mod thread_type;
88
use std::any::Any;
99
use std::sync::Arc;
1010

11-
pub use builder::{BufferCacheAllocationStrategy, BufferCacheBuilder, BufferCacheStrategy};
11+
pub use builder::BufferCacheBuilder;
12+
use feldera_types::config::dev_tweaks::BufferCacheStrategy;
1213
pub use lru::LruCache;
1314
pub use s3_fifo::S3FifoCache;
1415
pub use thread_type::ThreadType;

crates/buffer-cache/src/lru.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{BufferCache, BufferCacheStrategy, CacheEntry};
1+
use feldera_types::config::dev_tweaks::BufferCacheStrategy;
2+
3+
use crate::{BufferCache, CacheEntry};
24
use std::any::Any;
35
use std::collections::BTreeMap;
46
use std::fmt::Debug;

crates/buffer-cache/src/s3_fifo.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use crate::{BufferCache, BufferCacheStrategy, CacheEntry};
2-
use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
1+
use crate::{BufferCache, CacheEntry};
2+
use feldera_types::config::dev_tweaks::BufferCacheStrategy;
3+
use quick_cache::{sync::Cache as QuickCache, OptionsBuilder, Weighter};
34
use std::any::Any;
45
use std::hash::{BuildHasher, Hash, RandomState};
56

crates/buffer-cache/src/tests/builder.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use crate::{
2-
BufferCacheAllocationStrategy, BufferCacheBuilder, BufferCacheStrategy, CacheEntry, ThreadType,
3-
};
1+
use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
2+
3+
use crate::{BufferCacheBuilder, CacheEntry, ThreadType};
44
use std::sync::Arc;
55

66
#[derive(Clone)]

0 commit comments

Comments
 (0)