Skip to content

Commit dccd6d2

Browse files
committed
[feature] adapters: avro: parallel encoders
Implements parallel encoders for the avro output encoder. Defines cursor types `SplitCursor<'_>` and `SplitCursorBuilder`. `SplitCursorBuilder` can be built from a list of bounds for a partition (typically created by the `partition_keys` method) and the index of the partition we want a cursor for. This builder type is required to be able to send cursors safely between threads. The `SplitCursor<'_>` type requires a reference to the batch, and therefore is not thread safe. To create a builder use the method: `SplitCursorBuilder::from_bounds(batch, bounds, idx, format)`; returns None if the `idx` partition cannot be created from the given bounds. `SplitCursor<'_>` can only be created by calling `SplitCursorBuilder::build()`. This cursor then is only valid for the current partition. Additionally, we create a `AvroParallelEncoder` type for each threadpool worker we want to run. This parallel encoder then sends the enocded batches back to the main thread via a channel. All encoding errors, if any, are gathered and returned when the `AvroEncoder::encode_indexed` method returns. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent 4356771 commit dccd6d2

File tree

5 files changed

+609
-170
lines changed

5 files changed

+609
-170
lines changed

crates/adapterlib/src/catalog.rs

Lines changed: 217 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use apache_avro::{
1414
};
1515
use arrow::record_batch::RecordBatch;
1616
use dbsp::circuit::NodeId;
17-
use dbsp::dynamic::{DynData, DynVec};
17+
use dbsp::dynamic::{ClonableTrait, DynData, DynVec, Factory};
1818
use dbsp::operator::StagedBuffers;
1919
use dyn_clone::DynClone;
2020
use feldera_sqllib::Variant;
@@ -281,6 +281,10 @@ pub trait SerBatchReader: 'static + Send + Sync {
281281

282282
fn snapshot(&self) -> Arc<dyn SerBatchReader>;
283283

284+
fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
285+
286+
fn data_factory(&self) -> &'static dyn Factory<DynData>;
287+
284288
fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
285289

286290
fn partition_keys(&self, num_partitions: usize, bounds: &mut DynVec<DynData>);
@@ -354,6 +358,216 @@ pub trait SerTrace: SerBatchReader {
354358
fn as_batch_reader(&self) -> &dyn SerBatchReader;
355359
}
356360

