-
-
Notifications
You must be signed in to change notification settings - Fork 214
Expand file tree
/
Copy pathtyping.py
More file actions
48 lines (34 loc) · 1.28 KB
/
typing.py
File metadata and controls
48 lines (34 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from collections.abc import Sequence
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Literal, Optional, Protocol, Union
__all__ = (
'OptionType',
'WeekdayOptionType',
'WEEKDAYS',
'SecondsTimedelta',
'WorkerCoroutine',
'StartupShutdown',
'WorkerSettingsType',
)
if TYPE_CHECKING:
from .cron import CronJob
from .worker import Function
OptionType = Union[None, set[int], int]
WEEKDAYS = 'mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun'
WeekdayOptionType = Union[OptionType, Literal['mon', 'tues', 'wed', 'thurs', 'fri', 'sat', 'sun']]
SecondsTimedelta = Union[int, float, timedelta]
class WorkerCoroutine(Protocol):
__qualname__: str
async def __call__(self, ctx: dict[Any, Any], *args: Any, **kwargs: Any) -> Any: # pragma: no cover
pass
class StartupShutdown(Protocol):
__qualname__: str
async def __call__(self, ctx: dict[Any, Any]) -> Any: # pragma: no cover
pass
class WorkerSettingsBase(Protocol):
functions: Sequence[Union[WorkerCoroutine, 'Function']]
cron_jobs: Optional[Sequence['CronJob']] = None
on_startup: Optional[StartupShutdown] = None
on_shutdown: Optional[StartupShutdown] = None
# and many more...
WorkerSettingsType = Union[dict[str, Any], type[WorkerSettingsBase]]