@@ -14,7 +14,7 @@ use apache_avro::{
1414} ;
1515use arrow:: record_batch:: RecordBatch ;
1616use dbsp:: circuit:: NodeId ;
17- use dbsp:: dynamic:: { DynData , DynVec } ;
17+ use dbsp:: dynamic:: { ClonableTrait , DynData , DynVec , Factory } ;
1818use dbsp:: operator:: StagedBuffers ;
1919use dyn_clone:: DynClone ;
2020use feldera_sqllib:: Variant ;
@@ -281,6 +281,10 @@ pub trait SerBatchReader: 'static + Send + Sync {
281281
282282 fn snapshot ( & self ) -> Arc < dyn SerBatchReader > ;
283283
284+ fn keys_factory ( & self ) -> & ' static dyn Factory < DynVec < DynData > > ;
285+
286+ fn data_factory ( & self ) -> & ' static dyn Factory < DynData > ;
287+
284288 fn sample_keys ( & self , sample_size : usize , sample : & mut DynVec < DynData > ) ;
285289
286290 fn partition_keys ( & self , num_partitions : usize , bounds : & mut DynVec < DynData > ) ;
@@ -354,6 +358,216 @@ pub trait SerTrace: SerBatchReader {
354358 fn as_batch_reader ( & self ) -> & dyn SerBatchReader ;
355359}
356360
361+ pub struct SplitCursorBuilder {
362+ batch : Arc < dyn SerBatchReader > ,
363+ start_key : Box < DynData > ,
364+ end_key : Option < Box < DynData > > ,
365+ format : RecordFormat ,
366+ }
367+
368+ impl SplitCursorBuilder {
369+ /// Create a [`SplitCursorBuilder`] for partition `index` given a batch,
370+ /// pre-computed partition `bounds` (as returned by
371+ /// [`SerBatchReader::partition_keys`]), and a record `format`.
372+ ///
373+ /// `bounds` contains `N-1` boundary keys for `N` partitions.
374+ /// Partition 0 spans from the start of the batch to `bounds[0]`,
375+ /// partition `i` spans from `bounds[i-1]` to `bounds[i]`, and the last
376+ /// partition spans from `bounds[N-2]` to the end of the batch.
377+ ///
378+ /// Returns `None` if the partition is empty (the cursor has no key at the
379+ /// start position).
380+ pub fn from_bounds (
381+ batch : Arc < dyn SerBatchReader > ,
382+ bounds : & DynVec < DynData > ,
383+ index : usize ,
384+ format : RecordFormat ,
385+ ) -> Option < Self > {
386+ let start_bound = if index == 0 {
387+ None
388+ } else if index <= bounds. len ( ) {
389+ Some ( bounds. index ( index - 1 ) . as_data ( ) )
390+ } else {
391+ None
392+ } ;
393+
394+ let end_bound = if index < bounds. len ( ) {
395+ Some ( bounds. index ( index) . as_data ( ) )
396+ } else {
397+ None
398+ } ;
399+
400+ let start_key = {
401+ let mut cursor = batch. cursor ( format. clone ( ) ) . unwrap ( ) ;
402+
403+ // Seek to start. If None, the cursor starts at the beginning.
404+ if let Some ( start_bound) = start_bound {
405+ cursor. seek_key_exact ( start_bound) ;
406+ }
407+
408+ // Clone the actual key the cursor landed on.
409+ cursor. get_key ( ) . map ( |s| {
410+ let mut key = batch. data_factory ( ) . default_box ( ) ;
411+ s. clone_to ( key. as_mut ( ) ) ;
412+ key
413+ } )
414+ } ?;
415+
416+ let end_key = end_bound. map ( |e| {
417+ let mut key = batch. data_factory ( ) . default_box ( ) ;
418+ e. clone_to ( key. as_mut ( ) ) ;
419+ key
420+ } ) ;
421+
422+ Some ( SplitCursorBuilder {
423+ batch,
424+ start_key,
425+ end_key,
426+ format,
427+ } )
428+ }
429+
430+ pub fn build < ' a > ( & ' a self ) -> SplitCursor < ' a > {
431+ let mut cursor = self . batch . cursor ( self . format . clone ( ) ) . unwrap ( ) ;
432+ cursor. seek_key_exact ( self . start_key . as_data ( ) ) ;
433+
434+ SplitCursor {
435+ cursor,
436+ start_key : self . start_key . clone ( ) ,
437+ end_key : self . end_key . clone ( ) ,
438+ }
439+ }
440+ }
441+
442+ pub struct SplitCursor < ' a > {
443+ cursor : Box < dyn SerCursor + ' a > ,
444+ start_key : Box < DynData > ,
445+ end_key : Option < Box < DynData > > ,
446+ }
447+
448+ impl SplitCursor < ' _ > {
449+ fn finished ( & self ) -> bool {
450+ if let Some ( ref end_key) = self . end_key
451+ && let Some ( current_key) = self . cursor . get_key ( )
452+ {
453+ return current_key >= end_key. as_data ( ) ;
454+ }
455+
456+ false
457+ }
458+ }
459+
460+ impl SerCursor for SplitCursor < ' _ > {
461+ fn key_valid ( & self ) -> bool {
462+ self . cursor . key_valid ( ) && !self . finished ( )
463+ }
464+
465+ fn val_valid ( & self ) -> bool {
466+ self . cursor . val_valid ( )
467+ }
468+
469+ fn key ( & self ) -> & DynData {
470+ self . cursor . key ( )
471+ }
472+
473+ fn get_key ( & self ) -> Option < & DynData > {
474+ if !self . key_valid ( ) {
475+ return None ;
476+ }
477+
478+ self . cursor . get_key ( )
479+ }
480+
481+ fn serialize_key ( & mut self , dst : & mut Vec < u8 > ) -> AnyResult < ( ) > {
482+ self . cursor . serialize_key ( dst)
483+ }
484+
485+ fn key_to_json ( & mut self ) -> AnyResult < serde_json:: Value > {
486+ self . cursor . key_to_json ( )
487+ }
488+
489+ fn serialize_key_fields (
490+ & mut self ,
491+ fields : & HashSet < String > ,
492+ dst : & mut Vec < u8 > ,
493+ ) -> AnyResult < ( ) > {
494+ self . cursor . serialize_key_fields ( fields, dst)
495+ }
496+
497+ fn serialize_key_to_arrow ( & mut self , dst : & mut ArrayBuilder ) -> AnyResult < ( ) > {
498+ self . cursor . serialize_key_to_arrow ( dst)
499+ }
500+
501+ fn serialize_key_to_arrow_with_metadata (
502+ & mut self ,
503+ metadata : & dyn erased_serde:: Serialize ,
504+ dst : & mut ArrayBuilder ,
505+ ) -> AnyResult < ( ) > {
506+ self . cursor
507+ . serialize_key_to_arrow_with_metadata ( metadata, dst)
508+ }
509+
510+ fn serialize_val_to_arrow ( & mut self , dst : & mut ArrayBuilder ) -> AnyResult < ( ) > {
511+ self . cursor . serialize_val_to_arrow ( dst)
512+ }
513+
514+ fn serialize_val_to_arrow_with_metadata (
515+ & mut self ,
516+ metadata : & dyn erased_serde:: Serialize ,
517+ dst : & mut ArrayBuilder ,
518+ ) -> AnyResult < ( ) > {
519+ self . cursor
520+ . serialize_val_to_arrow_with_metadata ( metadata, dst)
521+ }
522+
523+ #[ cfg( feature = "with-avro" ) ]
524+ fn key_to_avro ( & mut self , schema : & AvroSchema , refs : & NamesRef < ' _ > ) -> AnyResult < AvroValue > {
525+ self . cursor . key_to_avro ( schema, refs)
526+ }
527+
528+ fn serialize_key_weight ( & mut self , dst : & mut Vec < u8 > ) -> AnyResult < ( ) > {
529+ self . cursor . serialize_key_weight ( dst)
530+ }
531+
532+ fn serialize_val ( & mut self , dst : & mut Vec < u8 > ) -> AnyResult < ( ) > {
533+ self . cursor . serialize_val ( dst)
534+ }
535+
536+ fn val_to_json ( & mut self ) -> AnyResult < serde_json:: Value > {
537+ self . cursor . val_to_json ( )
538+ }
539+
540+ #[ cfg( feature = "with-avro" ) ]
541+ fn val_to_avro ( & mut self , schema : & AvroSchema , refs : & NamesRef < ' _ > ) -> AnyResult < AvroValue > {
542+ self . cursor . val_to_avro ( schema, refs)
543+ }
544+
545+ fn weight ( & mut self ) -> i64 {
546+ self . cursor . weight ( )
547+ }
548+
549+ fn step_key ( & mut self ) {
550+ self . cursor . step_key ( ) ;
551+ }
552+
553+ fn step_val ( & mut self ) {
554+ self . cursor . step_val ( ) ;
555+ }
556+
557+ fn rewind_keys ( & mut self ) {
558+ self . cursor . rewind_keys ( ) ;
559+ self . cursor . seek_key_exact ( self . start_key . as_data ( ) ) ;
560+ }
561+
562+ fn rewind_vals ( & mut self ) {
563+ self . cursor . rewind_vals ( ) ;
564+ }
565+
566+ fn seek_key_exact ( & mut self , key : & DynData ) -> bool {
567+ self . cursor . seek_key_exact ( key)
568+ }
569+ }
570+
357571/// Cursor that allows serializing the contents of a type-erased batch.
358572///
359573/// This is a wrapper around the DBSP `Cursor` trait that yields keys and values
@@ -471,11 +685,11 @@ pub trait SerBatchReaderHandle: Send + Sync + DynClone {
471685 fn num_nonempty_mailboxes ( & self ) -> usize ;
472686
473687 /// Like [`OutputHandle::take_from_worker`](`dbsp::OutputHandle::take_from_worker`),
474- /// but returns output batch as a [`SyncSerBatchReader `] trait object.
688+ /// but returns output batch as a [`SerBatchReader `] trait object.
475689 fn take_from_worker ( & self , worker : usize ) -> Option < Box < dyn SerBatchReader > > ;
476690
477691 /// Like [`OutputHandle::take_from_all`](`dbsp::OutputHandle::take_from_all`),
478- /// but returns output batches as [`SyncSerBatchReader `] trait objects.
692+ /// but returns output batches as [`SerBatchReader `] trait objects.
479693 fn take_from_all ( & self ) -> Vec < Arc < dyn SerBatchReader > > ;
480694
481695 /// Concatenate outputs from all workers into a single batch reader.
0 commit comments