Skip to content
Prev Previous commit
Next Next commit
Add basic tests for send-wait.
  • Loading branch information
ericsnowcurrently committed Oct 17, 2023
commit 09fccc6c8e275c5897fec829848d650383ee1c64
223 changes: 157 additions & 66 deletions Lib/test/test__xxinterpchannels.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,33 +564,62 @@ def test_channel_list_interpreters_closed_send_end(self):
with self.assertRaises(channels.ChannelClosedError):
channels.list_interpreters(cid, send=False)

####################
def test_allowed_types(self):
cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)

def test_send_closed_while_waiting(self):
obj = b'spam'
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?

def test_run_string_arg_unresolved(self):
cid = channels.create()
def f():
# sleep() isn't a great for this, but definitely simple.
time.sleep(1)
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send(cid, obj, blocking=True)
t.join()
interp = interpreters.create()

def test_send_buffer_closed_while_waiting(self):
obj = bytearray(b'spam')
out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
cid = channels.create()
def f():
# sleep() isn't a great for this, but definitely simple.
time.sleep(1)
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send_buffer(cid, obj, blocking=True)
t.join()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')

#-------------------
# send/recv

def test_send_recv_main(self):
cid = channels.create()
Expand Down Expand Up @@ -731,6 +760,9 @@ def test_recv_sending_interp_destroyed(self):
channels.recv(cid2)
del cid2

#-------------------
# send_buffer

def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
cid = channels.create()
Expand All @@ -746,60 +778,119 @@ def test_send_buffer(self):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)

def test_allowed_types(self):
#-------------------
# send with waiting

def build_send_waiter(self, obj, *, buffer=False):
# We want a long enough sleep that send() actually has to wait.

if buffer:
send = channels.send_buffer
else:
send = channels.send

cid = channels.create()
objects = [
None,
'spam',
b'spam',
42,
]
for obj in objects:
with self.subTest(obj):
channels.send(cid, obj, blocking=False)
got = channels.recv(cid)
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
channels.recv(cid)
finally:
channels.destroy(cid)
delay = stopped - started # seconds
delay *= 3

self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX Check the following?
#self.assertIsNot(got, obj)
# XXX What about between interpreters?
def wait():
time.sleep(delay)
return wait

def test_run_string_arg_unresolved(self):
def test_send_blocking_waiting(self):
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
interp = interpreters.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""),
dict(cid=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_waiting(self):
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
nonlocal received
wait()
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

# XXX For now there is no high-level channel into which the
# sent channel ID can be converted...
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
self.assertEqual(received, obj)

def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
cid = channels.create()
cid = channels._channel_id(cid, _resolve=True)
interp = interpreters.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send(cid, obj, blocking=True)
t.join()

out = _run_output(interp, dedent("""
import _xxinterpchannels as _channels
print(chan.id.end)
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
obj = channels.recv(cid)
self.assertEqual(received, obj)

self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
cid = channels.create()
def f():
nonlocal received
received = recv_wait(cid)
t = threading.Thread(target=f)
t.start()
channels.send_buffer(cid, obj, blocking=True)
t.join()

self.assertEqual(received, obj)

def test_send_closed_while_waiting(self):
obj = b'spam'
wait = self.build_send_waiter(obj)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send(cid, obj, blocking=True)
t.join()

def test_send_buffer_closed_while_waiting(self):
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
cid = channels.create()
def f():
wait()
channels.close(cid, force=True)
t = threading.Thread(target=f)
t.start()
with self.assertRaises(channels.ChannelClosedError):
channels.send_buffer(cid, obj, blocking=True)
t.join()

#-------------------
# close

def test_close_single_user(self):
Expand Down