Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Check for self-join outside of the once flag
  • Loading branch information
mpage committed Feb 9, 2024
commit 9a61e2007351718eea85f4ca73a824442804aec7
33 changes: 33 additions & 0 deletions Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ typedef struct {
uint8_t v;
} PyEvent;

// Check if the event is set without blocking. Returns 1 if the event is set or
// 0 otherwise.
PyAPI_FUNC(int) _PyEvent_IsSet(PyEvent *evt);

// Set the event and notify any waiting threads.
// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(void) _PyEvent_Notify(PyEvent *evt);
Expand All @@ -148,6 +152,35 @@ PyAPI_FUNC(void) PyEvent_Wait(PyEvent *evt);
// and 0 if the timeout expired or thread was interrupted.
PyAPI_FUNC(int) PyEvent_WaitTimed(PyEvent *evt, _PyTime_t timeout_ns);

// A one-time event notification with reference counting.
typedef struct _PyEventRc {
PyEvent event;
Py_ssize_t refcount;
} _PyEventRc;

static inline _PyEventRc *
Comment thread
mpage marked this conversation as resolved.
Outdated
_PyEventRc_New(void)
{
_PyEventRc *erc = (_PyEventRc *)PyMem_RawCalloc(1, sizeof(_PyEventRc));
if (erc != NULL) {
erc->refcount = 1;
}
return erc;
}

static inline void
_PyEventRc_Incref(_PyEventRc *erc)
{
_Py_atomic_add_ssize(&erc->refcount, 1);
}

static inline void
_PyEventRc_Decref(_PyEventRc *erc)
{
if (_Py_atomic_add_ssize(&erc->refcount, -1) == 1) {
PyMem_RawFree(erc);
}
}

// _PyRawMutex implements a word-sized mutex that that does not depend on the
// parking lot API, and therefore can be used in the parking lot
Expand Down
55 changes: 55 additions & 0 deletions Lib/test/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,61 @@ def task():
# Subsequent calls to detach() should succeed
handle.detach()

def test_join_then_self_join(self):
# make sure we can't deadlock in the following scenario with
Comment thread
mpage marked this conversation as resolved.
# threads t0 and t1:
#
# - t0 joins t1
# - t1 self joins
def make_lock():
lock = thread.allocate_lock()
lock.acquire()
return lock

error = None
self_joiner_handle = None
self_joiner_started = make_lock()
self_joiner_barrier = make_lock()
def self_joiner():
nonlocal error

self_joiner_started.release()
self_joiner_barrier.acquire()

try:
self_joiner_handle.join()
except Exception as e:
error = e

joiner_started = make_lock()
def joiner():
joiner_started.release()
self_joiner_handle.join()

with threading_helper.wait_threads_exit():
self_joiner_handle = thread.start_joinable_thread(self_joiner)
# Wait for the self-joining thread to start
self_joiner_started.acquire()

# Start the thread that joins the self-joiner
joiner_handle = thread.start_joinable_thread(joiner)

# Wait for the joiner to start
joiner_started.acquire()

# Not great, but I don't think there's a deterministic way to do
# make sure that the self-joining thread has been joined.
time.sleep(0.1)

# Unblock the self-joiner
self_joiner_barrier.release()

self_joiner_handle.join()
joiner_handle.join()

with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
raise error


class Barrier:
def __init__(self, num_threads):
Expand Down
61 changes: 52 additions & 9 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ typedef struct {
// and cleared post-fork. Does not need to be accessed atomically.
bool is_valid;
Comment thread
mpage marked this conversation as resolved.
Outdated

// Set immediately before `thread_run` returns to indicate that the OS
// thread is about to exit.
Comment thread
mpage marked this conversation as resolved.
Outdated
_PyEventRc *thread_is_exiting;

// State is set once by the first successful `join` or `detach` operation
// (or if the handle is invalidated).
ThreadHandleState state;
Comment thread
mpage marked this conversation as resolved.
Outdated
Expand All @@ -76,13 +80,22 @@ typedef struct {
static ThreadHandleObject*
new_thread_handle(thread_module_state* state)
{
_PyEventRc *event = _PyEventRc_New();
if (event == NULL) {
PyErr_NoMemory();
return NULL;
}

ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type);
if (self == NULL) {
_PyEventRc_Decref(event);
return NULL;
}

self->ident = 0;
self->handle = 0;
self->is_valid = false;
self->thread_is_exiting = event;
self->once = (_PyOnceFlag){0};

HEAD_LOCK(&_PyRuntime);
Expand Down Expand Up @@ -121,6 +134,7 @@ ThreadHandle_dealloc(ThreadHandleObject *self)
self) == -1) {
PyErr_WriteUnraisable(tp);
Comment thread
pitrou marked this conversation as resolved.
Outdated
}
_PyEventRc_Decref(self->thread_is_exiting);
PyObject_Free(self);
Py_DECREF(tp);
}
Expand Down Expand Up @@ -199,12 +213,6 @@ ThreadHandle_detach(ThreadHandleObject *self, void* ignored)
static int
join_thread(ThreadHandleObject *handle)
{
if (handle->ident == PyThread_get_thread_ident_ex()) {
// PyThread_join_thread() would deadlock or error out.
PyErr_SetString(ThreadError, "Cannot join current thread");
return -1;
}

int err;
Py_BEGIN_ALLOW_THREADS
err = PyThread_join_thread(handle->handle);
Expand All @@ -224,6 +232,23 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored)
return invalid_handle_error();
}

