Skip to content

Commit f59c3f9

Browse files
committed
[adapters] Separate journal and checkpoint code in the controller.
These are related, but distinct, and it now seems worthwhile to separate them. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 6011a60 commit f59c3f9

File tree

9 files changed

+129
-125
lines changed

9 files changed

+129
-125
lines changed

crates/adapterlib/src/errors/controller.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use dbsp::{storage::backend::StorageError, Error as DbspError};
1515
use feldera_types::error::{DetailedError, ErrorResponse};
1616
use serde::{ser::SerializeStruct, Serialize, Serializer};
1717

18-
use super::metadata::StepError;
18+
use super::journal::StepError;
1919
use crate::{format::ParseError, transport::Step, DbspDetailedError};
2020

2121
/// Controller configuration error.
@@ -576,7 +576,7 @@ pub enum ControllerError {
576576
/// from a checkpoint.
577577
RestoreInProgress,
578578

579-
/// Error in steps metadata.
579+
/// Error in journal metadata.
580580
StepError(StepError),
581581

582582
/// Unexpected step number.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
pub mod controller;
2-
pub mod metadata;
2+
pub mod journal;

crates/adapterlib/src/utils/datafusion.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::errors::metadata::ControllerError;
1+
use crate::errors::journal::ControllerError;
22
use anyhow::{anyhow, Error as AnyError};
33
use arrow::util::pretty::pretty_format_batches;
44
use datafusion::common::arrow::array::{AsArray, RecordBatch};

crates/adapters/src/adhoc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
1616
use datafusion::execution::SendableRecordBatchStream;
1717
use datafusion::execution::SessionStateBuilder;
1818
use datafusion::prelude::*;
19-
use feldera_adapterlib::errors::metadata::ControllerError;
19+
use feldera_adapterlib::errors::journal::ControllerError;
2020
use feldera_types::config::PipelineConfig;
2121
use feldera_types::query::{AdHocResultFormat, AdhocQueryArgs};
2222
use futures_util::future::{BoxFuture, FutureExt};
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use dbsp::{
2+
circuit::checkpointer::CheckpointMetadata,
3+
storage::{backend::StorageBackend, buffer_cache::FBuf},
4+
};
5+
use feldera_types::config::PipelineConfig;
6+
use serde::{Deserialize, Serialize};
7+
use serde_json::Value as JsonValue;
8+
use std::{collections::HashMap, path::Path};
9+
10+
use crate::{controller::journal::StepMetadata, transport::Step, ControllerError};
11+
12+
/// Initial offsets for the input endpoints in a [Checkpoint].
13+
///
14+
/// This is a subset of [StepMetadata] that is useful for seeking input
15+
/// endpoints to a starting point.
16+
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
17+
pub struct CheckpointOffsets(
18+
/// Maps from an input endpoint name to its metadata.
19+
pub HashMap<String, JsonValue>,
20+
);
21+
22+
impl From<StepMetadata> for CheckpointOffsets {
23+
fn from(value: StepMetadata) -> Self {
24+
Self(
25+
value
26+
.input_logs
27+
.into_iter()
28+
.map(|(name, log)| (name, log.metadata))
29+
.collect(),
30+
)
31+
}
32+
}
33+
34+
impl From<&StepMetadata> for CheckpointOffsets {
35+
fn from(value: &StepMetadata) -> Self {
36+
Self(
37+
value
38+
.input_logs
39+
.iter()
40+
.map(|(name, log)| (name.clone(), log.metadata.clone()))
41+
.collect(),
42+
)
43+
}
44+
}
45+
46+
/// Checkpoint for a pipeline.
47+
#[derive(Debug, Serialize, Deserialize)]
48+
pub struct Checkpoint {
49+
/// The circuit's checkpoint.
50+
pub circuit: Option<CheckpointMetadata>,
51+
52+
/// Step number.
53+
pub step: Step,
54+
55+
/// Pipeline configuration.
56+
pub config: PipelineConfig,
57+
58+
/// Number of records processed.
59+
pub processed_records: u64,
60+
61+
/// Initial offsets for the input endpoints.
62+
pub input_metadata: CheckpointOffsets,
63+
}
64+
65+
impl Checkpoint {
66+
/// Reads a checkpoint in JSON format from `path`.
67+
pub(super) fn read<P>(storage: &dyn StorageBackend, path: P) -> Result<Self, ControllerError>
68+
where
69+
P: AsRef<Path>,
70+
{
71+
let path = path.as_ref();
72+
let data = storage.read(path).map_err(|error| {
73+
ControllerError::storage_error(
74+
format!("{}: failed to read checkpoint", path.display()),
75+
error,
76+
)
77+
})?;
78+
serde_json::from_slice::<Checkpoint>(&data).map_err(|e| {
79+
ControllerError::CheckpointParseError {
80+
error: e.to_string(),
81+
}
82+
})
83+
}
84+
85+
/// Writes this checkpoint in JSON format to `path`, atomically replacing
86+
/// any file that was previously at `path`.
87+
pub(super) fn write<P>(
88+
&self,
89+
storage: &dyn StorageBackend,
90+
path: P,
91+
) -> Result<(), ControllerError>
92+
where
93+
P: AsRef<Path>,
94+
{
95+
let path = path.as_ref();
96+
let mut content = FBuf::with_capacity(4096);
97+
serde_json::to_writer(&mut content, self).unwrap();
98+
storage.write(path, content).map_err(|error| {
99+
ControllerError::storage_error(
100+
format!("{}: failed to write pipeline state", path.display()),
101+
error,
102+
)
103+
})
104+
}
105+
}

crates/adapters/src/controller/metadata.rs renamed to crates/adapters/src/controller/journal.rs

Lines changed: 8 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,3 @@
1-
use dbsp::{
2-
circuit::checkpointer::CheckpointMetadata,
3-
storage::{backend::StorageBackend, buffer_cache::FBuf},
4-
};
5-
use feldera_types::config::{InputEndpointConfig, PipelineConfig};
6-
use rmpv::Value as RmpValue;
7-
use serde::{Deserialize, Serialize};
8-
use serde_json::Value as JsonValue;
91
use std::{
102
cmp::Ordering,
113
collections::{HashMap, HashSet},
@@ -16,71 +8,15 @@ use std::{
168
thread::{self, JoinHandle},
179
};
1810

19-
use crate::{transport::Step, util::write_file_atomically, ControllerError};
20-
pub use feldera_adapterlib::errors::metadata::StepError;
21-
22-
/// Checkpoint for a pipeline.
23-
#[derive(Debug, Serialize, Deserialize)]
24-
pub struct Checkpoint {
25-
/// The circuit's checkpoint.
26-
pub circuit: Option<CheckpointMetadata>,
27-
28-
/// Step number.
29-
pub step: Step,
30-
31-
/// Pipeline configuration.
32-
pub config: PipelineConfig,
33-
34-
/// Number of records processed.
35-
pub processed_records: u64,
36-
37-
/// Initial offsets for the input endpoints.
38-
pub input_metadata: StepInputMetadata,
39-
}
40-
41-
impl Checkpoint {
42-
/// Reads a checkpoint in JSON format from `path`.
43-
pub(super) fn read<P>(storage: &dyn StorageBackend, path: P) -> Result<Self, ControllerError>
44-
where
45-
P: AsRef<Path>,
46-
{
47-
let path = path.as_ref();
48-
let data = storage.read(path).map_err(|error| {
49-
ControllerError::storage_error(
50-
format!("{}: failed to read checkpoint", path.display()),
51-
error,
52-
)
53-
})?;
54-
serde_json::from_slice::<Checkpoint>(&data).map_err(|e| {
55-
ControllerError::CheckpointParseError {
56-
error: e.to_string(),
57-
}
58-
})
59-
}
11+
use feldera_adapterlib::{errors::journal::StepError, transport::Step};
12+
use feldera_types::config::InputEndpointConfig;
13+
use rmpv::Value as RmpValue;
14+
use serde::{Deserialize, Serialize};
15+
use serde_json::Value as JsonValue;
6016

61-
/// Writes this checkpoint in JSON format to `path`, atomically replacing
62-
/// any file that was previously at `path`.
63-
pub(super) fn write<P>(
64-
&self,
65-
storage: &dyn StorageBackend,
66-
path: P,
67-
) -> Result<(), ControllerError>
68-
where
69-
P: AsRef<Path>,
70-
{
71-
let path = path.as_ref();
72-
let mut content = FBuf::with_capacity(4096);
73-
serde_json::to_writer(&mut content, self).unwrap();
74-
storage.write(path, content).map_err(|error| {
75-
ControllerError::storage_error(
76-
format!("{}: failed to write pipeline state", path.display()),
77-
error,
78-
)
79-
})
80-
}
81-
}
17+
use crate::util::write_file_atomically;
8218

83-
pub struct BackgroundSync {
19+
struct BackgroundSync {
8420
join_handle: Option<JoinHandle<()>>,
8521
request_sender: Option<Sender<()>>,
8622
reply_receiver: Receiver<Result<(), IoError>>,
@@ -504,47 +440,13 @@ impl From<&StepMetadata> for StepInputChecksums {
504440
}
505441
}
506442

507-
/// Metadata for the input endpoints in a step.
508-
///
509-
/// This is a subset of [StepMetadata] that is useful for seeking input
510-
/// endpoints to a starting point.
511-
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
512-
pub struct StepInputMetadata(
513-
/// Maps from an input endpoint name to its metadata.
514-
pub HashMap<String, JsonValue>,
515-
);
516-
517-
impl From<StepMetadata> for StepInputMetadata {
518-
fn from(value: StepMetadata) -> Self {
519-
Self(
520-
value
521-
.input_logs
522-
.into_iter()
523-
.map(|(name, log)| (name, log.metadata))
524-
.collect(),
525-
)
526-
}
527-
}
528-
529-
impl From<&StepMetadata> for StepInputMetadata {
530-
fn from(value: &StepMetadata) -> Self {
531-
Self(
532-
value
533-
.input_logs
534-
.iter()
535-
.map(|(name, log)| (name.clone(), log.metadata.clone()))
536-
.collect(),
537-
)
538-
}
539-
}
540-
541443
#[cfg(test)]
542444
mod tests {
543445
use std::collections::{HashMap, HashSet};
544446

545447
use tempfile::TempDir;
546448

547-
use crate::{controller::metadata::ReadResult, test::init_test_logger};
449+
use crate::{controller::journal::ReadResult, test::init_test_logger};
548450

549451
use super::{StepMetadata, StepReader, StepWriter};
550452

crates/adapters/src/controller/mod.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
//! buffered via `InputConsumer::buffered`.
1919
2020
use crate::catalog::OutputCollectionHandles;
21-
use crate::controller::metadata::{
22-
InputChecksums, ReadResult, StepInputChecksums, StepInputMetadata,
23-
};
24-
use crate::controller::stats::{CanSuspend, StepResults};
21+
use crate::controller::checkpoint::CheckpointOffsets;
2522
use crate::create_integrated_output_endpoint;
2623
use crate::transport::Step;
2724
use crate::transport::{input_transport_config_to_endpoint, output_transport_config_to_endpoint};
@@ -33,6 +30,7 @@ use crate::{
3330
use anyhow::Error as AnyError;
3431
use arrow::datatypes::Schema;
3532
use atomic::Atomic;
33+
use checkpoint::Checkpoint;
3634
use crossbeam::{
3735
queue::SegQueue,
3836
sync::{Parker, ShardedLock, Unparker},
@@ -51,15 +49,13 @@ use feldera_types::format::json::JsonLines;
5149
use governor::DefaultDirectRateLimiter;
5250
use governor::Quota;
5351
use governor::RateLimiter;
54-
use metadata::Checkpoint;
55-
use metadata::InputLog;
56-
use metadata::StepMetadata;
57-
use metadata::StepRw;
52+
use journal::{InputChecksums, InputLog, ReadResult, StepInputChecksums, StepMetadata, StepRw};
5853
use metrics_exporter_prometheus::PrometheusHandle;
5954
use metrics_util::debugging::Snapshotter;
6055
use nonzero_ext::nonzero;
6156
use rmpv::Value as RmpValue;
6257
use serde_json::Value as JsonValue;
58+
use stats::{CanSuspend, StepResults};
6359
use std::borrow::Cow;
6460
use std::collections::HashMap;
6561
use std::collections::HashSet;
@@ -85,8 +81,9 @@ use tokio::sync::Mutex as TokioMutex;
8581
use tracing::{debug, error, info, trace, warn};
8682
use validate::validate_config;
8783

84+
mod checkpoint;
8885
mod error;
89-
mod metadata;
86+
mod journal;
9087
mod stats;
9188
mod validate;
9289

@@ -553,7 +550,7 @@ struct CircuitThread {
553550
/// starting point for reading data for `step`.
554551
///
555552
/// This is only `None` if `step` is 0.
556-
input_metadata: Option<StepInputMetadata>,
553+
input_metadata: Option<CheckpointOffsets>,
557554
}
558555

559556
impl CircuitThread {
@@ -700,7 +697,7 @@ impl CircuitThread {
700697
else {
701698
return Ok(false);
702699
};
703-
self.input_metadata = Some(StepInputMetadata(
700+
self.input_metadata = Some(CheckpointOffsets(
704701
step_metadata
705702
.iter()
706703
.map(|(name, log)| (name.clone(), log.metadata.clone()))
@@ -1108,7 +1105,7 @@ impl FtState {
11081105
step: 0,
11091106
config,
11101107
processed_records: 0,
1111-
input_metadata: StepInputMetadata::default(),
1108+
input_metadata: CheckpointOffsets::default(),
11121109
};
11131110
checkpoint.write(&*storage, &state_path)?;
11141111

@@ -1386,7 +1383,7 @@ struct ControllerInit {
13861383
/// Metadata for seeking to input endpoint initial positions.
13871384
///
13881385
/// This is `Some` iff we read a checkpoint.
1389-
input_metadata: Option<StepInputMetadata>,
1386+
input_metadata: Option<CheckpointOffsets>,
13901387
}
13911388

13921389
fn storage_path(config: &PipelineConfig) -> Option<&Path> {

crates/iceberg/src/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use datafusion::{
88
use dbsp::circuit::tokio::TOKIO;
99
use feldera_adapterlib::{
1010
catalog::{ArrowStream, InputCollectionHandle},
11-
errors::metadata::ControllerError,
11+
errors::journal::ControllerError,
1212
format::ParseError,
1313
transport::{
1414
InputConsumer, InputEndpoint, InputQueue, InputReader, InputReaderCommand,

0 commit comments

Comments
 (0)