-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtest_excutil.py
More file actions
161 lines (141 loc) · 7.6 KB
/
test_excutil.py
File metadata and controls
161 lines (141 loc) · 7.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
from ..syntax import macros, test, test_raises, error, warn, the # noqa: F401
from ..test.fixtures import session, testset, returns_normally
import threading
from time import sleep
import sys
from ..excutil import (raisef, tryf,
equip_with_traceback,
reraise_in, reraise,
async_raise)
from ..env import env
def runtests():
# raisef: raise an exception from an expression position
with testset("raisef (raise exception from an expression)"):
raise_instance = lambda: raisef(ValueError("all ok")) # the argument works the same as in `raise ...`
test_raises[ValueError, raise_instance()]
try:
raise_instance()
except ValueError as err:
test[err.__cause__ is None] # like plain `raise ...`, no cause set (default behavior)
# using the `cause` parameter, raisef can also perform a `raise ... from ...`
exc = TypeError("oof")
raise_instance = lambda: raisef(ValueError("all ok"), cause=exc)
test_raises[ValueError, raise_instance()]
try:
raise_instance()
except ValueError as err:
test[err.__cause__ is exc] # cause specified, like `raise ... from ...`
# can also raise an exception class (no instance)
test_raises[StopIteration, raisef(StopIteration)]
# tryf: handle an exception in an expression position
with testset("tryf (try/except/finally in an expression)"):
raise_instance = lambda: raisef(ValueError("all ok"))
raise_class = lambda: raisef(ValueError)
test[tryf(lambda: "hello") == "hello"]
test[tryf(lambda: "hello",
elsef=lambda: "there") == "there"]
test[tryf(lambda: raise_instance(),
(ValueError, lambda: "got a ValueError")) == "got a ValueError"]
test[tryf(lambda: raise_instance(),
(ValueError, lambda err: f"got a ValueError: '{err.args[0]}'")) == "got a ValueError: 'all ok'"]
test[tryf(lambda: raise_instance(),
((RuntimeError, ValueError), lambda err: f"got a RuntimeError or ValueError: '{err.args[0]}'")) == "got a RuntimeError or ValueError: 'all ok'"]
test[tryf(lambda: "hello",
(ValueError, lambda: "got a ValueError"),
elsef=lambda: "there") == "there"]
test[tryf(lambda: raisef(ValueError("oof")),
(TypeError, lambda: "got a TypeError"),
((TypeError, ValueError), lambda: "got a TypeError or a ValueError"),
(ValueError, lambda: "got a ValueError")) == "got a TypeError or a ValueError"]
e = env(finally_ran=False)
test[e.finally_ran is False]
test[tryf(lambda: "hello",
elsef=lambda: "there",
finallyf=lambda: e << ("finally_ran", True)) == "there"]
test[e.finally_ran is True]
test[tryf(lambda: raise_class(),
(ValueError, lambda: "ok")) == "ok"]
test[tryf(lambda: raise_class(),
((RuntimeError, ValueError), lambda: "ok")) == "ok"]
test_raises[TypeError, tryf(lambda: "hello",
(str, lambda: "got a string"))] # str is not an exception type
test_raises[TypeError, tryf(lambda: "hello",
((ValueError, str), lambda: "got a string"))] # same, in the tuple case
test_raises[TypeError, tryf(lambda: "hello",
("not a type at all!", lambda: "got a string"))]
with testset("equip_with_traceback"):
e = Exception("just testing")
try:
e = equip_with_traceback(e)
except NotImplementedError:
warn["equip_with_traceback only supported on Python 3.7+, skipping test."]
else:
# Can't do meaningful testing on the result, so just check it's there.
test[e.__traceback__ is not None]
test_raises[TypeError, equip_with_traceback("not an exception")]
with testset("reraise_in, reraise"):
class LibraryException(Exception):
pass
class MoreSophisticatedLibraryException(LibraryException):
pass
class UnrelatedException(Exception):
pass
class ApplicationException(Exception):
pass
test_raises[ApplicationException, reraise_in(lambda: raisef(LibraryException),
{LibraryException: ApplicationException})]
# subclasses
test_raises[ApplicationException, reraise_in(lambda: raisef(MoreSophisticatedLibraryException),
{LibraryException: ApplicationException})]
# tuple of types as input
test_raises[ApplicationException, reraise_in(lambda: raisef(UnrelatedException),
{(LibraryException, UnrelatedException):
ApplicationException})]
test[returns_normally(reraise_in(lambda: 42,
{LibraryException: ApplicationException}))]
with test_raises[ApplicationException]:
with reraise({LibraryException: ApplicationException}):
raise LibraryException
with test_raises[ApplicationException]:
with reraise({LibraryException: ApplicationException}):
raise MoreSophisticatedLibraryException
with test_raises[ApplicationException]:
with reraise({(LibraryException, UnrelatedException): ApplicationException}):
raise LibraryException
with test["should return normally"]:
with reraise({LibraryException: ApplicationException}):
42
# async_raise - evil ctypes hack to inject an asynchronous exception into another running thread
if sys.implementation.name != "cpython":
warn["async_raise only supported on CPython, skipping test."] # pragma: no cover
else:
with testset("async_raise (inject KeyboardInterrupt)"):
try:
# Test whether the Python we're running on provides ctypes. At least CPython and PyPy3 do.
# For PyPy3, the general rule is "if it imports, it should work", so let's go along with that.
import ctypes # noqa: F401
out = [] # box, really, but let's not depend on unpythonic.collections in this unrelated unit test module
def test_async_raise_worker():
try:
for j in range(10):
sleep(0.1)
except KeyboardInterrupt: # normally, KeyboardInterrupt is only raised in the main thread
pass
out.append(j)
t = threading.Thread(target=test_async_raise_worker)
t.start()
sleep(0.1) # make sure we're in the while loop
async_raise(t, KeyboardInterrupt)
t.join()
test[out[0] < 9] # terminated early due to the injected KeyboardInterrupt
except NotImplementedError: # pragma: no cover
error["async_raise not supported on this Python interpreter."]
test_raises[TypeError, async_raise(42, KeyboardInterrupt)] # not a thread
t = threading.Thread(target=lambda: None)
t.start()
t.join()
test_raises[ValueError, async_raise(t, KeyboardInterrupt)] # thread no longer running
if __name__ == '__main__': # pragma: no cover
with session(__file__):
runtests()