Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement Unix SemLock and expose sem_unlink
Co-authored-by: youknowone <69878+youknowone@users.noreply.github.com>
  • Loading branch information
Copilot and youknowone committed Dec 25, 2025
commit 5a6800857bcb66cc5658e46a94a6fbeca7006095
129 changes: 69 additions & 60 deletions crates/stdlib/src/multiprocessing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ mod _multiprocessing {
#[pymodule]
mod _multiprocessing {
use crate::vm::{
Context, FromArgs, Py, PyBaseExceptionRef, PyPayload, PyRef, PyResult, VirtualMachine,
builtins::PyTypeRef,
function::OptionalArg,
Context, FromArgs, Py, PyPayload, PyResult, VirtualMachine,
builtins::{PyBaseExceptionRef, PyType, PyTypeRef},
function::{FuncArgs, OptionalArg},
types::Constructor,
};
use libc::sem_t;
Expand All @@ -58,19 +58,21 @@ mod _multiprocessing {
time::Duration,
};

const RECURSIVE_MUTEX: i32 = 0;
const RECURSIVE_MUTEX_KIND: i32 = 0;
const SEMAPHORE_KIND: i32 = 1;
const SEM_VALUE_MAX_CONST: i32 = 32_767;

#[derive(FromArgs)]
struct SemLockArgs {
#[pyarg(positional_only)]
#[pyarg(positional)]
kind: i32,
#[pyarg(positional_only)]
#[pyarg(positional)]
value: i32,
#[pyarg(positional_only)]
#[pyarg(positional)]
maxvalue: i32,
#[pyarg(positional_only)]
#[pyarg(positional)]
name: String,
#[pyarg(positional_only)]
#[pyarg(positional)]
unlink: bool,
}

Expand Down Expand Up @@ -111,12 +113,7 @@ mod _multiprocessing {
) -> PyResult<(Self, Option<String>)> {
let cname = semaphore_name(vm, name)?;
let raw = unsafe {
libc::sem_open(
cname.as_ptr(),
libc::O_CREAT | libc::O_EXCL,
0o600,
value,
)
libc::sem_open(cname.as_ptr(), libc::O_CREAT | libc::O_EXCL, 0o600, value)
};
if raw == libc::SEM_FAILED {
let err = Errno::last();
Expand Down Expand Up @@ -158,37 +155,8 @@ mod _multiprocessing {
}
}

#[extend_class]
fn extend_class(ctx: &Context, class: &'static Py<crate::vm::builtins::PyType>) {
class.set_attr(
"SEM_VALUE_MAX",
ctx.new_int(libc::SEM_VALUE_MAX),
ctx,
);
}

#[pyclass(with(Constructor))]
impl SemLock {
#[pyslot]
fn slot_new(cls: PyTypeRef, args: SemLockArgs, vm: &VirtualMachine) -> PyResult {
if args.value < 0 || args.value > args.maxvalue {
return Err(vm.new_value_error("semaphore or lock value out of range".to_owned()));
}
let value = u32::try_from(args.value).map_err(|_| {
vm.new_value_error("semaphore or lock value out of range".to_owned())
})?;
let (handle, name) = SemHandle::create(&args.name, value, args.unlink, vm)?;
let zelf = SemLock {
handle,
kind: args.kind,
maxvalue: args.maxvalue,
name,
owner: AtomicU64::new(0),
count: AtomicUsize::new(0),
};
zelf.into_ref_with_type(vm, cls).map(Into::into)
}

#[pygetset]
fn handle(&self) -> isize {
self.handle.as_ptr() as isize
Expand Down Expand Up @@ -223,9 +191,7 @@ mod _multiprocessing {
}

let tid = current_thread_id();
if self.kind == RECURSIVE_MUTEX
&& self.owner.load(Ordering::Acquire) == tid
{
if self.kind == RECURSIVE_MUTEX_KIND && self.owner.load(Ordering::Acquire) == tid {
self.count.fetch_add(1, Ordering::Relaxed);
return Ok(true);
}
Expand Down Expand Up @@ -254,16 +220,17 @@ mod _multiprocessing {
#[pymethod]
fn release(&self, vm: &VirtualMachine) -> PyResult<()> {
let tid = current_thread_id();
if self.kind == RECURSIVE_MUTEX && self.owner.load(Ordering::Acquire) != tid {
if self.kind == RECURSIVE_MUTEX_KIND && self.owner.load(Ordering::Acquire) != tid {
return Err(vm.new_value_error("cannot release un-acquired lock".to_owned()));
}

if self.owner.load(Ordering::Acquire) == tid {
let owner_tid = self.owner.load(Ordering::Acquire);
if owner_tid == tid {
let current = self.count.load(Ordering::Acquire);
if current == 0 {
return Err(vm.new_value_error("cannot release un-acquired lock".to_owned()));
}
if self.kind == RECURSIVE_MUTEX && current > 1 {
if self.kind == RECURSIVE_MUTEX_KIND && current > 1 {
self.count.store(current - 1, Ordering::Release);
return Ok(());
}
Expand All @@ -272,6 +239,11 @@ mod _multiprocessing {
if new_val == 0 {
self.owner.store(0, Ordering::Release);
}
} else if self.kind != RECURSIVE_MUTEX_KIND {
// releasing semaphore or non-recursive lock from another thread;
// drop ownership information.
self.owner.store(0, Ordering::Release);
self.count.store(0, Ordering::Release);
}

let res = unsafe { libc::sem_post(self.handle.as_ptr()) };
Expand All @@ -282,8 +254,7 @@ mod _multiprocessing {
Ok(())
}

#[pymethod]
#[pyclass(name = "__enter__")]
#[pymethod(name = "__enter__")]
fn enter(&self, vm: &VirtualMachine) -> PyResult<bool> {
self.acquire(
AcquireArgs {
Expand All @@ -295,14 +266,15 @@ mod _multiprocessing {
}

#[pymethod]
fn __exit__(&self, _args: crate::vm::function::FuncArgs, vm: &VirtualMachine) -> PyResult<()> {
fn __exit__(&self, _args: FuncArgs, vm: &VirtualMachine) -> PyResult<()> {
self.release(vm)
}

#[pyclassmethod]
#[pymethod(name = "_rebuild")]
fn rebuild(
cls: PyTypeRef,
handle: isize,
_handle: isize,
kind: i32,
maxvalue: i32,
name: Option<String>,
Expand All @@ -320,8 +292,6 @@ mod _multiprocessing {
owner: AtomicU64::new(0),
count: AtomicUsize::new(0),
};
// handle is unused but kept for compatibility
let _ = handle;
zelf.into_ref_with_type(vm, cls).map(Into::into)
}

Expand Down Expand Up @@ -362,6 +332,24 @@ mod _multiprocessing {
}
}

#[extend_class]
fn extend_class(ctx: &Context, class: &Py<PyType>) {
class.set_attr(
ctx.interned_str("RECURSIVE_MUTEX")
.expect("intern RECURSIVE_MUTEX"),
ctx.new_int(RECURSIVE_MUTEX_KIND).into(),
);
class.set_attr(
ctx.interned_str("SEMAPHORE").expect("intern SEMAPHORE"),
ctx.new_int(SEMAPHORE_KIND).into(),
);
class.set_attr(
ctx.interned_str("SEM_VALUE_MAX")
.expect("intern SEM_VALUE_MAX"),
ctx.new_int(SEM_VALUE_MAX_CONST).into(),
);
}

fn wait(&self, vm: &VirtualMachine) -> PyResult<()> {
loop {
let res = unsafe { libc::sem_wait(self.handle.as_ptr()) };
Expand Down Expand Up @@ -390,8 +378,7 @@ mod _multiprocessing {

fn wait_timeout(&self, duration: Duration, vm: &VirtualMachine) -> PyResult<bool> {
let mut ts = current_timespec(vm)?;
let nsec_total =
ts.tv_nsec as i64 + i64::from(duration.subsec_nanos());
let nsec_total = ts.tv_nsec as i64 + i64::from(duration.subsec_nanos());
ts.tv_sec = ts
.tv_sec
.saturating_add(duration.as_secs() as libc::time_t + nsec_total / 1_000_000_000);
Expand All @@ -411,6 +398,28 @@ mod _multiprocessing {
}
}

impl Constructor for SemLock {
type Args = SemLockArgs;

fn py_new(_cls: &Py<PyType>, args: Self::Args, vm: &VirtualMachine) -> PyResult<Self> {
if args.value < 0 || args.value > args.maxvalue {
return Err(vm.new_value_error("semaphore or lock value out of range".to_owned()));
}
let value = u32::try_from(args.value).map_err(|_| {
vm.new_value_error("semaphore or lock value out of range".to_owned())
})?;
let (handle, name) = SemHandle::create(&args.name, value, args.unlink, vm)?;
Ok(SemLock {
handle,
kind: args.kind,
maxvalue: args.maxvalue,
name,
owner: AtomicU64::new(0),
count: AtomicUsize::new(0),
})
}
}

#[pyfunction]
fn sem_unlink(name: String, vm: &VirtualMachine) -> PyResult<()> {
let cname = semaphore_name(vm, &name)?;
Expand Down Expand Up @@ -461,11 +470,11 @@ mod _multiprocessing {
};
let text = msg.unwrap_or_else(|| err.desc().to_owned());
vm.new_os_subtype_error(exc_type, Some(err as i32), text)
.into()
.upcast()
}

fn current_thread_id() -> u64 {
std::thread::current().id().as_u64().get()
unsafe { libc::pthread_self() as u64 }
}
}

Expand Down