Skip to content

Commit c7de8f0

Browse files
committed
Suspend Python threads before fork()
Implement CPython-style stop-the-world for pre-fork thread suspension. - Add per-thread state tracking (DETACHED/ATTACHED/SUSPENDED) in ThreadSlot, mirroring CPython's tstate->state. - enter_vm() transitions DETACHED→ATTACHED (CAS, blocks if SUSPENDED); exiting transitions ATTACHED→DETACHED. - StopTheWorldState (like _stoptheworld_state): - stop_the_world(): sets requested flag, CAS parks DETACHED threads, polls with 1ms condvar timeout for ATTACHED threads to self-suspend. - start_the_world(): sets all SUSPENDED→DETACHED, clears flag. - check_signals() calls suspend_if_needed() which transitions ATTACHED→SUSPENDED and waits until released. - os.fork() calls stop_the_world/start_the_world around libc::fork().
1 parent 04cf5da commit c7de8f0

7 files changed

Lines changed: 280 additions & 53 deletions

File tree

crates/common/src/lock.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,43 +68,45 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock,
6868

6969
// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to
7070

71-
/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
71+
/// Reset a lock to its initial (unlocked) state by zeroing its bytes.
7272
///
73-
/// After `fork()`, locks held by dead parent threads would deadlock in the
74-
/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor,
75-
/// bypassing the normal unlock path which may interact with parking_lot's
76-
/// internal waiter queues.
73+
/// After `fork()`, any lock held by a now-dead thread would remain
74+
/// permanently locked. We zero the raw bytes (the unlocked state for all
75+
/// `parking_lot` raw lock types) instead of using the normal unlock path,
76+
/// which would interact with stale waiter queues.
7777
///
7878
/// # Safety
7979
///
8080
/// Must only be called from the single-threaded child process immediately
8181
/// after `fork()`, before any other thread is created.
82-
#[cfg(unix)]
83-
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
84-
// Use Mutex::raw() to access the underlying lock without layout assumptions.
85-
// parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell<bool>) both
86-
// represent the unlocked state as all-zero bytes.
82+
/// The type `T` must represent the unlocked state as all-zero bytes
83+
/// (true for `parking_lot::RawMutex`, `RawRwLock`, `RawReentrantMutex`, etc.).
84+
pub unsafe fn zero_reinit_after_fork<T>(lock: *const T) {
8785
unsafe {
88-
let raw = mutex.raw() as *const RawMutex as *mut u8;
89-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawMutex>());
86+
core::ptr::write_bytes(lock as *mut u8, 0, core::mem::size_of::<T>());
9087
}
9188
}
9289

