Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Clean up the _PyThreadHandle code.
  • Loading branch information
ericsnowcurrently committed Mar 13, 2024
commit 23ca97bcc867785d78a1c97239b25722084057ae
256 changes: 154 additions & 102 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#include "Python.h"
#include "pycore_interp.h" // _PyInterpreterState.threads.count
#include "pycore_lock.h"
#include "pycore_lock.h" // _PyEventRc
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "pycore_modsupport.h" // _PyArg_NoKeywords()
#include "pycore_pylifecycle.h"
Expand Down Expand Up @@ -47,9 +47,9 @@ get_thread_state(PyObject *module)
}


/**********************/
/* _ThreadHandle type */
/**********************/
/*************************/
/* _ThreadHandle objects */
/*************************/

// Handles transition from RUNNING to one of JOINED, DETACHED, or INVALID (post
// fork).
Expand All @@ -58,7 +58,12 @@ typedef enum {
THREAD_HANDLE_JOINED = 2,
THREAD_HANDLE_DETACHED = 3,
THREAD_HANDLE_INVALID = 4,
} ThreadHandleState;
} _PyThreadHandleState;


// ThreadHandleError is just an alias to PyExc_RuntimeError.
#define ThreadHandleError PyExc_RuntimeError


// A handle around an OS thread.
//
Expand All @@ -78,7 +83,9 @@ typedef struct {
PyThread_ident_t ident;
PyThread_handle_t handle;

// Holds a value from the `ThreadHandleState` enum.
// Holds the handle's simple state.
// The type is actually _PyThreadHandleState but we use int
// for the _Py_atomic API.
int state;

// Set immediately before `thread_run` returns to indicate that the OS
Expand All @@ -89,29 +96,48 @@ typedef struct {

// Serializes calls to `join`.
_PyOnceFlag once;
} ThreadHandleObject;
} thandleobject;


static inline int
get_thread_handle_state(ThreadHandleObject *handle)
get_thread_handle_state(thandleobject *hobj)
{
return _Py_atomic_load_int(&handle->state);
return _Py_atomic_load_int(&hobj->state);
}

static inline void
set_thread_handle_state(ThreadHandleObject *handle, ThreadHandleState state)
set_thread_handle_state(thandleobject *hobj, _PyThreadHandleState state)
{
_Py_atomic_store_int(&handle->state, state);
_Py_atomic_store_int(&hobj->state, state);
}

static int
join_thread(thandleobject *hobj)
{
assert(get_thread_handle_state(hobj) == THREAD_HANDLE_RUNNING);

int err;
Py_BEGIN_ALLOW_THREADS
err = PyThread_join_thread(hobj->handle);
Py_END_ALLOW_THREADS
if (err) {
PyErr_SetString(ThreadHandleError, "Failed joining thread");
return -1;
}
set_thread_handle_state(hobj, THREAD_HANDLE_JOINED);
return 0;
}

static ThreadHandleObject*
new_thread_handle(thread_module_state* state)

