1- use dbsp:: {
2- circuit:: checkpointer:: CheckpointMetadata ,
3- storage:: { backend:: StorageBackend , buffer_cache:: FBuf } ,
4- } ;
5- use feldera_types:: config:: { InputEndpointConfig , PipelineConfig } ;
6- use rmpv:: Value as RmpValue ;
7- use serde:: { Deserialize , Serialize } ;
8- use serde_json:: Value as JsonValue ;
91use std:: {
102 cmp:: Ordering ,
113 collections:: { HashMap , HashSet } ,
@@ -16,71 +8,15 @@ use std::{
168 thread:: { self , JoinHandle } ,
179} ;
1810
19- use crate :: { transport:: Step , util:: write_file_atomically, ControllerError } ;
20- pub use feldera_adapterlib:: errors:: metadata:: StepError ;
21-
22- /// Checkpoint for a pipeline.
23- #[ derive( Debug , Serialize , Deserialize ) ]
24- pub struct Checkpoint {
25- /// The circuit's checkpoint.
26- pub circuit : Option < CheckpointMetadata > ,
27-
28- /// Step number.
29- pub step : Step ,
30-
31- /// Pipeline configuration.
32- pub config : PipelineConfig ,
33-
34- /// Number of records processed.
35- pub processed_records : u64 ,
36-
37- /// Initial offsets for the input endpoints.
38- pub input_metadata : StepInputMetadata ,
39- }
40-
41- impl Checkpoint {
42- /// Reads a checkpoint in JSON format from `path`.
43- pub ( super ) fn read < P > ( storage : & dyn StorageBackend , path : P ) -> Result < Self , ControllerError >
44- where
45- P : AsRef < Path > ,
46- {
47- let path = path. as_ref ( ) ;
48- let data = storage. read ( path) . map_err ( |error| {
49- ControllerError :: storage_error (
50- format ! ( "{}: failed to read checkpoint" , path. display( ) ) ,
51- error,
52- )
53- } ) ?;
54- serde_json:: from_slice :: < Checkpoint > ( & data) . map_err ( |e| {
55- ControllerError :: CheckpointParseError {
56- error : e. to_string ( ) ,
57- }
58- } )
59- }
11+ use feldera_adapterlib:: { errors:: journal:: StepError , transport:: Step } ;
12+ use feldera_types:: config:: InputEndpointConfig ;
13+ use rmpv:: Value as RmpValue ;
14+ use serde:: { Deserialize , Serialize } ;
15+ use serde_json:: Value as JsonValue ;
6016
61- /// Writes this checkpoint in JSON format to `path`, atomically replacing
62- /// any file that was previously at `path`.
63- pub ( super ) fn write < P > (
64- & self ,
65- storage : & dyn StorageBackend ,
66- path : P ,
67- ) -> Result < ( ) , ControllerError >
68- where
69- P : AsRef < Path > ,
70- {
71- let path = path. as_ref ( ) ;
72- let mut content = FBuf :: with_capacity ( 4096 ) ;
73- serde_json:: to_writer ( & mut content, self ) . unwrap ( ) ;
74- storage. write ( path, content) . map_err ( |error| {
75- ControllerError :: storage_error (
76- format ! ( "{}: failed to write pipeline state" , path. display( ) ) ,
77- error,
78- )
79- } )
80- }
81- }
17+ use crate :: util:: write_file_atomically;
8218
83- pub struct BackgroundSync {
19+ struct BackgroundSync {
8420 join_handle : Option < JoinHandle < ( ) > > ,
8521 request_sender : Option < Sender < ( ) > > ,
8622 reply_receiver : Receiver < Result < ( ) , IoError > > ,
@@ -504,47 +440,13 @@ impl From<&StepMetadata> for StepInputChecksums {
504440 }
505441}
506442
507- /// Metadata for the input endpoints in a step.
508- ///
509- /// This is a subset of [StepMetadata] that is useful for seeking input
510- /// endpoints to a starting point.
511- #[ derive( Clone , Serialize , Deserialize , Debug , Default ) ]
512- pub struct StepInputMetadata (
513- /// Maps from an input endpoint name to its metadata.
514- pub HashMap < String , JsonValue > ,
515- ) ;
516-
517- impl From < StepMetadata > for StepInputMetadata {
518- fn from ( value : StepMetadata ) -> Self {
519- Self (
520- value
521- . input_logs
522- . into_iter ( )
523- . map ( |( name, log) | ( name, log. metadata ) )
524- . collect ( ) ,
525- )
526- }
527- }
528-
529- impl From < & StepMetadata > for StepInputMetadata {
530- fn from ( value : & StepMetadata ) -> Self {
531- Self (
532- value
533- . input_logs
534- . iter ( )
535- . map ( |( name, log) | ( name. clone ( ) , log. metadata . clone ( ) ) )
536- . collect ( ) ,
537- )
538- }
539- }
540-
541443#[ cfg( test) ]
542444mod tests {
543445 use std:: collections:: { HashMap , HashSet } ;
544446
545447 use tempfile:: TempDir ;
546448
547- use crate :: { controller:: metadata :: ReadResult , test:: init_test_logger} ;
449+ use crate :: { controller:: journal :: ReadResult , test:: init_test_logger} ;
548450
549451 use super :: { StepMetadata , StepReader , StepWriter } ;
550452
0 commit comments