Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Set a default fmt on Queue objects.
  • Loading branch information
ericsnowcurrently committed Feb 27, 2024
commit 1e6b6d2e851469c695b96845335ddafb684aac3b
40 changes: 27 additions & 13 deletions Lib/test/support/interpreters/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,40 +32,46 @@ class QueueFull(_queues.QueueFull, queue.Full):
"""


def create(maxsize=0):
_SHARED_ONLY = 0
_PICKLED = 1

def create(maxsize=0, *, sharedonly=False):
"""Return a new cross-interpreter queue.

The queue may be used to pass data safely between interpreters.

"sharedonly" sets the default for Queue.put() and Queue.put_nowait().
"""
qid = _queues.create(maxsize)
return Queue(qid)
fmt = _SHARED_ONLY if sharedonly else _PICKLED
qid = _queues.create(maxsize, fmt)
return Queue(qid, _fmt=fmt)


def list_all():
"""Return a list of all open queues."""
return [Queue(qid)
for qid in _queues.list_all()]

return [Queue(qid, _fmt=fmt)
for qid, fmt in _queues.list_all()]

_SHARED_ONLY = 0
_PICKLED = 1

_known_queues = weakref.WeakValueDictionary()

class Queue:
"""A cross-interpreter queue."""

def __new__(cls, id, /):
def __new__(cls, id, /, *, _fmt=None):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
if _fmt is None:
_fmt = _queues.get_default_fmt(id)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
self._fmt = _fmt
_known_queues[id] = self
_queues.bind(id)
return self
Expand Down Expand Up @@ -108,18 +114,22 @@ def qsize(self):
return _queues.get_count(self._id)

def put(self, obj, timeout=None, *,
sharedonly=False,
sharedonly=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.

This blocks while the queue is full.

If "sharedonly" is true then the object must be "shareable".
It will be passed through the queue efficiently. Otherwise
It will be passed through the queue efficiently. If false then
all objects are supported, at the expense of worse performance.
If None (the default) then it uses the queue's default.
"""
fmt = _SHARED_ONLY if sharedonly else _PICKLED
if sharedonly is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if sharedonly else _PICKLED
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
Expand All @@ -138,7 +148,11 @@ def put(self, obj, timeout=None, *,
else:
break

def put_nowait(self, obj, *, sharedonly=False):
def put_nowait(self, obj, *, sharedonly=None):
if sharedonly is None:
fmt = self._fmt
else:
fmt = _SHARED_ONLY if sharedonly else _PICKLED
fmt = _SHARED_ONLY if sharedonly else _PICKLED
if fmt is _PICKLED:
obj = pickle.dumps(obj)
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,34 @@ def test_get_nowait(self):
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()

def test_put_get_default_sharedonly(self):
expected = list(range(20))
queue = queues.create(sharedonly=True)
for i in range(20):
queue.put(i)
actual = [queue.get() for _ in range(20)]

self.assertEqual(actual, expected)

obj = [1, 2, 3] # lists are not shareable
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj)

def test_put_get_default_not_sharedonly(self):
expected = list(range(20))
queue = queues.create(sharedonly=False)
for i in range(20):
queue.put(i)
actual = [queue.get() for _ in range(20)]

self.assertEqual(actual, expected)

obj = [1, 2, 3] # lists are not shareable
queue.put(obj)
obj2 = queue.get()
self.assertEqual(obj, obj2)
self.assertIsNot(obj, obj2)

def test_put_get_same_interpreter(self):
interp = interpreters.create()
interp.exec_sync(dedent("""
Expand Down
74 changes: 57 additions & 17 deletions Modules/_xxinterpqueuesmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,11 @@ typedef struct _queue {
_queueitem *first;
_queueitem *last;
} items;
int fmt;
} _queue;

static int
_queue_init(_queue *queue, Py_ssize_t maxsize)
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
Expand All @@ -415,6 +416,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize)
.items = {
.maxsize = maxsize,
},
.fmt = fmt,
};
return 0;
}
Expand Down Expand Up @@ -851,18 +853,26 @@ _queues_decref(_queues *queues, int64_t qid)
PyThread_release_lock(queues->mutex);
}

static int64_t *
struct queue_id_and_fmt {
int64_t id;
int fmt;
};

static struct queue_id_and_fmt *
_queues_list_all(_queues *queues, int64_t *count)
{
int64_t *qids = NULL;
struct queue_id_and_fmt *qids = NULL;
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count));
struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
(Py_ssize_t)(queues->count));
if (ids == NULL) {
goto done;
}
_queueref *ref = queues->head;
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i] = ref->qid;
ids[i].id = ref->qid;
assert(ref->queue != NULL);
ids[i].fmt = ref->queue->fmt;
}
*count = queues->count;

Expand Down Expand Up @@ -898,13 +908,13 @@ _queue_free(_queue *queue)

// Create a new queue.
static int64_t
queue_create(_queues *queues, Py_ssize_t maxsize)
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
int err = _queue_init(queue, maxsize);
int err = _queue_init(queue, maxsize, fmt);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
Expand Down Expand Up @@ -1275,14 +1285,15 @@ qidarg_converter(PyObject *arg, void *ptr)
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"maxsize", NULL};
Py_ssize_t maxsize = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist,
&maxsize)) {
static char *kwlist[] = {"maxsize", "fmt", NULL};
Py_ssize_t maxsize;
int fmt;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
&maxsize, &fmt)) {
return NULL;
}

int64_t qid = queue_create(&_globals.queues, maxsize);
int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
Expand Down Expand Up @@ -1337,7 +1348,7 @@ static PyObject *
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
int64_t *qids = _queues_list_all(&_globals.queues, &count);
struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
if (qids == NULL) {
if (count == 0) {
return PyList_New(0);
Expand All @@ -1348,14 +1359,14 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
if (ids == NULL) {
goto finally;
}
int64_t *cur = qids;
struct queue_id_and_fmt *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *qidobj = PyLong_FromLongLong(*cur);
if (qidobj == NULL) {
PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
}
PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj);
PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
}

finally:
Expand Down Expand Up @@ -1512,6 +1523,33 @@ PyDoc_STRVAR(queuesmod_get_maxsize_doc,
\n\
Return the maximum number of items in the queue.");

static PyObject *
queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds)
{
static char *kwlist[] = {"qid", NULL};
qidarg_converter_data qidarg;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
"O&:get_default_fmt", kwlist,
qidarg_converter, &qidarg)) {
return NULL;
}
int64_t qid = qidarg.id;

_queue *queue = NULL;
int err = _queues_lookup(&_globals.queues, qid, &queue);
if (handle_queue_error(err, self, qid)) {
return NULL;
}
int fmt = queue->fmt;
_queue_unmark_waiter(queue, _globals.queues.mutex);
return PyLong_FromLong(fmt);
}

PyDoc_STRVAR(queuesmod_get_default_fmt_doc,
"get_default_fmt(qid)\n\
\n\
Return the default format to use for the queue.");

static PyObject *
queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
{
Expand Down Expand Up @@ -1606,6 +1644,8 @@ static PyMethodDef module_functions[] = {
METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
{"get_maxsize", _PyCFunction_CAST(queuesmod_get_maxsize),
METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
{"get_default_fmt", _PyCFunction_CAST(queuesmod_get_default_fmt),
METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc},
{"is_full", _PyCFunction_CAST(queuesmod_is_full),
METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
{"get_count", _PyCFunction_CAST(queuesmod_get_count),
Expand Down