forked from rethinkdb/rethinkdb
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkload_runner.py
More file actions
194 lines (173 loc) · 7.76 KB
/
workload_runner.py
File metadata and controls
194 lines (173 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# Copyright 2010-2012 RethinkDB, all rights reserved.
import subprocess, os, time, string, signal
from vcoptparse import *
# TODO: Merge this file into `scenario_common.py`?
class MemcachedPorts(object):
def __init__(self, host, http_port, memcached_port):
self.host = host
self.http_port = http_port
self.memcached_port = memcached_port
def add_to_environ(self, env):
env["HOST"] = self.host
env["HTTP_PORT"] = str(self.http_port)
env["PORT"] = str(self.memcached_port)
class RDBPorts(object):
def __init__(self, host, http_port, rdb_port, db_name, table_name):
self.host = host
self.http_port = http_port
self.rdb_port = rdb_port
self.db_name = db_name
self.table_name = table_name
def add_to_environ(self, env):
env["HOST"] = self.host
env["HTTP_PORT"] = str(self.http_port)
env["PORT"] = str(self.rdb_port)
env["DB_NAME"] = self.db_name
env["TABLE_NAME"] = self.table_name
def run(protocol, command_line, ports, timeout):
assert protocol in ["rdb", "memcached"]
if protocol == "memcached":
assert isinstance(ports, MemcachedPorts)
else:
assert isinstance(ports, RDBPorts)
start_time = time.time()
end_time = start_time + timeout
print "Running %s workload %r..." % (protocol, command_line)
# Set up environment
new_environ = os.environ.copy()
ports.add_to_environ(new_environ)
proc = subprocess.Popen(command_line, shell = True, env = new_environ, preexec_fn = lambda: os.setpgid(0, 0))
try:
while time.time() < end_time:
result = proc.poll()
if result is None:
time.sleep(1)
elif result == 0:
print "Done (%d seconds)" % (time.time() - start_time)
return
else:
print "Failed (%d seconds)" % (time.time() - start_time)
raise RuntimeError("workload '%s' failed with error code %d" % (command_line, result))
print "Timed out (%d seconds)" % (time.time() - start_time)
finally:
try:
os.killpg(proc.pid, signal.SIGTERM)
except OSError:
pass
raise RuntimeError("workload timed out before completion")
class ContinuousWorkload(object):
def __init__(self, command_line, protocol, ports):
assert protocol in ["rdb", "memcached"]
if protocol == "memcached":
assert isinstance(ports, MemcachedPorts)
else:
assert isinstance(ports, RDBPorts)
self.command_line = command_line
self.protocol = protocol
self.ports = ports
self.running = False
def __enter__(self):
return self
def start(self):
assert not self.running
print "Starting %s workload %r..." % (self.protocol, self.command_line)
# Set up environment
new_environ = os.environ.copy()
self.ports.add_to_environ(new_environ)
self.proc = subprocess.Popen(self.command_line, shell = True, env = new_environ, preexec_fn = lambda: os.setpgid(0, 0))
self.running = True
self.check()
def check(self):
assert self.running
result = self.proc.poll()
if result is not None:
self.running = False
raise RuntimeError("workload '%s' stopped prematurely with error code %d" % (self.command_line, result))
def stop(self):
self.check()
print "Stopping %r..." % self.command_line
os.killpg(self.proc.pid, signal.SIGINT)
shutdown_grace_period = 10 # seconds
end_time = time.time() + shutdown_grace_period
while time.time() < end_time:
result = self.proc.poll()
if result is None:
time.sleep(1)
elif result == 0 or result == -signal.SIGINT:
print "OK"
self.running = False
break
else:
self.running = False
raise RuntimeError("workload '%s' failed when interrupted with error code %d" % (self.command_line, result))
else:
raise RuntimeError("workload '%s' failed to terminate within %d seconds of SIGINT" % (self.command_line, shutdown_grace_period))
def __exit__(self, exc = None, ty = None, tb = None):
if self.running:
try:
os.killpg(self.proc.pid, signal.SIGTERM)
except OSError:
pass
def prepare_option_parser_for_split_or_continuous_workload(op, allow_between = False):
# `--workload-during` specifies one or more workloads that will be run
# continuously while other stuff is happening. `--extra-*` specifies how
# many seconds to sit while the workloads are running.
op["workload-during"] = ValueFlag("--workload-during", converter = str, default = [], combiner = append_combiner)
op["extra-before"] = IntFlag("--extra-before", 10)
op["extra-between"] = IntFlag("--extra-between", 10)
op["extra-after"] = IntFlag("--extra-after", 10)
# `--workload-*` specifies a workload to run at some point in the scenario.
# `--timeout-*` specifies the timeout to enforce for `--workload-*`.
op["workload-before"] = StringFlag("--workload-before", None)
op["timeout-before"] = IntFlag("--timeout-before", 600)
if allow_between:
op["workload-between"] = StringFlag("--workload-between", None)
op["timeout-between"] = IntFlag("--timeout-between", 600)
op["workload-after"] = StringFlag("--workload-after", None)
op["timeout-after"] = IntFlag("--timeout-after", 600)
class SplitOrContinuousWorkload(object):
def __init__(self, opts, protocol, ports):
self.opts, self.protocol, self.ports = opts, protocol, ports
def __enter__(self):
self.continuous_workloads = []
for cl in self.opts["workload-during"]:
cwl = ContinuousWorkload(cl, self.protocol, self.ports)
cwl.__enter__()
self.continuous_workloads.append(cwl)
return self
def _spin_continuous_workloads(self, seconds):
assert self.opts["workload-during"]
if seconds != 0:
print "Letting %s run for %d seconds..." % \
(" and ".join(repr(x) for x in self.opts["workload-during"]), seconds)
for i in xrange(seconds):
time.sleep(1)
self.check()
def run_before(self):
if self.opts["workload-before"] is not None:
run(self.protocol, self.opts["workload-before"], self.ports, self.opts["timeout-before"])
if self.opts["workload-during"]:
for cwl in self.continuous_workloads:
cwl.start()
self._spin_continuous_workloads(self.opts["extra-before"])
def check(self):
for cwl in self.continuous_workloads:
cwl.check()
def run_between(self):
self.check()
assert "workload-between" in self.opts, "pass allow_between=True to prepare_option_parser_for_split_or_continuous_workload()"
if self.opts["workload-between"] is not None:
run(self.protocol, self.opts["workload-between"], self.ports, self.opts["timeout-between"])
if self.opts["workload-during"]:
self._spin_continuous_workloads(self.opts["extra-between"])
def run_after(self):
if self.opts["workload-during"]:
self._spin_continuous_workloads(self.opts["extra-after"])
for cwl in self.continuous_workloads:
cwl.stop()
if self.opts["workload-after"] is not None:
run(self.protocol, self.opts["workload-after"], self.ports, self.opts["timeout-after"])
def __exit__(self, exc = None, ty = None, tb = None):
if self.opts["workload-during"] is not None:
for cwl in self.continuous_workloads:
cwl.__exit__()