Skip to content

Commit a7ad534

Browse files
committed
Suspend Python threads before fork()
Implement CPython-style stop-the-world for pre-fork thread suspension. - Add per-thread state tracking (DETACHED/ATTACHED/SUSPENDED) in ThreadSlot, mirroring CPython's tstate->state. - enter_vm() transitions DETACHED→ATTACHED (CAS, blocks if SUSPENDED); exiting transitions ATTACHED→DETACHED. - StopTheWorldState (like _stoptheworld_state): - stop_the_world(): sets requested flag, CAS parks DETACHED threads, polls with 1ms condvar timeout for ATTACHED threads to self-suspend. - start_the_world(): sets all SUSPENDED→DETACHED, clears flag. - check_signals() calls suspend_if_needed() which transitions ATTACHED→SUSPENDED and waits until released. - os.fork() calls stop_the_world/start_the_world around libc::fork().
1 parent 04cf5da commit a7ad534

File tree

7 files changed

+280
-52
lines changed

7 files changed

+280
-52
lines changed

crates/common/src/lock.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,43 +68,46 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,
6868

6969
// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to
7070

71-
/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
71+
/// Reset a lock to its initial (unlocked) state by zeroing its bytes.
7272
///
73-
/// After `fork()`, locks held by dead parent threads would deadlock in the
74-
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
75-
/// bypassing the normal unlock path which may interact with parking_lot's
76-
/// internal waiter queues.
73+
/// After `fork()`, any lock held by a now-dead thread would remain
74+
/// permanently locked. We zero the raw bytes (the unlocked state for all
75+
/// `parking_lot` raw lock types) instead of using the normal unlock path,
76+
/// which would interact with stale waiter queues.
7777
///
7878
/// # Safety
7979
///
8080
/// Must only be called from the single-threaded child process immediately
8181
/// after `fork()`, before any other thread is created.
82+
/// The type `T` must represent the unlocked state as all-zero bytes
83+
/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.).
8284
#[cfg(unix)]
83-
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
84-
// Use Mutex::raw() to access the underlying lock without layout assumptions.
85-
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
86-
// represent the unlocked state as all-zero bytes.
85+
pub unsafe fn zero_reinit_after_fork<T>(lock: *const T) {
8786
unsafe {
88-
let raw = mutex.raw() as *const RawMutex as *mut u8;
89-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
87+
core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::<T>());
9088
}
9189
}
9290

93-
/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
91+
/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`].
9492
///
95-
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
96-
/// write locks would cause permanent deadlock in the child.
93+
/// # Safety
94+
///
95+
/// Must only be called from the single-threaded child process immediately
96+
/// after `fork()`, before any other thread is created.
97+
#[cfg(unix)]
98+
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
99+
unsafe { zero_reinit_after_fork(mutex.raw()) }
100+
}
101+
102+
/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`].
97103
///
98104
/// # Safety
99105
///
100106
/// Must only be called from the single-threaded child process immediately
101107
/// after `fork()`, before any other thread is created.
102108
#[cfg(unix)]
103109
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
104-
unsafe {
105-
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
106-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
107-
}
110+
unsafe { zero_reinit_after_fork(rwlock.raw()) }
108111
}
109112

110113
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.

crates/vm/src/stdlib/imp.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,7 @@ mod lock {
4747
pub(crate) unsafe fn reinit_after_fork() {
4848
if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() {
4949
// Held by a dead thread — reset to unlocked.
50-
// Same pattern as RLock::_at_fork_reinit in thread.rs.
51-
unsafe {
52-
let old: &crossbeam_utils::atomic::AtomicCell<RawRMutex> =
53-
core::mem::transmute(&IMP_LOCK);
54-
old.swap(RawRMutex::INIT);
55-
}
50+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) };
5651
}
5752
}
5853
}

crates/vm/src/stdlib/posix.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -908,14 +908,22 @@ pub mod module {
908908
fn fork(vm: &VirtualMachine) -> i32 {
909909
warn_if_multi_threaded("fork", vm);
910910

911-
let pid: i32;
912911
py_os_before_fork(vm);
913-
unsafe {
914-
pid = libc::fork();
915-
}
912+
913+
// Like CPython's PyOS_BeforeFork: stop all other Python threads
914+
// so they are at safe points, not holding internal Rust/C locks.
915+
#[cfg(feature = "threading")]
916+
vm.state.stop_the_world.stop_the_world(vm);
917+
918+
let pid = unsafe { libc::fork() };
916919
if pid == 0 {
920+
#[cfg(feature = "threading")]
921+
vm.state.stop_the_world.reset_after_fork();
917922
py_os_after_fork_child(vm);
918923
} else {
924+
// Like CPython's PyOS_AfterFork_Parent: resume all threads
925+
#[cfg(feature = "threading")]
926+
vm.state.stop_the_world.start_the_world(vm);
919927
py_os_after_fork_parent(vm);
920928
}
921929
pid

crates/vm/src/stdlib/thread.rs

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub(crate) mod _thread {
2323
sync::{Arc, Weak},
2424
};
2525
use core::{cell::RefCell, time::Duration};
26-
use crossbeam_utils::atomic::AtomicCell;
2726
use parking_lot::{
2827
RawMutex, RawThreadId,
2928
lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex},
@@ -152,15 +151,9 @@ pub(crate) mod _thread {
152151

153152
#[pymethod]
154153
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
155-
// Reset the mutex to unlocked by directly writing the INIT value.
156-
// Do NOT call unlock() here — after fork(), unlock_slow() would
157-
// try to unpark stale waiters from dead parent threads.
158-
let new_mut = RawMutex::INIT;
159-
unsafe {
160-
let old_mutex: &AtomicCell<RawMutex> = core::mem::transmute(&self.mu);
161-
old_mutex.swap(new_mut);
162-
}
163-
154+
// Overwrite lock state to unlocked. Do NOT call unlock() here —
155+
// after fork(), unlock_slow() would try to unpark stale waiters.
156+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) };
164157
Ok(())
165158
}
166159