93-
/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`.
90+
/// Reset a `PyMutex` after `fork()`. See [`zero_reinit_after_fork`].
91+
///
92+
/// # Safety
9493
///
95-
/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or
96-
/// write locks would cause permanent deadlock in the child.
94+
/// Must only be called from the single-threaded child process immediately
95+
/// after `fork()`, before any other thread is created.
96+
#[cfg(unix)]
97+
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
98+
unsafe { zero_reinit_after_fork(mutex.raw()) }
99+
}
100+
101+
/// Reset a `PyRwLock` after `fork()`. See [`zero_reinit_after_fork`].
97102
///
98103
/// # Safety
99104
///
100105
/// Must only be called from the single-threaded child process immediately
101106
/// after `fork()`, before any other thread is created.
102107
#[cfg(unix)]
103108
pub unsafe fn reinit_rwlock_after_fork<T: ?Sized>(rwlock: &PyRwLock<T>) {
104-
unsafe {
105-
let raw = rwlock.raw() as *const RawRwLock as *mut u8;
106-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<RawRwLock>());
107-
}
109+
unsafe { zero_reinit_after_fork(rwlock.raw()) }
108110
}
109111

110112
/// Reset a `PyThreadMutex` to its initial (unlocked, unowned) state after `fork()`.

crates/vm/src/stdlib/imp.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,7 @@ mod lock {
4747
pub(crate) unsafe fn reinit_after_fork() {
4848
if IMP_LOCK.is_locked() && !IMP_LOCK.is_owned_by_current_thread() {
4949
// Held by a dead thread — reset to unlocked.
50-
// Same pattern as RLock::_at_fork_reinit in thread.rs.
51-
unsafe {
52-
let old: &crossbeam_utils::atomic::AtomicCell<RawRMutex> =
53-
core::mem::transmute(&IMP_LOCK);
54-
old.swap(RawRMutex::INIT);
55-
}
50+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&IMP_LOCK) };
5651
}
5752
}
5853
}

crates/vm/src/stdlib/posix.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -908,14 +908,22 @@ pub mod module {
908908
fn fork(vm: &VirtualMachine) -> i32 {
909909
warn_if_multi_threaded("fork", vm);
910910

911-
let pid: i32;
912911
py_os_before_fork(vm);
913-
unsafe {
914-
pid = libc::fork();
915-
}
912+
913+
// Like CPython's PyOS_BeforeFork: stop all other Python threads
914+
// so they are at safe points, not holding internal Rust/C locks.
915+
#[cfg(feature = "threading")]
916+
vm.state.stop_the_world.stop_the_world(vm);
917+
918+
let pid = unsafe { libc::fork() };
916919
if pid == 0 {
920+
#[cfg(feature = "threading")]
921+
vm.state.stop_the_world.reset_after_fork();
917922
py_os_after_fork_child(vm);
918923
} else {
924+
// Like CPython's PyOS_AfterFork_Parent: resume all threads
925+
#[cfg(feature = "threading")]
926+
vm.state.stop_the_world.start_the_world(vm);
919927
py_os_after_fork_parent(vm);
920928
}
921929
pid

crates/vm/src/stdlib/thread.rs

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub(crate) mod _thread {
2323
sync::{Arc, Weak},
2424
};
2525
use core::{cell::RefCell, time::Duration};
26-
use crossbeam_utils::atomic::AtomicCell;
2726
use parking_lot::{
2827
RawMutex, RawThreadId,
2928
lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex},
@@ -150,17 +149,12 @@ pub(crate) mod _thread {
150149
Ok(())
151150
}
152151

152+
#[cfg(unix)]
153153
#[pymethod]
154154
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
155-
// Reset the mutex to unlocked by directly writing the INIT value.
156-
// Do NOT call unlock() here — after fork(), unlock_slow() would
157-
// try to unpark stale waiters from dead parent threads.
158-
let new_mut = RawMutex::INIT;
159-
unsafe {
160-
let old_mutex: &AtomicCell<RawMutex> = core::mem::transmute(&self.mu);
161-
old_mutex.swap(new_mut);
162-
}
163-
155+
// Overwrite lock state to unlocked. Do NOT call unlock() here —
156+
// after fork(), unlock_slow() would try to unpark stale waiters.
157+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) };
164158
Ok(())
165159
}
166160

@@ -250,18 +244,13 @@ pub(crate) mod _thread {
250244
Ok(())
251245
}
252246

247+
#[cfg(unix)]
253248
#[pymethod]
254249
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
255-
// Reset the reentrant mutex to unlocked by directly writing INIT.
256-
// Do NOT call unlock() — after fork(), the slow path would try
257-
// to unpark stale waiters from dead parent threads.
250+
// Overwrite lock state to unlocked. Do NOT call unlock() here —
251+
// after fork(), unlock_slow() would try to unpark stale waiters.
258252
self.count.store(0, core::sync::atomic::Ordering::Relaxed);
259-
let new_mut = RawRMutex::INIT;
260-
unsafe {
261-
let old_mutex: &AtomicCell<RawRMutex> = core::mem::transmute(&self.mu);
262-
old_mutex.swap(new_mut);
263-
}
264-
253+
unsafe { rustpython_common::lock::zero_reinit_after_fork(&self.mu) };
265254
Ok(())
266255
}
267256

@@ -1021,10 +1010,7 @@ pub(crate) mod _thread {
10211010
/// Reset a parking_lot::Mutex to unlocked state after fork.
10221011
#[cfg(unix)]
10231012
fn reinit_parking_lot_mutex<T: ?Sized>(mutex: &parking_lot::Mutex<T>) {
1024-
unsafe {
1025-
let raw = mutex.raw() as *const parking_lot::RawMutex as *mut u8;
1026-
core::ptr::write_bytes(raw, 0, core::mem::size_of::<parking_lot::RawMutex>());
1027-
}
1013+
unsafe { rustpython_common::lock::zero_reinit_after_fork(mutex.raw()) };
10281014
}
10291015

10301016
// Thread handle state enum

crates/vm/src/vm/interpreter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#[cfg(all(unix, feature = "threading"))]
2+
use super::StopTheWorldState;
13
use super::{Context, PyConfig, PyGlobalState, VirtualMachine, setting::Settings, thread};
24
use crate::{
35
PyResult, builtins, common::rc::PyRc, frozen::FrozenModule, getpath, py_freeze, stdlib::atexit,
@@ -124,6 +126,8 @@ where
124126
monitoring: PyMutex::default(),
125127
monitoring_events: AtomicCell::new(0),
126128
instrumentation_version: AtomicU64::new(0),
129+
#[cfg(all(unix, feature = "threading"))]
130+
stop_the_world: StopTheWorldState::new(),
127131
});
128132

129133
// Create VM with the global state

crates/vm/src/vm/mod.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,123 @@ struct ExceptionStack {
125125
stack: Vec<Option<PyBaseExceptionRef>>,
126126
}
127127

128+
/// Stop-the-world state for fork safety. Before `fork()`, the requester
129+
/// stops all other Python threads so they are not holding internal locks.
130+
#[cfg(all(unix, feature = "threading"))]
131+
pub struct StopTheWorldState {
132+
/// Fast-path flag checked in the bytecode loop (like `_PY_EVAL_PLEASE_STOP_BIT`)
133+
pub(crate) requested: AtomicBool,
134+
/// Ident of the thread that requested the stop (like `stw->requester`)
135+
requester: AtomicU64,
136+
/// Signaled by suspending threads when their state transitions to SUSPENDED
137+
notify_mutex: std::sync::Mutex<()>,
138+
notify_cv: std::sync::Condvar,
139+
}
140+
141+
#[cfg(all(unix, feature = "threading"))]
142+
impl StopTheWorldState {
143+
pub const fn new() -> Self {
144+
Self {
145+
requested: AtomicBool::new(false),
146+
requester: AtomicU64::new(0),
147+
notify_mutex: std::sync::Mutex::new(()),
148+
notify_cv: std::sync::Condvar::new(),
149+
}
150+
}
151+
152+
/// Wake the stop-the-world requester (called by each thread that suspends).
153+
pub(crate) fn notify_suspended(&self) {
154+
// Just signal the condvar; the requester holds the mutex.
155+
self.notify_cv.notify_one();
156+
}
157+
158+
/// Try to CAS detached threads directly to SUSPENDED and check whether
159+
/// all non-requester threads are now SUSPENDED.
160+
/// Like CPython's `park_detached_threads`.
161+
fn park_detached_threads(&self, vm: &VirtualMachine) -> bool {
162+
use thread::{THREAD_ATTACHED, THREAD_DETACHED, THREAD_SUSPENDED};
163+
let requester = self.requester.load(Ordering::Relaxed);
164+
let registry = vm.state.thread_frames.lock();
165+
let mut all_suspended = true;
166+
for (&id, slot) in registry.iter() {
167+
if id == requester {
168+
continue;
169+
}
170+
let state = slot.state.load(Ordering::Relaxed);
171+
if state == THREAD_DETACHED {
172+
// CAS DETACHED → SUSPENDED (park without thread cooperation)
173+
let _ = slot.state.compare_exchange(
174+
THREAD_DETACHED,
175+
THREAD_SUSPENDED,
176+
Ordering::AcqRel,
177+
Ordering::Relaxed,
178+
);
179+
all_suspended = false; // re-check on next poll
180+
} else if state == THREAD_ATTACHED {
181+
// Thread is in bytecode — it will see `requested` and self-suspend
182+
all_suspended = false;
183+
}
184+
// THREAD_SUSPENDED → already parked
185+
}
186+
if all_suspended {
187+
// Verify once more after dropping the lock
188+
return true;
189+
}
190+
all_suspended
191+
}
192+
193+
/// Stop all non-requester threads. Like CPython's `stop_the_world`.
194+
///
195+
/// 1. Sets `requested`, marking the requester thread.
196+
/// 2. CAS detached threads to SUSPENDED.
197+
/// 3. Waits (polling with 1 ms condvar timeout) for attached threads
198+
/// to self-suspend in `check_signals`.
199+
pub fn stop_the_world(&self, vm: &VirtualMachine) {
200+
let requester_ident = crate::stdlib::thread::get_ident();
201+
self.requester.store(requester_ident, Ordering::Relaxed);
202+
self.requested.store(true, Ordering::Release);
203+
204+
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(500);
205+
loop {
206+
if self.park_detached_threads(vm) {
207+
break;
208+
}
209+
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
210+
if remaining.is_zero() {
211+
break;
212+
}
213+
// Wait up to 1 ms for a thread to notify us it suspended
214+
let wait = remaining.min(std::time::Duration::from_millis(1));
215+
let guard = self.notify_mutex.lock().unwrap();
216+
let _ = self.notify_cv.wait_timeout(guard, wait);
217+
}
218+
}
219+
220+
/// Resume all suspended threads. Like CPython's `start_the_world`.
221+
pub fn start_the_world(&self, vm: &VirtualMachine) {
222+
use thread::{THREAD_DETACHED, THREAD_SUSPENDED};
223+
let requester = self.requester.load(Ordering::Relaxed);
224+
let registry = vm.state.thread_frames.lock();
225+
for (&id, slot) in registry.iter() {
226+
if id == requester {
227+
continue;
228+
}
229+
if slot.state.load(Ordering::Relaxed) == THREAD_SUSPENDED {
230+
slot.state.store(THREAD_DETACHED, Ordering::Release);
231+
}
232+
}
233+
drop(registry);
234+
self.requested.store(false, Ordering::Release);
235+
self.requester.store(0, Ordering::Relaxed);
236+
}
237+
238+
/// Reset after fork in the child (only one thread alive).
239+
pub fn reset_after_fork(&self) {
240+
self.requested.store(false, Ordering::Relaxed);
241+
self.requester.store(0, Ordering::Relaxed);
242+
}
243+
}
244+
128245
pub struct PyGlobalState {
129246
pub config: PyConfig,
130247
pub module_defs: BTreeMap<&'static str, &'static builtins::PyModuleDef>,
@@ -165,6 +282,9 @@ pub struct PyGlobalState {
165282
/// Incremented on every monitoring state change. Code objects compare their
166283
/// local version against this to decide whether re-instrumentation is needed.
167284
pub instrumentation_version: AtomicU64,
285+
/// Stop-the-world state for pre-fork thread suspension
286+
#[cfg(all(unix, feature = "threading"))]
287+
pub stop_the_world: StopTheWorldState,
168288
}
169289

170290
pub fn process_hash_secret_seed() -> u32 {
@@ -1482,6 +1602,10 @@ impl VirtualMachine {
14821602
return Err(self.new_exception(self.ctx.exceptions.system_exit.to_owned(), vec![]));
14831603
}
14841604

1605+
// Suspend this thread if stop-the-world is in progress
1606+
#[cfg(all(unix, feature = "threading"))]
1607+
thread::suspend_if_needed(&self.state.stop_the_world);
1608+
14851609
#[cfg(not(target_arch = "wasm32"))]
14861610
{
14871611
crate::signal::check_signals(self)

0 commit comments

Comments
 (0)