@@ -243,7 +243,16 @@ impl std::io::Write for ChannelWriter {
243243 let bytes = Bytes :: copy_from_slice ( buf) ;
244244 let len = bytes. len ( ) ;
245245 let tx = self . tx . clone ( ) ;
246- let handle = TOKIO . spawn ( async move { tx. send ( bytes) . await } ) ;
246+ let handle = TOKIO . spawn ( async move {
247+ // Tests can force a deliberate scheduling reorder of consecutive
248+ // writes via `test_support::force_reorder_writes` to demonstrate
249+ // that the per-call-spawn pattern below loses the ordering the
250+ // sync `StreamWriter` requires. The delay is a no-op outside
251+ // tests and outside the forced-reorder window.
252+ #[ cfg( test) ]
253+ test_support:: maybe_reorder_delay ( ) . await ;
254+ tx. send ( bytes) . await
255+ } ) ;
247256 self . handles . push ( handle) ;
248257 Ok ( len)
249258 }
@@ -257,6 +266,41 @@ impl std::io::Write for ChannelWriter {
257266 }
258267}
259268
269+ #[ cfg( test) ]
270+ mod test_support {
271+ use std:: sync:: atomic:: { AtomicI64 , AtomicUsize , Ordering } ;
272+ use std:: time:: Duration ;
273+
274+ /// When `> 0`, the next `force_reorder_writes` consecutive writes through
275+ /// `ChannelWriter::write` get an artificial per-call delay of
276+ /// `(REORDER_REMAINING - 1) * REORDER_STEP_MS` milliseconds — i.e. earlier
277+ /// writes wait longer, so later writes win the race into the receiver and
278+ /// the byte stream comes out in reverse order. This lets a test reproduce
279+ /// the ordering hazard without flakiness.
280+ static REORDER_REMAINING : AtomicI64 = AtomicI64 :: new ( 0 ) ;
281+ static REORDER_INDEX : AtomicUsize = AtomicUsize :: new ( 0 ) ;
282+ const REORDER_STEP_MS : u64 = 5 ;
283+
284+ pub ( crate ) fn force_reorder_writes ( n : usize ) {
285+ REORDER_INDEX . store ( 0 , Ordering :: SeqCst ) ;
286+ REORDER_REMAINING . store ( n as i64 , Ordering :: SeqCst ) ;
287+ }
288+
289+ pub ( crate ) async fn maybe_reorder_delay ( ) {
290+ // Each call decrements the budget; once it hits zero the delay is
291+ // disabled so unrelated tests aren't affected.
292+ let remaining = REORDER_REMAINING . fetch_sub ( 1 , Ordering :: SeqCst ) ;
293+ if remaining <= 0 {
294+ return ;
295+ }
296+ let i = REORDER_INDEX . fetch_add ( 1 , Ordering :: SeqCst ) ;
297+ let delay_ms = ( remaining as u64 ) . saturating_sub ( 1 ) * REORDER_STEP_MS ;
298+ // Suppress unused warning for `i` when there's nothing to vary on.
299+ let _ = i;
300+ tokio:: time:: sleep ( Duration :: from_millis ( delay_ms) ) . await ;
301+ }
302+ }
303+
260304pub ( crate ) fn stream_arrow_query (
261305 df : DataFrame ,
262306) -> impl Stream < Item = Result < Bytes , DataFusionError > > {
@@ -505,4 +549,114 @@ mod tests {
505549 . unwrap ( ) ;
506550 assert_ne ! ( h_empty, hash_batches( & schema, & [ one] ) ) ;
507551 }
552+
553+ /// Encodes a few record batches through the production `ChannelWriter` +
554+ /// `StreamWriter` plumbing (the same plumbing `stream_arrow_query` used to
555+ /// rely on) and tries to decode the byte stream the receiver collected.
556+ ///
557+ /// The helper is shared by two tests: one runs without reordering and
558+ /// confirms the encoder/decoder pair is otherwise correct; the other
559+ /// activates `test_support::force_reorder_writes` to make every adjacent
560+ /// pair of writes land out of order and demonstrates that the resulting
561+ /// Arrow IPC stream is corrupted. The corruption mode reproduces the
562+ /// non-deterministic failures reported against PR #4226 / issue #4287.
563+ async fn round_trip_through_channel_writer (
564+ force_reorder : bool ,
565+ ) -> Result < Vec < RecordBatch > , arrow:: error:: ArrowError > {
566+ use arrow:: ipc:: reader:: StreamReader ;
567+
568+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ;
569+ let batches: Vec < RecordBatch > = ( 0 ..4 )
570+ . map ( |i| {
571+ RecordBatch :: try_new (
572+ schema. clone ( ) ,
573+ vec ! [ Arc :: new( Int32Array :: from( vec![ i, i + 1 , i + 2 , i + 3 ] ) ) ] ,
574+ )
575+ . unwrap ( )
576+ } )
577+ . collect ( ) ;
578+
579+ let ( tx, mut rx) = mpsc:: channel :: < Bytes > ( 1024 ) ;
580+ // `try_new` writes the schema, plus each `write(&batch)` causes
581+ // ~6 sequential `Write::write` calls (continuation marker, length,
582+ // flatbuf, padding, body, padding). With four batches that's roughly
583+ // 1 (schema) + 4 * 6 = 25 writes; we budget a comfortable upper bound.
584+ if force_reorder {
585+ test_support:: force_reorder_writes ( 64 ) ;
586+ }
587+
588+ let mut channel_writer = ChannelWriter :: new ( tx) ;
589+ {
590+ let mut writer = StreamWriter :: try_new ( & mut channel_writer, & schema) . unwrap ( ) ;
591+ for batch in & batches {
592+ writer. write ( batch) . unwrap ( ) ;
593+ }
594+ writer. finish ( ) . unwrap ( ) ;
595+ }
596+ let handles = std:: mem:: take ( & mut channel_writer. handles ) ;
597+ drop ( channel_writer) ;
598+ for h in handles {
599+ // Awaiting the handles guarantees every spawned `tx.send` has
600+ // either delivered its bytes or returned. The race we care about
601+ // is the *order* in which deliveries land, not whether they land.
602+ let _ = h. await ;
603+ }
604+
605+ let mut buf = Vec :: new ( ) ;
606+ while let Some ( bytes) = rx. recv ( ) . await {
607+ buf. extend_from_slice ( & bytes) ;
608+ }
609+
610+ let reader = StreamReader :: try_new ( buf. as_slice ( ) , None ) ?;
611+ reader. collect :: < Result < Vec < _ > , _ > > ( )
612+ }
613+
614+ /// Sanity check: without forced reordering, the round trip succeeds and the
615+ /// decoded batches equal the input. This baseline keeps the reordering
616+ /// test honest — if it ever starts passing, that means the producer or
617+ /// helper changed, not that the bug fixed itself.
618+ #[ test]
619+ fn channel_writer_roundtrip_baseline ( ) {
620+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
621+ . worker_threads ( 2 )
622+ . enable_all ( )
623+ . build ( )
624+ . unwrap ( ) ;
625+ let decoded = rt
626+ . block_on ( round_trip_through_channel_writer ( false ) )
627+ . expect ( "baseline round trip should not corrupt the stream" ) ;
628+ assert_eq ! ( decoded. len( ) , 4 ) ;
629+ for ( i, batch) in decoded. iter ( ) . enumerate ( ) {
630+ let col = batch
631+ . column ( 0 )
632+ . as_any ( )
633+ . downcast_ref :: < Int32Array > ( )
634+ . unwrap ( ) ;
635+ let i = i as i32 ;
636+ assert_eq ! ( col. values( ) , & [ i, i + 1 , i + 2 , i + 3 ] ) ;
637+ }
638+ }
639+
640+ /// Demonstrates the bug: `ChannelWriter::write` spawns one tokio task per
641+ /// `std::io::Write::write` call, and the tasks race to deliver their bytes
642+ /// to the mpsc receiver. With more than a couple of writes per record
643+ /// batch the order is not preserved, and the Arrow IPC stream framing is
644+ /// invalid on the receiving end. The same symptom (`Invalid flatbuffers
645+ /// message` / `negative metadata length` / "bytes moved from the middle to
646+ /// the end") was observed in production in issue #4287 and against PR
647+ /// #4226's earlier attempt to use Arrow IPC from the Python SDK.
648+ #[ test]
649+ fn channel_writer_corrupts_stream_when_writes_are_reordered ( ) {
650+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
651+ . worker_threads ( 4 )
652+ . enable_all ( )
653+ . build ( )
654+ . unwrap ( ) ;
655+ let result = rt. block_on ( round_trip_through_channel_writer ( true ) ) ;
656+ assert ! (
657+ result. is_err( ) ,
658+ "expected the StreamReader to reject a stream whose writes the \
659+ ChannelWriter delivered out of order, but it accepted: {result:?}"
660+ ) ;
661+ }
508662}
0 commit comments