static PyObject *
_PyThreadHandle_NewObject(PyTypeObject *type)
{
_PyEventRc *event = _PyEventRc_New();
if (event == NULL) {
PyErr_NoMemory();
return NULL;
}
ThreadHandleObject* self = PyObject_New(ThreadHandleObject, state->thread_handle_type);
thandleobject *self = PyObject_New(thandleobject, type);
if (self == NULL) {
_PyEventRc_Decref(event);
return NULL;
Expand All @@ -126,95 +152,33 @@ new_thread_handle(thread_module_state* state)
llist_insert_tail(&_PyRuntime.threads.handles, &self->node);
HEAD_UNLOCK(&_PyRuntime);

return self;
return (PyObject *)self;
}

static void
ThreadHandle_dealloc(ThreadHandleObject *self)
{
PyObject *tp = (PyObject *) Py_TYPE(self);

// Remove ourself from the global list of handles
HEAD_LOCK(&_PyRuntime);
if (self->node.next != NULL) {
llist_remove(&self->node);
}
HEAD_UNLOCK(&_PyRuntime);

// It's safe to access state non-atomically:
// 1. This is the destructor; nothing else holds a reference.
// 2. The refcount going to zero is a "synchronizes-with" event;
// all changes from other threads are visible.
if (self->state == THREAD_HANDLE_RUNNING) {
// This is typically short so no need to release the GIL
if (PyThread_detach_thread(self->handle)) {
PyErr_SetString(ThreadError, "Failed detaching thread");
PyErr_WriteUnraisable(tp);
}
else {
self->state = THREAD_HANDLE_DETACHED;
}
}
_PyEventRc_Decref(self->thread_is_exiting);
PyObject_Free(self);
Py_DECREF(tp);
}

void
_PyThread_AfterFork(struct _pythread_runtime_state *state)
static void
_PyThreadHandle_SetStarted(PyObject *obj,
PyThread_handle_t handle, PyThread_ident_t ident)
{
// gh-115035: We mark ThreadHandles as not joinable early in the child's
// after-fork handler. We do this before calling any Python code to ensure
// that it happens before any ThreadHandles are deallocated, such as by a
// GC cycle.
PyThread_ident_t current = PyThread_get_thread_ident_ex();

struct llist_node *node;
llist_for_each_safe(node, &state->handles) {
ThreadHandleObject *hobj = llist_data(node, ThreadHandleObject, node);
if (hobj->ident == current) {
continue;
}

// Disallow calls to join() as they could crash. We are the only
// thread; it's safe to set this without an atomic.
hobj->state = THREAD_HANDLE_INVALID;
llist_remove(node);
}
thandleobject *hobj = (thandleobject *)obj;
assert(get_thread_handle_state(hobj) == THREAD_HANDLE_INVALID);
hobj->handle = handle;
hobj->ident = ident;
set_thread_handle_state(hobj, THREAD_HANDLE_RUNNING);
}

static PyObject *
ThreadHandle_repr(ThreadHandleObject *self)
{
return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">",
Py_TYPE(self)->tp_name, self->ident);
}

static PyObject *
ThreadHandle_get_ident(ThreadHandleObject *self, void *ignored)
static _PyEventRc *
_PyThreadHandle_GetExitingEvent(PyObject *hobj)
{
return PyLong_FromUnsignedLongLong(self->ident);
return ((thandleobject *)hobj)->thread_is_exiting;
}

static int
join_thread(ThreadHandleObject *handle)
{
assert(get_thread_handle_state(handle) == THREAD_HANDLE_RUNNING);

int err;
Py_BEGIN_ALLOW_THREADS
err = PyThread_join_thread(handle->handle);
Py_END_ALLOW_THREADS
if (err) {
PyErr_SetString(ThreadError, "Failed joining thread");
return -1;
}
set_thread_handle_state(handle, THREAD_HANDLE_JOINED);
return 0;
}
/* _ThreadHandle instance methods */

