Skip to content

Commit 03b5144

Browse files
committed
docs: avro: document workers parameter
Adds documentation for the `workers` parameter on the Avro format configuration. Default: 4. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
1 parent d0103ac commit 03b5144

6 files changed

Lines changed: 22 additions & 60 deletions

File tree

crates/adapterlib/src/catalog.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ pub trait SerBatchReader: 'static + Send + Sync {
283283

284284
fn keys_factory(&self) -> &'static dyn Factory<DynVec<DynData>>;
285285

286-
fn data_factory(&self) -> &'static dyn Factory<DynData>;
286+
fn key_factory(&self) -> &'static dyn Factory<DynData>;
287287

288288
fn sample_keys(&self, sample_size: usize, sample: &mut DynVec<DynData>);
289289

@@ -407,14 +407,14 @@ impl SplitCursorBuilder {
407407

408408
// Clone the actual key the cursor landed on.
409409
cursor.get_key().map(|s| {
410-
let mut key = batch.data_factory().default_box();
410+
let mut key = batch.key_factory().default_box();
411411
s.clone_to(key.as_mut());
412412
key
413413
})
414414
}?;
415415

416416
let end_key = end_bound.map(|e| {
417-
let mut key = batch.data_factory().default_box();
417+
let mut key = batch.key_factory().default_box();
418418
e.clone_to(key.as_mut());
419419
key
420420
});

crates/adapters/benches/avro_encoder.rs

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
2+
use dbsp::OrdIndexedZSet;
23
use dbsp::utils::Tup2;
3-
use dbsp::{OrdIndexedZSet, OrdZSet};
44
use dbsp_adapters::format::avro::output::AvroEncoder;
55
use dbsp_adapters::static_compile::seroutput::SerBatchImpl;
66
use dbsp_adapters::{Encoder, OutputConsumer, SerBatch};
@@ -66,19 +66,6 @@ impl BenchTestStruct {
6666
properties: BTreeMap::new(),
6767
}
6868
}
69-
70-
fn avro_schema() -> &'static str {
71-
r#"{
72-
"type": "record",
73-
"name": "BenchTestStruct",
74-
"fields": [
75-
{ "name": "id", "type": "long" },
76-
{ "name": "b", "type": "boolean" },
77-
{ "name": "i", "type": ["null", "long"] },
78-
{ "name": "s", "type": "string" }
79-
]
80-
}"#
81-
}
8269
}
8370

8471
#[derive(
@@ -212,15 +199,9 @@ fn build_indexed_batch(data: &[BenchTestStruct]) -> Arc<dyn SerBatch> {
212199
))
213200
}
214201