@@ -252,16 +245,10 @@ pub(crate) mod _thread {
252245

253246
#[pymethod]
254247
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
255-
// Reset the reentrant mutex to unlocked by directly writing INIT.
256-
// Do NOT call unlock() — after fork(), the slow path would try
257-
// to unpark stale waiters from dead parent threads.
248+
// Overwrite lock state to unlocked. Do NOT call unlock() here —
249+
// after fork(), unlock_slow() would try to unpark stale waiters.
258250
self.count.store(0, core::sync::atomic::Ordering::Relaxed);
259-
let new_mut = RawRMutex::INIT;
260-
unsafe {
261-
let old_mutex: &AtomicCell<RawRMutex> = core::mem::transmute(&self.mu);
262-
old_mutex.swap(new_mut);
263-
}
264-
251+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) };
265252
Ok(())
266253
}
267254

@@ -1021,10 +1008,7 @@ pub(crate) mod _thread {
10211008
/// Reset a parking_lot::Mutex to unlocked state after fork.
10221009
#[cfg(unix)]
10231010
fn reinit_parking_lot_mutex<T: ?Sized>(mutex: &parking_lot::Mutex<T>) {
1024-
unsafe {
1025-
let raw = mutex.raw() as *const parking_lot::RawMutex as *mut u8;
1026-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<parking_lot::RawMutex>());
1027-
}
1011+
unsafe { rustpython_common::lock::zero_reinit_after_fork(mutex.raw()) };
10281012
}
10291013

10301014
// Thread handle state enum

crates/vm/src/vm/interpreter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#[cfg(all(unix, feature = "threading"))]
2+
use super::StopTheWorldState;
13
use super::{Context, PyConfig, PyGlobalState, VirtualMachine, setting::Settings, thread};
24
use crate::{
35
PyResult, builtins, common::rc::PyRc, frozen::FrozenModule, getpath, py_freeze, stdlib::atexit,
@@ -124,6 +126,8 @@ where
124126
monitoring: PyMutex::default(),
125127
monitoring_events: AtomicCell::new(0),
126128
instrumentation_version: AtomicU64::new(0),
129+
#[cfg(all(unix, feature = "threading"))]
130+
stop_the_world: StopTheWorldState::new(),
127131
});
128132

129133
// Create VM with the global state

crates/vm/src/vm/mod.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,124 @@ struct ExceptionStack {
125125
stack: Vec<Option<PyBaseExceptionRef>>,
126126
}
127127

