Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
76a7d7b
Make Interpreter() idempotent.
ericsnowcurrently Nov 28, 2023
57a3c19
Interpreters are always isolated.
ericsnowcurrently Nov 28, 2023
747e542
interp.id is always int.
ericsnowcurrently Nov 28, 2023
0ed9fb0
Add InterpreterNotFoundError.
ericsnowcurrently Dec 1, 2023
98f9cf3
Stop using InterpreterID in _interpreters.
ericsnowcurrently Dec 1, 2023
0277878
Fix Interpreter.__repr__().
ericsnowcurrently Nov 28, 2023
59c8227
.run() -> .exec_sync()
ericsnowcurrently Nov 28, 2023
fa132a2
RunFailedError -> ExecFailure
ericsnowcurrently Nov 28, 2023
644efa5
Add Interpreter.run().
ericsnowcurrently Nov 28, 2023
abf2aa8
Make the interpreters module a package.
ericsnowcurrently Dec 1, 2023
8ca303f
Add interpreters.Queue.
ericsnowcurrently Nov 28, 2023
8b97373
Add memoryview XID with _xxsubinterpreters import.
ericsnowcurrently Dec 11, 2023
cc20f1f
Update CODEOWNERS.
ericsnowcurrently Dec 11, 2023
cb0a605
Ignore static builtin exception types.
ericsnowcurrently Dec 11, 2023
62a9bac
Make CODEOWNERS more specific.
ericsnowcurrently Dec 11, 2023
6c71018
Fix submodule names.
ericsnowcurrently Dec 11, 2023
13a44e3
Use interpreters.__getattr__() for submodule aliases.
ericsnowcurrently Dec 7, 2023
c9d15d2
Fix InterpreterIDTests.
ericsnowcurrently Dec 11, 2023
1c76c4a
LONG_LONG_MAX -> LLONG_MAX
ericsnowcurrently Dec 11, 2023
cfae15f
Fix Interpreter.run().
ericsnowcurrently Dec 11, 2023
3f98ce1
Use exec_sync() in test_sys.
ericsnowcurrently Dec 11, 2023
f8b1685
Fix test_capi.
ericsnowcurrently Dec 11, 2023
0b34a83
Fix test_importlib.
ericsnowcurrently Dec 12, 2023
68faf1a
Fix test_import.
ericsnowcurrently Dec 12, 2023
d161f76
Fix test_threading.
ericsnowcurrently Dec 12, 2023
778276f
Fix TestInterpreterRun.
ericsnowcurrently Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add interpreters.Queue.
  • Loading branch information
ericsnowcurrently committed Dec 4, 2023
commit 8ca303fe7fe79bdb161530ed4832625efb328113
7 changes: 5 additions & 2 deletions Lib/test/support/interpreters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
InterpreterError, InterpreterNotFoundError,
is_shareable,
)
from .channel import create as create_channel
from .queue import (
create as create_queue,
Queue, QueueEmpty, QueueFull,
)


__all__ = [
'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
'Interpreter',
'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure',
'create_channel',
'create_queue', 'QueueEmpty', 'QueueFull',
]


Expand Down
156 changes: 156 additions & 0 deletions Lib/test/support/interpreters/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""Cross-interpreter Queues High Level Module."""

import queue
import time
import weakref
import _xxinterpchannels as _channels
import _xxinterpchannels as _queues

# aliases:
from _xxinterpchannels import (
ChannelError as QueueError,
ChannelNotFoundError as QueueNotFoundError,
)

__all__ = [
'create', 'list_all',
'Queue',
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
]


def create(maxsize=0):
"""Return a new cross-interpreter queue.

The queue may be used to pass data safely between interpreters.
"""
# XXX honor maxsize
qid = _queues.create()
return Queue._with_maxsize(qid, maxsize)


def list_all():
"""Return a list of all open queues."""
return [Queue(qid)
for qid in _queues.list_all()]


class QueueEmpty(queue.Empty):
"""Raised from get_nowait() when the queue is empty.

It is also raised from get() if it times out.
"""


class QueueFull(queue.Full):
"""Raised from put_nowait() when the queue is full.

It is also raised from put() if it times out.
"""


_known_queues = weakref.WeakValueDictionary()

class Queue:
"""A cross-interpreter queue."""

@classmethod
def _with_maxsize(cls, id, maxsize):
if not isinstance(maxsize, int):
raise TypeError(f'maxsize must be an int, got {maxsize!r}')
elif maxsize < 0:
maxsize = 0
else:
maxsize = int(maxsize)
self = cls(id)
self._maxsize = maxsize
return self

def __new__(cls, id, /):
# There is only one instance for any given ID.
if isinstance(id, int):
id = _channels._channel_id(id, force=False)
elif not isinstance(id, _channels.ChannelID):
raise TypeError(f'id must be an int, got {id!r}')
key = int(id)
try:
self = _known_queues[key]
except KeyError:
self = super().__new__(cls)
self._id = id
self._maxsize = 0
_known_queues[key] = self
return self

def __repr__(self):
return f'{type(self).__name__}({self.id})'

def __hash__(self):
return hash(self._id)

@property
def id(self):
return int(self._id)

@property
def maxsize(self):
return self._maxsize

@property
def _info(self):
return _channels.get_info(self._id)

def empty(self):
return self._info.count == 0

def full(self):
if self._maxsize <= 0:
return False
return self._info.count >= self._maxsize

def qsize(self):
return self._info.count

def put(self, obj, timeout=None):
# XXX block if full
_channels.send(self._id, obj, blocking=False)

def put_nowait(self, obj):
# XXX raise QueueFull if full
return _channels.send(self._id, obj, blocking=False)

def get(self, timeout=None, *,
_sentinel=object(),
_delay=10 / 1000, # 10 milliseconds
):
"""Return the next object from the queue.

This blocks while the queue is empty.
"""
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise QueueEmpty
obj = _channels.recv(self._id, _sentinel)
return obj

def get_nowait(self, *, _sentinel=object()):
"""Return the next object from the channel.

If the queue is empty then raise QueueEmpty. Otherwise this
is the same as get().
"""
obj = _channels.recv(self._id, _sentinel)
if obj is _sentinel:
raise QueueEmpty
return obj


# XXX add this:
#_channels._register_queue_type(Queue)
Loading