-
-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathasync_reduce.py
More file actions
50 lines (37 loc) · 1.64 KB
/
async_reduce.py
File metadata and controls
50 lines (37 loc) · 1.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
"""Reduce awaitable values"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, TypeVar, cast
from .is_awaitable import is_awaitable as default_is_awaitable
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Collection
from typing import TypeGuard
from .awaitable_or_value import AwaitableOrValue
__all__ = ["async_reduce"]
T = TypeVar("T")
U = TypeVar("U")
def async_reduce(
callback: Callable[[U, T], AwaitableOrValue[U]],
values: Collection[T],
initial_value: AwaitableOrValue[U],
is_awaitable: Callable[[Any], TypeGuard[Awaitable]] = default_is_awaitable,
) -> AwaitableOrValue[U]:
"""Reduce the given potentially awaitable values using a callback function.
Similar to functools.reduce(), however the reducing callback may return
an awaitable, in which case reduction will continue after each promise resolves.
If the callback does not return an awaitable, then this function will also not
return an awaitable.
"""
accumulator: AwaitableOrValue[U] = initial_value
for value in values:
if is_awaitable(accumulator):
async def async_callback(
current_accumulator: Awaitable[U], current_value: T
) -> U:
result: AwaitableOrValue[U] = callback(
await current_accumulator, current_value
)
return await result if is_awaitable(result) else result # type: ignore
accumulator = async_callback(cast("Awaitable[U]", accumulator), value)
else:
accumulator = callback(cast("U", accumulator), value)
return accumulator