128+
/// Stop-the-world state — mirrors `_stoptheworld_state` in CPython's
129+
/// `pycore_interp_structs.h`. Before `fork()`, the requester stops all other
130+
/// Python threads so they are not holding internal Rust/C locks.
131+
#[cfg(all(unix, feature = "threading"))]
132+
pub struct StopTheWorldState {
133+
/// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`)
134+
pub(crate) requested: AtomicBool,
135+
/// Ident of the thread that requested the stop (like `stw->requester`)
136+
requester: AtomicU64,
137+
/// Signaled by suspending threads when their state transitions to SUSPENDED
138+
notify_mutex: std::sync::Mutex<()>,
139+
notify_cv: std::sync::Condvar,
140+
}
141+
142+
#[cfg(all(unix, feature = "threading"))]
143+
impl StopTheWorldState {
144+
pub const fn new() -> Self {
145+
Self {
146+
requested: AtomicBool::new(false),
147+
requester: AtomicU64::new(0),
148+
notify_mutex: std::sync::Mutex::new(()),
149+
notify_cv: std::sync::Condvar::new(),
150+
}
151+
}
152+
153+
/// Wake the stop-the-world requester (called by each thread that suspends).
154+
pub(crate) fn notify_suspended(&self) {
155+
// Just signal the condvar; the requester holds the mutex.
156+
self.notify_cv.notify_one();
157+
}
158+
159+
/// Try to CAS detached threads directly to SUSPENDED and check whether
160+
/// all non-requester threads are now SUSPENDED.
161+
/// Like CPython's `park_detached_threads`.
162+
fn park_detached_threads(&self, vm: &VirtualMachine) -> bool {
163+
use thread::{THREAD_ATTACHED, THREAD_DETACHED, THREAD_SUSPENDED};
164+
let requester = self.requester.load(Ordering::Relaxed);
165+
let registry = vm.state.thread_frames.lock();
166+
let mut all_suspended = true;
167+
for (&id, slot) in registry.iter() {
168+
if id == requester {
169+
continue;
170+
}
171+
let state = slot.state.load(Ordering::Relaxed);
172+
if state == THREAD_DETACHED {
173+
// CAS DETACHED → SUSPENDED (park without thread cooperation)
174+
let _ = slot.state.compare_exchange(
175+
THREAD_DETACHED,
176+
THREAD_SUSPENDED,
177+
Ordering::AcqRel,
178+
Ordering::Relaxed,
179+
);
180+
all_suspended = false; // re-check on next poll
181+
} else if state == THREAD_ATTACHED {
182+
// Thread is in bytecode — it will see `requested` and self-suspend
183+
all_suspended = false;
184+
}
185+
// THREAD_SUSPENDED → already parked
186+
}
187+
if all_suspended {
188+
// Verify once more after dropping the lock
189+
return true;
190+
}
191+
all_suspended
192+
}
193+
194+
/// Stop all non-requester threads. Like CPython's `stop_the_world`.
195+
///
196+
/// 1. Sets `requested`, marking the requester thread.
197+
/// 2. CAS detached threads to SUSPENDED.
198+
/// 3. Waits (polling with 1 ms condvar timeout) for attached threads
199+
/// to self-suspend in `check_signals`.
200+
pub fn stop_the_world(&self, vm: &VirtualMachine) {
201+
let requester_ident = crate::stdlib::thread::get_ident();
202+
self.requester.store(requester_ident, Ordering::Relaxed);
203+
self.requested.store(true, Ordering::Release);
204+
205+
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
206+
loop {
207+
if self.park_detached_threads(vm) {
208+
break;
209+
}
210+
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
211+
if remaining.is_zero() {
212+
break;
213+
}
214+
// Wait up to 1 ms for a thread to notify us it suspended
215+
let wait = remaining.min(std::time::Duration::from_millis(1));
216+
let guard = self.notify_mutex.lock().unwrap();
217+
let _ = self.notify_cv.wait_timeout(guard, wait);
218+
}
219+
}
220+
221+
/// Resume all suspended threads. Like CPython's `start_the_world`.
222+
pub fn start_the_world(&self, vm: &VirtualMachine) {
223+
use thread::{THREAD_DETACHED, THREAD_SUSPENDED};
224+
let requester = self.requester.load(Ordering::Relaxed);
225+
let registry = vm.state.thread_frames.lock();
226+
for (&id, slot) in registry.iter() {
227+
if id == requester {
228+
continue;
229+
}
230+
if slot.state.load(Ordering::Relaxed) == THREAD_SUSPENDED {
231+
slot.state.store(THREAD_DETACHED, Ordering::Release);
232+
}
233+
}
234+
drop(registry);
235+
self.requested.store(false, Ordering::Release);
236+
self.requester.store(0, Ordering::Relaxed);
237+
}
238+
239+
/// Reset after fork in the child (only one thread alive).
240+
pub fn reset_after_fork(&self) {
241+
self.requested.store(false, Ordering::Relaxed);
242+
self.requester.store(0, Ordering::Relaxed);
243+
}
244+
}
245+
128246
pub struct PyGlobalState {
129247
pub config: PyConfig,
130248
pub module_defs: BTreeMap<&'static str, &'static builtins::PyModuleDef>,
@@ -165,6 +283,9 @@ pub struct PyGlobalState {
165283
/// Incremented on every monitoring state change. Code objects compare their
166284
/// local version against this to decide whether re-instrumentation is needed.
167285
pub instrumentation_version: AtomicU64,
286+
/// Stop-the-world state for pre-fork thread suspension
287+
#[cfg(all(unix, feature = "threading"))]
288+
pub stop_the_world: StopTheWorldState,
168289
}
169290

170291
pub fn process_hash_secret_seed() -> u32 {
@@ -1482,6 +1603,10 @@ impl VirtualMachine {
14821603
return Err(self.new_exception(self.ctx.exceptions.system_exit.to_owned(), vec![]));
14831604
}
14841605

1606+
// Suspend this thread if stop-the-world is in progress
1607+
#[cfg(all(unix, feature = "threading"))]
1608+
thread::suspend_if_needed(&self.state.stop_the_world);
1609+
14851610
#[cfg(not(target_arch = "wasm32"))]
14861611
{
14871612
crate::signal::check_signals(self)

0 commit comments

Comments
 (0)