static PyObject *
ThreadHandle_join(ThreadHandleObject *self, void* ignored)
ThreadHandle_join(thandleobject *self, void* ignored)
{
if (get_thread_handle_state(self) == THREAD_HANDLE_INVALID) {
PyErr_SetString(PyExc_ValueError,
Expand All @@ -235,7 +199,7 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored)
if (!_PyEvent_IsSet(&self->thread_is_exiting->event) &&
self->ident == PyThread_get_thread_ident_ex()) {
// PyThread_join_thread() would deadlock or error out.
PyErr_SetString(ThreadError, "Cannot join current thread");
PyErr_SetString(ThreadHandleError, "Cannot join current thread");
return NULL;
}

Expand All @@ -247,17 +211,70 @@ ThreadHandle_join(ThreadHandleObject *self, void* ignored)
Py_RETURN_NONE;
}

static PyGetSetDef ThreadHandle_getsetlist[] = {
{"ident", (getter)ThreadHandle_get_ident, NULL, NULL},
{0},
};

static PyMethodDef ThreadHandle_methods[] =
{
{"join", (PyCFunction)ThreadHandle_join, METH_NOARGS},
{0, 0}
};


/* _ThreadHandle instance properties */

static PyObject *
ThreadHandle_get_ident(thandleobject *self, void *ignored)
{
return PyLong_FromUnsignedLongLong(self->ident);
}


static PyGetSetDef ThreadHandle_getsetlist[] = {
{"ident", (getter)ThreadHandle_get_ident, NULL, NULL},
{0},
};


/* The _ThreadHandle class */

static void
ThreadHandle_dealloc(thandleobject *self)
{
PyObject *tp = (PyObject *) Py_TYPE(self);

// Remove ourself from the global list of handles
HEAD_LOCK(&_PyRuntime);
if (self->node.next != NULL) {
llist_remove(&self->node);
}
HEAD_UNLOCK(&_PyRuntime);

// It's safe to access state non-atomically:
// 1. This is the destructor; nothing else holds a reference.
// 2. The refcount going to zero is a "synchronizes-with" event;
// all changes from other threads are visible.
if (self->state == THREAD_HANDLE_RUNNING) {
// This is typically short so no need to release the GIL
if (PyThread_detach_thread(self->handle)) {
PyErr_SetString(ThreadHandleError, "Failed detaching thread");
PyErr_WriteUnraisable(tp);
}
else {
self->state = THREAD_HANDLE_DETACHED;
}
}
_PyEventRc_Decref(self->thread_is_exiting);
PyObject_Free(self);
Py_DECREF(tp);
}

static PyObject *
ThreadHandle_repr(thandleobject *self)
{
return PyUnicode_FromFormat("<%s object: ident=%" PY_FORMAT_THREAD_IDENT_T ">",
Py_TYPE(self)->tp_name, self->ident);
}


static PyType_Slot ThreadHandle_Type_slots[] = {
{Py_tp_dealloc, (destructor)ThreadHandle_dealloc},
{Py_tp_repr, (reprfunc)ThreadHandle_repr},
Expand All @@ -268,13 +285,46 @@ static PyType_Slot ThreadHandle_Type_slots[] = {

static PyType_Spec ThreadHandle_Type_spec = {
"_thread._ThreadHandle",
sizeof(ThreadHandleObject),
sizeof(thandleobject),
0,
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_DISALLOW_INSTANTIATION,
ThreadHandle_Type_slots,
};


PyTypeObject *
_PyThreadHandle_NewType(void)
{
return (PyTypeObject *)PyType_FromSpec(&ThreadHandle_Type_spec);
}


/* other API */

void
_PyThread_AfterFork(struct _pythread_runtime_state *state)
{
// gh-115035: We mark ThreadHandles as not joinable early in the child's
// after-fork handler. We do this before calling any Python code to ensure
// that it happens before any ThreadHandles are deallocated, such as by a
// GC cycle.
PyThread_ident_t current = PyThread_get_thread_ident_ex();

struct llist_node *node;
llist_for_each_safe(node, &state->handles) {
thandleobject *hobj = llist_data(node, thandleobject, node);
if (hobj->ident == current) {
continue;
}

// Disallow calls to join() as they could crash. We are the only
// thread; it's safe to set this without an atomic.
hobj->state = THREAD_HANDLE_INVALID;
llist_remove(node);
}
}


/********************/
/* thread execution */
/********************/
Expand Down Expand Up @@ -1785,26 +1835,28 @@ threadmod_start_joinable_thread(PyObject *module, PyObject *func)
return NULL;
}

ThreadHandleObject* hobj = new_thread_handle(state);
PyObject *hobj = _PyThreadHandle_NewObject(state->thread_handle_type);
if (hobj == NULL) {
return NULL;
}
struct bootstate *boot = thread_bootstate_new(
func, NULL, NULL, hobj->thread_is_exiting);
func, NULL, NULL, _PyThreadHandle_GetExitingEvent(hobj));
if (boot == NULL) {
Py_DECREF(hobj);
return NULL;
}
PyThread_ident_t ident = 0;
PyThread_handle_t handle = 0;
if (PyThread_start_joinable_thread(
thread_run, (void*) boot, &hobj->ident, &hobj->handle) < 0)
thread_run, (void*) boot, &ident, &handle) < 0)
{
PyErr_SetString(ThreadError, "can't start new thread");
thread_bootstate_free(boot);
Py_DECREF(hobj);
return NULL;
}
set_thread_handle_state(hobj, THREAD_HANDLE_RUNNING);
return (PyObject*) hobj;
_PyThreadHandle_SetStarted(hobj, handle, ident);
return hobj;
}

PyDoc_STRVAR(start_joinable_doc,
Expand Down Expand Up @@ -2055,7 +2107,7 @@ thread_module_exec(PyObject *module)
PyThread_init_thread();

// _ThreadHandle
state->thread_handle_type = (PyTypeObject *)PyType_FromSpec(&ThreadHandle_Type_spec);
state->thread_handle_type = _PyThreadHandle_NewType();
if (state->thread_handle_type == NULL) {
return -1;
}
Expand Down