Skip to content

Commit ea1b6d1

Browse files
committed
[adapters] Fix ad-hoc processing for single-worker output on multihost.
When an operator produces output only to a subset of workers, for a single host we created only as many partitions for ad-hoc processing as outputs. For multi-host output, if we used the same model, then the coordinator would have to inquire about the numbers of outputs on each host. Instead, it's easier to simply assume that there is one output per worker and produce an empty batch for the workers that produced no output. This commit implements that model. This fixes a out-of-range panic for multihost ad-hoc queries for output operators that have fewer outputs than workers. Signed-off-by: Ben Pfaff <blp@feldera.com>
1 parent 2e98d22 commit ea1b6d1

File tree

1 file changed

+15
-6
lines changed

1 file changed

+15
-6
lines changed

crates/adapters/src/adhoc/table.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ struct AdHocQueryExecution {
332332
indexed: bool,
333333
table_schema: Arc<Schema>,
334334
projected_schema: Arc<Schema>,
335-
readers: Option<Vec<Arc<dyn SerBatchReader>>>,
335+
readers: Vec<Arc<dyn SerBatchReader>>,
336336
projection: Option<Vec<usize>>,
337337
plan_properties: PlanProperties,
338338
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -367,7 +367,7 @@ impl AdHocQueryExecution {
367367
indexed,
368368
table_schema,
369369
projected_schema,
370-
readers,
370+
readers: readers.unwrap_or_default(),
371371
projection: projection.cloned(),
372372
plan_properties,
373373
children: vec![],
@@ -454,11 +454,10 @@ use `with ('materialized' = 'true')` for tables, or `create materialized view` f
454454
Ok(())
455455
}
456456

457-
if let Some(readers) = &self.readers {
457+
if let Some(batch_reader) = self.readers.get(partition).cloned() {
458458
let mut builder =
459459
RecordBatchReceiverStreamBuilder::new(self.projected_schema.clone(), 10);
460460
// Returns a single batch when the returned stream is polled
461-
let batch_reader = readers[partition].clone();
462461
let schema = self.table_schema.clone();
463462
let tx = builder.tx();
464463
let projection = self.projection.clone();
@@ -543,8 +542,18 @@ use `with ('materialized' = 'true')` for tables, or `create materialized view` f
543542
builder.build(),
544543
)))
545544
} else {
546-
// The case of no readers can happen if the table has never received any input &
547-
// the circuit has never stepped so the correct response is to send an empty batch
545+
// We did not find a reader. There are two causes:
546+
//
547+
// - There are no readers at all, because the table has never
548+
// received any input, and the circuit has never stepped.
549+
//
550+
// - There are some readers but not for every worker, because the
551+
// output operator only produces output for one worker. (It might
552+
// not even be our worker, it might be on a different host in
553+
// multihost.)
554+
//
555+
// In either case, the correct thing to do is to produce an empty
556+
// batch.
548557
let fut =
549558
futures::future::ready(Ok(RecordBatch::new_empty(self.projected_schema.clone())));
550559
let stream = futures::stream::once(fut);

0 commit comments

Comments
 (0)