Skip to content

Commit 10899f6

Browse files
committed
[adapters][test] Reproduce arrow-ipc adhoc data corruption deterministically
Adds a unit test that drives StreamWriter through the existing ChannelWriter (the sync std::io::Write adapter that spawns one tokio task per write call) and verifies the receiver-side StreamReader can parse the byte stream back. With four record batches and a four-worker tokio runtime the test fails reliably with the same errors report in issues: ParseError("Unable to get root as message: RangeOutOfBounds { range: 655360..655364, .. }") IpcError("Expected schema message, found empty stream.") Refs: #3923 #3792 #4287 #4226 #5814
1 parent 47bb91a commit 10899f6

1 file changed

Lines changed: 155 additions & 1 deletion

File tree

crates/adapters/src/adhoc/executor.rs

Lines changed: 155 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,16 @@ impl std::io::Write for ChannelWriter {
243243
let bytes = Bytes::copy_from_slice(buf);
244244
let len = bytes.len();
245245
let tx = self.tx.clone();
246-
let handle = TOKIO.spawn(async move { tx.send(bytes).await });
246+
let handle = TOKIO.spawn(async move {
247+
// Tests can force a deliberate scheduling reorder of consecutive
248+
// writes via `test_support::force_reorder_writes` to demonstrate
249+
// that the per-call-spawn pattern below loses the ordering the
250+
// sync `StreamWriter` requires. The delay is a no-op outside
251+
// tests and outside the forced-reorder window.
252+
#[cfg(test)]
253+
test_support::maybe_reorder_delay().await;
254+
tx.send(bytes).await
255+
});
247256
self.handles.push(handle);
248257
Ok(len)
249258
}
@@ -257,6 +266,41 @@ impl std::io::Write for ChannelWriter {
257266
}
258267
}
259268

