Skip to content

Commit 3841d1a

Browse files
committed
Add SqlIdentifier type to handle case rules for field/table names.
Fixes #2374. Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
1 parent f310222 commit 3841d1a

File tree

17 files changed

+629
-452
lines changed

17 files changed

+629
-452
lines changed

Cargo.lock

Lines changed: 224 additions & 194 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/adapters/src/catalog.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ use apache_avro::{types::Value as AvroValue, Schema as AvroSchema};
1010
use arrow::record_batch::RecordBatch;
1111
use dbsp::{utils::Tup2, InputHandle};
1212
use feldera_types::format::json::JsonFlavor;
13-
use feldera_types::program_schema::canonical_identifier;
14-
use feldera_types::program_schema::Relation;
13+
use feldera_types::program_schema::{Relation, SqlIdentifier};
1514
use feldera_types::query::OutputQuery;
1615
use feldera_types::serde_with_context::SqlSerdeConfig;
1716
use feldera_types::serialize_struct;
@@ -496,15 +495,21 @@ impl<'a> SerCursor for CursorWithPolarity<'a> {
496495
/// A catalog of input and output stream handles of a circuit.
497496
pub trait CircuitCatalog: Send {
498497
/// Look up an input stream handle by name.
499-
fn input_collection_handle(&self, name: &str) -> Option<&InputCollectionHandle>;
498+
fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle>;
500499

501-
fn output_iter(&self) -> Box<dyn Iterator<Item = (&String, &OutputCollectionHandles)> + '_>;
500+
fn output_iter(
501+
&self,
502+
) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_>;
502503

503504
/// Look up output stream handles by name.
504-
fn output_handles(&self, name: &str) -> Option<&OutputCollectionHandles>;
505+
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles>;
505506

506507
/// Look up output query handles by stream name and query type.
507-
fn output_query_handles(&self, name: &str, query: OutputQuery) -> Option<OutputQueryHandles> {
508+
fn output_query_handles(
509+
&self,
510+
name: &SqlIdentifier,
511+
query: OutputQuery,
512+
) -> Option<OutputQueryHandles> {
508513
self.output_handles(name).map(|handles| match query {
509514
OutputQuery::Table => OutputQueryHandles {
510515
schema: handles.schema.clone(),
@@ -536,8 +541,8 @@ pub trait CircuitCatalog: Send {
536541

537542
/// Circuit catalog implementation.
538543
pub struct Catalog {
539-
input_collection_handles: BTreeMap<String, InputCollectionHandle>,
540-
output_batch_handles: BTreeMap<String, OutputCollectionHandles>,
544+
input_collection_handles: BTreeMap<SqlIdentifier, InputCollectionHandle>,
545+
output_batch_handles: BTreeMap<SqlIdentifier, OutputCollectionHandles>,
541546
}
542547

543548
impl Default for Catalog {
@@ -558,11 +563,11 @@ impl Catalog {
558563
&mut self,
559564
handle: InputCollectionHandle,
560565
) -> Result<(), ControllerError> {
561-
let name = handle.schema.name();
562-
if self.input_collection_handles.contains_key(&name) {
563-
return Err(ControllerError::duplicate_input_stream(&name));
566+
let name = &handle.schema.name;
567+
if self.input_collection_handles.contains_key(name) {
568+
return Err(ControllerError::duplicate_input_stream(&name.sql_name()));
564569
}
565-
self.input_collection_handles.insert(name, handle);
570+
self.input_collection_handles.insert(name.clone(), handle);
566571

567572
Ok(())
568573
}
@@ -571,29 +576,30 @@ impl Catalog {
571576
&mut self,
572577
handles: OutputCollectionHandles,
573578
) -> Result<(), ControllerError> {
574-
let name = handles.schema.name();
575-
if self.output_batch_handles.contains_key(&name) {
576-
return Err(ControllerError::duplicate_output_stream(&name));
579+
let name = &handles.schema.name;
580+
if self.output_batch_handles.contains_key(name) {
581+
return Err(ControllerError::duplicate_output_stream(&name.sql_name()));
577582
}
578-
self.output_batch_handles.insert(name, handles);
583+
self.output_batch_handles.insert(name.clone(), handles);
579584

580585
Ok(())
581586
}
582587
}
583588

584589
impl CircuitCatalog for Catalog {
585590
/// Look up an input stream handle by name.
586-
fn input_collection_handle(&self, name: &str) -> Option<&InputCollectionHandle> {
587-
self.input_collection_handles
588-
.get(&canonical_identifier(name))
591+
fn input_collection_handle(&self, name: &SqlIdentifier) -> Option<&InputCollectionHandle> {
592+
self.input_collection_handles.get(name)
589593
}
590594

591595
/// Look up output stream handles by name.
592-
fn output_handles(&self, name: &str) -> Option<&OutputCollectionHandles> {
593-
self.output_batch_handles.get(&canonical_identifier(name))
596+
fn output_handles(&self, name: &SqlIdentifier) -> Option<&OutputCollectionHandles> {
597+
self.output_batch_handles.get(name)
594598
}
595599

596-
fn output_iter(&self) -> Box<dyn Iterator<Item = (&String, &OutputCollectionHandles)> + '_> {
600+
fn output_iter(
601+
&self,
602+
) -> Box<dyn Iterator<Item = (&SqlIdentifier, &OutputCollectionHandles)> + '_> {
597603
Box::new(self.output_batch_handles.iter())
598604
}
599605
}

crates/adapters/src/controller/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ pub use feldera_types::config::{
9595
RuntimeConfig, TransportConfig,
9696
};
9797
use feldera_types::format::json::{JsonFlavor, JsonParserConfig, JsonUpdateFormat};
98-
use feldera_types::program_schema::canonical_identifier;
98+
use feldera_types::program_schema::{canonical_identifier, SqlIdentifier};
9999
pub use stats::{ControllerStatus, InputEndpointStatus, OutputEndpointStatus};
100100

101101
/// Maximal number of concurrent API connections per circuit
@@ -1129,11 +1129,8 @@ impl ControllerInner {
11291129
// Sync feldera catalog with datafusion catalog
11301130
for (name, clh) in catalog.output_iter() {
11311131
if clh.integrate_handle.is_some() {
1132-
debug!("registering datafusion table: name={}", clh.schema.name());
1133-
let typ = if catalog
1134-
.input_collection_handle(&clh.schema.name())
1135-
.is_some()
1136-
{
1132+
debug!("registering datafusion table: name={}", clh.schema.name);
1133+
let typ = if catalog.input_collection_handle(&clh.schema.name).is_some() {
11371134
TableType::Base
11381135
} else {
11391136
TableType::View
@@ -1142,15 +1139,15 @@ impl ControllerInner {
11421139
let arrow_fields = relation_to_arrow_fields(&clh.schema.fields, false);
11431140
let adhoc_tbl = Arc::new(AdHocTable::new(
11441141
typ,
1145-
clh.schema.name().to_string(),
1142+
clh.schema.name.to_string(),
11461143
Arc::new(Schema::new(arrow_fields)),
11471144
self.trace_snapshot.clone(),
11481145
));
11491146

11501147
// This should never fail (we're not registering the same table twice).
11511148
let r = self
11521149
.session_ctxt
1153-
.register_table(clh.schema.name().to_string(), adhoc_tbl)
1150+
.register_table(clh.schema.name.sql_name(), adhoc_tbl)
11541151
.expect("table registration failed");
11551152
assert!(r.is_none(), "table {name} already registered");
11561153
}
@@ -1228,7 +1225,7 @@ impl ControllerInner {
12281225

12291226
let catalog = self.catalog.lock().unwrap();
12301227
let input_handle = catalog
1231-
.input_collection_handle(&endpoint_config.stream)
1228+
.input_collection_handle(&SqlIdentifier::from(&endpoint_config.stream))
12321229
.ok_or_else(|| {
12331230
ControllerError::unknown_input_stream(endpoint_name, &endpoint_config.stream)
12341231
})?;
@@ -1412,7 +1409,10 @@ impl ControllerInner {
14121409
.catalog
14131410
.lock()
14141411
.unwrap()
1415-
.output_query_handles(&endpoint_config.stream, endpoint_config.query)
1412+
.output_query_handles(
1413+
&SqlIdentifier::from(&endpoint_config.stream),
1414+
endpoint_config.query,
1415+
)
14161416
.ok_or_else(|| {
14171417
ControllerError::unknown_output_stream(endpoint_name, &endpoint_config.stream)
14181418
})?;

crates/adapters/src/format/json/output.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,7 @@ mod test {
547547
Box::new(consumer),
548548
config,
549549
&Relation::new(
550-
"TestStruct",
551-
false,
550+
"TestStruct".into(),
552551
TestStruct::schema(),
553552
false,
554553
BTreeMap::new(),
@@ -746,8 +745,7 @@ mod test {
746745
Box::new(consumer),
747746
config,
748747
&Relation::new(
749-
"TestStruct",
750-
false,
748+
"TestStruct".into(),
751749
TestStruct::schema(),
752750
false,
753751
BTreeMap::new(),
@@ -780,8 +778,7 @@ mod test {
780778
"TestStruct",
781779
&serde_yaml::to_value(&config).unwrap(),
782780
&Relation::new(
783-
"TestStruct",
784-
false,
781+
"TestStruct".into(),
785782
TestStruct::schema(),
786783
false,
787784
BTreeMap::new(),

crates/adapters/src/format/json/schema.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ pub fn build_key_schema(config: &JsonEncoderConfig, schema: &Relation) -> Option
5151
/// types can have additional parameters, e.g., scale and precision for
5252
/// decimals.
5353
mod kafka_connect_json_converter {
54-
use feldera_types::program_schema::{
55-
canonical_identifier, ColumnType, Field, Relation, SqlType,
56-
};
54+
use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
5755
use serde::{Deserialize, Serialize};
5856
use std::collections::BTreeMap;
5957

@@ -176,12 +174,7 @@ mod kafka_connect_json_converter {
176174
let mut fields = Vec::new();
177175

178176
for field in schema.fields.iter() {
179-
if key_fields.is_none()
180-
|| key_fields
181-
.unwrap()
182-
.iter()
183-
.any(|f| canonical_identifier(f) == field.name())
184-
{
177+
if key_fields.is_none() || key_fields.unwrap().iter().any(|f| field.name == f) {
185178
fields.push(field_schema(field))
186179
}
187180
}
@@ -194,7 +187,7 @@ mod kafka_connect_json_converter {
194187

195188
fn field_schema(schema: &Field) -> JsonField {
196189
JsonField {
197-
field: schema.name.clone(),
190+
field: schema.name.name().clone(),
198191
schema: type_schema(&schema.columntype),
199192
}
200193
}

crates/adapters/src/format/parquet/test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ format:
8585
// Send the data through the mock pipeline
8686
let (endpoint, consumer, zset) = mock_input_pipeline::<TestStruct2, TestStruct2>(
8787
serde_yaml::from_str(&config_str).unwrap(),
88-
Relation::new("test", false, TestStruct2::schema(), false, BTreeMap::new()),
88+
Relation::new("test".into(), TestStruct2::schema(), false, BTreeMap::new()),
8989
)
9090
.unwrap();
9191
sleep(Duration::from_millis(10));
@@ -119,8 +119,7 @@ fn parquet_output() {
119119
Box::new(consumer),
120120
config,
121121
Relation::new(
122-
"TestStruct2",
123-
false,
122+
"TestStruct2".into(),
124123
TestStruct2::schema(),
125124
false,
126125
BTreeMap::new(),

crates/adapters/src/server/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion::error::DataFusionError;
2727
use dbsp::circuit::CircuitConfig;
2828
use dbsp::operator::sample::MAX_QUANTILES;
2929
use env_logger::Env;
30+
use feldera_types::program_schema::SqlIdentifier;
3031
use feldera_types::{format::json::JsonFlavor, transport::http::EgressMode};
3132
use feldera_types::{
3233
query::{AdHocQueryFormat, AdhocQueryArgs, OutputQuery},
@@ -1032,7 +1033,7 @@ async fn output_endpoint(
10321033
.catalog()
10331034
.lock()
10341035
.unwrap()
1035-
.output_handles(&config.stream)
1036+
.output_handles(&SqlIdentifier::from(config.stream))
10361037
// The following `unwrap` is safe because `table_name` was previously
10371038
// validated by `add_output_endpoint`.
10381039
.unwrap()
@@ -1057,7 +1058,7 @@ async fn output_endpoint(
10571058
.catalog()
10581059
.lock()
10591060
.unwrap()
1060-
.output_handles(&config.stream)
1061+
.output_handles(&SqlIdentifier::from(config.stream))
10611062
.unwrap()
10621063
.num_quantiles_handle
10631064
.as_ref()

crates/adapters/src/static_compile/catalog.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -710,13 +710,15 @@ mod test {
710710
})
711711
.unwrap();
712712

713-
let input_map_handle = catalog.input_collection_handle("iNpUt_map").unwrap();
713+
let input_map_handle = catalog
714+
.input_collection_handle(&("iNpUt_map".into()))
715+
.unwrap();
714716
let mut input_stream_handle = input_map_handle
715717
.handle
716718
.configure_deserializer(RECORD_FORMAT.clone())
717719
.unwrap();
718720

719-
let output_stream_handles = catalog.output_handles("Input_map").unwrap();
721+
let output_stream_handles = catalog.output_handles(&("Input_map".into())).unwrap();
720722

721723
// Step 1: insert a couple of values.
722724

0 commit comments

Comments
 (0)