forked from getsentry/sentry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_monitor.py
More file actions
104 lines (73 loc) · 2.64 KB
/
test_monitor.py
File metadata and controls
104 lines (73 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import random
from sentry_sdk import Hub, start_transaction
from sentry_sdk.transport import Transport
try:
from unittest import mock # python 3.3 and above
except ImportError:
import mock # python < 3.3
class HealthyTestTransport(Transport):
def _send_event(self, event):
pass
def _send_envelope(self, envelope):
pass
def is_healthy(self):
return True
class UnhealthyTestTransport(HealthyTestTransport):
def is_healthy(self):
return False
def test_no_monitor_if_disabled(sentry_init):
sentry_init(
transport=HealthyTestTransport(),
enable_backpressure_handling=False,
)
assert Hub.current.client.monitor is None
def test_monitor_if_enabled(sentry_init):
sentry_init(transport=HealthyTestTransport())
monitor = Hub.current.client.monitor
assert monitor is not None
assert monitor._thread is None
assert monitor.is_healthy() is True
assert monitor.downsample_factor == 0
assert monitor._thread is not None
assert monitor._thread.name == "sentry.monitor"
def test_monitor_unhealthy(sentry_init):
sentry_init(transport=UnhealthyTestTransport())
monitor = Hub.current.client.monitor
monitor.interval = 0.1
assert monitor.is_healthy() is True
for i in range(15):
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == (i + 1 if i < 10 else 10)
def test_transaction_uses_downsampled_rate(
sentry_init, capture_client_reports, monkeypatch
):
sentry_init(
traces_sample_rate=1.0,
transport=UnhealthyTestTransport(),
)
reports = capture_client_reports()
monitor = Hub.current.client.monitor
monitor.interval = 0.1
# make sure rng doesn't sample
monkeypatch.setattr(random, "random", lambda: 0.9)
assert monitor.is_healthy() is True
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == 1
with start_transaction(name="foobar") as transaction:
assert transaction.sampled is False
assert transaction.sample_rate == 0.5
assert reports == [("backpressure", "transaction")]
def test_monitor_no_thread_on_shutdown_no_errors(sentry_init):
sentry_init(transport=HealthyTestTransport())
# make it seem like the interpreter is shutting down
with mock.patch(
"threading.Thread.start",
side_effect=RuntimeError("can't create new thread at interpreter shutdown"),
):
monitor = Hub.current.client.monitor
assert monitor is not None
assert monitor._thread is None
monitor.run()
assert monitor._thread is None