269+
#[cfg(test)]
270+
mod test_support {
271+
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
272+
use std::time::Duration;
273+
274+
/// When `> 0`, the next `force_reorder_writes` consecutive writes through
275+
/// `ChannelWriter::write` get an artificial per-call delay of
276+
/// `(REORDER_REMAINING - 1) * REORDER_STEP_MS` milliseconds — i.e. earlier
277+
/// writes wait longer, so later writes win the race into the receiver and
278+
/// the byte stream comes out in reverse order. This lets a test reproduce
279+
/// the ordering hazard without flakiness.
280+
static REORDER_REMAINING: AtomicI64 = AtomicI64::new(0);
281+
static REORDER_INDEX: AtomicUsize = AtomicUsize::new(0);
282+
const REORDER_STEP_MS: u64 = 5;
283+
284+
pub(crate) fn force_reorder_writes(n: usize) {
285+
REORDER_INDEX.store(0, Ordering::SeqCst);
286+
REORDER_REMAINING.store(n as i64, Ordering::SeqCst);
287+
}
288+
289+
pub(crate) async fn maybe_reorder_delay() {
290+
// Each call decrements the budget; once it hits zero the delay is
291+
// disabled so unrelated tests aren't affected.
292+
let remaining = REORDER_REMAINING.fetch_sub(1, Ordering::SeqCst);
293+
if remaining <= 0 {
294+
return;
295+
}
296+
let i = REORDER_INDEX.fetch_add(1, Ordering::SeqCst);
297+
let delay_ms = (remaining as u64).saturating_sub(1) * REORDER_STEP_MS;
298+
// Suppress unused warning for `i` when there's nothing to vary on.
299+
let _ = i;
300+
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
301+
}
302+
}
303+
260304
pub(crate) fn stream_arrow_query(
261305
df: DataFrame,
262306
) -> impl Stream<Item = Result<Bytes, DataFusionError>> {
@@ -505,4 +549,114 @@ mod tests {
505549
.unwrap();
506550
assert_ne!(h_empty, hash_batches(&schema, &[one]));
507551
}
552+
553+
/// Encodes a few record batches through the production `ChannelWriter` +
554+
/// `StreamWriter` plumbing (the same plumbing `stream_arrow_query` used to
555+
/// rely on) and tries to decode the byte stream the receiver collected.
556+
///
557+
/// The helper is shared by two tests: one runs without reordering and
558+
/// confirms the encoder/decoder pair is otherwise correct; the other
559+
/// activates `test_support::force_reorder_writes` to make every adjacent
560+
/// pair of writes land out of order and demonstrates that the resulting
561+
/// Arrow IPC stream is corrupted. The corruption mode reproduces the
562+
/// non-deterministic failures reported against PR #4226 / issue #4287.
563+
async fn round_trip_through_channel_writer(
564+
force_reorder: bool,
565+
) -> Result<Vec<RecordBatch>, arrow::error::ArrowError> {
566+
use arrow::ipc::reader::StreamReader;
567+
568+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
569+
let batches: Vec<RecordBatch> = (0..4)
570+
.map(|i| {
571+
RecordBatch::try_new(
572+
schema.clone(),
573+
vec![Arc::new(Int32Array::from(vec![i, i + 1, i + 2, i + 3]))],
574+
)
575+
.unwrap()
576+
})
577+
.collect();
578+
579+
let (tx, mut rx) = mpsc::channel::<Bytes>(1024);
580+
// `try_new` writes the schema, plus each `write(&batch)` causes
581+
// ~6 sequential `Write::write` calls (continuation marker, length,
582+
// flatbuf, padding, body, padding). With four batches that's roughly
583+
// 1 (schema) + 4 * 6 = 25 writes; we budget a comfortable upper bound.
584+
if force_reorder {
585+
test_support::force_reorder_writes(64);
586+
}
587+
588+
let mut channel_writer = ChannelWriter::new(tx);
589+
{
590+
let mut writer = StreamWriter::try_new(&mut channel_writer, &schema).unwrap();
591+
for batch in &batches {
592+
writer.write(batch).unwrap();
593+
}
594+
writer.finish().unwrap();
595+
}
596+
let handles = std::mem::take(&mut channel_writer.handles);
597+
drop(channel_writer);
598+
for h in handles {
599+
// Awaiting the handles guarantees every spawned `tx.send` has
600+
// either delivered its bytes or returned. The race we care about
601+
// is the *order* in which deliveries land, not whether they land.
602+
let _ = h.await;
603+
}
604+
605+
let mut buf = Vec::new();
606+
while let Some(bytes) = rx.recv().await {
607+
buf.extend_from_slice(&bytes);
608+
}
609+
610+
let reader = StreamReader::try_new(buf.as_slice(), None)?;
611+
reader.collect::<Result<Vec<_>, _>>()
612+
}
613+
614+
/// Sanity check: without forced reordering, the round trip succeeds and the
615+
/// decoded batches equal the input. This baseline keeps the reordering
616+
/// test honest — if it ever starts passing, that means the producer or
617+
/// helper changed, not that the bug fixed itself.
618+
#[test]
619+
fn channel_writer_roundtrip_baseline() {
620+
let rt = tokio::runtime::Builder::new_multi_thread()
621+
.worker_threads(2)
622+
.enable_all()
623+
.build()
624+
.unwrap();
625+
let decoded = rt
626+
.block_on(round_trip_through_channel_writer(false))
627+
.expect("baseline round trip should not corrupt the stream");
628+
assert_eq!(decoded.len(), 4);
629+
for (i, batch) in decoded.iter().enumerate() {
630+
let col = batch
631+
.column(0)
632+
.as_any()
633+
.downcast_ref::<Int32Array>()
634+
.unwrap();
635+
let i = i as i32;
636+
assert_eq!(col.values(), &[i, i + 1, i + 2, i + 3]);
637+
}
638+
}
639+
640+
/// Demonstrates the bug: `ChannelWriter::write` spawns one tokio task per
641+
/// `std::io::Write::write` call, and the tasks race to deliver their bytes
642+
/// to the mpsc receiver. With more than a couple of writes per record
643+
/// batch the order is not preserved, and the Arrow IPC stream framing is
644+
/// invalid on the receiving end. The same symptom (`Invalid flatbuffers
645+
/// message` / `negative metadata length` / "bytes moved from the middle to
646+
/// the end") was observed in production in issue #4287 and against PR
647+
/// #4226's earlier attempt to use Arrow IPC from the Python SDK.
648+
#[test]
649+
fn channel_writer_corrupts_stream_when_writes_are_reordered() {
650+
let rt = tokio::runtime::Builder::new_multi_thread()
651+
.worker_threads(4)
652+
.enable_all()
653+
.build()
654+
.unwrap();
655+
let result = rt.block_on(round_trip_through_channel_writer(true));
656+
assert!(
657+
result.is_err(),
658+
"expected the StreamReader to reject a stream whose writes the \
659+
ChannelWriter delivered out of order, but it accepted: {result:?}"
660+
);
661+
}
508662
}

0 commit comments

Comments
 (0)