Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/adapters/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,19 @@ impl Controller {
self.inner.trace_snapshots.lock().await.get(&step).cloned()
}

/// Steps for which an ad-hoc-query snapshot is currently retained, in
/// ascending order. Used for diagnostics when a requested step's snapshot
/// has already been evicted.
pub async fn available_snapshot_steps(&self) -> Vec<Step> {
self.inner
.trace_snapshots
.lock()
.await
.keys()
.copied()
.collect()
}

pub async fn latest_consistent_snapshot(&self) -> Option<ConsistentSnapshot> {
self.inner
.trace_snapshots
Expand Down
87 changes: 77 additions & 10 deletions crates/adapters/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2856,12 +2856,27 @@ async fn coordination_adhoc_lease(

// Grab the snapshot for the step.
let controller = state.controller()?;
let snapshot = controller.consistent_snapshot(step).await.ok_or_else(|| {
PipelineError::AdHocQueryError {
error: format!("snapshot for step {step} is not available"),
df: None,
let snapshot = match controller.consistent_snapshot(step).await {
Some(snapshot) => snapshot,
None => {
// INSTRUMENTATION: the coordinator leased `step` (its `current_step`),
// but our snapshot for it has already been evicted. Log which steps we
// still retain so we can see how far `current_step` lagged this host.
let retained = controller.available_snapshot_steps().await;
error!(
"adhoc lease: snapshot for step {step} is not available; \
retained snapshot steps = {retained:?} (this host is at step \
{:?})",
retained.last()
);
return Err(PipelineError::AdHocQueryError {
error: format!(
"snapshot for step {step} is not available (retained steps: {retained:?})"
),
df: None,
});
}
})?;
};

// Add the snapshot to the table of leases.
state
Expand Down Expand Up @@ -2963,21 +2978,73 @@ async fn coordination_adhoc_scan(
// one. After the first batch, there is no way to properly report an error,
// so this at least allows us to report errors at the point they are most
// likely.
//
// Crucially, an error that occurs *after* the first batch can no longer
// change the HTTP status (200 OK and the initial bytes are already on the
// wire). All we can do is stop writing, which truncates the Arrow stream;
// the coordinator then observes a generic "Unexpected End of Stream" with no
// hint as to the real cause. To keep that cause from being lost, we log it
// here -- with table/step/worker context -- before the stream terminates.
let first_batch = match stream.next().await {
Some(Err(error)) => return Err(error.into()),
other => other.into_iter(),
};

let schema = stream.schema();
let response_stream = async_stream::try_stream! {
let mut writer = StreamWriter::try_new(Vec::new(), &schema)?;

// Diagnostic context, captured so it can be logged from inside the stream.
let table = scan.table.clone();
let step = scan.step;
let worker = scan.worker;

let response_stream = async_stream::stream! {
let mut writer = match StreamWriter::try_new(Vec::new(), &schema) {
Ok(writer) => writer,
Err(error) => {
error!(
"ad-hoc scan of {table} (step {step}, worker {worker}): failed to \
create the Arrow stream writer: {error}"
);
yield Err(error.into());
return;
}
};
let mut stream = futures_util::stream::iter(first_batch).chain(stream);
while let Some(batch) = stream.next().await {
writer.write(&batch?)?;
yield Bytes::copy_from_slice(writer.get_ref().as_slice());
let batch = match batch {
Ok(batch) => batch,
Err(error) => {
error!(
"ad-hoc scan of {table} (step {step}, worker {worker}): error \

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worst case would be if the partial stream was interpreted as a complete answer. Is that possible?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem unlikely, but I don't know if it's in theory possible. This doesn't make it worse I think. Also, this should only happen if there's a bug in the engine.

producing a record batch after streaming had already begun; the \
response will be truncated and the coordinator will report an \
incomplete Arrow stream: {error}"
);
yield Err(error.into());
return;
}
};
if let Err(error) = writer.write(&batch) {
error!(
"ad-hoc scan of {table} (step {step}, worker {worker}): error encoding \
a record batch into the Arrow stream: {error}"
);
yield Err(error.into());
return;
}
yield Ok(Bytes::copy_from_slice(writer.get_ref().as_slice()));
writer.get_mut().clear();
}
yield writer.into_inner()?.into();
match writer.into_inner() {
Ok(buffer) => yield Ok(Bytes::from(buffer)),
Err(error) => {
error!(
"ad-hoc scan of {table} (step {step}, worker {worker}): error \
finalizing the Arrow stream: {error}"
);
yield Err(error.into());
}
}
};
Ok(HttpResponseBuilder::new(StatusCode::OK).streaming::<_, PipelineError>(response_stream))
}
Expand Down
Loading