-
Notifications
You must be signed in to change notification settings - Fork 111
Expand file tree
/
Copy pathcatalog.rs
More file actions
74 lines (62 loc) · 2.21 KB
/
catalog.rs
File metadata and controls
74 lines (62 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use feldera_adapterlib::errors::controller::ControllerError;
use feldera_types::program_schema::SqlIdentifier;
use std::collections::BTreeMap;
pub use feldera_adapterlib::catalog::*;
/// Circuit catalog implementation.
pub struct Catalog {
input_collection_handles: BTreeMap<SqlIdentifier, InputCollectionHandle>,
output_batch_handles: BTreeMap<SqlIdentifier, OutputCollectionHandles>,
}
impl Default for Catalog {
fn default() -> Self {
Self::new()
}
}
impl Catalog {
pub fn new() -> Self {
Self {
input_collection_handles: BTreeMap::new(),
output_batch_handles: BTreeMap::new(),
}
}
pub fn register_input_collection_handle(
&mut self,
handle: InputCollectionHandle,
) -> Result<(), ControllerError> {
let name = &handle.schema.name;
if self.input_collection_handles.contains_key(name) {
return Err(ControllerError::duplicate_input_stream(&name.sql_name()));
}
self.input_collection_handles.insert(name.clone(), handle);
Ok(())
}
pub fn register_output_batch_handles(
&mut self,
name: &SqlIdentifier,
handles: OutputCollectionHandles,
) -> Result<(), ControllerError> {
if self.output_batch_handles.contains_key(name) {
return Err(ControllerError::duplicate_output_stream(&name.sql_name()));
}
self.output_batch_handles.insert(name.clone(), handles);
Ok(())
}
}
impl CircuitCatalog for Catalog {
/// Look up an input stream handle by name.
fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle> {
self.input_collection_handles.get(name)
}
/// Look up output stream handles by name.
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles> {
self.output_batch_handles.get(name)
}
fn output_handles_mut(&mut self, name: &SqlIdentifier) -> Option<&mut OutputCollectionHandles> {
self.output_batch_handles.get_mut(name)
}
fn output_iter(
&self,
) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_> {
Box::new(self.output_batch_handles.iter())
}
}