// We want to perform this check outside of the `_PyOnceFlag` to prevent
// deadlock in the scenario where another thread joins us and we then
// attempt to join ourselves. However, it's not safe to check thread
// identity once the handle's os thread has finished. We may end up with
// the identity stored in the handle and erroneously think we are
// attempting to join ourselves.
Comment thread
mpage marked this conversation as resolved.
//
// To work around this, we set `thread_is_exiting` immediately before
// `thread_run` returns. We can be sure that we are not attempting to join
// ourselves if the handle's thread is about to exit.
if (!_PyEvent_IsSet(&self->thread_is_exiting->event) &&
self->ident == PyThread_get_thread_ident_ex()) {
// PyThread_join_thread() would deadlock or error out.
PyErr_SetString(ThreadError, "Cannot join current thread");
return NULL;
}

if (_PyOnceFlag_CallOnce(&self->once, (_Py_once_fn_t *)join_thread,
self) == -1) {
return NULL;
Expand Down Expand Up @@ -1276,6 +1301,7 @@ struct bootstate {
PyObject *func;
PyObject *args;
PyObject *kwargs;
_PyEventRc *thread_is_exiting;
};
Comment thread
mpage marked this conversation as resolved.


Expand All @@ -1287,6 +1313,9 @@ thread_bootstate_free(struct bootstate *boot, int decref)
Py_DECREF(boot->args);
Py_XDECREF(boot->kwargs);
}
if (boot->thread_is_exiting != NULL) {
_PyEventRc_Decref(boot->thread_is_exiting);
}
PyMem_RawFree(boot);
}

Expand All @@ -1297,6 +1326,10 @@ thread_run(void *boot_raw)
struct bootstate *boot = (struct bootstate *) boot_raw;
PyThreadState *tstate = boot->tstate;

// `thread_is_exiting` needs to be set after bootstate has been freed
_PyEventRc *thread_is_exiting = boot->thread_is_exiting;
boot->thread_is_exiting = NULL;

// gh-108987: If _thread.start_new_thread() is called before or while
// Python is being finalized, thread_run() can called *after*.
// _PyRuntimeState_SetFinalizing() is called. At this point, all Python
Expand Down Expand Up @@ -1341,6 +1374,11 @@ thread_run(void *boot_raw)
_PyThreadState_DeleteCurrent(tstate);

exit:
if (thread_is_exiting != NULL) {
_PyEvent_Notify(&thread_is_exiting->event);
_PyEventRc_Decref(thread_is_exiting);
}

// bpo-44434: Don't call explicitly PyThread_exit_thread(). On Linux with
// the glibc, pthread_exit() can abort the whole process if dlopen() fails
// to open the libgcc_s.so library (ex: EMFILE error).
Expand Down Expand Up @@ -1369,7 +1407,8 @@ static int
do_start_new_thread(thread_module_state* state,
PyObject *func, PyObject* args, PyObject* kwargs,
int joinable,
PyThread_ident_t* ident, PyThread_handle_t* handle)
PyThread_ident_t* ident, PyThread_handle_t* handle,
_PyEventRc *thread_is_exiting)
Comment thread
pitrou marked this conversation as resolved.
{
PyInterpreterState *interp = _PyInterpreterState_GET();
if (!_PyInterpreterState_HasFeature(interp, Py_RTFLAGS_THREADS)) {
Expand Down Expand Up @@ -1402,6 +1441,10 @@ do_start_new_thread(thread_module_state* state,
boot->func = Py_NewRef(func);
boot->args = Py_NewRef(args);
boot->kwargs = Py_XNewRef(kwargs);
boot->thread_is_exiting = thread_is_exiting;
if (thread_is_exiting != NULL) {
_PyEventRc_Incref(thread_is_exiting);
}

int err;
if (joinable) {
Expand Down Expand Up @@ -1453,7 +1496,7 @@ thread_PyThread_start_new_thread(PyObject *module, PyObject *fargs)
PyThread_ident_t ident = 0;
PyThread_handle_t handle;
if (do_start_new_thread(state, func, args, kwargs, /*joinable=*/ 0,
&ident, &handle)) {
&ident, &handle, NULL)) {
return NULL;
}
return PyLong_FromUnsignedLongLong(ident);
Expand Down Expand Up @@ -1497,7 +1540,7 @@ thread_PyThread_start_joinable_thread(PyObject *module, PyObject *func)
return NULL;
}
if (do_start_new_thread(state, func, args, /*kwargs=*/ NULL, /*joinable=*/ 1,
&hobj->ident, &hobj->handle)) {
&hobj->ident, &hobj->handle, hobj->thread_is_exiting)) {
Py_DECREF(args);
Py_DECREF(hobj);
return NULL;
Expand Down
7 changes: 7 additions & 0 deletions Python/lock.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ _PyRawMutex_UnlockSlow(_PyRawMutex *m)
}
}

int
_PyEvent_IsSet(PyEvent *evt)
{
uint8_t v = _Py_atomic_load_uint8(&evt->v);
return v == _Py_LOCKED;
}

void
_PyEvent_Notify(PyEvent *evt)
{
Expand Down