-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathutest.py
More file actions
177 lines (161 loc) · 5.5 KB
/
utest.py
File metadata and controls
177 lines (161 loc) · 5.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# -*- coding: utf-8 -*-
import logging
import time
from ucloud.client import Client
from ucloud.testing import op
from ucloud.testing.exc import ValueNotFoundError, CompareError, ValidateError
logger = logging.getLogger(__name__)
class Step(object):
def __init__(
self,
invoker,
max_retries=0,
retry_interval=0,
startup_delay=0,
retry_for=(CompareError, ValueNotFoundError),
fast_fail=False,
validators=None,
**kwargs
):
""" Step is the test step in a test scenario
:param invoker: invoker is a callable function
:param max_retries: the maximum retry number by the `retry_for` exception,
it will resolve the flaky testing case
:param retry_interval: the interval between twice retrying
:param retry_for: the exceptions to retrying
:param startup_delay: the delay seconds before any action execution
:param fast_fail: if fast fail is true, the test will fail when got
unexpected exception
:return:
"""
self.invoker = invoker
self.max_retries = max_retries
self.retry_interval = retry_interval
self.startup_delay = startup_delay
self.retry_for = retry_for
self.fast_fail = fast_fail
self.validators = validators or (lambda _: [])
self.extras = kwargs
def run(self, client, variables):
if self.startup_delay:
time.sleep(self.startup_delay)
for i in range(self.max_retries + 1):
try:
result = self.invoker(client, variables)
except self.retry_for as e:
if i == self.max_retries:
raise e
if self.retry_interval:
time.sleep(self.retry_interval)
continue
else:
result = self.set_default_response(result)
errors = []
for validator in self.validators(variables):
try:
op.check(
validator[0],
value_at_path(result, validator[1]),
validator[2],
)
except self.retry_for as e:
errors.append(e)
if errors:
if i == self.max_retries:
raise ValidateError(errors)
if self.retry_interval:
time.sleep(self.retry_interval)
continue
return result
def set_default_response(self, resp):
resp = resp.copy()
resp.setdefault("RetCode", 0)
if "action" in self.extras:
resp["Action"] = "{}Response".format(self.extras["action"])
return resp
class Scenario(object):
def __init__(self, id_):
self.id = id_
self.variables = {}
self.errors = []
self.steps = []
def summary(self):
logger.info("=" * 42)
logger.info("TEST SET {}".format(self.id))
logger.info("=" * 42)
if self.errors:
logger.info("-" * 42)
logger.info("ERRORS")
logger.info("-" * 42)
for err in self.errors:
logger.error(err)
logger.info("Errors!")
else:
logger.info("Success!")
def step(
self,
max_retries=0,
retry_interval=0,
startup_delay=0,
retry_for=(CompareError, ValueNotFoundError),
fast_fail=False,
validators=None,
**kwargs
):
def deco(fn):
step = Step(
invoker=fn,
max_retries=max_retries,
retry_interval=retry_interval,
startup_delay=startup_delay,
retry_for=retry_for,
fast_fail=fast_fail,
validators=validators,
**kwargs
)
self.steps.append(step)
return fn
return deco
def run(self, client):
for i, step in enumerate(self.steps):
try:
action = step.extras.get("action", "unknown")
client.logger.info(
"running {} step {} {}".format(self.id, i + 1, action)
)
step.run(client, self.variables)
except CompareError as e:
self.errors.append(e)
if step.fast_fail:
self.summary()
raise e
client.logger.error(e)
self.summary()
def initial(self, variables=None):
self.variables = variables
def value_at_path(d, path):
""" access value by object path
:param d: dict or list of dict
:param path: object path like `Data.1.UHostId`
:return: any value access by path
"""
if d is None:
return
indices = path.split(".")
result = d
for i, key in enumerate(indices):
if isinstance(result, list):
if not key.isdigit():
return
if len(result) <= int(key):
msg = "{} not found".format(".".join(indices[: i + 1]))
raise ValueNotFoundError(msg)
result = result[int(key)]
continue
if isinstance(result, dict):
result = {k.lower(): v for k, v in result.items()}.get(key.lower())
if result is None:
msg = "{} not found".format(".".join(indices[: i + 1]))
raise ValueNotFoundError(msg)
continue
return result