@@ -125,6 +125,123 @@ struct ExceptionStack {
125125 stack : Vec < Option < PyBaseExceptionRef > > ,
126126}
127127
128+ /// Stop-the-world state for fork safety. Before `fork()`, the requester
129+ /// stops all other Python threads so they are not holding internal locks.
130+ #[ cfg( all( unix, feature = "threading" ) ) ]
131+ pub struct StopTheWorldState {
132+ /// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`)
133+ pub ( crate ) requested : AtomicBool ,
134+ /// Ident of the thread that requested the stop (like `stw->requester`)
135+ requester : AtomicU64 ,
136+ /// Signaled by suspending threads when their state transitions to SUSPENDED
137+ notify_mutex : std:: sync:: Mutex < ( ) > ,
138+ notify_cv : std:: sync:: Condvar ,
139+ }
140+
141+ #[ cfg( all( unix, feature = "threading" ) ) ]
142+ impl StopTheWorldState {
143+ pub const fn new ( ) -> Self {
144+ Self {
145+ requested : AtomicBool :: new ( false ) ,
146+ requester : AtomicU64 :: new ( 0 ) ,
147+ notify_mutex : std:: sync:: Mutex :: new ( ( ) ) ,
148+ notify_cv : std:: sync:: Condvar :: new ( ) ,
149+ }
150+ }
151+
152+ /// Wake the stop-the-world requester (called by each thread that suspends).
153+ pub ( crate ) fn notify_suspended ( & self ) {
154+ // Just signal the condvar; the requester holds the mutex.
155+ self . notify_cv . notify_one ( ) ;
156+ }
157+
158+ /// Try to CAS detached threads directly to SUSPENDED and check whether
159+ /// all non-requester threads are now SUSPENDED.
160+ /// Like CPython's `park_detached_threads`.
161+ fn park_detached_threads ( & self , vm : & VirtualMachine ) -> bool {
162+ use thread:: { THREAD_ATTACHED , THREAD_DETACHED , THREAD_SUSPENDED } ;
163+ let requester = self . requester . load ( Ordering :: Relaxed ) ;
164+ let registry = vm. state . thread_frames . lock ( ) ;
165+ let mut all_suspended = true ;
166+ for ( & id, slot) in registry. iter ( ) {
167+ if id == requester {
168+ continue ;
169+ }
170+ let state = slot. state . load ( Ordering :: Relaxed ) ;
171+ if state == THREAD_DETACHED {
172+ // CAS DETACHED → SUSPENDED (park without thread cooperation)
173+ let _ = slot. state . compare_exchange (
174+ THREAD_DETACHED ,
175+ THREAD_SUSPENDED ,
176+ Ordering :: AcqRel ,
177+ Ordering :: Relaxed ,
178+ ) ;
179+ all_suspended = false ; // re-check on next poll
180+ } else if state == THREAD_ATTACHED {
181+ // Thread is in bytecode — it will see `requested` and self-suspend
182+ all_suspended = false ;
183+ }
184+ // THREAD_SUSPENDED → already parked
185+ }
186+ if all_suspended {
187+ // Verify once more after dropping the lock
188+ return true ;
189+ }
190+ all_suspended
191+ }
192+
193+ /// Stop all non-requester threads. Like CPython's `stop_the_world`.
194+ ///
195+ /// 1. Sets `requested`, marking the requester thread.
196+ /// 2. CAS detached threads to SUSPENDED.
197+ /// 3. Waits (polling with 1 ms condvar timeout) for attached threads
198+ /// to self-suspend in `check_signals`.
199+ pub fn stop_the_world ( & self , vm : & VirtualMachine ) {
200+ let requester_ident = crate :: stdlib:: thread:: get_ident ( ) ;
201+ self . requester . store ( requester_ident, Ordering :: Relaxed ) ;
202+ self . requested . store ( true , Ordering :: Release ) ;
203+
204+ let deadline = std:: time:: Instant :: now ( ) + std:: time:: Duration :: from_millis ( 500 ) ;
205+ loop {
206+ if self . park_detached_threads ( vm) {
207+ break ;
208+ }
209+ let remaining = deadline. saturating_duration_since ( std:: time:: Instant :: now ( ) ) ;
210+ if remaining. is_zero ( ) {
211+ break ;
212+ }
213+ // Wait up to 1 ms for a thread to notify us it suspended
214+ let wait = remaining. min ( std:: time:: Duration :: from_millis ( 1 ) ) ;
215+ let guard = self . notify_mutex . lock ( ) . unwrap ( ) ;
216+ let _ = self . notify_cv . wait_timeout ( guard, wait) ;
217+ }
218+ }
219+
220+ /// Resume all suspended threads. Like CPython's `start_the_world`.
221+ pub fn start_the_world ( & self , vm : & VirtualMachine ) {
222+ use thread:: { THREAD_DETACHED , THREAD_SUSPENDED } ;
223+ let requester = self . requester . load ( Ordering :: Relaxed ) ;
224+ let registry = vm. state . thread_frames . lock ( ) ;
225+ for ( & id, slot) in registry. iter ( ) {
226+ if id == requester {
227+ continue ;
228+ }
229+ if slot. state . load ( Ordering :: Relaxed ) == THREAD_SUSPENDED {
230+ slot. state . store ( THREAD_DETACHED , Ordering :: Release ) ;
231+ }
232+ }
233+ drop ( registry) ;
234+ self . requested . store ( false , Ordering :: Release ) ;
235+ self . requester . store ( 0 , Ordering :: Relaxed ) ;
236+ }
237+
238+ /// Reset after fork in the child (only one thread alive).
239+ pub fn reset_after_fork ( & self ) {
240+ self . requested . store ( false , Ordering :: Relaxed ) ;
241+ self . requester . store ( 0 , Ordering :: Relaxed ) ;
242+ }
243+ }
244+
128245pub struct PyGlobalState {
129246 pub config : PyConfig ,
130247 pub module_defs : BTreeMap < & ' static str , & ' static builtins:: PyModuleDef > ,
@@ -165,6 +282,9 @@ pub struct PyGlobalState {
165282 /// Incremented on every monitoring state change. Code objects compare their
166283 /// local version against this to decide whether re-instrumentation is needed.
167284 pub instrumentation_version : AtomicU64 ,
285+ /// Stop-the-world state for pre-fork thread suspension
286+ #[ cfg( all( unix, feature = "threading" ) ) ]
287+ pub stop_the_world : StopTheWorldState ,
168288}
169289
170290pub fn process_hash_secret_seed ( ) -> u32 {
@@ -1482,6 +1602,10 @@ impl VirtualMachine {
14821602 return Err ( self . new_exception ( self . ctx . exceptions . system_exit . to_owned ( ) , vec ! [ ] ) ) ;
14831603 }
14841604
1605+ // Suspend this thread if stop-the-world is in progress
1606+ #[ cfg( all( unix, feature = "threading" ) ) ]
1607+ thread:: suspend_if_needed ( & self . state . stop_the_world ) ;
1608+
14851609 #[ cfg( not( target_arch = "wasm32" ) ) ]
14861610 {
14871611 crate :: signal:: check_signals ( self )
0 commit comments