215-
fn build_plain_batch(data: &[BenchTestStruct]) -> Arc<dyn SerBatch> {
216-
let tuples: Vec<_> = data.iter().map(|v| Tup2(v.clone(), 1i64)).collect();
217-
let zset = OrdZSet::from_keys((), tuples);
218-
Arc::new(<SerBatchImpl<_, BenchTestStruct, ()>>::new(zset))
219-
}
220-
221-
fn create_indexed_encoder(workers: usize) -> AvroEncoder {
202+
fn create_indexed_encoder(threads: usize) -> AvroEncoder {
222203
let config = AvroEncoderConfig {
223-
workers,
204+
threads,
224205
update_format: AvroUpdateFormat::Raw,
225206
skip_schema_id: true,
226207
..Default::default()
@@ -237,26 +218,6 @@ fn create_indexed_encoder(workers: usize) -> AvroEncoder {
237218
.unwrap()
238219
}
239220

240-
fn create_plain_encoder(workers: usize) -> AvroEncoder {
241-
let config = AvroEncoderConfig {
242-
workers,
243-
schema: Some(BenchTestStruct::avro_schema().to_string()),
244-
update_format: AvroUpdateFormat::Raw,
245-
skip_schema_id: true,
246-
..Default::default()
247-
};
248-
let consumer = BenchOutputConsumer::new();
249-
AvroEncoder::create(
250-
"bench_endpoint",
251-
&None,
252-
&BenchTestStruct::relation_schema(),
253-
Box::new(consumer),
254-
config,
255-
None,
256-
)
257-
.unwrap()
258-
}
259-
260221
// ---------------------------------------------------------------------------
261222
// Benchmarks
262223
// ---------------------------------------------------------------------------

crates/adapters/src/format/avro/output.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ impl OutputFormat for AvroOutputFormat {
106106
_ => None,
107107
};
108108

109-
if avro_config.workers == 0 {
109+
if avro_config.threads == 0 {
110110
return Err(ControllerError::invalid_encoder_configuration(
111111
endpoint_name,
112-
"the 'workers' configuration must be greater than zero",
112+
"the 'threads' configuration must be greater than zero",
113113
));
114114
}
115115

@@ -394,8 +394,8 @@ pub struct AvroEncoder {
394394
/// The thread pool for parallel encoding.
395395
pool: Option<ThreadPool>,
396396

397-
/// The number of workers to run in parallel.
398-
workers: usize,
397+
/// The number of worker threads to run in parallel.
398+
threads: usize,
399399
}
400400

401401
/// `true` - this config will create messages with key and value components.
@@ -715,7 +715,7 @@ Consider defining an index with `CREATE INDEX` and setting `index` field in conn
715715
cdc_field: config.cdc_field,
716716
value_avro_schema_with_cdc,
717717
pool: None,
718-
workers: config.workers,
718+
threads: config.threads,
719719
})
720720
}
721721

@@ -812,12 +812,12 @@ Consider defining an index with `CREATE INDEX` and setting `index` field in conn
812812
fn encode_indexed(&mut self, batch: Arc<dyn SerBatchReader>) -> AnyResult<()> {
813813
let pool = self
814814
.pool
815-
.get_or_insert_with(|| ThreadPool::new(self.workers));
815+
.get_or_insert_with(|| ThreadPool::new(self.threads));
816816

817-
let (tx, rx) = crossbeam::channel::bounded(self.workers);
817+
let (tx, rx) = crossbeam::channel::bounded(self.threads);
818818

819819
let mut bounds = batch.keys_factory().default_box();
820-
batch.partition_keys(self.workers, &mut *bounds);
820+
batch.partition_keys(self.threads, &mut *bounds);
821821

822822
for i in 0..=bounds.len() {
823823
let Some(cursor_builder) =

crates/adapters/src/static_compile/seroutput.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ where
476476
self.batch.inner().factories().keys_factory()
477477
}
478478

479-
fn data_factory(&self) -> &'static dyn Factory<DynData> {
479+
fn key_factory(&self) -> &'static dyn Factory<DynData> {
480480
self.batch.inner().factories().key_factory()
481481
}
482482

crates/feldera-types/src/format/avro.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,13 @@ pub struct AvroEncoderConfig {
250250
#[serde(flatten)]
251251
pub registry_config: AvroSchemaRegistryConfig,
252252

253-
/// The number of workers to use during encoding.
253+
/// The number of threads to use during encoding.
254254
///
255255
/// Avro encoder supports encoding multiple records in parallel. This configuration specifies
256-
/// the number of workers to run in parallel.
256+
/// the number of threads to run in parallel.
257257
/// Default: 4
258-
#[serde(default = "default_encoder_workers")]
259-
pub workers: usize,
258+
#[serde(default = "default_encoder_threads")]
259+
pub threads: usize,
260260
}
261261

262262
impl Default for AvroEncoderConfig {
@@ -270,11 +270,11 @@ impl Default for AvroEncoderConfig {
270270
subject_name_strategy: Default::default(),
271271
skip_schema_id: Default::default(),
272272
registry_config: Default::default(),
273-
workers: default_encoder_workers(),
273+
threads: default_encoder_threads(),
274274
}
275275
}
276276
}
277277

278-
fn default_encoder_workers() -> usize {
278+
fn default_encoder_threads() -> usize {
279279
4
280280
}

docs.feldera.com/docs/formats/avro.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ either `registry_urls` or `schema` properties must be specified.
112112
| `registry_username` | string | | Username used to authenticate with the registry.Requires `registry_urls` to be set. This option is mutually exclusive with token-based authentication (see `registry_authorization_token`).|
113113
| `registry_password` | string | | Password used to authenticate with the registry. Requires `registry_urls` to be set.|
114114
| `registry_authorization_token`| string | | Token used to authenticate with the registry. Requires `registry_urls` to be set. This option is mutually exclusive with password-based authentication (see `registry_username` and `registry_password`).|
115+
| `threads` | integer | `4` | Number of parallel worker threads used to encode messages. **Only supported with [indexed outputs](/connectors/unique_keys).** |
115116

116117
### Examples
117118

0 commit comments

Comments
 (0)