Skip to content
Next Next commit
Implement Windows SemLock in _multiprocessing module
Add SemLock class using Windows semaphore APIs (CreateSemaphoreW,
WaitForSingleObjectEx, ReleaseSemaphore) so test_multiprocessing
suites are no longer skipped with "lacks a functioning sem_open".
Also add sem_unlink as no-op and flags dict for Windows.
  • Loading branch information
youknowone committed Feb 23, 2026
commit e88eaf0b4373ac575a8dd13d22c4d8ef7adbbc37
348 changes: 347 additions & 1 deletion crates/stdlib/src/multiprocessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,354 @@ pub(crate) use _multiprocessing::module_def;
#[cfg(windows)]
#[pymodule]
mod _multiprocessing {
use crate::vm::{PyResult, VirtualMachine, function::ArgBytesLike};
use crate::vm::{
Context, FromArgs, Py, PyPayload, PyRef, PyResult, VirtualMachine,
builtins::{PyDict, PyType, PyTypeRef},
function::{ArgBytesLike, FuncArgs, KwArgs},
types::Constructor,
};
use core::sync::atomic::{AtomicI32, AtomicU32, Ordering};
use windows_sys::Win32::Foundation::{
CloseHandle, HANDLE, INVALID_HANDLE_VALUE, WAIT_EVENT, WAIT_OBJECT_0,
};
use windows_sys::Win32::Networking::WinSock::{self, SOCKET};
use windows_sys::Win32::System::Threading::{
CreateSemaphoreW, GetCurrentThreadId, ReleaseSemaphore, WaitForSingleObjectEx,
};

const INFINITE: u32 = 0xFFFFFFFF;
const WAIT_TIMEOUT: WAIT_EVENT = 258; // 0x102
const WAIT_FAILED: WAIT_EVENT = 0xFFFFFFFF;
const ERROR_TOO_MANY_POSTS: u32 = 298;

// These match the values in Lib/multiprocessing/synchronize.py
const RECURSIVE_MUTEX: i32 = 0;
const SEMAPHORE: i32 = 1;

macro_rules! ismine {
($self:expr) => {
$self.count.load(Ordering::Acquire) > 0
&& $self.last_tid.load(Ordering::Acquire) == unsafe { GetCurrentThreadId() }
};
}

#[derive(FromArgs)]
struct SemLockNewArgs {
#[pyarg(positional)]
kind: i32,
#[pyarg(positional)]
value: i32,
#[pyarg(positional)]
maxvalue: i32,
#[pyarg(positional)]
name: String,
#[pyarg(positional)]
unlink: bool,
}

#[pyattr]
#[pyclass(name = "SemLock", module = "_multiprocessing")]
#[derive(Debug, PyPayload)]
struct SemLock {
handle: SemHandle,
kind: i32,
maxvalue: i32,
name: Option<String>,
last_tid: AtomicU32,
count: AtomicI32,
}

#[derive(Debug)]
struct SemHandle {
raw: HANDLE,
}

unsafe impl Send for SemHandle {}
unsafe impl Sync for SemHandle {}

impl SemHandle {
fn create(value: i32, maxvalue: i32, vm: &VirtualMachine) -> PyResult<Self> {
let handle =
unsafe { CreateSemaphoreW(core::ptr::null(), value, maxvalue, core::ptr::null()) };
if handle == 0 as HANDLE {
return Err(vm.new_last_os_error());
}
// Check ERROR_ALREADY_EXISTS
let last_err = unsafe { windows_sys::Win32::Foundation::GetLastError() };
if last_err != 0 {
unsafe { CloseHandle(handle) };
return Err(vm.new_last_os_error());
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
Ok(SemHandle { raw: handle })
}

#[inline]
fn as_raw(&self) -> HANDLE {
self.raw
}
}

impl Drop for SemHandle {
fn drop(&mut self) {
if self.raw != 0 as HANDLE && self.raw != INVALID_HANDLE_VALUE {
unsafe {
CloseHandle(self.raw);
}
}
}
}

/// _GetSemaphoreValue - get value of semaphore by briefly acquiring and releasing
fn get_semaphore_value(handle: HANDLE) -> Result<i32, ()> {
match unsafe { WaitForSingleObjectEx(handle, 0, 0) } {
WAIT_OBJECT_0 => {
let mut previous: i32 = 0;
if unsafe { ReleaseSemaphore(handle, 1, &mut previous) } == 0 {
return Err(());
}
Ok(previous + 1)
}
WAIT_TIMEOUT => Ok(0),
_ => Err(()),
}
}

#[pyclass(with(Constructor), flags(BASETYPE))]
impl SemLock {
#[pygetset]
fn handle(&self) -> isize {
self.handle.as_raw() as isize
}

#[pygetset]
fn kind(&self) -> i32 {
self.kind
}

#[pygetset]
fn maxvalue(&self) -> i32 {
self.maxvalue
}

#[pygetset]
fn name(&self) -> Option<String> {
self.name.clone()
}

#[pymethod]
fn acquire(&self, args: FuncArgs, vm: &VirtualMachine) -> PyResult<bool> {
let blocking: bool = args
.kwargs
.get("block")
.or_else(|| args.args.first())
.map(|o| o.clone().try_to_bool(vm))
.transpose()?
.unwrap_or(true);

let timeout_obj = args
.kwargs
.get("timeout")
.or_else(|| args.args.get(1))
.cloned();

// Calculate timeout in milliseconds
let full_msecs: u32 = if !blocking {
0
} else if timeout_obj.as_ref().is_none_or(|o| vm.is_none(o)) {
INFINITE
} else {
let timeout: f64 = timeout_obj.unwrap().try_float(vm)?.to_f64();
let timeout = timeout * 1000.0; // convert to ms
if timeout < 0.0 {
0
} else if timeout >= 0.5 * INFINITE as f64 {
return Err(vm.new_overflow_error("timeout is too large".to_owned()));
} else {
(timeout + 0.5) as u32
}
};

// Check whether we already own the lock
if self.kind == RECURSIVE_MUTEX && ismine!(self) {
self.count.fetch_add(1, Ordering::Release);
return Ok(true);
}

// Check whether we can acquire without blocking
if unsafe { WaitForSingleObjectEx(self.handle.as_raw(), 0, 0) } == WAIT_OBJECT_0 {
self.last_tid
.store(unsafe { GetCurrentThreadId() }, Ordering::Release);
self.count.fetch_add(1, Ordering::Release);
return Ok(true);
}

// Do the wait
let res = unsafe { WaitForSingleObjectEx(self.handle.as_raw(), full_msecs, 0) };

match res {
WAIT_TIMEOUT => Ok(false),
WAIT_OBJECT_0 => {
self.last_tid
.store(unsafe { GetCurrentThreadId() }, Ordering::Release);
self.count.fetch_add(1, Ordering::Release);
Ok(true)
}
WAIT_FAILED => Err(vm.new_last_os_error()),
_ => Err(vm.new_runtime_error(format!(
"WaitForSingleObject() gave unrecognized value {res}"
))),
}
}

#[pymethod]
fn release(&self, vm: &VirtualMachine) -> PyResult<()> {
if self.kind == RECURSIVE_MUTEX {
if !ismine!(self) {
return Err(vm.new_exception_msg(
vm.ctx.exceptions.assertion_error.to_owned(),
"attempt to release recursive lock not owned by thread".to_owned(),
));
}
if self.count.load(Ordering::Acquire) > 1 {
self.count.fetch_sub(1, Ordering::Release);
return Ok(());
}
}

if unsafe { ReleaseSemaphore(self.handle.as_raw(), 1, core::ptr::null_mut()) } == 0 {
let err = unsafe { windows_sys::Win32::Foundation::GetLastError() };
if err == ERROR_TOO_MANY_POSTS {
return Err(
vm.new_value_error("semaphore or lock released too many times".to_owned())
);
}
return Err(vm.new_last_os_error());
}

self.count.fetch_sub(1, Ordering::Release);
Ok(())
}

#[pymethod(name = "__enter__")]
fn enter(&self, vm: &VirtualMachine) -> PyResult<bool> {
self.acquire(
FuncArgs::new::<Vec<_>, KwArgs>(
vec![vm.ctx.new_bool(true).into()],
KwArgs::default(),
),
vm,
)
}

#[pymethod]
fn __exit__(&self, _args: FuncArgs, vm: &VirtualMachine) -> PyResult<()> {
self.release(vm)
}

#[pyclassmethod(name = "_rebuild")]
fn rebuild(
cls: PyTypeRef,
handle: isize,
kind: i32,
maxvalue: i32,
name: Option<String>,
vm: &VirtualMachine,
) -> PyResult {
// On Windows, _rebuild receives the handle directly (no sem_open)
let zelf = SemLock {
handle: SemHandle {
raw: handle as HANDLE,
},
kind,
maxvalue,
name,
last_tid: AtomicU32::new(0),
count: AtomicI32::new(0),
};
zelf.into_ref_with_type(vm, cls).map(Into::into)
}

#[pymethod]
fn _after_fork(&self) {
self.count.store(0, Ordering::Release);
self.last_tid.store(0, Ordering::Release);
}

#[pymethod]
fn __reduce__(&self, vm: &VirtualMachine) -> PyResult {
Err(vm.new_type_error("cannot pickle 'SemLock' object".to_owned()))
}

#[pymethod]
fn _count(&self) -> i32 {
self.count.load(Ordering::Acquire)
}

#[pymethod]
fn _is_mine(&self) -> bool {
ismine!(self)
}

#[pymethod]
fn _get_value(&self, vm: &VirtualMachine) -> PyResult<i32> {
get_semaphore_value(self.handle.as_raw()).map_err(|_| vm.new_last_os_error())
}

#[pymethod]
fn _is_zero(&self, vm: &VirtualMachine) -> PyResult<bool> {
let val =
get_semaphore_value(self.handle.as_raw()).map_err(|_| vm.new_last_os_error())?;
Ok(val == 0)
}

#[extend_class]
fn extend_class(ctx: &Context, class: &Py<PyType>) {
class.set_attr(
ctx.intern_str("RECURSIVE_MUTEX"),
ctx.new_int(RECURSIVE_MUTEX).into(),
);
class.set_attr(ctx.intern_str("SEMAPHORE"), ctx.new_int(SEMAPHORE).into());
class.set_attr(
ctx.intern_str("SEM_VALUE_MAX"),
ctx.new_int(i32::MAX).into(),
);
}
}

impl Constructor for SemLock {
type Args = SemLockNewArgs;

fn py_new(_cls: &Py<PyType>, args: Self::Args, vm: &VirtualMachine) -> PyResult<Self> {
if args.kind != RECURSIVE_MUTEX && args.kind != SEMAPHORE {
return Err(vm.new_value_error("unrecognized kind".to_owned()));
}
if args.value < 0 || args.value > args.maxvalue {
return Err(vm.new_value_error("invalid value".to_owned()));
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

let handle = SemHandle::create(args.value, args.maxvalue, vm)?;
let name = if args.unlink { None } else { Some(args.name) };

Ok(SemLock {
handle,
kind: args.kind,
maxvalue: args.maxvalue,
name,
last_tid: AtomicU32::new(0),
count: AtomicI32::new(0),
})
}
}

// On Windows, sem_unlink is a no-op
#[pyfunction]
fn sem_unlink(_name: String) {}

#[pyattr]
fn flags(vm: &VirtualMachine) -> PyRef<PyDict> {
// On Windows, no HAVE_SEM_OPEN / HAVE_SEM_TIMEDWAIT / HAVE_BROKEN_SEM_GETVALUE
vm.ctx.new_dict()
}

#[pyfunction]
fn closesocket(socket: usize, vm: &VirtualMachine) -> PyResult<()> {
Expand Down