-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathbootstrap.py
More file actions
43 lines (35 loc) · 1.23 KB
/
bootstrap.py
File metadata and controls
43 lines (35 loc) · 1.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from typing import Any
from celery import Celery
from dependency_injector.containers import DynamicContainer
from dependency_injector.providers import Object
from faststream.broker.core.usecase import BrokerUsecase
from gateways.event import FastStreamRedisGateway
from pydantic import BaseModel, ConfigDict
from .celery import init_celery
from .config import AppConfig
from .di_container import Container
from .faststream import init_broker
from .logs import init_logger
from .storage import init_storage
class InitReference(BaseModel):
celery_app: Celery
di_container: DynamicContainer
faststream_broker: BrokerUsecase[Any, Any]
model_config = ConfigDict(arbitrary_types_allowed=True)
def application_init(app_config: AppConfig) -> InitReference:
container = Container(
config=Object(app_config),
)
init_logger(app_config)
init_storage()
celery = init_celery(app_config)
broker = init_broker(app_config)
# This is temporary, has to go directly in the Container
container.BookEventGatewayInterface.override(
Object(FastStreamRedisGateway(broker=broker))
)
return InitReference(
celery_app=celery,
di_container=container,
faststream_broker=broker,
)