diff --git a/Lib/_py_warnings.py b/Lib/_py_warnings.py new file mode 100644 index 00000000000..55f8c069591 --- /dev/null +++ b/Lib/_py_warnings.py @@ -0,0 +1,869 @@ +"""Python part of the warnings subsystem.""" + +import sys +import _contextvars +import _thread + + +__all__ = ["warn", "warn_explicit", "showwarning", + "formatwarning", "filterwarnings", "simplefilter", + "resetwarnings", "catch_warnings", "deprecated"] + + +# Normally '_wm' is sys.modules['warnings'] but for unit tests it can be +# a different module. User code is allowed to reassign global attributes +# of the 'warnings' module, commonly 'filters' or 'showwarning'. So we +# need to lookup these global attributes dynamically on the '_wm' object, +# rather than binding them earlier. The code in this module consistently uses +# '_wm.' rather than using the globals of this module. If the +# '_warnings' C extension is in use, some globals are replaced by functions +# and variables defined in that extension. +_wm = None + + +def _set_module(module): + global _wm + _wm = module + + +# filters contains a sequence of filter 5-tuples +# The components of the 5-tuple are: +# - an action: error, ignore, always, all, default, module, or once +# - a compiled regex that must match the warning message +# - a class representing the warning category +# - a compiled regex that must match the module that is being warned +# - a line number for the line being warning, or 0 to mean any line +# If either if the compiled regexs are None, match anything. +filters = [] + + +defaultaction = "default" +onceregistry = {} +_lock = _thread.RLock() +_filters_version = 1 + + +# If true, catch_warnings() will use a context var to hold the modified +# filters list. Otherwise, catch_warnings() will operate on the 'filters' +# global of the warnings module. +_use_context = sys.flags.context_aware_warnings + + +class _Context: + def __init__(self, filters): + self._filters = filters + self.log = None # if set to a list, logging is enabled + + def copy(self): + context = _Context(self._filters[:]) + if self.log is not None: + context.log = self.log + return context + + def _record_warning(self, msg): + self.log.append(msg) + + +class _GlobalContext(_Context): + def __init__(self): + self.log = None + + @property + def _filters(self): + # Since there is quite a lot of code that assigns to + # warnings.filters, this needs to return the current value of + # the module global. + try: + return _wm.filters + except AttributeError: + # 'filters' global was deleted. Do we need to actually handle this case? + return [] + + +_global_context = _GlobalContext() + + +_warnings_context = _contextvars.ContextVar('warnings_context') + + +def _get_context(): + if not _use_context: + return _global_context + try: + return _wm._warnings_context.get() + except LookupError: + return _global_context + + +def _set_context(context): + assert _use_context + _wm._warnings_context.set(context) + + +def _new_context(): + assert _use_context + old_context = _wm._get_context() + new_context = old_context.copy() + _wm._set_context(new_context) + return old_context, new_context + + +def _get_filters(): + """Return the current list of filters. This is a non-public API used by + module functions and by the unit tests.""" + return _wm._get_context()._filters + + +def _filters_mutated_lock_held(): + _wm._filters_version += 1 + + +def showwarning(message, category, filename, lineno, file=None, line=None): + """Hook to write a warning to a file; replace if you like.""" + msg = _wm.WarningMessage(message, category, filename, lineno, file, line) + _wm._showwarnmsg_impl(msg) + + +def formatwarning(message, category, filename, lineno, line=None): + """Function to format a warning the standard way.""" + msg = _wm.WarningMessage(message, category, filename, lineno, None, line) + return _wm._formatwarnmsg_impl(msg) + + +def _showwarnmsg_impl(msg): + context = _wm._get_context() + if context.log is not None: + context._record_warning(msg) + return + file = msg.file + if file is None: + file = sys.stderr + if file is None: + # sys.stderr is None when run with pythonw.exe: + # warnings get lost + return + text = _wm._formatwarnmsg(msg) + try: + file.write(text) + except OSError: + # the file (probably stderr) is invalid - this warning gets lost. + pass + + +def _formatwarnmsg_impl(msg): + category = msg.category.__name__ + s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n" + + if msg.line is None: + try: + import linecache + line = linecache.getline(msg.filename, msg.lineno) + except Exception: + # When a warning is logged during Python shutdown, linecache + # and the import machinery don't work anymore + line = None + linecache = None + else: + line = msg.line + if line: + line = line.strip() + s += " %s\n" % line + + if msg.source is not None: + try: + import tracemalloc + # Logging a warning should not raise a new exception: + # catch Exception, not only ImportError and RecursionError. + except Exception: + # don't suggest to enable tracemalloc if it's not available + suggest_tracemalloc = False + tb = None + else: + try: + suggest_tracemalloc = not tracemalloc.is_tracing() + tb = tracemalloc.get_object_traceback(msg.source) + except Exception: + # When a warning is logged during Python shutdown, tracemalloc + # and the import machinery don't work anymore + suggest_tracemalloc = False + tb = None + + if tb is not None: + s += 'Object allocated at (most recent call last):\n' + for frame in tb: + s += (' File "%s", lineno %s\n' + % (frame.filename, frame.lineno)) + + try: + if linecache is not None: + line = linecache.getline(frame.filename, frame.lineno) + else: + line = None + except Exception: + line = None + if line: + line = line.strip() + s += ' %s\n' % line + elif suggest_tracemalloc: + s += (f'{category}: Enable tracemalloc to get the object ' + f'allocation traceback\n') + return s + + +# Keep a reference to check if the function was replaced +_showwarning_orig = showwarning + + +def _showwarnmsg(msg): + """Hook to write a warning to a file; replace if you like.""" + try: + sw = _wm.showwarning + except AttributeError: + pass + else: + if sw is not _showwarning_orig: + # warnings.showwarning() was replaced + if not callable(sw): + raise TypeError("warnings.showwarning() must be set to a " + "function or method") + + sw(msg.message, msg.category, msg.filename, msg.lineno, + msg.file, msg.line) + return + _wm._showwarnmsg_impl(msg) + + +# Keep a reference to check if the function was replaced +_formatwarning_orig = formatwarning + + +def _formatwarnmsg(msg): + """Function to format a warning the standard way.""" + try: + fw = _wm.formatwarning + except AttributeError: + pass + else: + if fw is not _formatwarning_orig: + # warnings.formatwarning() was replaced + return fw(msg.message, msg.category, + msg.filename, msg.lineno, msg.line) + return _wm._formatwarnmsg_impl(msg) + + +def filterwarnings(action, message="", category=Warning, module="", lineno=0, + append=False): + """Insert an entry into the list of warnings filters (at the front). + + 'action' -- one of "error", "ignore", "always", "all", "default", "module", + or "once" + 'message' -- a regex that the warning message must match + 'category' -- a class that the warning must be a subclass of + 'module' -- a regex that the module name must match + 'lineno' -- an integer line number, 0 matches all warnings + 'append' -- if true, append to the list of filters + """ + if action not in {"error", "ignore", "always", "all", "default", "module", "once"}: + raise ValueError(f"invalid action: {action!r}") + if not isinstance(message, str): + raise TypeError("message must be a string") + if not isinstance(category, type) or not issubclass(category, Warning): + raise TypeError("category must be a Warning subclass") + if not isinstance(module, str): + raise TypeError("module must be a string") + if not isinstance(lineno, int): + raise TypeError("lineno must be an int") + if lineno < 0: + raise ValueError("lineno must be an int >= 0") + + if message or module: + import re + + if message: + message = re.compile(message, re.I) + else: + message = None + if module: + module = re.compile(module) + else: + module = None + + _wm._add_filter(action, message, category, module, lineno, append=append) + + +def simplefilter(action, category=Warning, lineno=0, append=False): + """Insert a simple entry into the list of warnings filters (at the front). + + A simple filter matches all modules and messages. + 'action' -- one of "error", "ignore", "always", "all", "default", "module", + or "once" + 'category' -- a class that the warning must be a subclass of + 'lineno' -- an integer line number, 0 matches all warnings + 'append' -- if true, append to the list of filters + """ + if action not in {"error", "ignore", "always", "all", "default", "module", "once"}: + raise ValueError(f"invalid action: {action!r}") + if not isinstance(lineno, int): + raise TypeError("lineno must be an int") + if lineno < 0: + raise ValueError("lineno must be an int >= 0") + _wm._add_filter(action, None, category, None, lineno, append=append) + + +def _filters_mutated(): + # Even though this function is not part of the public API, it's used by + # a fair amount of user code. + with _wm._lock: + _wm._filters_mutated_lock_held() + + +def _add_filter(*item, append): + with _wm._lock: + filters = _wm._get_filters() + if not append: + # Remove possible duplicate filters, so new one will be placed + # in correct place. If append=True and duplicate exists, do nothing. + try: + filters.remove(item) + except ValueError: + pass + filters.insert(0, item) + else: + if item not in filters: + filters.append(item) + _wm._filters_mutated_lock_held() + + +def resetwarnings(): + """Clear the list of warning filters, so that no filters are active.""" + with _wm._lock: + del _wm._get_filters()[:] + _wm._filters_mutated_lock_held() + + +class _OptionError(Exception): + """Exception used by option processing helpers.""" + pass + + +# Helper to process -W options passed via sys.warnoptions +def _processoptions(args): + for arg in args: + try: + _wm._setoption(arg) + except _wm._OptionError as msg: + print("Invalid -W option ignored:", msg, file=sys.stderr) + + +# Helper for _processoptions() +def _setoption(arg): + parts = arg.split(':') + if len(parts) > 5: + raise _wm._OptionError("too many fields (max 5): %r" % (arg,)) + while len(parts) < 5: + parts.append('') + action, message, category, module, lineno = [s.strip() + for s in parts] + action = _wm._getaction(action) + category = _wm._getcategory(category) + if message or module: + import re + if message: + message = re.escape(message) + if module: + module = re.escape(module) + r'\z' + if lineno: + try: + lineno = int(lineno) + if lineno < 0: + raise ValueError + except (ValueError, OverflowError): + raise _wm._OptionError("invalid lineno %r" % (lineno,)) from None + else: + lineno = 0 + _wm.filterwarnings(action, message, category, module, lineno) + + +# Helper for _setoption() +def _getaction(action): + if not action: + return "default" + for a in ('default', 'always', 'all', 'ignore', 'module', 'once', 'error'): + if a.startswith(action): + return a + raise _wm._OptionError("invalid action: %r" % (action,)) + + +# Helper for _setoption() +def _getcategory(category): + if not category: + return Warning + if '.' not in category: + import builtins as m + klass = category + else: + module, _, klass = category.rpartition('.') + try: + m = __import__(module, None, None, [klass]) + except ImportError: + raise _wm._OptionError("invalid module name: %r" % (module,)) from None + try: + cat = getattr(m, klass) + except AttributeError: + raise _wm._OptionError("unknown warning category: %r" % (category,)) from None + if not issubclass(cat, Warning): + raise _wm._OptionError("invalid warning category: %r" % (category,)) + return cat + + +def _is_internal_filename(filename): + return 'importlib' in filename and '_bootstrap' in filename + + +def _is_filename_to_skip(filename, skip_file_prefixes): + return any(filename.startswith(prefix) for prefix in skip_file_prefixes) + + +def _is_internal_frame(frame): + """Signal whether the frame is an internal CPython implementation detail.""" + return _is_internal_filename(frame.f_code.co_filename) + + +def _next_external_frame(frame, skip_file_prefixes): + """Find the next frame that doesn't involve Python or user internals.""" + frame = frame.f_back + while frame is not None and ( + _is_internal_filename(filename := frame.f_code.co_filename) or + _is_filename_to_skip(filename, skip_file_prefixes)): + frame = frame.f_back + return frame + + +# Code typically replaced by _warnings +def warn(message, category=None, stacklevel=1, source=None, + *, skip_file_prefixes=()): + """Issue a warning, or maybe ignore it or raise an exception.""" + # Check if message is already a Warning object + if isinstance(message, Warning): + category = message.__class__ + # Check category argument + if category is None: + category = UserWarning + if not (isinstance(category, type) and issubclass(category, Warning)): + raise TypeError("category must be a Warning subclass, " + "not '{:s}'".format(type(category).__name__)) + if not isinstance(skip_file_prefixes, tuple): + # The C version demands a tuple for implementation performance. + raise TypeError('skip_file_prefixes must be a tuple of strs.') + if skip_file_prefixes: + stacklevel = max(2, stacklevel) + # Get context information + try: + if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)): + # If frame is too small to care or if the warning originated in + # internal code, then do not try to hide any frames. + frame = sys._getframe(stacklevel) + else: + frame = sys._getframe(1) + # Look for one frame less since the above line starts us off. + for x in range(stacklevel-1): + frame = _next_external_frame(frame, skip_file_prefixes) + if frame is None: + raise ValueError + except ValueError: + globals = sys.__dict__ + filename = "" + lineno = 0 + else: + globals = frame.f_globals + filename = frame.f_code.co_filename + lineno = frame.f_lineno + if '__name__' in globals: + module = globals['__name__'] + else: + module = "" + registry = globals.setdefault("__warningregistry__", {}) + _wm.warn_explicit( + message, + category, + filename, + lineno, + module, + registry, + globals, + source=source, + ) + + +def warn_explicit(message, category, filename, lineno, + module=None, registry=None, module_globals=None, + source=None): + lineno = int(lineno) + if module is None: + module = filename or "" + if module[-3:].lower() == ".py": + module = module[:-3] # XXX What about leading pathname? + if isinstance(message, Warning): + text = str(message) + category = message.__class__ + else: + text = message + message = category(message) + key = (text, category, lineno) + with _wm._lock: + if registry is None: + registry = {} + if registry.get('version', 0) != _wm._filters_version: + registry.clear() + registry['version'] = _wm._filters_version + # Quick test for common case + if registry.get(key): + return + # Search the filters + for item in _wm._get_filters(): + action, msg, cat, mod, ln = item + if ((msg is None or msg.match(text)) and + issubclass(category, cat) and + (mod is None or mod.match(module)) and + (ln == 0 or lineno == ln)): + break + else: + action = _wm.defaultaction + # Early exit actions + if action == "ignore": + return + + if action == "error": + raise message + # Other actions + if action == "once": + registry[key] = 1 + oncekey = (text, category) + if _wm.onceregistry.get(oncekey): + return + _wm.onceregistry[oncekey] = 1 + elif action in {"always", "all"}: + pass + elif action == "module": + registry[key] = 1 + altkey = (text, category, 0) + if registry.get(altkey): + return + registry[altkey] = 1 + elif action == "default": + registry[key] = 1 + else: + # Unrecognized actions are errors + raise RuntimeError( + "Unrecognized action (%r) in warnings.filters:\n %s" % + (action, item)) + + # Prime the linecache for formatting, in case the + # "file" is actually in a zipfile or something. + import linecache + linecache.getlines(filename, module_globals) + + # Print message and context + msg = _wm.WarningMessage(message, category, filename, lineno, source=source) + _wm._showwarnmsg(msg) + + +class WarningMessage(object): + + _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", + "line", "source") + + def __init__(self, message, category, filename, lineno, file=None, + line=None, source=None): + self.message = message + self.category = category + self.filename = filename + self.lineno = lineno + self.file = file + self.line = line + self.source = source + self._category_name = category.__name__ if category else None + + def __str__(self): + return ("{message : %r, category : %r, filename : %r, lineno : %s, " + "line : %r}" % (self.message, self._category_name, + self.filename, self.lineno, self.line)) + + def __repr__(self): + return f'<{type(self).__qualname__} {self}>' + + +class catch_warnings(object): + + """A context manager that copies and restores the warnings filter upon + exiting the context. + + The 'record' argument specifies whether warnings should be captured by a + custom implementation of warnings.showwarning() and be appended to a list + returned by the context manager. Otherwise None is returned by the context + manager. The objects appended to the list are arguments whose attributes + mirror the arguments to showwarning(). + + The 'module' argument is to specify an alternative module to the module + named 'warnings' and imported under that name. This argument is only useful + when testing the warnings module itself. + + If the 'action' argument is not None, the remaining arguments are passed + to warnings.simplefilter() as if it were called immediately on entering the + context. + """ + + def __init__(self, *, record=False, module=None, + action=None, category=Warning, lineno=0, append=False): + """Specify whether to record warnings and if an alternative module + should be used other than sys.modules['warnings']. + + """ + self._record = record + self._module = sys.modules['warnings'] if module is None else module + self._entered = False + if action is None: + self._filter = None + else: + self._filter = (action, category, lineno, append) + + def __repr__(self): + args = [] + if self._record: + args.append("record=True") + if self._module is not sys.modules['warnings']: + args.append("module=%r" % self._module) + name = type(self).__name__ + return "%s(%s)" % (name, ", ".join(args)) + + def __enter__(self): + if self._entered: + raise RuntimeError("Cannot enter %r twice" % self) + self._entered = True + with _wm._lock: + if _use_context: + self._saved_context, context = self._module._new_context() + else: + context = None + self._filters = self._module.filters + self._module.filters = self._filters[:] + self._showwarning = self._module.showwarning + self._showwarnmsg_impl = self._module._showwarnmsg_impl + self._module._filters_mutated_lock_held() + if self._record: + if _use_context: + context.log = log = [] + else: + log = [] + self._module._showwarnmsg_impl = log.append + # Reset showwarning() to the default implementation to make sure + # that _showwarnmsg() calls _showwarnmsg_impl() + self._module.showwarning = self._module._showwarning_orig + else: + log = None + if self._filter is not None: + self._module.simplefilter(*self._filter) + return log + + def __exit__(self, *exc_info): + if not self._entered: + raise RuntimeError("Cannot exit %r without entering first" % self) + with _wm._lock: + if _use_context: + self._module._warnings_context.set(self._saved_context) + else: + self._module.filters = self._filters + self._module.showwarning = self._showwarning + self._module._showwarnmsg_impl = self._showwarnmsg_impl + self._module._filters_mutated_lock_held() + + +class deprecated: + """Indicate that a class, function or overload is deprecated. + + When this decorator is applied to an object, the type checker + will generate a diagnostic on usage of the deprecated object. + + Usage: + + @deprecated("Use B instead") + class A: + pass + + @deprecated("Use g instead") + def f(): + pass + + @overload + @deprecated("int support is deprecated") + def g(x: int) -> int: ... + @overload + def g(x: str) -> int: ... + + The warning specified by *category* will be emitted at runtime + on use of deprecated objects. For functions, that happens on calls; + for classes, on instantiation and on creation of subclasses. + If the *category* is ``None``, no warning is emitted at runtime. + The *stacklevel* determines where the + warning is emitted. If it is ``1`` (the default), the warning + is emitted at the direct caller of the deprecated object; if it + is higher, it is emitted further up the stack. + Static type checker behavior is not affected by the *category* + and *stacklevel* arguments. + + The deprecation message passed to the decorator is saved in the + ``__deprecated__`` attribute on the decorated object. + If applied to an overload, the decorator + must be after the ``@overload`` decorator for the attribute to + exist on the overload as returned by ``get_overloads()``. + + See PEP 702 for details. + + """ + def __init__( + self, + message: str, + /, + *, + category: type[Warning] | None = DeprecationWarning, + stacklevel: int = 1, + ) -> None: + if not isinstance(message, str): + raise TypeError( + f"Expected an object of type str for 'message', not {type(message).__name__!r}" + ) + self.message = message + self.category = category + self.stacklevel = stacklevel + + def __call__(self, arg, /): + # Make sure the inner functions created below don't + # retain a reference to self. + msg = self.message + category = self.category + stacklevel = self.stacklevel + if category is None: + arg.__deprecated__ = msg + return arg + elif isinstance(arg, type): + import functools + from types import MethodType + + original_new = arg.__new__ + + @functools.wraps(original_new) + def __new__(cls, /, *args, **kwargs): + if cls is arg: + _wm.warn(msg, category=category, stacklevel=stacklevel + 1) + if original_new is not object.__new__: + return original_new(cls, *args, **kwargs) + # Mirrors a similar check in object.__new__. + elif cls.__init__ is object.__init__ and (args or kwargs): + raise TypeError(f"{cls.__name__}() takes no arguments") + else: + return original_new(cls) + + arg.__new__ = staticmethod(__new__) + + if "__init_subclass__" in arg.__dict__: + # __init_subclass__ is directly present on the decorated class. + # Synthesize a wrapper that calls this method directly. + original_init_subclass = arg.__init_subclass__ + # We need slightly different behavior if __init_subclass__ + # is a bound method (likely if it was implemented in Python). + # Otherwise, it likely means it's a builtin such as + # object's implementation of __init_subclass__. + if isinstance(original_init_subclass, MethodType): + original_init_subclass = original_init_subclass.__func__ + + @functools.wraps(original_init_subclass) + def __init_subclass__(*args, **kwargs): + _wm.warn(msg, category=category, stacklevel=stacklevel + 1) + return original_init_subclass(*args, **kwargs) + else: + def __init_subclass__(cls, *args, **kwargs): + _wm.warn(msg, category=category, stacklevel=stacklevel + 1) + return super(arg, cls).__init_subclass__(*args, **kwargs) + + arg.__init_subclass__ = classmethod(__init_subclass__) + + arg.__deprecated__ = __new__.__deprecated__ = msg + __init_subclass__.__deprecated__ = msg + return arg + elif callable(arg): + import functools + import inspect + + @functools.wraps(arg) + def wrapper(*args, **kwargs): + _wm.warn(msg, category=category, stacklevel=stacklevel + 1) + return arg(*args, **kwargs) + + if inspect.iscoroutinefunction(arg): + wrapper = inspect.markcoroutinefunction(wrapper) + + arg.__deprecated__ = wrapper.__deprecated__ = msg + return wrapper + else: + raise TypeError( + "@deprecated decorator with non-None category must be applied to " + f"a class or callable, not {arg!r}" + ) + + +_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}" + + +def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info): + """Warn that *name* is deprecated or should be removed. + + RuntimeError is raised if *remove* specifies a major/minor tuple older than + the current Python version or the same version but past the alpha. + + The *message* argument is formatted with *name* and *remove* as a Python + version tuple (e.g. (3, 11)). + + """ + remove_formatted = f"{remove[0]}.{remove[1]}" + if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"): + msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha" + raise RuntimeError(msg) + else: + msg = message.format(name=name, remove=remove_formatted) + _wm.warn(msg, DeprecationWarning, stacklevel=3) + + +# Private utility function called by _PyErr_WarnUnawaitedCoroutine +def _warn_unawaited_coroutine(coro): + msg_lines = [ + f"coroutine '{coro.__qualname__}' was never awaited\n" + ] + if coro.cr_origin is not None: + import linecache, traceback + def extract(): + for filename, lineno, funcname in reversed(coro.cr_origin): + line = linecache.getline(filename, lineno) + yield (filename, lineno, funcname, line) + msg_lines.append("Coroutine created at (most recent call last)\n") + msg_lines += traceback.format_list(list(extract())) + msg = "".join(msg_lines).rstrip("\n") + # Passing source= here means that if the user happens to have tracemalloc + # enabled and tracking where the coroutine was created, the warning will + # contain that traceback. This does mean that if they have *both* + # coroutine origin tracking *and* tracemalloc enabled, they'll get two + # partially-redundant tracebacks. If we wanted to be clever we could + # probably detect this case and avoid it, but for now we don't bother. + _wm.warn( + msg, category=RuntimeWarning, stacklevel=2, source=coro + ) + + +def _setup_defaults(): + # Several warning categories are ignored by default in regular builds + if hasattr(sys, 'gettotalrefcount'): + return + _wm.filterwarnings("default", category=DeprecationWarning, module="__main__", append=1) + _wm.simplefilter("ignore", category=DeprecationWarning, append=1) + _wm.simplefilter("ignore", category=PendingDeprecationWarning, append=1) + _wm.simplefilter("ignore", category=ImportWarning, append=1) + _wm.simplefilter("ignore", category=ResourceWarning, append=1) diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py index b65e1291bbc..c3a23ffa666 100644 --- a/Lib/test/test_sys.py +++ b/Lib/test/test_sys.py @@ -882,6 +882,7 @@ def test_subinterp_intern_singleton(self): ''')) self.assertTrue(sys._is_interned(s)) + @unittest.expectedFailure # TODO: RUSTPYTHON needs update for context_aware_warnings def test_sys_flags(self): self.assertTrue(sys.flags) attrs = ("debug", diff --git a/Lib/test/test_warnings/__init__.py b/Lib/test/test_warnings/__init__.py index ae9a365e1c9..fb324294f47 100644 --- a/Lib/test/test_warnings/__init__.py +++ b/Lib/test/test_warnings/__init__.py @@ -24,10 +24,13 @@ from warnings import deprecated -py_warnings = import_helper.import_fresh_module('warnings', - blocked=['_warnings']) -c_warnings = import_helper.import_fresh_module('warnings', - fresh=['_warnings']) +py_warnings = import_helper.import_fresh_module('_py_warnings') +py_warnings._set_module(py_warnings) + +c_warnings = import_helper.import_fresh_module( + "warnings", fresh=["_warnings", "_py_warnings"] +) +c_warnings._set_module(c_warnings) @contextmanager def warnings_state(module): @@ -43,15 +46,21 @@ def warnings_state(module): except NameError: pass original_warnings = warning_tests.warnings - original_filters = module.filters - try: + if module._use_context: + saved_context, context = module._new_context() + else: + original_filters = module.filters module.filters = original_filters[:] + try: module.simplefilter("once") warning_tests.warnings = module yield finally: warning_tests.warnings = original_warnings - module.filters = original_filters + if module._use_context: + module._set_context(saved_context) + else: + module.filters = original_filters class TestWarning(Warning): @@ -92,8 +101,9 @@ class PublicAPITests(BaseTest): public API. """ + @unittest.expectedFailure # TODO: RUSTPYTHON 'PyPublicAPITests' object has no attribute 'assertHasAttr' def test_module_all_attribute(self): - self.assertTrue(hasattr(self.module, '__all__')) + self.assertHasAttr(self.module, '__all__') target_api = ["warn", "warn_explicit", "showwarning", "formatwarning", "filterwarnings", "simplefilter", "resetwarnings", "catch_warnings", "deprecated"] @@ -111,14 +121,14 @@ class FilterTests(BaseTest): """Testing the filtering functionality.""" def test_error(self): - with original_warnings.catch_warnings(module=self.module) as w: + with self.module.catch_warnings() as w: self.module.resetwarnings() self.module.filterwarnings("error", category=UserWarning) self.assertRaises(UserWarning, self.module.warn, "FilterTests.test_error") def test_error_after_default(self): - with original_warnings.catch_warnings(module=self.module) as w: + with self.module.catch_warnings() as w: self.module.resetwarnings() message = "FilterTests.test_ignore_after_default" def f(): @@ -136,8 +146,7 @@ def f(): self.assertRaises(UserWarning, f) def test_ignore(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("ignore", category=UserWarning) self.module.warn("FilterTests.test_ignore", UserWarning) @@ -145,8 +154,7 @@ def test_ignore(self): self.assertEqual(list(__warningregistry__), ['version']) def test_ignore_after_default(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() message = "FilterTests.test_ignore_after_default" def f(): @@ -157,44 +165,43 @@ def f(): f() self.assertEqual(len(w), 1) - def test_always(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: - self.module.resetwarnings() - self.module.filterwarnings("always", category=UserWarning) - message = "FilterTests.test_always" - def f(): - self.module.warn(message, UserWarning) - f() - self.assertEqual(len(w), 1) - self.assertEqual(w[-1].message.args[0], message) - f() - self.assertEqual(len(w), 2) - self.assertEqual(w[-1].message.args[0], message) + def test_always_and_all(self): + for mode in {"always", "all"}: + with self.module.catch_warnings(record=True) as w: + self.module.resetwarnings() + self.module.filterwarnings(mode, category=UserWarning) + message = "FilterTests.test_always_and_all" + def f(): + self.module.warn(message, UserWarning) + f() + self.assertEqual(len(w), 1) + self.assertEqual(w[-1].message.args[0], message) + f() + self.assertEqual(len(w), 2) + self.assertEqual(w[-1].message.args[0], message) - def test_always_after_default(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: - self.module.resetwarnings() - message = "FilterTests.test_always_after_ignore" - def f(): - self.module.warn(message, UserWarning) - f() - self.assertEqual(len(w), 1) - self.assertEqual(w[-1].message.args[0], message) - f() - self.assertEqual(len(w), 1) - self.module.filterwarnings("always", category=UserWarning) - f() - self.assertEqual(len(w), 2) - self.assertEqual(w[-1].message.args[0], message) - f() - self.assertEqual(len(w), 3) - self.assertEqual(w[-1].message.args[0], message) + def test_always_and_all_after_default(self): + for mode in {"always", "all"}: + with self.module.catch_warnings(record=True) as w: + self.module.resetwarnings() + message = "FilterTests.test_always_and_all_after_ignore" + def f(): + self.module.warn(message, UserWarning) + f() + self.assertEqual(len(w), 1) + self.assertEqual(w[-1].message.args[0], message) + f() + self.assertEqual(len(w), 1) + self.module.filterwarnings(mode, category=UserWarning) + f() + self.assertEqual(len(w), 2) + self.assertEqual(w[-1].message.args[0], message) + f() + self.assertEqual(len(w), 3) + self.assertEqual(w[-1].message.args[0], message) def test_default(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("default", category=UserWarning) message = UserWarning("FilterTests.test_default") @@ -209,8 +216,7 @@ def test_default(self): raise ValueError("loop variant unhandled") def test_module(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("module", category=UserWarning) message = UserWarning("FilterTests.test_module") @@ -221,8 +227,7 @@ def test_module(self): self.assertEqual(len(w), 0) def test_once(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("once", category=UserWarning) message = UserWarning("FilterTests.test_once") @@ -237,9 +242,88 @@ def test_once(self): 42) self.assertEqual(len(w), 0) + @unittest.expectedFailure # TODO: RUSTPYTHON re.PatternError: bad escape \z at position 15 + def test_filter_module(self): + MS_WINDOWS = (sys.platform == 'win32') + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module=r'package\.module\z') + self.module.warn_explicit('msg', UserWarning, 'filename', 42, + module='package.module') + self.assertEqual(len(w), 1) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, '/path/to/package/module', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, '/path/to/package/module.py', 42) + + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module='package') + self.module.warn_explicit('msg', UserWarning, 'filename', 42, + module='package.module') + self.assertEqual(len(w), 1) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, 'filename', 42, + module='other.package.module') + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, '/path/to/otherpackage/module.py', 42) + + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module=r'/path/to/package/module\z') + self.module.warn_explicit('msg', UserWarning, '/path/to/package/module', 42) + self.assertEqual(len(w), 1) + self.module.warn_explicit('msg', UserWarning, '/path/to/package/module.py', 42) + self.assertEqual(len(w), 2) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, '/PATH/TO/PACKAGE/MODULE', 42) + if MS_WINDOWS: + if self.module is py_warnings: + self.module.warn_explicit('msg', UserWarning, r'/path/to/package/module.PY', 42) + self.assertEqual(len(w), 3) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'/path/to/package/module/__init__.py', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'/path/to/package/module.pyw', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'\path\to\package\module', 42) + + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module=r'/path/to/package/__init__\z') + self.module.warn_explicit('msg', UserWarning, '/path/to/package/__init__.py', 42) + self.assertEqual(len(w), 1) + self.module.warn_explicit('msg', UserWarning, '/path/to/package/__init__', 42) + self.assertEqual(len(w), 2) + + if MS_WINDOWS: + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module=r'C:\\path\\to\\package\\module\z') + self.module.warn_explicit('msg', UserWarning, r'C:\path\to\package\module', 42) + self.assertEqual(len(w), 1) + self.module.warn_explicit('msg', UserWarning, r'C:\path\to\package\module.py', 42) + self.assertEqual(len(w), 2) + if self.module is py_warnings: + self.module.warn_explicit('msg', UserWarning, r'C:\path\to\package\module.PY', 42) + self.assertEqual(len(w), 3) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'C:\path\to\package\module.pyw', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'C:\PATH\TO\PACKAGE\MODULE', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'C:/path/to/package/module', 42) + with self.assertRaises(UserWarning): + self.module.warn_explicit('msg', UserWarning, r'C:\path\to\package\module\__init__.py', 42) + + with self.module.catch_warnings(record=True) as w: + self.module.simplefilter('error') + self.module.filterwarnings('always', module=r'\z') + self.module.warn_explicit('msg', UserWarning, '', 42) + self.assertEqual(len(w), 1) + def test_module_globals(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter("always", UserWarning) # bpo-33509: module_globals=None must not crash @@ -259,15 +343,14 @@ def test_module_globals(self): self.assertEqual(len(w), 2) def test_inheritance(self): - with original_warnings.catch_warnings(module=self.module) as w: + with self.module.catch_warnings() as w: self.module.resetwarnings() self.module.filterwarnings("error", category=Warning) self.assertRaises(UserWarning, self.module.warn, "FilterTests.test_inheritance", UserWarning) def test_ordering(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("ignore", category=UserWarning) self.module.filterwarnings("error", category=UserWarning, @@ -282,8 +365,7 @@ def test_ordering(self): def test_filterwarnings(self): # Test filterwarnings(). # Implicitly also tests resetwarnings(). - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.filterwarnings("error", "", Warning, "", 0) self.assertRaises(UserWarning, self.module.warn, 'convert to error') @@ -307,8 +389,7 @@ def test_filterwarnings(self): self.assertIs(w[-1].category, UserWarning) def test_message_matching(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter("ignore", UserWarning) self.module.filterwarnings("error", "match", UserWarning) self.assertRaises(UserWarning, self.module.warn, "match") @@ -324,54 +405,52 @@ def match(self, a): L[:] = [] L = [("default",X(),UserWarning,X(),0) for i in range(2)] - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.filters = L self.module.warn_explicit(UserWarning("b"), None, "f.py", 42) self.assertEqual(str(w[-1].message), "b") def test_filterwarnings_duplicate_filters(self): - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.resetwarnings() self.module.filterwarnings("error", category=UserWarning) - self.assertEqual(len(self.module.filters), 1) + self.assertEqual(len(self.module._get_filters()), 1) self.module.filterwarnings("ignore", category=UserWarning) self.module.filterwarnings("error", category=UserWarning) self.assertEqual( - len(self.module.filters), 2, + len(self.module._get_filters()), 2, "filterwarnings inserted duplicate filter" ) self.assertEqual( - self.module.filters[0][0], "error", + self.module._get_filters()[0][0], "error", "filterwarnings did not promote filter to " "the beginning of list" ) def test_simplefilter_duplicate_filters(self): - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.resetwarnings() self.module.simplefilter("error", category=UserWarning) - self.assertEqual(len(self.module.filters), 1) + self.assertEqual(len(self.module._get_filters()), 1) self.module.simplefilter("ignore", category=UserWarning) self.module.simplefilter("error", category=UserWarning) self.assertEqual( - len(self.module.filters), 2, + len(self.module._get_filters()), 2, "simplefilter inserted duplicate filter" ) self.assertEqual( - self.module.filters[0][0], "error", + self.module._get_filters()[0][0], "error", "simplefilter did not promote filter to the beginning of list" ) def test_append_duplicate(self): - with original_warnings.catch_warnings(module=self.module, - record=True) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.simplefilter("ignore") self.module.simplefilter("error", append=True) self.module.simplefilter("ignore", append=True) self.module.warn("test_append_duplicate", category=UserWarning) - self.assertEqual(len(self.module.filters), 2, + self.assertEqual(len(self.module._get_filters()), 2, "simplefilter inserted duplicate filter" ) self.assertEqual(len(w), 0, @@ -401,19 +480,17 @@ def test_argument_validation(self): self.module.simplefilter('ignore', lineno=-1) def test_catchwarnings_with_simplefilter_ignore(self): - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(module=self.module): self.module.resetwarnings() self.module.simplefilter("error") - with self.module.catch_warnings( - module=self.module, action="ignore" - ): + with self.module.catch_warnings(action="ignore"): self.module.warn("This will be ignored") def test_catchwarnings_with_simplefilter_error(self): - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.resetwarnings() with self.module.catch_warnings( - module=self.module, action="error", category=FutureWarning + action="error", category=FutureWarning ): with support.captured_stderr() as stderr: error_msg = "Other types of warnings are not errors" @@ -435,8 +512,7 @@ class WarnTests(BaseTest): """Test warnings.warn() and warnings.warn_explicit().""" def test_message(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter("once") for i in range(4): text = 'multi %d' %i # Different text on each call. @@ -448,8 +524,7 @@ def test_message(self): def test_warn_nonstandard_types(self): # warn() should handle non-standard types without issue. for ob in (Warning, None, 42): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter("once") self.module.warn(ob) # Don't directly compare objects since @@ -458,8 +533,7 @@ def test_warn_nonstandard_types(self): def test_filename(self): with warnings_state(self.module): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: warning_tests.inner("spam1") self.assertEqual(os.path.basename(w[-1].filename), "stacklevel.py") @@ -471,8 +545,7 @@ def test_stacklevel(self): # Test stacklevel argument # make sure all messages are different, so the warning won't be skipped with warnings_state(self.module): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: warning_tests.inner("spam3", stacklevel=1) self.assertEqual(os.path.basename(w[-1].filename), "stacklevel.py") @@ -494,23 +567,20 @@ def test_stacklevel(self): self.assertEqual(os.path.basename(w[-1].filename), "") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_stacklevel_import(self): # Issue #24305: With stacklevel=2, module-level warnings should work. import_helper.unload('test.test_warnings.data.import_warning') with warnings_state(self.module): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter('always') - import test.test_warnings.data.import_warning + import test.test_warnings.data.import_warning # noqa: F401 self.assertEqual(len(w), 1) self.assertEqual(w[0].filename, __file__) def test_skip_file_prefixes(self): with warnings_state(self.module): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter('always') # Warning never attributed to the data/ package. @@ -533,6 +603,16 @@ def test_skip_file_prefixes(self): warning_tests.package("prefix02", stacklevel=3) self.assertIn("unittest", w[-1].filename) + def test_skip_file_prefixes_file_path(self): + # see: gh-126209 + with warnings_state(self.module): + skipped = warning_tests.__file__ + with self.module.catch_warnings(record=True) as w: + warning_tests.outer("msg", skip_file_prefixes=(skipped,)) + + self.assertEqual(len(w), 1) + self.assertNotEqual(w[-1].filename, skipped) + def test_skip_file_prefixes_type_errors(self): with warnings_state(self.module): warn = warning_tests.warnings.warn @@ -548,23 +628,16 @@ def test_exec_filename(self): codeobj = compile(("import warnings\n" "warnings.warn('hello', UserWarning)"), filename, "exec") - with original_warnings.catch_warnings(record=True) as w: + with self.module.catch_warnings(record=True) as w: self.module.simplefilter("always", category=UserWarning) exec(codeobj) self.assertEqual(w[0].filename, filename) def test_warn_explicit_non_ascii_filename(self): - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("always", category=UserWarning) - filenames = ["nonascii\xe9\u20ac"] - if not support.is_emscripten: - # JavaScript does not like surrogates. - # Invalid UTF-8 leading byte 0x80 encountered when - # deserializing a UTF-8 string in wasm memory to a JS - # string! - filenames.append("surrogate\udc80") + filenames = ["nonascii\xe9\u20ac", "surrogate\udc80"] for filename in filenames: try: os.fsencode(filename) @@ -625,7 +698,7 @@ class NonWarningSubclass: self.assertIn('category must be a Warning subclass, not ', str(cm.exception)) - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.resetwarnings() self.module.filterwarnings('default') with self.assertWarns(MyWarningClass) as cm: @@ -641,7 +714,7 @@ class NonWarningSubclass: self.assertIsInstance(cm.warning, Warning) def check_module_globals(self, module_globals): - with original_warnings.catch_warnings(module=self.module, record=True) as w: + with self.module.catch_warnings(record=True) as w: self.module.filterwarnings('default') self.module.warn_explicit( 'eggs', UserWarning, 'bar', 1, @@ -654,7 +727,7 @@ def check_module_globals_error(self, module_globals, errmsg, errtype=ValueError) if self.module is py_warnings: self.check_module_globals(module_globals) return - with original_warnings.catch_warnings(module=self.module, record=True) as w: + with self.module.catch_warnings(record=True) as w: self.module.filterwarnings('always') with self.assertRaisesRegex(errtype, re.escape(errmsg)): self.module.warn_explicit( @@ -666,7 +739,7 @@ def check_module_globals_deprecated(self, module_globals, msg): if self.module is py_warnings: self.check_module_globals(module_globals) return - with original_warnings.catch_warnings(module=self.module, record=True) as w: + with self.module.catch_warnings(record=True) as w: self.module.filterwarnings('always') self.module.warn_explicit( 'eggs', UserWarning, 'bar', 1, @@ -734,62 +807,54 @@ def test_gh86298_no_loader_with_spec_loader_okay(self): class CWarnTests(WarnTests, unittest.TestCase): module = c_warnings - # TODO: RUSTPYTHON - @unittest.expectedFailure # As an early adopter, we sanity check the # test.import_helper.import_fresh_module utility function + @unittest.expectedFailure # TODO: RUSTPYTHON def test_accelerated(self): self.assertIsNot(original_warnings, self.module) - self.assertFalse(hasattr(self.module.warn, '__code__')) + self.assertNotHasAttr(self.module.warn, '__code__') - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_gh86298_no_loader_and_spec_is_none(self): - return super().test_gh86298_no_loader_and_spec_is_none() + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_gh86298_loader_and_spec_loader_disagree(self): + return super().test_gh86298_loader_and_spec_loader_disagree() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gh86298_loader_is_none_and_spec_is_none(self): return super().test_gh86298_loader_is_none_and_spec_is_none() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gh86298_loader_is_none_and_spec_loader_is_none(self): return super().test_gh86298_loader_is_none_and_spec_loader_is_none() - - # TODO: RUSTPYTHON - @unittest.expectedFailure + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_gh86298_no_loader_and_no_spec_loader(self): + return super().test_gh86298_no_loader_and_no_spec_loader() + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_gh86298_no_loader_and_spec_is_none(self): + return super().test_gh86298_no_loader_and_spec_is_none() + + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gh86298_no_spec(self): return super().test_gh86298_no_spec() - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_gh86298_spec_is_none(self): - return super().test_gh86298_spec_is_none() - - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_gh86298_no_spec_loader(self): return super().test_gh86298_no_spec_loader() - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_gh86298_loader_and_spec_loader_disagree(self): - return super().test_gh86298_loader_and_spec_loader_disagree() - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_gh86298_no_loader_and_no_spec_loader(self): - return super().test_gh86298_no_loader_and_no_spec_loader() + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_gh86298_spec_is_none(self): + return super().test_gh86298_spec_is_none() class PyWarnTests(WarnTests, unittest.TestCase): module = py_warnings # As an early adopter, we sanity check the # test.import_helper.import_fresh_module utility function + @unittest.expectedFailure # TODO: RUSTPYTHON 'PyWarnTests' object has no attribute 'assertHasAttr' def test_pure_python(self): self.assertIsNot(original_warnings, self.module) - self.assertTrue(hasattr(self.module.warn, '__code__')) + self.assertHasAttr(self.module.warn, '__code__') class WCmdLineTests(BaseTest): @@ -797,7 +862,7 @@ class WCmdLineTests(BaseTest): def test_improper_input(self): # Uses the private _setoption() function to test the parsing # of command-line warning arguments - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.assertRaises(self.module._OptionError, self.module._setoption, '1:2:3:4:5:6') self.assertRaises(self.module._OptionError, @@ -816,7 +881,7 @@ def test_improper_input(self): self.assertRaises(UserWarning, self.module.warn, 'convert to error') def test_import_from_module(self): - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module._setoption('ignore::Warning') with self.assertRaises(self.module._OptionError): self.module._setoption('ignore::TestWarning') @@ -857,11 +922,10 @@ class _WarningsTests(BaseTest, unittest.TestCase): module = c_warnings - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_filter(self): # Everything should function even if 'filters' is not in warnings. - with original_warnings.catch_warnings(module=self.module) as w: + with self.module.catch_warnings() as w: self.module.filterwarnings("error", "", Warning, "", 0) self.assertRaises(UserWarning, self.module.warn, 'convert to error') @@ -869,8 +933,7 @@ def test_filter(self): self.assertRaises(UserWarning, self.module.warn, 'convert to error') - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_onceregistry(self): # Replacing or removing the onceregistry should be okay. global __warningregistry__ @@ -878,8 +941,7 @@ def test_onceregistry(self): try: original_registry = self.module.onceregistry __warningregistry__ = {} - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() self.module.filterwarnings("once", category=UserWarning) self.module.warn_explicit(message, UserWarning, "file", 42) @@ -901,15 +963,13 @@ def test_onceregistry(self): finally: self.module.onceregistry = original_registry - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_default_action(self): # Replacing or removing defaultaction should be okay. message = UserWarning("defaultaction test") original = self.module.defaultaction try: - with original_warnings.catch_warnings(record=True, - module=self.module) as w: + with self.module.catch_warnings(record=True) as w: self.module.resetwarnings() registry = {} self.module.warn_explicit(message, UserWarning, "", 42, @@ -942,8 +1002,12 @@ def test_default_action(self): def test_showwarning_missing(self): # Test that showwarning() missing is okay. + if self.module._use_context: + # If _use_context is true, the warnings module does not + # override/restore showwarning() + return text = 'del showwarning test' - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.filterwarnings("always", category=UserWarning) del self.module.showwarning with support.captured_output('stderr') as stream: @@ -951,12 +1015,11 @@ def test_showwarning_missing(self): result = stream.getvalue() self.assertIn(text, result) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_showwarnmsg_missing(self): # Test that _showwarnmsg() missing is okay. text = 'del _showwarnmsg test' - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.filterwarnings("always", category=UserWarning) show = self.module._showwarnmsg @@ -970,50 +1033,55 @@ def test_showwarnmsg_missing(self): self.assertIn(text, result) def test_showwarning_not_callable(self): - with original_warnings.catch_warnings(module=self.module): - self.module.filterwarnings("always", category=UserWarning) - self.module.showwarning = print - with support.captured_output('stdout'): - self.module.warn('Warning!') - self.module.showwarning = 23 - self.assertRaises(TypeError, self.module.warn, "Warning!") + orig = self.module.showwarning + try: + with self.module.catch_warnings(): + self.module.filterwarnings("always", category=UserWarning) + self.module.showwarning = print + with support.captured_output('stdout'): + self.module.warn('Warning!') + self.module.showwarning = 23 + self.assertRaises(TypeError, self.module.warn, "Warning!") + finally: + self.module.showwarning = orig def test_show_warning_output(self): # With showwarning() missing, make sure that output is okay. - text = 'test show_warning' - with original_warnings.catch_warnings(module=self.module): - self.module.filterwarnings("always", category=UserWarning) - del self.module.showwarning - with support.captured_output('stderr') as stream: - warning_tests.inner(text) - result = stream.getvalue() - self.assertEqual(result.count('\n'), 2, - "Too many newlines in %r" % result) - first_line, second_line = result.split('\n', 1) - expected_file = os.path.splitext(warning_tests.__file__)[0] + '.py' - first_line_parts = first_line.rsplit(':', 3) - path, line, warning_class, message = first_line_parts - line = int(line) - self.assertEqual(expected_file, path) - self.assertEqual(warning_class, ' ' + UserWarning.__name__) - self.assertEqual(message, ' ' + text) - expected_line = ' ' + linecache.getline(path, line).strip() + '\n' - assert expected_line - self.assertEqual(second_line, expected_line) - - # TODO: RUSTPYTHON - @unittest.expectedFailure + orig = self.module.showwarning + try: + text = 'test show_warning' + with self.module.catch_warnings(): + self.module.filterwarnings("always", category=UserWarning) + del self.module.showwarning + with support.captured_output('stderr') as stream: + warning_tests.inner(text) + result = stream.getvalue() + self.assertEqual(result.count('\n'), 2, + "Too many newlines in %r" % result) + first_line, second_line = result.split('\n', 1) + expected_file = os.path.splitext(warning_tests.__file__)[0] + '.py' + first_line_parts = first_line.rsplit(':', 3) + path, line, warning_class, message = first_line_parts + line = int(line) + self.assertEqual(expected_file, path) + self.assertEqual(warning_class, ' ' + UserWarning.__name__) + self.assertEqual(message, ' ' + text) + expected_line = ' ' + linecache.getline(path, line).strip() + '\n' + assert expected_line + self.assertEqual(second_line, expected_line) + finally: + self.module.showwarning = orig + def test_filename_none(self): # issue #12467: race condition if a warning is emitted at shutdown globals_dict = globals() oldfile = globals_dict['__file__'] try: - catch = original_warnings.catch_warnings(record=True, - module=self.module) + catch = self.module.catch_warnings(record=True) with catch as w: self.module.filterwarnings("always", category=UserWarning) globals_dict['__file__'] = None - original_warnings.warn('test', UserWarning) + self.module.warn('test', UserWarning) self.assertTrue(len(w)) finally: globals_dict['__file__'] = oldfile @@ -1027,8 +1095,7 @@ def test_stderr_none(self): self.assertNotIn(b'Warning!', stderr) self.assertNotIn(b'Error', stderr) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_issue31285(self): # warn_explicit() should neither raise a SystemError nor cause an # assertion failure, in case the return value of get_source() has a @@ -1052,7 +1119,7 @@ def get_source(self, fullname): wmod = self.module - with original_warnings.catch_warnings(module=wmod): + with wmod.catch_warnings(): wmod.filterwarnings('default', category=UserWarning) linecache.clearcache() @@ -1079,7 +1146,7 @@ def test_issue31411(self): # warn_explicit() shouldn't raise a SystemError in case # warnings.onceregistry isn't a dictionary. wmod = self.module - with original_warnings.catch_warnings(module=wmod): + with wmod.catch_warnings(): wmod.filterwarnings('once') with support.swap_attr(wmod, 'onceregistry', None): with self.assertRaises(TypeError): @@ -1090,12 +1157,12 @@ def test_issue31416(self): # warn_explicit() shouldn't cause an assertion failure in case of a # bad warnings.filters or warnings.defaultaction. wmod = self.module - with original_warnings.catch_warnings(module=wmod): - wmod.filters = [(None, None, Warning, None, 0)] + with wmod.catch_warnings(): + wmod._get_filters()[:] = [(None, None, Warning, None, 0)] with self.assertRaises(TypeError): wmod.warn_explicit('foo', Warning, 'bar', 1) - wmod.filters = [] + wmod._get_filters()[:] = [] with support.swap_attr(wmod, 'defaultaction', None), \ self.assertRaises(TypeError): wmod.warn_explicit('foo', Warning, 'bar', 1) @@ -1104,7 +1171,7 @@ def test_issue31416(self): def test_issue31566(self): # warn() shouldn't cause an assertion failure in case of a bad # __name__ global. - with original_warnings.catch_warnings(module=self.module): + with self.module.catch_warnings(): self.module.filterwarnings('error', category=UserWarning) with support.swap_item(globals(), '__name__', b'foo'), \ support.swap_item(globals(), '__file__', None): @@ -1181,8 +1248,7 @@ class CWarningsDisplayTests(WarningsDisplayTests, unittest.TestCase): class PyWarningsDisplayTests(WarningsDisplayTests, unittest.TestCase): module = py_warnings - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_tracemalloc(self): self.addCleanup(os_helper.unlink, os_helper.TESTFN) @@ -1234,16 +1300,18 @@ class CatchWarningTests(BaseTest): """Test catch_warnings().""" def test_catch_warnings_restore(self): + if self.module._use_context: + return # test disabled if using context vars wmod = self.module orig_filters = wmod.filters orig_showwarning = wmod.showwarning # Ensure both showwarning and filters are restored when recording - with wmod.catch_warnings(module=wmod, record=True): + with wmod.catch_warnings(record=True): wmod.filters = wmod.showwarning = object() self.assertIs(wmod.filters, orig_filters) self.assertIs(wmod.showwarning, orig_showwarning) # Same test, but with recording disabled - with wmod.catch_warnings(module=wmod, record=False): + with wmod.catch_warnings(record=False): wmod.filters = wmod.showwarning = object() self.assertIs(wmod.filters, orig_filters) self.assertIs(wmod.showwarning, orig_showwarning) @@ -1251,7 +1319,7 @@ def test_catch_warnings_restore(self): def test_catch_warnings_recording(self): wmod = self.module # Ensure warnings are recorded when requested - with wmod.catch_warnings(module=wmod, record=True) as w: + with wmod.catch_warnings(record=True) as w: self.assertEqual(w, []) self.assertIs(type(w), list) wmod.simplefilter("always") @@ -1265,44 +1333,48 @@ def test_catch_warnings_recording(self): self.assertEqual(w, []) # Ensure warnings are not recorded when not requested orig_showwarning = wmod.showwarning - with wmod.catch_warnings(module=wmod, record=False) as w: + with wmod.catch_warnings(record=False) as w: self.assertIsNone(w) self.assertIs(wmod.showwarning, orig_showwarning) def test_catch_warnings_reentry_guard(self): wmod = self.module # Ensure catch_warnings is protected against incorrect usage - x = wmod.catch_warnings(module=wmod, record=True) + x = wmod.catch_warnings(record=True) self.assertRaises(RuntimeError, x.__exit__) with x: self.assertRaises(RuntimeError, x.__enter__) # Same test, but with recording disabled - x = wmod.catch_warnings(module=wmod, record=False) + x = wmod.catch_warnings(record=False) self.assertRaises(RuntimeError, x.__exit__) with x: self.assertRaises(RuntimeError, x.__enter__) def test_catch_warnings_defaults(self): wmod = self.module - orig_filters = wmod.filters + orig_filters = wmod._get_filters() orig_showwarning = wmod.showwarning # Ensure default behaviour is not to record warnings - with wmod.catch_warnings(module=wmod) as w: + with wmod.catch_warnings() as w: self.assertIsNone(w) self.assertIs(wmod.showwarning, orig_showwarning) - self.assertIsNot(wmod.filters, orig_filters) - self.assertIs(wmod.filters, orig_filters) + self.assertIsNot(wmod._get_filters(), orig_filters) + self.assertIs(wmod._get_filters(), orig_filters) if wmod is sys.modules['warnings']: # Ensure the default module is this one with wmod.catch_warnings() as w: self.assertIsNone(w) self.assertIs(wmod.showwarning, orig_showwarning) - self.assertIsNot(wmod.filters, orig_filters) - self.assertIs(wmod.filters, orig_filters) + self.assertIsNot(wmod._get_filters(), orig_filters) + self.assertIs(wmod._get_filters(), orig_filters) def test_record_override_showwarning_before(self): # Issue #28835: If warnings.showwarning() was overridden, make sure # that catch_warnings(record=True) overrides it again. + if self.module._use_context: + # If _use_context is true, the warnings module does not restore + # showwarning() + return text = "This is a warning" wmod = self.module my_log = [] @@ -1313,7 +1385,7 @@ def my_logger(message, category, filename, lineno, file=None, line=None): # Override warnings.showwarning() before calling catch_warnings() with support.swap_attr(wmod, 'showwarning', my_logger): - with wmod.catch_warnings(module=wmod, record=True) as log: + with wmod.catch_warnings(record=True) as log: self.assertIsNot(wmod.showwarning, my_logger) wmod.simplefilter("always") @@ -1328,6 +1400,10 @@ def my_logger(message, category, filename, lineno, file=None, line=None): def test_record_override_showwarning_inside(self): # Issue #28835: It is possible to override warnings.showwarning() # in the catch_warnings(record=True) context manager. + if self.module._use_context: + # If _use_context is true, the warnings module does not restore + # showwarning() + return text = "This is a warning" wmod = self.module my_log = [] @@ -1336,7 +1412,7 @@ def my_logger(message, category, filename, lineno, file=None, line=None): nonlocal my_log my_log.append(message) - with wmod.catch_warnings(module=wmod, record=True) as log: + with wmod.catch_warnings(record=True) as log: wmod.simplefilter("always") wmod.showwarning = my_logger wmod.warn(text) @@ -1385,8 +1461,7 @@ class PyCatchWarningTests(CatchWarningTests, unittest.TestCase): class EnvironmentVariableTests(BaseTest): - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_single_warning(self): rc, stdout, stderr = assert_python_ok("-c", "import sys; sys.stdout.write(str(sys.warnoptions))", @@ -1394,8 +1469,7 @@ def test_single_warning(self): PYTHONDEVMODE="") self.assertEqual(stdout, b"['ignore::DeprecationWarning']") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_comma_separated_warnings(self): rc, stdout, stderr = assert_python_ok("-c", "import sys; sys.stdout.write(str(sys.warnoptions))", @@ -1404,8 +1478,7 @@ def test_comma_separated_warnings(self): self.assertEqual(stdout, b"['ignore::DeprecationWarning', 'ignore::UnicodeWarning']") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @force_not_colorized def test_envvar_and_command_line(self): rc, stdout, stderr = assert_python_ok("-Wignore::UnicodeWarning", "-c", @@ -1415,8 +1488,7 @@ def test_envvar_and_command_line(self): self.assertEqual(stdout, b"['ignore::DeprecationWarning', 'ignore::UnicodeWarning']") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @force_not_colorized def test_conflicting_envvar_and_command_line(self): rc, stdout, stderr = assert_python_failure("-Werror::DeprecationWarning", "-c", @@ -1458,7 +1530,7 @@ def test_default_filter_configuration(self): code = "import sys; sys.modules.pop('warnings', None); sys.modules['_warnings'] = None; " else: code = "" - code += "import warnings; [print(f) for f in warnings.filters]" + code += "import warnings; [print(f) for f in warnings._get_filters()]" rc, stdout, stderr = assert_python_ok("-c", code, __isolated=True) stdout_lines = [line.strip() for line in stdout.splitlines()] @@ -1466,8 +1538,7 @@ def test_default_filter_configuration(self): self.assertEqual(stdout_lines, expected_output) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(sys.getfilesystemencoding() != 'ascii', 'requires non-ascii filesystemencoding') def test_nonascii(self): @@ -1481,18 +1552,24 @@ def test_nonascii(self): class CEnvironmentVariableTests(EnvironmentVariableTests, unittest.TestCase): module = c_warnings - - # TODO: RUSTPYTHON - @unittest.expectedFailure - def test_default_filter_configuration(self): - # XXX: RUSTPYHTON; remove the entire function when fixed - super().test_default_filter_configuration() - + @unittest.expectedFailure # TODO: RUSTPYTHON Lists differ + def test_default_filter_configuration(self): super().test_default_filter_configuration() # TODO: RUSTPYTHON class PyEnvironmentVariableTests(EnvironmentVariableTests, unittest.TestCase): module = py_warnings +class LocksTest(unittest.TestCase): + @support.cpython_only + @unittest.skipUnless(c_warnings, 'C module is required') + def test_release_lock_no_lock(self): + with self.assertRaisesRegex( + RuntimeError, + 'cannot release un-acquired lock', + ): + c_warnings._release_lock() + + class _DeprecatedTest(BaseTest, unittest.TestCase): """Test _deprecated().""" @@ -1547,8 +1624,7 @@ def test_issue_8766(self): class FinalizationTest(unittest.TestCase): - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_finalization(self): # Issue #19421: warnings.warn() should not crash # during Python finalization @@ -1566,8 +1642,7 @@ def __del__(self): self.assertEqual(err.decode().rstrip(), ':7: UserWarning: test') - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_late_resource_warning(self): # Issue #21925: Emitting a ResourceWarning late during the Python # shutdown must be logged. @@ -1578,15 +1653,178 @@ def test_late_resource_warning(self): # (_warnings will try to import it) code = "f = open(%a)" % __file__ rc, out, err = assert_python_ok("-Wd", "-c", code) - self.assertTrue(err.startswith(expected), ascii(err)) + self.assertStartsWith(err, expected) # import the warnings module code = "import warnings; f = open(%a)" % __file__ rc, out, err = assert_python_ok("-Wd", "-c", code) - self.assertTrue(err.startswith(expected), ascii(err)) + self.assertStartsWith(err, expected) + + +class AsyncTests(BaseTest): + """Verifies that the catch_warnings() context manager behaves + as expected when used inside async co-routines. This requires + that the context_aware_warnings flag is enabled, so that + the context manager uses a context variable. + """ + + def setUp(self): + super().setUp() + self.module.resetwarnings() + + @unittest.skipIf(not sys.flags.context_aware_warnings, + "requires context aware warnings") + def test_async_context(self): + import asyncio + + # Events to force the execution interleaving we want. + step_a1 = asyncio.Event() + step_a2 = asyncio.Event() + step_b1 = asyncio.Event() + step_b2 = asyncio.Event() + + async def run_a(): + with self.module.catch_warnings(record=True) as w: + await step_a1.wait() + # The warning emitted here should be caught be the enclosing + # context manager. + self.module.warn('run_a warning', UserWarning) + step_b1.set() + await step_a2.wait() + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.args[0], 'run_a warning') + step_b2.set() + + async def run_b(): + with self.module.catch_warnings(record=True) as w: + step_a1.set() + await step_b1.wait() + # The warning emitted here should be caught be the enclosing + # context manager. + self.module.warn('run_b warning', UserWarning) + step_a2.set() + await step_b2.wait() + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.args[0], 'run_b warning') + + async def run_tasks(): + await asyncio.gather(run_a(), run_b()) + asyncio.run(run_tasks()) -class DeprecatedTests(unittest.TestCase): + @unittest.skipIf(not sys.flags.context_aware_warnings, + "requires context aware warnings") + def test_async_task_inherit(self): + """Check that a new asyncio task inherits warnings context from the + coroutine that spawns it. + """ + import asyncio + + step1 = asyncio.Event() + step2 = asyncio.Event() + + async def run_child1(): + await step1.wait() + # This should be recorded by the run_parent() catch_warnings + # context. + self.module.warn('child warning', UserWarning) + step2.set() + + async def run_child2(): + # This establishes a new catch_warnings() context. The + # run_child1() task should still be using the context from + # run_parent() if context-aware warnings are enabled. + with self.module.catch_warnings(record=True) as w: + step1.set() + await step2.wait() + + async def run_parent(): + with self.module.catch_warnings(record=True) as w: + await asyncio.gather(run_child1(), run_child2()) + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.args[0], 'child warning') + + asyncio.run(run_parent()) + + +class CAsyncTests(AsyncTests, unittest.TestCase): + module = c_warnings + + +class PyAsyncTests(AsyncTests, unittest.TestCase): + module = py_warnings + + +class ThreadTests(BaseTest): + """Verifies that the catch_warnings() context manager behaves as + expected when used within threads. This requires that both the + context_aware_warnings flag and thread_inherit_context flags are enabled. + """ + + ENABLE_THREAD_TESTS = (sys.flags.context_aware_warnings and + sys.flags.thread_inherit_context) + + def setUp(self): + super().setUp() + self.module.resetwarnings() + + @unittest.skipIf(not ENABLE_THREAD_TESTS, + "requires thread-safe warnings flags") + def test_threaded_context(self): + import threading + + barrier = threading.Barrier(2, timeout=2) + + def run_a(): + with self.module.catch_warnings(record=True) as w: + barrier.wait() + # The warning emitted here should be caught be the enclosing + # context manager. + self.module.warn('run_a warning', UserWarning) + barrier.wait() + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.args[0], 'run_a warning') + # Should be caught be the catch_warnings() context manager of run_threads() + self.module.warn('main warning', UserWarning) + + def run_b(): + with self.module.catch_warnings(record=True) as w: + barrier.wait() + # The warning emitted here should be caught be the enclosing + # context manager. + barrier.wait() + self.module.warn('run_b warning', UserWarning) + self.assertEqual(len(w), 1) + self.assertEqual(w[0].message.args[0], 'run_b warning') + # Should be caught be the catch_warnings() context manager of run_threads() + self.module.warn('main warning', UserWarning) + + def run_threads(): + threads = [ + threading.Thread(target=run_a), + threading.Thread(target=run_b), + ] + with self.module.catch_warnings(record=True) as w: + for thread in threads: + thread.start() + for thread in threads: + thread.join() + self.assertEqual(len(w), 2) + self.assertEqual(w[0].message.args[0], 'main warning') + self.assertEqual(w[1].message.args[0], 'main warning') + + run_threads() + + +class CThreadTests(ThreadTests, unittest.TestCase): + module = c_warnings + + +class PyThreadTests(ThreadTests, unittest.TestCase): + module = py_warnings + + +class DeprecatedTests(PyPublicAPITests): def test_dunder_deprecated(self): @deprecated("A will go away soon") class A: @@ -1614,6 +1852,7 @@ def h(x): self.assertEqual(len(overloads), 2) self.assertEqual(overloads[0].__deprecated__, "no more ints") + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_class(self): @deprecated("A will go away soon") class A: @@ -1625,6 +1864,7 @@ class A: with self.assertRaises(TypeError): A(42) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_class_with_init(self): @deprecated("HasInit will go away soon") class HasInit: @@ -1635,6 +1875,7 @@ def __init__(self, x): instance = HasInit(42) self.assertEqual(instance.x, 42) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_class_with_new(self): has_new_called = False @@ -1653,6 +1894,7 @@ def __init__(self, x) -> None: self.assertEqual(instance.x, 42) self.assertTrue(has_new_called) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_class_with_inherited_new(self): new_base_called = False @@ -1674,6 +1916,7 @@ class HasInheritedNew(NewBase): self.assertEqual(instance.x, 42) self.assertTrue(new_base_called) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_class_with_new_but_no_init(self): new_called = False @@ -1691,6 +1934,7 @@ def __new__(cls, x): self.assertEqual(instance.x, 42) self.assertTrue(new_called) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_mixin_class(self): @deprecated("Mixin will go away soon") class Mixin: @@ -1707,6 +1951,7 @@ class Child(Base, Mixin): instance = Child(42) self.assertEqual(instance.a, 42) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_do_not_shadow_user_arguments(self): new_called = False new_called_cls = None @@ -1726,6 +1971,7 @@ class Foo(metaclass=MyMeta, cls='haha'): self.assertTrue(new_called) self.assertEqual(new_called_cls, 'haha') + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_existing_init_subclass(self): @deprecated("C will go away soon") class C: @@ -1742,6 +1988,7 @@ class D(C): self.assertTrue(D.inited) self.assertIsInstance(D(), D) # no deprecation + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_existing_init_subclass_in_base(self): class Base: def __init_subclass__(cls, x) -> None: @@ -1762,6 +2009,27 @@ class D(C, x=3): self.assertEqual(D.inited, 3) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered + def test_existing_init_subclass_in_sibling_base(self): + @deprecated("A will go away soon") + class A: + pass + class B: + def __init_subclass__(cls, x): + super().__init_subclass__() + cls.inited = x + + with self.assertWarnsRegex(DeprecationWarning, "A will go away soon"): + class C(A, B, x=42): + pass + self.assertEqual(C.inited, 42) + + with self.assertWarnsRegex(DeprecationWarning, "A will go away soon"): + class D(B, A, x=42): + pass + self.assertEqual(D.inited, 42) + + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_init_subclass_has_correct_cls(self): init_subclass_saw = None @@ -1779,6 +2047,7 @@ class C(Base): self.assertIs(init_subclass_saw, C) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_init_subclass_with_explicit_classmethod(self): init_subclass_saw = None @@ -1797,6 +2066,7 @@ class C(Base): self.assertIs(init_subclass_saw, C) + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_function(self): @deprecated("b will go away soon") def b(): @@ -1805,6 +2075,7 @@ def b(): with self.assertWarnsRegex(DeprecationWarning, "b will go away soon"): b() + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_method(self): class Capybara: @deprecated("x will go away soon") @@ -1815,6 +2086,7 @@ def x(self): with self.assertWarnsRegex(DeprecationWarning, "x will go away soon"): instance.x() + @unittest.expectedFailure # TODO: RUSTPYTHON DeprecationWarning not triggered def test_property(self): class Capybara: @property @@ -1842,6 +2114,7 @@ def no_more_setting(self, value): with self.assertWarnsRegex(DeprecationWarning, "no more setting"): instance.no_more_setting = 42 + @unittest.expectedFailure # TODO: RUSTPYTHON RuntimeWarning not triggered def test_category(self): @deprecated("c will go away soon", category=RuntimeWarning) def c(): diff --git a/Lib/test/test_warnings/data/stacklevel.py b/Lib/test/test_warnings/data/stacklevel.py index c6dd24733b3..fe36242d3d2 100644 --- a/Lib/test/test_warnings/data/stacklevel.py +++ b/Lib/test/test_warnings/data/stacklevel.py @@ -4,11 +4,13 @@ import warnings from test.test_warnings.data import package_helper -def outer(message, stacklevel=1): - inner(message, stacklevel) -def inner(message, stacklevel=1): - warnings.warn(message, stacklevel=stacklevel) +def outer(message, stacklevel=1, skip_file_prefixes=()): + inner(message, stacklevel, skip_file_prefixes) + +def inner(message, stacklevel=1, skip_file_prefixes=()): + warnings.warn(message, stacklevel=stacklevel, + skip_file_prefixes=skip_file_prefixes) def package(message, *, stacklevel): package_helper.inner_api(message, stacklevel=stacklevel, diff --git a/Lib/warnings.py b/Lib/warnings.py index f83aaf231ea..6759857d909 100644 --- a/Lib/warnings.py +++ b/Lib/warnings.py @@ -1,735 +1,99 @@ -"""Python part of the warnings subsystem.""" - import sys +__all__ = [ + "warn", + "warn_explicit", + "showwarning", + "formatwarning", + "filterwarnings", + "simplefilter", + "resetwarnings", + "catch_warnings", + "deprecated", +] + +from _py_warnings import ( + WarningMessage, + _DEPRECATED_MSG, + _OptionError, + _add_filter, + _deprecated, + _filters_mutated, + _filters_mutated_lock_held, + _filters_version, + _formatwarning_orig, + _formatwarnmsg, + _formatwarnmsg_impl, + _get_context, + _get_filters, + _getaction, + _getcategory, + _is_filename_to_skip, + _is_internal_filename, + _is_internal_frame, + _lock, + _new_context, + _next_external_frame, + _processoptions, + _set_context, + _set_module, + _setoption, + _setup_defaults, + _showwarning_orig, + _showwarnmsg, + _showwarnmsg_impl, + _use_context, + _warn_unawaited_coroutine, + _warnings_context, + catch_warnings, + defaultaction, + deprecated, + filters, + filterwarnings, + formatwarning, + onceregistry, + resetwarnings, + showwarning, + simplefilter, + warn, + warn_explicit, +) -__all__ = ["warn", "warn_explicit", "showwarning", - "formatwarning", "filterwarnings", "simplefilter", - "resetwarnings", "catch_warnings", "deprecated"] - -def showwarning(message, category, filename, lineno, file=None, line=None): - """Hook to write a warning to a file; replace if you like.""" - msg = WarningMessage(message, category, filename, lineno, file, line) - _showwarnmsg_impl(msg) - -def formatwarning(message, category, filename, lineno, line=None): - """Function to format a warning the standard way.""" - msg = WarningMessage(message, category, filename, lineno, None, line) - return _formatwarnmsg_impl(msg) - -def _showwarnmsg_impl(msg): - file = msg.file - if file is None: - file = sys.stderr - if file is None: - # sys.stderr is None when run with pythonw.exe: - # warnings get lost - return - text = _formatwarnmsg(msg) - try: - file.write(text) - except OSError: - # the file (probably stderr) is invalid - this warning gets lost. - pass - -def _formatwarnmsg_impl(msg): - category = msg.category.__name__ - s = f"{msg.filename}:{msg.lineno}: {category}: {msg.message}\n" - - if msg.line is None: - try: - import linecache - line = linecache.getline(msg.filename, msg.lineno) - except Exception: - # When a warning is logged during Python shutdown, linecache - # and the import machinery don't work anymore - line = None - linecache = None - else: - line = msg.line - if line: - line = line.strip() - s += " %s\n" % line - - if msg.source is not None: - try: - import tracemalloc - # Logging a warning should not raise a new exception: - # catch Exception, not only ImportError and RecursionError. - except Exception: - # don't suggest to enable tracemalloc if it's not available - suggest_tracemalloc = False - tb = None - else: - try: - suggest_tracemalloc = not tracemalloc.is_tracing() - tb = tracemalloc.get_object_traceback(msg.source) - except Exception: - # When a warning is logged during Python shutdown, tracemalloc - # and the import machinery don't work anymore - suggest_tracemalloc = False - tb = None - - if tb is not None: - s += 'Object allocated at (most recent call last):\n' - for frame in tb: - s += (' File "%s", lineno %s\n' - % (frame.filename, frame.lineno)) - - try: - if linecache is not None: - line = linecache.getline(frame.filename, frame.lineno) - else: - line = None - except Exception: - line = None - if line: - line = line.strip() - s += ' %s\n' % line - elif suggest_tracemalloc: - s += (f'{category}: Enable tracemalloc to get the object ' - f'allocation traceback\n') - return s - -# Keep a reference to check if the function was replaced -_showwarning_orig = showwarning - -def _showwarnmsg(msg): - """Hook to write a warning to a file; replace if you like.""" - try: - sw = showwarning - except NameError: - pass - else: - if sw is not _showwarning_orig: - # warnings.showwarning() was replaced - if not callable(sw): - raise TypeError("warnings.showwarning() must be set to a " - "function or method") - - sw(msg.message, msg.category, msg.filename, msg.lineno, - msg.file, msg.line) - return - _showwarnmsg_impl(msg) - -# Keep a reference to check if the function was replaced -_formatwarning_orig = formatwarning - -def _formatwarnmsg(msg): - """Function to format a warning the standard way.""" - try: - fw = formatwarning - except NameError: - pass - else: - if fw is not _formatwarning_orig: - # warnings.formatwarning() was replaced - return fw(msg.message, msg.category, - msg.filename, msg.lineno, msg.line) - return _formatwarnmsg_impl(msg) - -def filterwarnings(action, message="", category=Warning, module="", lineno=0, - append=False): - """Insert an entry into the list of warnings filters (at the front). - - 'action' -- one of "error", "ignore", "always", "default", "module", - or "once" - 'message' -- a regex that the warning message must match - 'category' -- a class that the warning must be a subclass of - 'module' -- a regex that the module name must match - 'lineno' -- an integer line number, 0 matches all warnings - 'append' -- if true, append to the list of filters - """ - if action not in {"error", "ignore", "always", "default", "module", "once"}: - raise ValueError(f"invalid action: {action!r}") - if not isinstance(message, str): - raise TypeError("message must be a string") - if not isinstance(category, type) or not issubclass(category, Warning): - raise TypeError("category must be a Warning subclass") - if not isinstance(module, str): - raise TypeError("module must be a string") - if not isinstance(lineno, int): - raise TypeError("lineno must be an int") - if lineno < 0: - raise ValueError("lineno must be an int >= 0") - - if message or module: - import re - - if message: - message = re.compile(message, re.I) - else: - message = None - if module: - module = re.compile(module) - else: - module = None - - _add_filter(action, message, category, module, lineno, append=append) - -def simplefilter(action, category=Warning, lineno=0, append=False): - """Insert a simple entry into the list of warnings filters (at the front). - - A simple filter matches all modules and messages. - 'action' -- one of "error", "ignore", "always", "default", "module", - or "once" - 'category' -- a class that the warning must be a subclass of - 'lineno' -- an integer line number, 0 matches all warnings - 'append' -- if true, append to the list of filters - """ - if action not in {"error", "ignore", "always", "default", "module", "once"}: - raise ValueError(f"invalid action: {action!r}") - if not isinstance(lineno, int): - raise TypeError("lineno must be an int") - if lineno < 0: - raise ValueError("lineno must be an int >= 0") - _add_filter(action, None, category, None, lineno, append=append) - -def _add_filter(*item, append): - # Remove possible duplicate filters, so new one will be placed - # in correct place. If append=True and duplicate exists, do nothing. - if not append: - try: - filters.remove(item) - except ValueError: - pass - filters.insert(0, item) - else: - if item not in filters: - filters.append(item) - _filters_mutated() - -def resetwarnings(): - """Clear the list of warning filters, so that no filters are active.""" - filters[:] = [] - _filters_mutated() - -class _OptionError(Exception): - """Exception used by option processing helpers.""" - pass - -# Helper to process -W options passed via sys.warnoptions -def _processoptions(args): - for arg in args: - try: - _setoption(arg) - except _OptionError as msg: - print("Invalid -W option ignored:", msg, file=sys.stderr) - -# Helper for _processoptions() -def _setoption(arg): - parts = arg.split(':') - if len(parts) > 5: - raise _OptionError("too many fields (max 5): %r" % (arg,)) - while len(parts) < 5: - parts.append('') - action, message, category, module, lineno = [s.strip() - for s in parts] - action = _getaction(action) - category = _getcategory(category) - if message or module: - import re - if message: - message = re.escape(message) - if module: - module = re.escape(module) + r'\Z' - if lineno: - try: - lineno = int(lineno) - if lineno < 0: - raise ValueError - except (ValueError, OverflowError): - raise _OptionError("invalid lineno %r" % (lineno,)) from None - else: - lineno = 0 - filterwarnings(action, message, category, module, lineno) - -# Helper for _setoption() -def _getaction(action): - if not action: - return "default" - if action == "all": return "always" # Alias - for a in ('default', 'always', 'ignore', 'module', 'once', 'error'): - if a.startswith(action): - return a - raise _OptionError("invalid action: %r" % (action,)) - -# Helper for _setoption() -def _getcategory(category): - if not category: - return Warning - if '.' not in category: - import builtins as m - klass = category - else: - module, _, klass = category.rpartition('.') - try: - m = __import__(module, None, None, [klass]) - except ImportError: - raise _OptionError("invalid module name: %r" % (module,)) from None - try: - cat = getattr(m, klass) - except AttributeError: - raise _OptionError("unknown warning category: %r" % (category,)) from None - if not issubclass(cat, Warning): - raise _OptionError("invalid warning category: %r" % (category,)) - return cat - - -def _is_internal_filename(filename): - return 'importlib' in filename and '_bootstrap' in filename - - -def _is_filename_to_skip(filename, skip_file_prefixes): - return any(filename.startswith(prefix) for prefix in skip_file_prefixes) - - -def _is_internal_frame(frame): - """Signal whether the frame is an internal CPython implementation detail.""" - return _is_internal_filename(frame.f_code.co_filename) - - -def _next_external_frame(frame, skip_file_prefixes): - """Find the next frame that doesn't involve Python or user internals.""" - frame = frame.f_back - while frame is not None and ( - _is_internal_filename(filename := frame.f_code.co_filename) or - _is_filename_to_skip(filename, skip_file_prefixes)): - frame = frame.f_back - return frame - - -# Code typically replaced by _warnings -def warn(message, category=None, stacklevel=1, source=None, - *, skip_file_prefixes=()): - """Issue a warning, or maybe ignore it or raise an exception.""" - # Check if message is already a Warning object - if isinstance(message, Warning): - category = message.__class__ - # Check category argument - if category is None: - category = UserWarning - if not (isinstance(category, type) and issubclass(category, Warning)): - raise TypeError("category must be a Warning subclass, " - "not '{:s}'".format(type(category).__name__)) - if not isinstance(skip_file_prefixes, tuple): - # The C version demands a tuple for implementation performance. - raise TypeError('skip_file_prefixes must be a tuple of strs.') - if skip_file_prefixes: - stacklevel = max(2, stacklevel) - # Get context information - try: - if stacklevel <= 1 or _is_internal_frame(sys._getframe(1)): - # If frame is too small to care or if the warning originated in - # internal code, then do not try to hide any frames. - frame = sys._getframe(stacklevel) - else: - frame = sys._getframe(1) - # Look for one frame less since the above line starts us off. - for x in range(stacklevel-1): - frame = _next_external_frame(frame, skip_file_prefixes) - if frame is None: - raise ValueError - except ValueError: - globals = sys.__dict__ - filename = "" - lineno = 0 - else: - globals = frame.f_globals - filename = frame.f_code.co_filename - lineno = frame.f_lineno - if '__name__' in globals: - module = globals['__name__'] - else: - module = "" - registry = globals.setdefault("__warningregistry__", {}) - warn_explicit(message, category, filename, lineno, module, registry, - globals, source) - -def warn_explicit(message, category, filename, lineno, - module=None, registry=None, module_globals=None, - source=None): - lineno = int(lineno) - if module is None: - module = filename or "" - if module[-3:].lower() == ".py": - module = module[:-3] # XXX What about leading pathname? - if registry is None: - registry = {} - if registry.get('version', 0) != _filters_version: - registry.clear() - registry['version'] = _filters_version - if isinstance(message, Warning): - text = str(message) - category = message.__class__ - else: - text = message - message = category(message) - key = (text, category, lineno) - # Quick test for common case - if registry.get(key): - return - # Search the filters - for item in filters: - action, msg, cat, mod, ln = item - if ((msg is None or msg.match(text)) and - issubclass(category, cat) and - (mod is None or mod.match(module)) and - (ln == 0 or lineno == ln)): - break - else: - action = defaultaction - # Early exit actions - if action == "ignore": - return - - # Prime the linecache for formatting, in case the - # "file" is actually in a zipfile or something. - import linecache - linecache.getlines(filename, module_globals) - - if action == "error": - raise message - # Other actions - if action == "once": - registry[key] = 1 - oncekey = (text, category) - if onceregistry.get(oncekey): - return - onceregistry[oncekey] = 1 - elif action == "always": - pass - elif action == "module": - registry[key] = 1 - altkey = (text, category, 0) - if registry.get(altkey): - return - registry[altkey] = 1 - elif action == "default": - registry[key] = 1 - else: - # Unrecognized actions are errors - raise RuntimeError( - "Unrecognized action (%r) in warnings.filters:\n %s" % - (action, item)) - # Print message and context - msg = WarningMessage(message, category, filename, lineno, source=source) - _showwarnmsg(msg) - - -class WarningMessage(object): - - _WARNING_DETAILS = ("message", "category", "filename", "lineno", "file", - "line", "source") - - def __init__(self, message, category, filename, lineno, file=None, - line=None, source=None): - self.message = message - self.category = category - self.filename = filename - self.lineno = lineno - self.file = file - self.line = line - self.source = source - self._category_name = category.__name__ if category else None - - def __str__(self): - return ("{message : %r, category : %r, filename : %r, lineno : %s, " - "line : %r}" % (self.message, self._category_name, - self.filename, self.lineno, self.line)) - - -class catch_warnings(object): - - """A context manager that copies and restores the warnings filter upon - exiting the context. - - The 'record' argument specifies whether warnings should be captured by a - custom implementation of warnings.showwarning() and be appended to a list - returned by the context manager. Otherwise None is returned by the context - manager. The objects appended to the list are arguments whose attributes - mirror the arguments to showwarning(). - - The 'module' argument is to specify an alternative module to the module - named 'warnings' and imported under that name. This argument is only useful - when testing the warnings module itself. - - If the 'action' argument is not None, the remaining arguments are passed - to warnings.simplefilter() as if it were called immediately on entering the - context. - """ - - def __init__(self, *, record=False, module=None, - action=None, category=Warning, lineno=0, append=False): - """Specify whether to record warnings and if an alternative module - should be used other than sys.modules['warnings']. - - For compatibility with Python 3.0, please consider all arguments to be - keyword-only. - - """ - self._record = record - self._module = sys.modules['warnings'] if module is None else module - self._entered = False - if action is None: - self._filter = None - else: - self._filter = (action, category, lineno, append) - - def __repr__(self): - args = [] - if self._record: - args.append("record=True") - if self._module is not sys.modules['warnings']: - args.append("module=%r" % self._module) - name = type(self).__name__ - return "%s(%s)" % (name, ", ".join(args)) - - def __enter__(self): - if self._entered: - raise RuntimeError("Cannot enter %r twice" % self) - self._entered = True - self._filters = self._module.filters - self._module.filters = self._filters[:] - self._module._filters_mutated() - self._showwarning = self._module.showwarning - self._showwarnmsg_impl = self._module._showwarnmsg_impl - if self._filter is not None: - simplefilter(*self._filter) - if self._record: - log = [] - self._module._showwarnmsg_impl = log.append - # Reset showwarning() to the default implementation to make sure - # that _showwarnmsg() calls _showwarnmsg_impl() - self._module.showwarning = self._module._showwarning_orig - return log - else: - return None - - def __exit__(self, *exc_info): - if not self._entered: - raise RuntimeError("Cannot exit %r without entering first" % self) - self._module.filters = self._filters - self._module._filters_mutated() - self._module.showwarning = self._showwarning - self._module._showwarnmsg_impl = self._showwarnmsg_impl - - -class deprecated: - """Indicate that a class, function or overload is deprecated. - - When this decorator is applied to an object, the type checker - will generate a diagnostic on usage of the deprecated object. - - Usage: - - @deprecated("Use B instead") - class A: - pass - - @deprecated("Use g instead") - def f(): - pass - - @overload - @deprecated("int support is deprecated") - def g(x: int) -> int: ... - @overload - def g(x: str) -> int: ... - - The warning specified by *category* will be emitted at runtime - on use of deprecated objects. For functions, that happens on calls; - for classes, on instantiation and on creation of subclasses. - If the *category* is ``None``, no warning is emitted at runtime. - The *stacklevel* determines where the - warning is emitted. If it is ``1`` (the default), the warning - is emitted at the direct caller of the deprecated object; if it - is higher, it is emitted further up the stack. - Static type checker behavior is not affected by the *category* - and *stacklevel* arguments. - - The deprecation message passed to the decorator is saved in the - ``__deprecated__`` attribute on the decorated object. - If applied to an overload, the decorator - must be after the ``@overload`` decorator for the attribute to - exist on the overload as returned by ``get_overloads()``. - - See PEP 702 for details. - - """ - def __init__( - self, - message: str, - /, - *, - category: type[Warning] | None = DeprecationWarning, - stacklevel: int = 1, - ) -> None: - if not isinstance(message, str): - raise TypeError( - f"Expected an object of type str for 'message', not {type(message).__name__!r}" - ) - self.message = message - self.category = category - self.stacklevel = stacklevel - - def __call__(self, arg, /): - # Make sure the inner functions created below don't - # retain a reference to self. - msg = self.message - category = self.category - stacklevel = self.stacklevel - if category is None: - arg.__deprecated__ = msg - return arg - elif isinstance(arg, type): - import functools - from types import MethodType - - original_new = arg.__new__ - - @functools.wraps(original_new) - def __new__(cls, /, *args, **kwargs): - if cls is arg: - warn(msg, category=category, stacklevel=stacklevel + 1) - if original_new is not object.__new__: - return original_new(cls, *args, **kwargs) - # Mirrors a similar check in object.__new__. - elif cls.__init__ is object.__init__ and (args or kwargs): - raise TypeError(f"{cls.__name__}() takes no arguments") - else: - return original_new(cls) - - arg.__new__ = staticmethod(__new__) - - original_init_subclass = arg.__init_subclass__ - # We need slightly different behavior if __init_subclass__ - # is a bound method (likely if it was implemented in Python) - if isinstance(original_init_subclass, MethodType): - original_init_subclass = original_init_subclass.__func__ - - @functools.wraps(original_init_subclass) - def __init_subclass__(*args, **kwargs): - warn(msg, category=category, stacklevel=stacklevel + 1) - return original_init_subclass(*args, **kwargs) - - arg.__init_subclass__ = classmethod(__init_subclass__) - # Or otherwise, which likely means it's a builtin such as - # object's implementation of __init_subclass__. - else: - @functools.wraps(original_init_subclass) - def __init_subclass__(*args, **kwargs): - warn(msg, category=category, stacklevel=stacklevel + 1) - return original_init_subclass(*args, **kwargs) - - arg.__init_subclass__ = __init_subclass__ - - arg.__deprecated__ = __new__.__deprecated__ = msg - __init_subclass__.__deprecated__ = msg - return arg - elif callable(arg): - import functools - import inspect - - @functools.wraps(arg) - def wrapper(*args, **kwargs): - warn(msg, category=category, stacklevel=stacklevel + 1) - return arg(*args, **kwargs) - - if inspect.iscoroutinefunction(arg): - wrapper = inspect.markcoroutinefunction(wrapper) - - arg.__deprecated__ = wrapper.__deprecated__ = msg - return wrapper - else: - raise TypeError( - "@deprecated decorator with non-None category must be applied to " - f"a class or callable, not {arg!r}" - ) - - -_DEPRECATED_MSG = "{name!r} is deprecated and slated for removal in Python {remove}" - -def _deprecated(name, message=_DEPRECATED_MSG, *, remove, _version=sys.version_info): - """Warn that *name* is deprecated or should be removed. - - RuntimeError is raised if *remove* specifies a major/minor tuple older than - the current Python version or the same version but past the alpha. - - The *message* argument is formatted with *name* and *remove* as a Python - version tuple (e.g. (3, 11)). - - """ - remove_formatted = f"{remove[0]}.{remove[1]}" - if (_version[:2] > remove) or (_version[:2] == remove and _version[3] != "alpha"): - msg = f"{name!r} was slated for removal after Python {remove_formatted} alpha" - raise RuntimeError(msg) - else: - msg = message.format(name=name, remove=remove_formatted) - warn(msg, DeprecationWarning, stacklevel=3) - - -# Private utility function called by _PyErr_WarnUnawaitedCoroutine -def _warn_unawaited_coroutine(coro): - msg_lines = [ - f"coroutine '{coro.__qualname__}' was never awaited\n" - ] - if coro.cr_origin is not None: - import linecache, traceback - def extract(): - for filename, lineno, funcname in reversed(coro.cr_origin): - line = linecache.getline(filename, lineno) - yield (filename, lineno, funcname, line) - msg_lines.append("Coroutine created at (most recent call last)\n") - msg_lines += traceback.format_list(list(extract())) - msg = "".join(msg_lines).rstrip("\n") - # Passing source= here means that if the user happens to have tracemalloc - # enabled and tracking where the coroutine was created, the warning will - # contain that traceback. This does mean that if they have *both* - # coroutine origin tracking *and* tracemalloc enabled, they'll get two - # partially-redundant tracebacks. If we wanted to be clever we could - # probably detect this case and avoid it, but for now we don't bother. - warn(msg, category=RuntimeWarning, stacklevel=2, source=coro) - - -# filters contains a sequence of filter 5-tuples -# The components of the 5-tuple are: -# - an action: error, ignore, always, default, module, or once -# - a compiled regex that must match the warning message -# - a class representing the warning category -# - a compiled regex that must match the module that is being warned -# - a line number for the line being warning, or 0 to mean any line -# If either if the compiled regexs are None, match anything. try: - from _warnings import (filters, _defaultaction, _onceregistry, - warn, warn_explicit, _filters_mutated) - defaultaction = _defaultaction - onceregistry = _onceregistry + # Try to use the C extension, this will replace some parts of the + # _py_warnings implementation imported above. + from _warnings import ( + _acquire_lock, + _defaultaction as defaultaction, + _filters_mutated_lock_held, + _onceregistry as onceregistry, + _release_lock, + _warnings_context, + filters, + warn, + warn_explicit, + ) + _warnings_defaults = True -except ImportError: - filters = [] - defaultaction = "default" - onceregistry = {} - _filters_version = 1 + class _Lock: + def __enter__(self): + _acquire_lock() + return self - def _filters_mutated(): - global _filters_version - _filters_version += 1 + def __exit__(self, *args): + _release_lock() + _lock = _Lock() +except ImportError: _warnings_defaults = False # Module initialization +_set_module(sys.modules[__name__]) _processoptions(sys.warnoptions) if not _warnings_defaults: - # Several warning categories are ignored by default in regular builds - if not hasattr(sys, 'gettotalrefcount'): - filterwarnings("default", category=DeprecationWarning, - module="__main__", append=1) - simplefilter("ignore", category=DeprecationWarning, append=1) - simplefilter("ignore", category=PendingDeprecationWarning, append=1) - simplefilter("ignore", category=ImportWarning, append=1) - simplefilter("ignore", category=ResourceWarning, append=1) + _setup_defaults() del _warnings_defaults +del _setup_defaults diff --git a/crates/vm/src/stdlib/sys.rs b/crates/vm/src/stdlib/sys.rs index 66a43427f83..4917e3e43c6 100644 --- a/crates/vm/src/stdlib/sys.rs +++ b/crates/vm/src/stdlib/sys.rs @@ -1303,6 +1303,8 @@ mod sys { warn_default_encoding: u8, /// -X thread_inherit_context, whether new threads inherit context from parent thread_inherit_context: bool, + /// -X context_aware_warnings, whether warnings are context aware + context_aware_warnings: bool, } impl FlagsData { @@ -1327,6 +1329,7 @@ mod sys { safe_path: settings.safe_path, warn_default_encoding: settings.warn_default_encoding as u8, thread_inherit_context: settings.thread_inherit_context, + context_aware_warnings: settings.context_aware_warnings, } } } diff --git a/crates/vm/src/vm/setting.rs b/crates/vm/src/vm/setting.rs index a7779156f8c..06cc35e933f 100644 --- a/crates/vm/src/vm/setting.rs +++ b/crates/vm/src/vm/setting.rs @@ -91,6 +91,9 @@ pub struct Settings { /// -X thread_inherit_context, whether new threads inherit context from parent pub thread_inherit_context: bool, + /// -X context_aware_warnings, whether warnings are context aware + pub context_aware_warnings: bool, + /// -i pub inspect: bool, @@ -194,6 +197,7 @@ impl Default for Settings { dev_mode: false, warn_default_encoding: false, thread_inherit_context: false, + context_aware_warnings: false, warnoptions: vec![], path_list: vec![], argv: vec![],