forked from Zipstack/unstract
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_utils.py
More file actions
147 lines (117 loc) · 4.43 KB
/
Copy pathlog_utils.py
File metadata and controls
147 lines (117 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""Shared log processing utilities for Unstract platform.
This module contains log processing utilities that can be used by both
backend Django services and worker processes for consistent log handling.
"""
import json
import logging
import os
from typing import Any
import redis
from unstract.core.constants import LogFieldName
from unstract.core.data_models import LogDataDTO
from unstract.workflow_execution.enums import LogType
logger = logging.getLogger(__name__)
def get_validated_log_data(json_data: Any) -> LogDataDTO | None:
"""Validate log data to persist history.
This function takes log data in JSON format, validates it, and returns a
LogDataDTO object if the data is valid. The validation process includes
decoding bytes to string, parsing the string as JSON, and checking for
required fields and log type.
Args:
json_data (Any): Log data in JSON format
Returns:
LogDataDTO | None: Log data DTO object if valid, None otherwise
"""
if isinstance(json_data, bytes):
json_data = json_data.decode("utf-8")
if isinstance(json_data, str):
try:
# Parse the string as JSON
json_data = json.loads(json_data)
except json.JSONDecodeError:
logger.error(f"Error decoding JSON data while validating {json_data}")
return None
if not isinstance(json_data, dict):
logger.warning(f"Getting invalid data type while validating {json_data}")
return None
# Extract required fields from the JSON data
execution_id = json_data.get(LogFieldName.EXECUTION_ID)
organization_id = json_data.get(LogFieldName.ORGANIZATION_ID)
timestamp = json_data.get(LogFieldName.TIMESTAMP)
log_type = json_data.get(LogFieldName.TYPE)
file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID)
# Ensure the log type is LogType.LOG
if log_type != LogType.LOG.value:
return None
# Check if all required fields are present
if not all((execution_id, organization_id, timestamp)):
logger.debug(f"Missing required fields while validating {json_data}")
return None
return LogDataDTO(
execution_id=execution_id,
file_execution_id=file_execution_id,
organization_id=organization_id,
timestamp=timestamp,
log_type=log_type,
data=json_data,
)
def store_execution_log(
data: dict[str, Any],
redis_client: redis.Redis,
log_queue_name: str,
is_enabled: bool = True,
) -> None:
"""Store execution log in Redis queue with automatic size protection.
Protects against memory overflow by capping queue at configurable maximum size.
When limit is reached, new logs are dropped to prevent queue overflow.
Args:
data: Execution log data
redis_client: Redis client instance
log_queue_name: Name of the Redis queue to store logs
is_enabled: Whether log storage is enabled
"""
if not is_enabled:
return
try:
log_data = get_validated_log_data(json_data=data)
if not log_data:
return
# Get max queue size from environment (default: 10,000 logs ~10MB)
max_queue_size = int(os.getenv("LOG_QUEUE_MAX_SIZE", "10000"))
# Check if queue is at limit (O(1) operation)
queue_length = redis_client.llen(log_queue_name)
if queue_length >= max_queue_size:
logger.warning(
f"Log queue '{log_queue_name}' at capacity ({max_queue_size}), "
"dropping current log - scheduler may be falling behind"
)
return
# Add new log to end of queue (O(1) operation)
redis_client.rpush(log_queue_name, log_data.to_json())
except Exception as e:
logger.error(f"Error storing execution log: {e}")
def create_redis_client(
host: str = "localhost",
port: int = 6379,
username: str | None = None,
password: str | None = None,
**kwargs,
) -> redis.Redis:
"""Create Redis client with configuration.
Args:
host: Redis host
port: Redis port
username: Redis username (optional)
password: Redis password (optional)
**kwargs: Additional Redis configuration
Returns:
Configured Redis client
"""
return redis.Redis(
host=host,
port=port,
username=username,
password=password,
decode_responses=False, # Keep as bytes for consistency
**kwargs,
)