-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmisc.py
More file actions
461 lines (378 loc) · 17 KB
/
misc.py
File metadata and controls
461 lines (378 loc) · 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# -*- coding: utf-8 -*-
"""Miscellaneous constructs."""
__all__ = ["pack",
"namelambda",
"timer",
"getattrrec", "setattrrec",
"Popper", "CountingIterator",
"slurp",
"callsite_filename",
"safeissubclass",
"maybe_open", "redirect_stdin",
"UnionFilter",
"si_prefix"]
from collections.abc import Callable, Iterable, Iterator
import contextlib
from copy import copy
from functools import partial
import logging
import pathlib
from queue import Empty, Queue
import sys
from time import perf_counter
from typing import Any, IO, TypeVar
from types import FunctionType, LambdaType, TracebackType
F = TypeVar('F', bound=Callable)
from .regutil import register_decorator
def pack(*args: Any) -> tuple:
"""Multi-argument constructor for tuples.
In other words, the inverse of tuple unpacking, as a function.
E.g. ``pack(a, b, c)`` is the same as ``(a, b, c)``.
We provide this because the default constructor `tuple(...)` requires an
iterable, and there are use cases where it is useful to be able to say
*pack these args into a tuple*.
See also:
https://www.python.org/dev/peps/pep-0448/
Examples. If args naturally arrive separately::
myzip = lambda lol: map(pack, *lol)
lol = ((1, 2), (3, 4), (5, 6))
for z in myzip(lol):
print(z)
Eliminate an ugly trailing comma::
@looped_over(zip((1, 2, 3), ('a', 'b', 'c')), acc=())
def p(loop, item, acc):
numb, lett = item
return loop(acc + pack(f"{numb:d}{lett}"))
assert p == ('1a', '2b', '3c')
"""
return args # pretty much like in Lisps, (define (list . args) args)
@register_decorator(priority=5) # allow sorting by unpythonic.syntax.sort_lambda_decorators
def namelambda(name: str) -> Callable[[F], F]:
"""Rename a function. Decorator.
This can be used to give a lambda a meaningful name, which is especially
useful for debugging in cases where a lambda is returned as a closure,
and the actual call into it occurs much later (so that if the call crashes,
the stack trace will report a meaningful name, not just ``"<lambda>"``).
To support reordering by ``unpythonic.syntax.util.sort_lambda_decorators``,
this is a standard parametric decorator, called like::
foo = namelambda("foo")(lambda ...: ...)
The first call returns a *foo-renamer*, and supplying the lambda to that
actually returns a lambda that has the name *foo*.
This is used internally by some macros (``namedlambda``, ``let``, ``do``),
but also provided as part of unpythonic's public API in case it's useful
elsewhere.
**CAUTION**: When a function definition is executed, the names the parent
scopes had at that time are baked into the function's ``__qualname__``.
Hence renaming a function after it is defined will not affect the
dotted names of any closures defined *inside* that function.
This is mainly an issue for nested lambdas::
from unpythonic import namelambda, withself
nested = namelambda("outer")(lambda: namelambda("inner")(withself(lambda self: self)))
print(nested.__qualname__) # "outer"
print(nested().__qualname__) # "<lambda>.<locals>.inner"
Note the inner lambda does not see the outer's new name.
"""
def rename(f: F) -> F:
if not isinstance(f, (LambdaType, FunctionType)):
# TODO: Can't raise TypeError; @fploop et al. do-it-now-and-replace-def-with-result
# TODO: decorators need to do this.
return f
f = copy(f)
# __name__ for tools like pydoc; __qualname__ for repr(); __code__.co_name for stack traces
# https://stackoverflow.com/questions/40661758/name-of-a-python-function-in-a-stack-trace
# https://stackoverflow.com/questions/16064409/how-to-create-a-code-object-in-python
f.__name__ = name
idx = f.__qualname__.rfind('.')
f.__qualname__ = f"{f.__qualname__[:idx]}.{name}" if idx != -1 else name
f.__code__ = f.__code__.replace(co_name=name)
return f
return rename
class timer:
"""Simplistic context manager for performance-testing sections of code.
Example::
with timer() as tictoc:
for _ in range(int(1e7)):
pass
print(tictoc.dt) # elapsed time in seconds (float)
If only interested in printing the result::
with timer(p=True):
for _ in range(int(1e7)):
pass
"""
def __init__(self, p: bool = False) -> None:
"""p: if True, print the delta-t when done.
Regardless of ``p``, the result is always accessible as the ``dt``.
"""
self.p = p
def __enter__(self) -> "timer":
# `perf_counter`, not `monotonic`: the former is documented as "a
# clock with the highest available resolution to measure a short
# duration" and is backed by `QueryPerformanceCounter` (~100 ns) on
# Windows, whereas `monotonic` is backed there by the ~16 ms
# tick-counter and would record `dt = 0` for microsecond-scale
# blocks (e.g. a PyPy-JIT'd tight loop). Both are monotonic; we
# only give up the "comparable across processes" guarantee of
# `monotonic`, which `timer` does not need since it only measures
# a dynamic extent in wall-clock time within a single process.
self.t0 = perf_counter()
return self
def __exit__(self, exctype: type[BaseException] | None, excvalue: BaseException | None, traceback: TracebackType | None) -> None:
self.dt = perf_counter() - self.t0
if self.p:
print(self.dt)
def getattrrec(object: Any, name: str, *default: Any) -> Any:
"""Extract the underlying data from an onion of wrapper objects.
``r = object.name``, and then get ``r.name`` recursively, as long as
it exists. Return the final result.
The ``default`` parameter acts as in ``getattr``.
See also ``setattrrec``.
"""
o = getattr(object, name, *default)
while hasattr(o, name):
o = getattr(o, name, *default)
return o
def setattrrec(object: Any, name: str, value: Any) -> None:
"""Inject data into the innermost layer in an onion of wrapper objects.
See also ``getattrrec``.
"""
o = object
while hasattr(o, name) and hasattr(getattr(o, name), name):
o = getattr(o, name)
setattr(o, name, value)
# TODO: move `Popper` to `unpythonic.it`?
class Popper:
"""Pop-while iterator.
Consider this code::
from collections import deque
inp = deque(range(5))
out = []
while inp:
x = inp.pop(0)
out.append(x)
assert inp == []
assert out == list(range(5))
``Popper`` condenses the ``while`` and ``pop`` into a ``for``, while allowing
the loop body to mutate the input iterable in arbitrary ways (since we never
actually ``iter()`` it)::
inp = deque(range(5))
out = []
for x in Popper(inp):
out.append(x)
assert inp == deque([])
assert out == list(range(5))
inp = deque(range(3))
out = []
for x in Popper(inp):
out.append(x)
if x < 10:
inp.appendleft(x + 10)
assert inp == deque([])
assert out == [0, 10, 1, 11, 2, 12]
(A real use case: split sequences of items, stored as lists in a deque, into
shorter sequences where some condition is contiguously ``True`` or ``False``.
When the condition changes state, just commit the current subsequence, and
push the rest of that input sequence (still requiring analysis) back to the
input deque, to be dealt with later.)
**Notes**:
- The argument to ``Popper`` (here ``lst``) contains the **remaining**
items.
- Each iteration pops an element **from the left**.
- The loop terminates when ``lst`` is empty.
- Per-iteration efficiency, if the input container is:
- ``collections.deque``: ``O(1)``
- ``list``: ``O(n)``
Named after Karl Popper.
"""
def __init__(self, seq: Iterable[Any]) -> None:
"""seq: input container. Must support either ``popleft()`` or ``pop(0)``.
Fully duck-typed. At least ``collections.deque`` and any
``collections.abc.MutableSequence`` (including ``list``) are fine.
"""
self.seq = seq
self._pop = seq.popleft if hasattr(seq, "popleft") else partial(seq.pop, 0)
def __iter__(self) -> "Popper":
return self
def __next__(self) -> Any:
if self.seq:
return self._pop()
raise StopIteration
# TODO: move `CountingIterator` to `unpythonic.it`?
class CountingIterator:
"""Iterator that counts how many elements it has yielded.
Wraps the original iterator of `iterable`. Simply use
`CountingIterator(iterable)` in place of `iter(iterable)`.
The count stops updating when the original iterator raises StopIteration.
"""
def __init__(self, iterable: Iterable[Any]) -> None:
self._it = iter(iterable)
self.count: int = 0
def __iter__(self) -> "CountingIterator":
return self
def __next__(self) -> Any:
x = next(self._it) # let StopIteration propagate
self.count += 1
return x
def slurp(queue: Queue) -> list:
"""Slurp all items currently on a queue.Queue into a list.
This retrieves items from the queue until it is empty, populates a list with them
(preserving the original order), and returns that list.
**CAUTION**: This does **not** prevent new items being added to the queue
afterwards, or indeed by another thread while the slurping is in progress.
This is purely a convenience function to abstract away a common operation.
"""
out = []
try:
while True:
out.append(queue.get(block=False))
except Empty:
pass
return out
_CALLSITE_TRANSPARENT = frozenset((
"maybe_force_args", # lazify
"curried", "curry", "_currycall", # autocurry
"call", "callwith", # manual use of misc utils
))
def callsite_filename() -> str:
"""Return the filename of the call site, as a string.
Useful as a building block for debug utilities and similar.
Skips over our own call-helpers (`call`, `callwith`, `curry` and
friends, lazify's `maybe_force_args`), so the *user's* call site is
reported. Works also in the REPL (where `__file__` is undefined).
"""
# We walk via `sys._getframe` rather than `inspect.stack`. `inspect.stack`
# calls `inspect.getframeinfo` for every frame on the way and reads source
# context lines around `f_lineno`, which raises `TypeError` if any frame
# in the walk has `f_lineno is None`. PyPy 3.11 / macOS / Windows hits
# exactly that: at least one frame on the way out of a `test[]` macro
# invocation reports `f_lineno = None`. Linux PyPy and CPython don't.
# We never use line info here; only `f_code.co_filename`.
frame = sys._getframe(1) # skip callsite_filename itself
while frame is not None:
if frame.f_code.co_name not in _CALLSITE_TRANSPARENT:
return frame.f_code.co_filename
frame = frame.f_back
raise RuntimeError("callsite_filename: no eligible frame on the call stack")
def safeissubclass(cls: Any, cls_or_tuple: type | tuple[type, ...]) -> bool:
"""Like issubclass, but if `cls` is not a class, swallow the `TypeError` and return `False`."""
try:
return issubclass(cls, cls_or_tuple)
except TypeError: # "issubclass() arg 1 must be a class"
pass
return False
# --------------------------------------------------------------------------------
# I/O utilities
@contextlib.contextmanager
def maybe_open(filename: str | pathlib.Path | None,
mode: str,
fallback: IO,
**kwargs) -> Iterator[IO]:
"""Context manager: open a file, or use a fallback stream.
Adapter that lets you always syntactically write
``with maybe_open(...) as f:`` even when the target is a
standard stream like ``sys.stdin`` or ``sys.stdout``.
``filename``: path to open (``str`` or ``pathlib.Path``).
If ``None``, yield ``fallback`` instead.
``mode``: as in the builtin ``open``.
``fallback``: stream to use when ``filename is None``. Typical values
are ``sys.stdin`` (reading) and ``sys.stdout`` or
``sys.stderr`` (writing).
``**kwargs``: passed through to ``open``.
"""
if filename is not None:
with open(filename, mode, **kwargs) as f:
yield f
else:
yield fallback
class redirect_stdin(contextlib._RedirectStream):
"""Context manager that feeds ``sys.stdin`` from *target*.
The third sibling: the standard library ships
`contextlib.redirect_stdout` (Python 3.4) and
`contextlib.redirect_stderr` (Python 3.5), but not
`redirect_stdin`. This fills the gap, sharing machinery with its
stdlib siblings so the behavior matches exactly — including the
per-instance stack that supports nested re-entry on the same
instance.
Like its stdlib siblings, this redirects the global ``sys.stdin``
and is **not** safe under concurrent use from multiple threads;
parallel redirects from different threads will stomp on each other.
For tests (the primary use case), single-threaded use is the norm.
Example::
from io import StringIO
from unpythonic import redirect_stdin
with redirect_stdin(StringIO("42\\n")):
value = input() # reads "42"
"""
_stream = "stdin"
# --------------------------------------------------------------------------------
# Logging utilities
class UnionFilter(logging.Filter):
"""A ``logging.Filter`` that matches if *any* sub-filter matches.
The standard library provides ``logging.Filter`` for a single logger-name
prefix, but no OR combinator. ``UnionFilter`` fills the gap::
import logging
from unpythonic import UnionFilter
for handler in logging.root.handlers:
handler.addFilter(UnionFilter(logging.Filter("myapp.core"),
logging.Filter("myapp.io")))
"""
def __init__(self, *filters: logging.Filter) -> None:
self.filters = filters
def filter(self, record: logging.LogRecord) -> bool:
return any(f.filter(record) for f in self.filters)
# --------------------------------------------------------------------------------
# Number formatting
def si_prefix(number: int | float, precision: int = 2, binary: bool = False) -> str:
"""Format a number with an SI decimal or IEC binary prefix.
Returns a string like ``"1.50 k"``, ``"23.40 M"``, ``"500.00 m"``
(milli), or ``"42.00"`` (no prefix for magnitudes in [1, base)).
``number``: the value to format (``int`` or ``float``).
``precision``: decimal places (default 2).
``binary``: if ``True``, use IEC binary prefixes (Ki, Mi, Gi, ...)
with base 1024 instead of SI decimal prefixes with
base 1000. Sub-unity binary prefixes (mi, µi, ni, ...)
follow the same convention.
Negative numbers and zero are handled correctly.
In decimal mode (the default), both positive prefixes (k through Q)
and negative prefixes (m through q) are supported. The micro prefix
is ``µ`` (U+00B5 MICRO SIGN).
Examples::
si_prefix(1500) # "1.50 k"
si_prefix(2_500_000) # "2.50 M"
si_prefix(0.0015) # "1.50 m"
si_prefix(0.0000025) # "2.50 µ"
si_prefix(-1500) # "-1.50 k"
si_prefix(42) # "42.00"
si_prefix(1536, binary=True) # "1.50 Ki"
si_prefix(2_621_440, binary=True) # "2.50 Mi"
si_prefix(0.5, binary=True) # "512.00 mi"
"""
if binary:
base = 1024
large = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi', 'Ri', 'Qi')
small = ('mi', 'µi', 'ni', 'pi', 'fi', 'ai', 'zi', 'yi', 'ri', 'qi')
else:
base = 1000
large = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y', 'R', 'Q')
small = ('m', 'µ', 'n', 'p', 'f', 'a', 'z', 'y', 'r', 'q')
if number == 0:
return f"{0:.{precision}f}"
sign = -1 if number < 0 else 1
magnitude = abs(number)
if magnitude >= 1:
for prefix in large:
if magnitude < base:
value = sign * magnitude
if prefix:
return f"{value:.{precision}f} {prefix}"
return f"{value:.{precision}f}"
magnitude /= base
value = sign * magnitude
return f"{value:.{precision}f} {large[-1]}"
else:
for prefix in small:
magnitude *= base
if magnitude >= 1:
value = sign * magnitude
return f"{value:.{precision}f} {prefix}"
value = sign * magnitude
return f"{value:.{precision}f} {small[-1]}"