Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Destroy any remaining queues.
  • Loading branch information
ericsnowcurrently committed Mar 4, 2024
commit 9496113df439c77692ac3d31a37afa7ae592ef5e
11 changes: 7 additions & 4 deletions Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import unittest
import time

from test.support import import_helper
from test.support import import_helper, Py_DEBUG
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_xxinterpqueues')
from test.support import interpreters
Expand Down Expand Up @@ -49,16 +49,19 @@ def test_create_destroy(self):
_queues.destroy(qid)

def test_not_destroyed(self):
stdout, stderr = self.assert_python_failure(
# It should have cleaned up any remaining queues.
stdout, stderr = self.assert_python_ok(
'-c',
dedent(f"""
import {_queues.__name__} as _queues
_queues.create(2, 0)
"""),
)
# It should have aborted due to an assert.
self.assertEqual(stdout, '')
self.assertNotEqual(stderr, '')
if Py_DEBUG:
self.assertNotEqual(stderr, '')
else:
self.assertEqual(stderr, '')

def test_bind_release(self):
with self.subTest('typical'):
Expand Down
42 changes: 38 additions & 4 deletions Modules/_xxinterpqueuesmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ _queueitem_popped(_queueitem *item,


/* the queue */

typedef struct _queue {
Py_ssize_t num_waiters; // protected by global lock
PyThread_type_lock mutex;
Expand Down Expand Up @@ -500,6 +501,8 @@ _queue_clear(_queue *queue)
*queue = (_queue){0};
}

static void _queue_free(_queue *);

static void
_queue_kill_and_wait(_queue *queue)
{
Expand Down Expand Up @@ -732,6 +735,32 @@ _queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev)
return ref;
}

static void
_queuerefs_clear(_queueref *head)
{
_queueref *next = head;
while (next != NULL) {
_queueref *ref = next;
next = ref->next;
int64_t qid = ref->qid;

#ifdef Py_DEBUG
fprintf(stderr, "queue %ld still exists\n", qid);
#endif
_queue *queue = ref->queue;
GLOBAL_FREE(ref);

_queue_kill_and_wait(queue);
#ifdef Py_DEBUG
if (queue->items.count > 0) {
fprintf(stderr, "queue %ld still holds %ld items\n",
qid, queue->items.count);
}
#endif
_queue_free(queue);
}
}


/* a collection of queues ***************************************************/

Expand All @@ -754,8 +783,15 @@ _queues_init(_queues *queues, PyThread_type_lock mutex)
static void
_queues_fini(_queues *queues)
{
assert(queues->count == 0);
assert(queues->head == NULL);
if (queues->count > 0) {
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
assert((queues->count == 0) != (queues->head != NULL));
_queueref *head = queues->head;
queues->head = NULL;
queues->count = 0;
PyThread_release_lock(queues->mutex);
_queuerefs_clear(head);
}
if (queues->mutex != NULL) {
PyThread_free_lock(queues->mutex);
queues->mutex = NULL;
Expand Down Expand Up @@ -887,8 +923,6 @@ _queues_incref(_queues *queues, int64_t qid)
return res;
}

static void _queue_free(_queue *);

static int
_queues_decref(_queues *queues, int64_t qid)
{
Expand Down