-
Notifications
You must be signed in to change notification settings - Fork 614
Expand file tree
/
Copy pathmonitor.py
More file actions
139 lines (111 loc) · 4.47 KB
/
monitor.py
File metadata and controls
139 lines (111 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import time
import weakref
from threading import Thread, Lock
import sentry_sdk
from sentry_sdk.utils import logger
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional
MAX_DOWNSAMPLE_FACTOR = 10
class Monitor:
"""
Performs health checks in a separate thread once every interval seconds
and updates the internal state. Other parts of the SDK only read this state
and act accordingly.
"""
name = "sentry.monitor"
_thread: "Optional[Thread]"
_thread_for_pid: "Optional[int]"
def __init__(
self, transport: "sentry_sdk.transport.Transport", interval: float = 10
) -> None:
self.transport: "sentry_sdk.transport.Transport" = transport
self.interval: float = interval
self._healthy = True
self._downsample_factor: int = 0
self._running = True
self._reset_thread_state()
# See https://github.com/getsentry/sentry-python/issues/6148.
# If os.fork() runs while another thread holds self._thread_lock,
# the child inherits the lock locked but the holding thread does
# not exist in the child, so the lock can never be released and
# _ensure_running deadlocks forever. Reinitialise the lock and
# cached thread/pid in the child so it starts clean regardless
# of inherited state. We bind via a WeakMethod so the
# permanently-registered fork handler does not pin this Monitor
# (and its Transport): register_at_fork has no unregister API.
# POSIX-only; Windows uses spawn.
if hasattr(os, "register_at_fork"):
weak_reset = weakref.WeakMethod(self._reset_thread_state)
def _reset_in_child() -> None:
method = weak_reset()
if method is not None:
method()
os.register_at_fork(after_in_child=_reset_in_child)
def _reset_thread_state(self) -> None:
self._thread = None
self._thread_lock = Lock()
self._thread_for_pid = None
def _ensure_running(self) -> None:
"""
Check that the monitor has an active thread to run in, or create one if not.
Note that this might fail (e.g. in Python 3.12 it's not possible to
spawn new threads at interpreter shutdown). In that case self._running
will be False after running this function.
"""
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None
with self._thread_lock:
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None
def _thread() -> None:
while self._running:
time.sleep(self.interval)
if self._running:
self.run()
thread = Thread(name=self.name, target=_thread)
thread.daemon = True
try:
thread.start()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return None
self._thread = thread
self._thread_for_pid = os.getpid()
return None
def run(self) -> None:
self.check_health()
self.set_downsample_factor()
def set_downsample_factor(self) -> None:
if self._healthy:
if self._downsample_factor > 0:
logger.debug(
"[Monitor] health check positive, reverting to normal sampling"
)
self._downsample_factor = 0
else:
if self.downsample_factor < MAX_DOWNSAMPLE_FACTOR:
self._downsample_factor += 1
logger.debug(
"[Monitor] health check negative, downsampling with a factor of %d",
self._downsample_factor,
)
def check_health(self) -> None:
"""
Perform the actual health checks,
currently only checks if the transport is rate-limited.
TODO: augment in the future with more checks.
"""
self._healthy = self.transport.is_healthy()
def is_healthy(self) -> bool:
self._ensure_running()
return self._healthy
@property
def downsample_factor(self) -> int:
self._ensure_running()
return self._downsample_factor
def kill(self) -> None:
self._running = False