361+
pub struct SplitCursorBuilder {
362+
batch: Arc<dyn SerBatchReader>,
363+
start_key: Box<DynData>,
364+
end_key: Option<Box<DynData>>,
365+
format: RecordFormat,
366+
}
367+
368+
impl SplitCursorBuilder {
369+
/// Create a [`SplitCursorBuilder`] for partition `index` given a batch,
370+
/// pre-computed partition `bounds` (as returned by
371+
/// [`SerBatchReader::partition_keys`]), and a record `format`.
372+
///
373+
/// `bounds` contains `N-1` boundary keys for `N` partitions.
374+
/// Partition 0 spans from the start of the batch to `bounds[0]`,
375+
/// partition `i` spans from `bounds[i-1]` to `bounds[i]`, and the last
376+
/// partition spans from `bounds[N-2]` to the end of the batch.
377+
///
378+
/// Returns `None` if the partition is empty (the cursor has no key at the
379+
/// start position).
380+
pub fn from_bounds(
381+
batch: Arc<dyn SerBatchReader>,
382+
bounds: &DynVec<DynData>,
383+
index: usize,
384+
format: RecordFormat,
385+
) -> Option<Self> {
386+
let start_bound = if index == 0 {
387+
None
388+
} else if index <= bounds.len() {
389+
Some(bounds.index(index - 1).as_data())
390+
} else {
391+
None
392+
};
393+
394+
let end_bound = if index < bounds.len() {
395+
Some(bounds.index(index).as_data())
396+
} else {
397+
None
398+
};
399+
400+
let start_key = {
401+
let mut cursor = batch.cursor(format.clone()).unwrap();
402+
403+
// Seek to start. If None, the cursor starts at the beginning.
404+
if let Some(start_bound) = start_bound {
405+
cursor.seek_key_exact(start_bound);
406+
}
407+
408+
// Clone the actual key the cursor landed on.
409+
cursor.get_key().map(|s| {
410+
let mut key = batch.data_factory().default_box();
411+
s.clone_to(key.as_mut());
412+
key
413+
})
414+
}?;
415+
416+
let end_key = end_bound.map(|e| {
417+
let mut key = batch.data_factory().default_box();
418+
e.clone_to(key.as_mut());
419+
key
420+
});
421+
422+
Some(SplitCursorBuilder {
423+
batch,
424+
start_key,
425+
end_key,
426+
format,
427+
})
428+
}
429+
430+
pub fn build<'a>(&'a self) -> SplitCursor<'a> {
431+
let mut cursor = self.batch.cursor(self.format.clone()).unwrap();
432+
cursor.seek_key_exact(self.start_key.as_data());
433+
434+
SplitCursor {
435+
cursor,
436+
start_key: self.start_key.clone(),
437+
end_key: self.end_key.clone(),
438+
}
439+
}
440+
}
441+
442+
pub struct SplitCursor<'a> {
443+
cursor: Box<dyn SerCursor + 'a>,
444+
start_key: Box<DynData>,
445+
end_key: Option<Box<DynData>>,
446+
}
447+
448+
impl SplitCursor<'_> {
449+
fn finished(&self) -> bool {
450+
if let Some(ref end_key) = self.end_key
451+
&& let Some(current_key) = self.cursor.get_key()
452+
{
453+
return current_key >= end_key.as_data();
454+
}
455+
456+
false
457+
}
458+
}
459+
460+
impl SerCursor for SplitCursor<'_> {
461+
fn key_valid(&self) -> bool {
462+
self.cursor.key_valid() && !self.finished()
463+
}
464+
465+
fn val_valid(&self) -> bool {
466+
self.cursor.val_valid()
467+
}
468+
469+
fn key(&self) -> &DynData {
470+
self.cursor.key()
471+
}
472+
473+
fn get_key(&self) -> Option<&DynData> {
474+
if !self.key_valid() {
475+
return None;
476+
}
477+
478+
self.cursor.get_key()
479+
}
480+
481+
fn serialize_key(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
482+
self.cursor.serialize_key(dst)
483+
}
484+
485+
fn key_to_json(&mut self) -> AnyResult<serde_json::Value> {
486+
self.cursor.key_to_json()
487+
}
488+
489+
fn serialize_key_fields(
490+
&mut self,
491+
fields: &HashSet<String>,
492+
dst: &mut Vec<u8>,
493+
) -> AnyResult<()> {
494+
self.cursor.serialize_key_fields(fields, dst)
495+
}
496+
497+
fn serialize_key_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
498+
self.cursor.serialize_key_to_arrow(dst)
499+
}
500+
501+
fn serialize_key_to_arrow_with_metadata(
502+
&mut self,
503+
metadata: &dyn erased_serde::Serialize,
504+
dst: &mut ArrayBuilder,
505+
) -> AnyResult<()> {
506+
self.cursor
507+
.serialize_key_to_arrow_with_metadata(metadata, dst)
508+
}
509+
510+
fn serialize_val_to_arrow(&mut self, dst: &mut ArrayBuilder) -> AnyResult<()> {
511+
self.cursor.serialize_val_to_arrow(dst)
512+
}
513+
514+
fn serialize_val_to_arrow_with_metadata(
515+
&mut self,
516+
metadata: &dyn erased_serde::Serialize,
517+
dst: &mut ArrayBuilder,
518+
) -> AnyResult<()> {
519+
self.cursor
520+
.serialize_val_to_arrow_with_metadata(metadata, dst)
521+
}
522+
523+
#[cfg(feature = "with-avro")]
524+
fn key_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
525+
self.cursor.key_to_avro(schema, refs)
526+
}
527+
528+
fn serialize_key_weight(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
529+
self.cursor.serialize_key_weight(dst)
530+
}
531+
532+
fn serialize_val(&mut self, dst: &mut Vec<u8>) -> AnyResult<()> {
533+
self.cursor.serialize_val(dst)
534+
}
535+
536+
fn val_to_json(&mut self) -> AnyResult<serde_json::Value> {
537+
self.cursor.val_to_json()
538+
}
539+
540+
#[cfg(feature = "with-avro")]
541+
fn val_to_avro(&mut self, schema: &AvroSchema, refs: &NamesRef<'_>) -> AnyResult<AvroValue> {
542+
self.cursor.val_to_avro(schema, refs)
543+
}
544+
545+
fn weight(&mut self) -> i64 {
546+
self.cursor.weight()
547+
}
548+
549+
fn step_key(&mut self) {
550+
self.cursor.step_key();
551+
}
552+
553+
fn step_val(&mut self) {
554+
self.cursor.step_val();
555+
}
556+
557+
fn rewind_keys(&mut self) {
558+
self.cursor.rewind_keys();
559+
self.cursor.seek_key_exact(self.start_key.as_data());
560+
}
561+
562+
fn rewind_vals(&mut self) {
563+
self.cursor.rewind_vals();
564+
}
565+
566+
fn seek_key_exact(&mut self, key: &DynData) -> bool {
567+
self.cursor.seek_key_exact(key)
568+
}
569+
}
570+
357571
/// Cursor that allows serializing the contents of a type-erased batch.
358572
///
359573
/// This is a wrapper around the DBSP `Cursor` trait that yields keys and values
@@ -471,11 +685,11 @@ pub trait SerBatchReaderHandle: Send + Sync + DynClone {
471685
fn num_nonempty_mailboxes(&self) -> usize;
472686

473687
/// Like [`OutputHandle::take_from_worker`](`dbsp::OutputHandle::take_from_worker`),
474-
/// but returns output batch as a [`SyncSerBatchReader`] trait object.
688+
/// but returns output batch as a [`SerBatchReader`] trait object.
475689
fn take_from_worker(&self, worker: usize) -> Option<Box<dyn SerBatchReader>>;
476690

477691
/// Like [`OutputHandle::take_from_all`](`dbsp::OutputHandle::take_from_all`),
478-
/// but returns output batches as [`SyncSerBatchReader`] trait objects.
692+
/// but returns output batches as [`SerBatchReader`] trait objects.
479693
fn take_from_all(&self) -> Vec<Arc<dyn SerBatchReader>>;
480694

481695
/// Concatenate outputs from all workers into a single batch reader.

0 commit comments

Comments
 (0)