forked from Zipstack/unstract
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker.py
More file actions
70 lines (56 loc) · 2.28 KB
/
worker.py
File metadata and controls
70 lines (56 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""File Processing Worker
Celery worker for document processing and file handling.
Handles file uploads, text extraction, and processing workflows.
"""
from shared.enums.worker_enums import WorkerType
from shared.infrastructure.config.builder import WorkerBuilder
from shared.infrastructure.config.registry import WorkerRegistry
from shared.infrastructure.logging import WorkerLogger
# Setup worker
logger = WorkerLogger.setup(WorkerType.FILE_PROCESSING)
app, config = WorkerBuilder.build_celery_app(WorkerType.FILE_PROCESSING)
def check_file_processing_health():
"""Custom health check for file processing worker."""
from shared.infrastructure.monitoring.health import HealthCheckResult, HealthStatus
try:
from shared.utils.api_client_singleton import get_singleton_api_client
client = get_singleton_api_client(config)
api_healthy = client is not None
if api_healthy:
return HealthCheckResult(
name="file_processing_health",
status=HealthStatus.HEALTHY,
message="File processing worker is healthy",
details={
"worker_type": "file_processing",
"api_client": "healthy",
"queues": ["file_processing", "api_file_processing"],
},
)
else:
return HealthCheckResult(
name="file_processing_health",
status=HealthStatus.DEGRADED,
message="File processing worker partially functional",
details={"api_client": "unhealthy"},
)
except Exception as e:
return HealthCheckResult(
name="file_processing_health",
status=HealthStatus.DEGRADED,
message=f"Health check failed: {e}",
details={"error": str(e)},
)
# Register health check
WorkerRegistry.register_health_check(
WorkerType.FILE_PROCESSING, "file_processing_health", check_file_processing_health
)
@app.task(bind=True)
def healthcheck(self):
"""Health check task for monitoring systems."""
return {
"status": "healthy",
"worker_type": "file_processing",
"task_id": self.request.id,
"worker_name": config.worker_name if config else "file-processing-worker",
}