@@ -125,6 +125,124 @@ struct ExceptionStack {
125125 stack : Vec < Option < PyBaseExceptionRef > > ,
126126}
127127
128+ /// Stop-the-world state — mirrors `_stoptheworld_state` in CPython's
129+ /// `pycore_interp_structs.h`. Before `fork()`, the requester stops all other
130+ /// Python threads so they are not holding internal Rust/C locks.
131+ #[ cfg( all( unix, feature = "threading" ) ) ]
132+ pub struct StopTheWorldState {
133+ /// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`)
134+ pub ( crate ) requested : AtomicBool ,
135+ /// Ident of the thread that requested the stop (like `stw->requester`)
136+ requester : AtomicU64 ,
137+ /// Signaled by suspending threads when their state transitions to SUSPENDED
138+ notify_mutex : std:: sync:: Mutex < ( ) > ,
139+ notify_cv : std:: sync:: Condvar ,
140+ }
141+
142+ #[ cfg( all( unix, feature = "threading" ) ) ]
143+ impl StopTheWorldState {
144+ pub const fn new ( ) -> Self {
145+ Self {
146+ requested : AtomicBool :: new ( false ) ,
147+ requester : AtomicU64 :: new ( 0 ) ,
148+ notify_mutex : std:: sync:: Mutex :: new ( ( ) ) ,
149+ notify_cv : std:: sync:: Condvar :: new ( ) ,
150+ }
151+ }
152+
153+ /// Wake the stop-the-world requester (called by each thread that suspends).
154+ pub ( crate ) fn notify_suspended ( & self ) {
155+ // Just signal the condvar; the requester holds the mutex.
156+ self . notify_cv . notify_one ( ) ;
157+ }
158+
159+ /// Try to CAS detached threads directly to SUSPENDED and check whether
160+ /// all non-requester threads are now SUSPENDED.
161+ /// Like CPython's `park_detached_threads`.
162+ fn park_detached_threads ( & self , vm : & VirtualMachine ) -> bool {
163+ use thread:: { THREAD_ATTACHED , THREAD_DETACHED , THREAD_SUSPENDED } ;
164+ let requester = self . requester . load ( Ordering :: Relaxed ) ;
165+ let registry = vm. state . thread_frames . lock ( ) ;
166+ let mut all_suspended = true ;
167+ for ( & id, slot) in registry. iter ( ) {
168+ if id == requester {
169+ continue ;
170+ }
171+ let state = slot. state . load ( Ordering :: Relaxed ) ;
172+ if state == THREAD_DETACHED {
173+ // CAS DETACHED → SUSPENDED (park without thread cooperation)
174+ let _ = slot. state . compare_exchange (
175+ THREAD_DETACHED ,
176+ THREAD_SUSPENDED ,
177+ Ordering :: AcqRel ,
178+ Ordering :: Relaxed ,
179+ ) ;
180+ all_suspended = false ; // re-check on next poll
181+ } else if state == THREAD_ATTACHED {
182+ // Thread is in bytecode — it will see `requested` and self-suspend
183+ all_suspended = false ;
184+ }
185+ // THREAD_SUSPENDED → already parked
186+ }
187+ if all_suspended {
188+ // Verify once more after dropping the lock
189+ return true ;
190+ }
191+ all_suspended
192+ }
193+
194+ /// Stop all non-requester threads. Like CPython's `stop_the_world`.
195+ ///
196+ /// 1. Sets `requested`, marking the requester thread.
197+ /// 2. CAS detached threads to SUSPENDED.
198+ /// 3. Waits (polling with 1 ms condvar timeout) for attached threads
199+ /// to self-suspend in `check_signals`.
200+ pub fn stop_the_world ( & self , vm : & VirtualMachine ) {
201+ let requester_ident = crate :: stdlib:: thread:: get_ident ( ) ;
202+ self . requester . store ( requester_ident, Ordering :: Relaxed ) ;
203+ self . requested . store ( true , Ordering :: Release ) ;
204+
205+ let deadline = std:: time:: Instant :: now ( ) + std:: time:: Duration :: from_millis ( 500 ) ;
206+ loop {
207+ if self . park_detached_threads ( vm) {
208+ break ;
209+ }
210+ let remaining = deadline. saturating_duration_since ( std:: time:: Instant :: now ( ) ) ;
211+ if remaining. is_zero ( ) {
212+ break ;
213+ }
214+ // Wait up to 1 ms for a thread to notify us it suspended
215+ let wait = remaining. min ( std:: time:: Duration :: from_millis ( 1 ) ) ;
216+ let guard = self . notify_mutex . lock ( ) . unwrap ( ) ;
217+ let _ = self . notify_cv . wait_timeout ( guard, wait) ;
218+ }
219+ }
220+
221+ /// Resume all suspended threads. Like CPython's `start_the_world`.
222+ pub fn start_the_world ( & self , vm : & VirtualMachine ) {
223+ use thread:: { THREAD_DETACHED , THREAD_SUSPENDED } ;
224+ let requester = self . requester . load ( Ordering :: Relaxed ) ;
225+ let registry = vm. state . thread_frames . lock ( ) ;
226+ for ( & id, slot) in registry. iter ( ) {
227+ if id == requester {
228+ continue ;
229+ }
230+ if slot. state . load ( Ordering :: Relaxed ) == THREAD_SUSPENDED {
231+ slot. state . store ( THREAD_DETACHED , Ordering :: Release ) ;
232+ }
233+ }
234+ drop ( registry) ;
235+ self . requested . store ( false , Ordering :: Release ) ;
236+ self . requester . store ( 0 , Ordering :: Relaxed ) ;
237+ }
238+
239+ /// Reset after fork in the child (only one thread alive).
240+ pub fn reset_after_fork ( & self ) {
241+ self . requested . store ( false , Ordering :: Relaxed ) ;
242+ self . requester . store ( 0 , Ordering :: Relaxed ) ;
243+ }
244+ }
245+
128246pub struct PyGlobalState {
129247 pub config : PyConfig ,
130248 pub module_defs : BTreeMap < & ' static str , & ' static builtins:: PyModuleDef > ,
@@ -165,6 +283,9 @@ pub struct PyGlobalState {
165283 /// Incremented on every monitoring state change. Code objects compare their
166284 /// local version against this to decide whether re-instrumentation is needed.
167285 pub instrumentation_version : AtomicU64 ,
286+ /// Stop-the-world state for pre-fork thread suspension
287+ #[ cfg( all( unix, feature = "threading" ) ) ]
288+ pub stop_the_world : StopTheWorldState ,
168289}
169290
170291pub fn process_hash_secret_seed ( ) -> u32 {
@@ -1482,6 +1603,10 @@ impl VirtualMachine {
14821603 return Err ( self . new_exception ( self . ctx . exceptions . system_exit . to_owned ( ) , vec ! [ ] ) ) ;
14831604 }
14841605
1606+ // Suspend this thread if stop-the-world is in progress
1607+ #[ cfg( all( unix, feature = "threading" ) ) ]
1608+ thread:: suspend_if_needed ( & self . state . stop_the_world ) ;
1609+
14851610 #[ cfg( not( target_arch = "wasm32" ) ) ]
14861611 {
14871612 crate :: signal:: check_signals ( self )
0 commit comments