Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ async def _sock_sendfile_native(self, sock, file, offset, count):
f"and file {file!r} combination")

async def _sock_sendfile_fallback(self, sock, file, offset, count):
if offset:
if hasattr(file, 'seek'):
file.seek(offset)
blocksize = (
min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
Expand Down Expand Up @@ -1286,7 +1286,6 @@ async def sendfile(self, transport, file, offset=0, count=None,
raise RuntimeError(
f"fallback is disabled and native sendfile is not "
f"supported for transport {transport!r}")

return await self._sendfile_fallback(transport, file,
offset, count)

Expand All @@ -1295,7 +1294,7 @@ async def _sendfile_native(self, transp, file, offset, count):
"sendfile syscall is not supported")

async def _sendfile_fallback(self, transp, file, offset, count):
if offset:
if hasattr(file, 'seek'):
file.seek(offset)
blocksize = min(count, 16384) if count else 16384
buf = bytearray(blocksize)
Expand Down
3 changes: 3 additions & 0 deletions Lib/asyncio/windows_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,9 @@ def sendfile(self, sock, file, offset, count):
ov = _overlapped.Overlapped(NULL)
offset_low = offset & 0xffff_ffff
offset_high = (offset >> 32) & 0xffff_ffff
# TransmitFile ignores OVERLAPPED.Offset for handles not opened with
# FILE_FLAG_OVERLAPPED, so seek the CRT file pointer to match.
file.seek(offset)
ov.TransmitFile(sock.fileno(),
msvcrt.get_osfhandle(file.fileno()),
offset_low, offset_high,
Expand Down
16 changes: 14 additions & 2 deletions Lib/test/libregrtest/logger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from typing import Callable

from test.support import MS_WINDOWS
from .results import TestResults
Expand All @@ -19,16 +20,27 @@ def __init__(self, results: TestResults, quiet: bool, pgo: bool):
self._results: TestResults = results
self._quiet: bool = quiet
self._pgo: bool = pgo
self.get_mem_usage: Callable[[], int | None] | None = None

def log(self, line: str = '') -> None:
empty = not line

# add the system load prefix: "load avg: 1.80 "
# Add the memory usage: "mem: 1 GiB "
if self.get_mem_usage is not None:
mem = self.get_mem_usage()
if mem:
mib = mem / (1024*1024)
if mib >= 1024:
line = f"mem: {mib / 1024:.1f} GiB {line}"
else:
line = f"mem: {mib:.1f} MiB {line}"

# Add the system load prefix: "load avg: 1.80 "
load_avg = self.get_load_avg()
if load_avg is not None:
line = f"load avg: {load_avg:.2f} {line}"

# add the timestamp prefix: "0:01:05 "
# Add the timestamp prefix: "0:01:05 "
log_time = time.perf_counter() - self.start_time

mins, secs = divmod(int(log_time), 60)
Expand Down
22 changes: 21 additions & 1 deletion Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .single import PROGRESS_MIN_TIME
from .utils import (
StrPath, TestName,
format_duration, print_warning, count, plural)
format_duration, print_warning, count, plural, get_process_memory_usage)
from .worker import create_worker_process, USE_PROCESS_GROUP

if MS_WINDOWS:
Expand Down Expand Up @@ -452,6 +452,12 @@ def wait_stopped(self, start_time: float) -> None:
print_warning(f"Failed to join {self} in {format_duration(dt)}")
break

def get_mem_usage(self):
popen = self._popen
if popen is None:
return
return get_process_memory_usage(popen.pid)


def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
Expand All @@ -473,6 +479,7 @@ def __init__(self, num_workers: int, runtests: RunTests,
logger: Logger, results: TestResults) -> None:
self.num_workers = num_workers
self.runtests = runtests
self.logger = logger
self.log = logger.log
self.display_progress = logger.display_progress
self.results: TestResults = results
Expand Down Expand Up @@ -598,9 +605,21 @@ def _process_result(self, item: QueueOutput) -> TestResult:

return result

def get_mem_usage(self):
usage = 0
main_mem = get_process_memory_usage(os.getpid())
if main_mem:
usage += main_mem
for worker in self.workers:
worker_mem = worker.get_mem_usage()
if worker_mem:
usage += worker_mem
return usage

def run(self) -> None:
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
self.logger.get_mem_usage = self.get_mem_usage

self.start_workers()

Expand All @@ -625,3 +644,4 @@ def run(self) -> None:
# worker when we exit this function
self.pending.stop()
self.stop_workers()
self.logger.get_mem_usage = None
23 changes: 23 additions & 0 deletions Lib/test/libregrtest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,3 +752,26 @@ def display_title(title):
print(title)
print("#" * len(title))
print(flush=True)


def get_process_memory_usage(pid: int) -> int | None:
"""
Read the private memory in bytes from /proc/pid/smaps.
"""
try:
fp = open(f"/proc/{pid}/smaps", "rb")
except OSError:
return None

try:
total = 0
with fp:
for line in fp:
# Include both Private_Clean and Private_Dirty sections.
line = line.rstrip()
if line.startswith(b"Private_") and line.endswith(b'kB'):
parts = line.split()
total += int(parts[1]) * 1024
return total
except ProcessLookupError:
return None
26 changes: 26 additions & 0 deletions Lib/test/test_asyncio/test_sendfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,32 @@ def test_sock_sendfile_zero_size(self):
self.assertEqual(ret, 0)
self.assertEqual(self.file.tell(), 0)

def check_sock_sendfile_offset(self, data, offset, force_fallback=False):
sock, proto = self.prepare_socksendfile()
with tempfile.TemporaryFile() as f:
f.write(data)
f.flush()
self.assertEqual(f.tell(), len(data))

if force_fallback:
async def _sock_sendfile_fail(sock, file, offset, count):
raise asyncio.exceptions.SendfileNotAvailableError()
with support.swap_attr(self.loop, '_sock_sendfile_native', _sock_sendfile_fail):
ret = self.run_loop(self.loop.sock_sendfile(sock, f, offset, None))
else:
ret = self.run_loop(self.loop.sock_sendfile(sock, f, offset, None))
self.assertEqual(f.tell(), len(data))
sock.close()
self.run_loop(proto.wait_closed())
self.assertEqual(ret, len(data) - offset)

def test_sock_sendfile_offset(self):
data = b'abcdef'
for offset in (0, len(data) // 2, len(data)):
for force_fallback in (False, True):
with self.subTest(offset=offset, force_fallback=force_fallback):
self.check_sock_sendfile_offset(data, offset, force_fallback)

def test_sock_sendfile_mix_with_regular_send(self):
buf = b"mix_regular_send" * (4 * 1024) # 64 KiB
sock, proto = self.prepare_socksendfile()
Expand Down
6 changes: 5 additions & 1 deletion Lib/test/test_regrtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@

ROOT_DIR = os.path.join(os.path.dirname(__file__), '..', '..')
ROOT_DIR = os.path.abspath(os.path.normpath(ROOT_DIR))
LOG_PREFIX = r'[0-9]+:[0-9]+:[0-9]+ (?:load avg: [0-9]+\.[0-9]{2} )?'
LOG_PREFIX = (
r'[0-9]+:[0-9]+:[0-9]+ '
r'(?:load avg: [0-9]+\.[0-9]{2} )?'
r'(?:mem: [0-9]+\.[0-9] (?:MiB|GiB) )?'
)
RESULT_REGEX = (
'passed',
'failed',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
:mod:`asyncio`: ``sendfile()`` and ``sock_sendfile()`` event loop methods
now call ``file.seek(offset)`` if *file* has a ``seek()`` method,
even if *offset* is ``0`` (default value).
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
On Linux, regrtest now logs the total memory usage of all Python processes.
Read the private memory in ``/proc/pid/smaps``. Patch by Victor Stinner.
Loading