@@ -259,7 +259,7 @@ impl Catalog {
259259 . unwrap ( ) ;
260260
261261 // Inputs are also outputs.
262- self . register_materialized_output_map_persistent (
262+ self . register_output_map_persistent (
263263 Self :: output_persistent_id ( & stream) . as_deref ( ) ,
264264 stream,
265265 schema,
@@ -506,6 +506,33 @@ impl Catalog {
506506 self . register_materialized_output_map_persistent ( None , stream, schema)
507507 }
508508
509+ pub fn register_output_map_persistent < K , KD , V , VD > (
510+ & mut self ,
511+ persistent_id : Option < & str > ,
512+ stream : Stream < RootCircuit , OrdIndexedZSet < K , V > > ,
513+ schema : & str ,
514+ ) where
515+ KD : for < ' de > DeserializeWithContext < ' de , SqlSerdeConfig >
516+ + SerializeWithContext < SqlSerdeConfig >
517+ + From < K >
518+ + Send
519+ + Sync
520+ + Debug
521+ + ' static ,
522+ VD : for < ' de > DeserializeWithContext < ' de , SqlSerdeConfig >
523+ + SerializeWithContext < SqlSerdeConfig >
524+ + From < V >
525+ + Default
526+ + Debug
527+ + Clone
528+ + Send
529+ + ' static ,
530+ K : DBData + Send + Sync + From < KD > + Default ,
531+ V : DBData + Send + Sync + From < VD > + Default ,
532+ {
533+ self . register_output_map_persistent_inner ( persistent_id, stream, schema, false )
534+ }
535+
509536 pub fn register_materialized_output_map_persistent < K , KD , V , VD > (
510537 & mut self ,
511538 persistent_id : Option < & str > ,
@@ -529,6 +556,34 @@ impl Catalog {
529556 + ' static ,
530557 K : DBData + Send + Sync + From < KD > + Default ,
531558 V : DBData + Send + Sync + From < VD > + Default ,
559+ {
560+ self . register_output_map_persistent_inner ( persistent_id, stream, schema, true )
561+ }
562+
563+ pub fn register_output_map_persistent_inner < K , KD , V , VD > (
564+ & mut self ,
565+ persistent_id : Option < & str > ,
566+ stream : Stream < RootCircuit , OrdIndexedZSet < K , V > > ,
567+ schema : & str ,
568+ materialized : bool ,
569+ ) where
570+ KD : for < ' de > DeserializeWithContext < ' de , SqlSerdeConfig >
571+ + SerializeWithContext < SqlSerdeConfig >
572+ + From < K >
573+ + Send
574+ + Sync
575+ + Debug
576+ + ' static ,
577+ VD : for < ' de > DeserializeWithContext < ' de , SqlSerdeConfig >
578+ + SerializeWithContext < SqlSerdeConfig >
579+ + From < V >
580+ + Default
581+ + Debug
582+ + Clone
583+ + Send
584+ + ' static ,
585+ K : DBData + Send + Sync + From < KD > + Default ,
586+ V : DBData + Send + Sync + From < VD > + Default ,
532587 {
533588 let schema: Relation = Self :: parse_relation_schema ( schema) . unwrap ( ) ;
534589 let name = schema. name . clone ( ) ;
@@ -546,16 +601,26 @@ impl Catalog {
546601 let ( delta_handle, delta_gid) = delta. accumulate_output_persistent_with_gid ( persistent_id) ;
547602 stream. circuit ( ) . set_mir_node_id ( & delta_gid, persistent_id) ;
548603
549- // `integrate_trace` below should return the existing integral created by the InputUpsert operator.
550- let ( integrate_handle, integral_gid) = stream
551- . integrate_trace ( )
552- . apply ( |s| TypedBatch :: < K , V , ZWeight , _ > :: new ( s. inner ( ) . ro_snapshot ( ) ) )
553- . output_persistent_with_gid (
554- persistent_id. map ( |id| format ! ( "{id}.integral" ) ) . as_deref ( ) ,
555- ) ;
556- stream
557- . circuit ( )
558- . set_mir_node_id ( & integral_gid, persistent_id) ;
604+ let integrate_handle = if materialized {
605+ // `integrate_trace` below should return the existing integral created by the InputUpsert operator.
606+ let ( integrate_handle, integral_gid) = stream
607+ . integrate_trace ( )
608+ . apply ( |s| TypedBatch :: < K , V , ZWeight , _ > :: new ( s. inner ( ) . ro_snapshot ( ) ) )
609+ . output_persistent_with_gid (
610+ persistent_id
611+ . map ( |id| format ! ( "{id}.output_integral" ) )
612+ . as_deref ( ) ,
613+ ) ;
614+ stream
615+ . circuit ( )
616+ . set_mir_node_id ( & integral_gid, persistent_id) ;
617+ Some (
618+ Arc :: new ( <SerCollectionHandleImpl < _ , KD , VD > >:: new ( integrate_handle) )
619+ as Arc < dyn SerBatchReaderHandle > ,
620+ )
621+ } else {
622+ None
623+ } ;
559624
560625 let handles = OutputCollectionHandles {
561626 key_schema : None ,
@@ -564,9 +629,7 @@ impl Catalog {
564629 delta_handle : Box :: new ( <SerCollectionHandleImpl < _ , VD , ( ) > >:: new ( delta_handle) )
565630 as Box < dyn SerBatchReaderHandle > ,
566631 integrate_handle_is_indexed : true ,
567- integrate_handle : Some ( Arc :: new ( <SerCollectionHandleImpl < _ , KD , VD > >:: new (
568- integrate_handle,
569- ) ) as Arc < dyn SerBatchReaderHandle > ) ,
632+ integrate_handle,
570633 } ;
571634
572635 self . register_output_batch_handles ( & name, handles) . unwrap ( ) ;
0 commit comments