@@ -10,8 +10,7 @@ use apache_avro::{types::Value as AvroValue, Schema as AvroSchema};
1010use arrow:: record_batch:: RecordBatch ;
1111use dbsp:: { utils:: Tup2 , InputHandle } ;
1212use feldera_types:: format:: json:: JsonFlavor ;
13- use feldera_types:: program_schema:: canonical_identifier;
14- use feldera_types:: program_schema:: Relation ;
13+ use feldera_types:: program_schema:: { Relation , SqlIdentifier } ;
1514use feldera_types:: query:: OutputQuery ;
1615use feldera_types:: serde_with_context:: SqlSerdeConfig ;
1716use feldera_types:: serialize_struct;
@@ -496,15 +495,21 @@ impl<'a> SerCursor for CursorWithPolarity<'a> {
496495/// A catalog of input and output stream handles of a circuit.
497496pub trait CircuitCatalog : Send {
498497 /// Look up an input stream handle by name.
499- fn input_collection_handle ( & self , name : & str ) -> Option < & InputCollectionHandle > ;
498+ fn input_collection_handle ( & self , name : & SqlIdentifier ) -> Option < & InputCollectionHandle > ;
500499
501- fn output_iter ( & self ) -> Box < dyn Iterator < Item = ( & String , & OutputCollectionHandles ) > + ' _ > ;
500+ fn output_iter (
501+ & self ,
502+ ) -> Box < dyn Iterator < Item = ( & SqlIdentifier , & OutputCollectionHandles ) > + ' _ > ;
502503
503504 /// Look up output stream handles by name.
504- fn output_handles ( & self , name : & str ) -> Option < & OutputCollectionHandles > ;
505+ fn output_handles ( & self , name : & SqlIdentifier ) -> Option < & OutputCollectionHandles > ;
505506
506507 /// Look up output query handles by stream name and query type.
507- fn output_query_handles ( & self , name : & str , query : OutputQuery ) -> Option < OutputQueryHandles > {
508+ fn output_query_handles (
509+ & self ,
510+ name : & SqlIdentifier ,
511+ query : OutputQuery ,
512+ ) -> Option < OutputQueryHandles > {
508513 self . output_handles ( name) . map ( |handles| match query {
509514 OutputQuery :: Table => OutputQueryHandles {
510515 schema : handles. schema . clone ( ) ,
@@ -536,8 +541,8 @@ pub trait CircuitCatalog: Send {
536541
537542/// Circuit catalog implementation.
538543pub struct Catalog {
539- input_collection_handles : BTreeMap < String , InputCollectionHandle > ,
540- output_batch_handles : BTreeMap < String , OutputCollectionHandles > ,
544+ input_collection_handles : BTreeMap < SqlIdentifier , InputCollectionHandle > ,
545+ output_batch_handles : BTreeMap < SqlIdentifier , OutputCollectionHandles > ,
541546}
542547
543548impl Default for Catalog {
@@ -558,11 +563,11 @@ impl Catalog {
558563 & mut self ,
559564 handle : InputCollectionHandle ,
560565 ) -> Result < ( ) , ControllerError > {
561- let name = handle. schema . name ( ) ;
562- if self . input_collection_handles . contains_key ( & name) {
563- return Err ( ControllerError :: duplicate_input_stream ( & name) ) ;
566+ let name = & handle. schema . name ;
567+ if self . input_collection_handles . contains_key ( name) {
568+ return Err ( ControllerError :: duplicate_input_stream ( & name. sql_name ( ) ) ) ;
564569 }
565- self . input_collection_handles . insert ( name, handle) ;
570+ self . input_collection_handles . insert ( name. clone ( ) , handle) ;
566571
567572 Ok ( ( ) )
568573 }
@@ -571,29 +576,30 @@ impl Catalog {
571576 & mut self ,
572577 handles : OutputCollectionHandles ,
573578 ) -> Result < ( ) , ControllerError > {
574- let name = handles. schema . name ( ) ;
575- if self . output_batch_handles . contains_key ( & name) {
576- return Err ( ControllerError :: duplicate_output_stream ( & name) ) ;
579+ let name = & handles. schema . name ;
580+ if self . output_batch_handles . contains_key ( name) {
581+ return Err ( ControllerError :: duplicate_output_stream ( & name. sql_name ( ) ) ) ;
577582 }
578- self . output_batch_handles . insert ( name, handles) ;
583+ self . output_batch_handles . insert ( name. clone ( ) , handles) ;
579584
580585 Ok ( ( ) )
581586 }
582587}
583588
584589impl CircuitCatalog for Catalog {
585590 /// Look up an input stream handle by name.
586- fn input_collection_handle ( & self , name : & str ) -> Option < & InputCollectionHandle > {
587- self . input_collection_handles
588- . get ( & canonical_identifier ( name) )
591+ fn input_collection_handle ( & self , name : & SqlIdentifier ) -> Option < & InputCollectionHandle > {
592+ self . input_collection_handles . get ( name)
589593 }
590594
591595 /// Look up output stream handles by name.
592- fn output_handles ( & self , name : & str ) -> Option < & OutputCollectionHandles > {
593- self . output_batch_handles . get ( & canonical_identifier ( name) )
596+ fn output_handles ( & self , name : & SqlIdentifier ) -> Option < & OutputCollectionHandles > {
597+ self . output_batch_handles . get ( name)
594598 }
595599
596- fn output_iter ( & self ) -> Box < dyn Iterator < Item = ( & String , & OutputCollectionHandles ) > + ' _ > {
600+ fn output_iter (
601+ & self ,
602+ ) -> Box < dyn Iterator < Item = ( & SqlIdentifier , & OutputCollectionHandles ) > + ' _ > {
597603 Box :: new ( self . output_batch_handles . iter ( ) )
598604 }
599605}
0 commit comments