Skip to content

Commit 057758f

Browse files
committed
dbsp_adapters: Specify max step size as per-thread value for Nexmark input.
This is a little easier to dimension than the maximum total step size, at least if the number of generator threads matches the number of worker threads as `run-nexmark.sh` does. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 277f64e commit 057758f

File tree

6 files changed

+24
-18
lines changed

6 files changed

+24
-18
lines changed

benchmark/feldera-sql/benchmarks/nexmark/table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ CREATE TABLE person (
2020
"options": {{
2121
"events": {events},
2222
"threads": {cores},
23-
"batch_size_per_thread": {batchsize},
24-
"max_step_size": {batchsize}
23+
"batch_size_per_thread": 1000,
24+
"max_step_size_per_thread": 10000
2525
}}
2626
}}
2727
}}

crates/adapters/src/transport/nexmark.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ impl InputReader for InputGenerator {
119119
let consumable_batches = self.inner.consumable_batches.load(Ordering::Acquire);
120120
let options = self.inner.options.get().unwrap();
121121
let max_batches = options
122-
.max_step_size
123-
.div_ceil(options.batch_size_per_thread * options.threads as u64);
122+
.max_step_size_per_thread
123+
.div_ceil(options.batch_size_per_thread);
124124

125125
let mut total = 0;
126126
for _ in 0..max_batches {

crates/feldera-types/src/transport/nexmark.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ pub struct NexmarkInputOptions {
5555
/// batches of size `threads × batch_size_per_thread`.
5656
pub batch_size_per_thread: u64,
5757

58-
/// Maximum number of events to submit in a single step. This should be a
59-
/// multiple of `batch_size`.
58+
/// Maximum number of events to submit in a single step, per thread.
59+
///
60+
/// This should really be per worker thread, not per generator thread, but
61+
/// the connector does not know how many worker threads there are.
6062
///
6163
/// This stands in for `max_batch_size` from the connector configuration
6264
/// because it must be a constant across all three of the nexmark tables.
63-
pub max_step_size: u64,
65+
pub max_step_size_per_thread: u64,
6466
}
6567

6668
impl Default for NexmarkInputOptions {
@@ -69,7 +71,7 @@ impl Default for NexmarkInputOptions {
6971
events: 100_000_000,
7072
threads: 4,
7173
batch_size_per_thread: 1_000,
72-
max_step_size: 4_000_000,
74+
max_step_size_per_thread: 10_000,
7375
}
7476
}
7577
}

openapi.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3058,11 +3058,11 @@
30583058
"default": 100000000,
30593059
"minimum": 0
30603060
},
3061-
"max_step_size": {
3061+
"max_step_size_per_thread": {
30623062
"type": "integer",
30633063
"format": "int64",
3064-
"description": "Maximum number of events to submit in a single step. This should be a\nmultiple of `batch_size`.\n\nThis stands in for `max_batch_size` from the connector configuration\nbecause it must be a constant across all three of the nexmark tables.",
3065-
"default": 4000000,
3064+
"description": "Maximum number of events to submit in a single step, per thread.\n\nThis should really be per worker thread, not per generator thread, but\nthe connector does not know how many worker threads there are.\n\nThis stands in for `max_batch_size` from the connector configuration\nbecause it must be a constant across all three of the nexmark tables.",
3065+
"default": 10000,
30663066
"minimum": 0
30673067
},
30683068
"threads": {

web-console/src/lib/services/manager/schemas.gen.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,15 +1361,17 @@ batches of size \`threads × batch_size_per_thread\`.`,
13611361
default: 100000000,
13621362
minimum: 0
13631363
},
1364-
max_step_size: {
1364+
max_step_size_per_thread: {
13651365
type: 'integer',
13661366
format: 'int64',
1367-
description: `Maximum number of events to submit in a single step. This should be a
1368-
multiple of \`batch_size\`.
1367+
description: `Maximum number of events to submit in a single step, per thread.
1368+
1369+
This should really be per worker thread, not per generator thread, but
1370+
the connector does not know how many worker threads there are.
13691371
13701372
This stands in for \`max_batch_size\` from the connector configuration
13711373
because it must be a constant across all three of the nexmark tables.`,
1372-
default: 4000000,
1374+
default: 10000,
13731375
minimum: 0
13741376
},
13751377
threads: {

web-console/src/lib/services/manager/types.gen.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -950,13 +950,15 @@ export type NexmarkInputOptions = {
950950
*/
951951
events?: number
952952
/**
953-
* Maximum number of events to submit in a single step. This should be a
954-
* multiple of `batch_size`.
953+
* Maximum number of events to submit in a single step, per thread.
954+
*
955+
* This should really be per worker thread, not per generator thread, but
956+
* the connector does not know how many worker threads there are.
955957
*
956958
* This stands in for `max_batch_size` from the connector configuration
957959
* because it must be a constant across all three of the nexmark tables.
958960
*/
959-
max_step_size?: number
961+
max_step_size_per_thread?: number
960962
/**
961963
* Number of event generator threads.
962964
*

0 commit comments

Comments
 (0)