Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,87 @@ async def execute():

self.assertIsNone(self.loop.run_until_complete(execute()))

def test_create_subprocess_env_shell(self) -> None:
async def main() -> None:
cmd = f'''{sys.executable} -c "import os, sys; sys.stdout.write(os.getenv('FOO'))"'''
env = {"FOO": 'bar'}
proc = await asyncio.create_subprocess_shell(
cmd, env=env, stdout=subprocess.PIPE
)
stdout, _ = await proc.communicate()
self.assertEqual(stdout, b"bar")
self.assertEqual(proc.returncode, 0)
task = asyncio.create_task(proc.wait())
await asyncio.sleep(0)
self.assertEqual(task.result(), proc.returncode)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not await task instead of sleep(0) and task.result()? At this point we already know the process has exited, since the returncode is 0 on the previous line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we already know the process has exited, since the returncode is 0 on the previous line.

Yes, awaiting can take more than one cycle but we are checking that wait will be done immediately so using sleep(0) trick.


self.loop.run_until_complete(main())

def test_create_subprocess_env_exec(self) -> None:
async def main() -> None:
cmd = [sys.executable, "-c",
"import os, sys; sys.stdout.write(os.getenv('FOO'))"]
env = {"FOO": 'bar'}
proc = await asyncio.create_subprocess_exec(
*cmd, env=env, stdout=subprocess.PIPE
)
stdout, _ = await proc.communicate()
self.assertEqual(stdout, b"bar")
self.assertEqual(proc.returncode, 0)
task = asyncio.create_task(proc.wait())
await asyncio.sleep(0)
self.assertEqual(task.result(), proc.returncode)

self.loop.run_until_complete(main())
Comment thread
kumaraditya303 marked this conversation as resolved.
Outdated

def test_subprocess_concurrent_wait(self) -> None:
async def main() -> None:
proc = await asyncio.create_subprocess_exec(
*PROGRAM_CAT,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdout, _ = await proc.communicate(b'some data')
self.assertEqual(stdout, b"some data")
self.assertEqual(proc.returncode, 0)
self.assertEqual(await asyncio.gather(*[proc.wait() for _ in range(10)]),
[proc.returncode] * 10)

self.loop.run_until_complete(main())

def test_subprocess_consistent_callbacks(self):
Comment thread
kumaraditya303 marked this conversation as resolved.
events = []
class Protocol(asyncio.SubprocessProtocol):
Comment thread
kumaraditya303 marked this conversation as resolved.
Outdated
def __init__(self, exit_future: asyncio.Future) -> None:
self.exit_future = exit_future

def pipe_data_received(self, fd, data) -> None:
events.append(('pipe_data_received', fd, data))

def pipe_connection_lost(self, fd, exc) -> None:
events.append('pipe_connection_lost')

def process_exited(self) -> None:
events.append('process_exited')
self.exit_future.set_result(True)

async def main() -> None:
loop = asyncio.get_running_loop()
exit_future = asyncio.Future()
code = 'import sys; sys.stdout.write("stdout"); sys.stderr.write("stderr")'
transport, _ = await loop.subprocess_exec(lambda: Protocol(exit_future), sys.executable, '-c', code,
stdin=None)
await exit_future
transport.close()
self.assertEqual(len(events), 5, events)
self.assertEqual(events[0], ('pipe_data_received', 1, b'stdout'))
self.assertEqual(events[1], ('pipe_data_received', 2, b'stderr'))
self.assertEqual(events[2], 'pipe_connection_lost')
self.assertEqual(events[3], 'pipe_connection_lost')
self.assertEqual(events[4], 'process_exited')
Comment thread
kumaraditya303 marked this conversation as resolved.
Outdated

self.loop.run_until_complete(main())


if sys.platform != 'win32':
# Unix
Expand Down