forked from getsentry/sentry-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_contextvars.py
More file actions
41 lines (27 loc) · 794 Bytes
/
test_contextvars.py
File metadata and controls
41 lines (27 loc) · 794 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pytest
import random
import time
@pytest.mark.forked
def test_leaks(maybe_monkeypatched_threading):
import threading
# Need to explicitly call _get_contextvars because the SDK has already
# decided upon gevent on import.
from sentry_sdk import utils
_, ContextVar = utils._get_contextvars() # noqa: N806
ts = []
var = ContextVar("test_contextvar_leaks")
success = []
def run():
value = int(random.random() * 1000)
var.set(value)
for _ in range(100):
time.sleep(0)
assert var.get(None) == value
success.append(1)
for _ in range(20):
t = threading.Thread(target=run)
t.start()
ts.append(t)
for t in ts:
t.join()
assert len(success) == 20