Skip to content

Commit f01df9a

Browse files
committed
Easy profiling of benchmarks
1 parent 95537a7 commit f01df9a

6 files changed

Lines changed: 89 additions & 112 deletions

File tree

benchmarks/base.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
from cProfile import Profile
12
import logging
23
import os.path
34
import sys
5+
from threading import Thread
46
import time
57
from optparse import OptionParser
8+
69
from greplin import scales
710

811
dirname = os.path.dirname(os.path.abspath(__file__))
@@ -68,7 +71,7 @@ def teardown(hosts):
6871
session.execute("DROP KEYSPACE " + KEYSPACE)
6972

7073

71-
def benchmark(run_fn):
74+
def benchmark(thread_class):
7275
options, args = parse_options()
7376
for conn_class in options.supported_reactors:
7477
setup(options.hosts)
@@ -87,10 +90,24 @@ def benchmark(run_fn):
8790
""".format(table=TABLE))
8891
values = {'key': 'key', 'a': 'a', 'b': 'b'}
8992

93+
per_thread = options.num_ops / options.threads
94+
threads = []
95+
9096
log.debug("Beginning inserts...")
9197
start = time.time()
9298
try:
93-
run_fn(session, query, values, options.num_ops, options.threads)
99+
for i in range(options.threads):
100+
thread = thread_class(i, session, query, values, per_thread, options.profile)
101+
thread.daemon = True
102+
threads.append(thread)
103+
104+
for thread in threads:
105+
thread.start()
106+
107+
for thread in threads:
108+
while thread.is_alive():
109+
thread.join(timeout=0.5)
110+
94111
end = time.time()
95112
finally:
96113
teardown(options.hosts)
@@ -137,6 +154,9 @@ def parse_options():
137154
help='enable and print metrics for operations')
138155
parser.add_option('-l', '--log-level', default='info',
139156
help='logging level: debug, info, warning, or error')
157+
parser.add_option('-p', '--profile', action='store_true', dest='profile',
158+
help='Profile the run')
159+
140160
options, args = parser.parse_args()
141161

142162
options.hosts = options.hosts.split(',')
@@ -154,3 +174,24 @@ def parse_options():
154174
options.supported_reactors = supported_reactors
155175

156176
return options, args
177+
178+
179+
class BenchmarkThread(Thread):
180+
181+
def __init__(self, thread_num, session, query, values, num_queries, profile):
182+
Thread.__init__(self)
183+
self.thread_num = thread_num
184+
self.session = session
185+
self.query = query
186+
self.values = values
187+
self.num_queries = num_queries
188+
self.profiler = Profile() if profile else None
189+
190+
def start_profile(self):
191+
if self.profiler:
192+
self.profiler.enable()
193+
194+
def finish_profile(self):
195+
if self.profiler:
196+
self.profiler.disable()
197+
self.profiler.dump_stats('profile-%d' % self.thread_num)
Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,20 @@
1-
from base import benchmark
2-
3-
import logging
41
from itertools import count
5-
from threading import Event, Thread
2+
import logging
3+
from threading import Event
4+
5+
from base import benchmark, BenchmarkThread
66

77
log = logging.getLogger(__name__)
88

99
initial = object()
1010

11-
class Runner(Thread):
11+
class Runner(BenchmarkThread):
1212

13-
def __init__(self, session, query, values, num_queries, *args, **kwargs):
14-
self.session = session
15-
self.query = query
16-
self.values = values
17-
self.num_queries = num_queries
13+
def __init__(self, *args, **kwargs):
14+
BenchmarkThread.__init__(self, *args, **kwargs)
1815
self.num_started = count()
1916
self.num_finished = count()
2017
self.event = Event()
21-
Thread.__init__(self)
2218

2319
def handle_error(self, exc):
2420
log.error("Error on insert: %r", exc)
@@ -36,27 +32,15 @@ def insert_next(self, previous_result):
3632
future.add_callbacks(self.insert_next, self.handle_error)
3733

3834
def run(self):
35+
self.start_profile()
36+
3937
for i in range(120):
4038
self.insert_next(initial)
4139

4240
self.event.wait()
4341

44-
def execute(session, query, values, num_queries, num_threads):
45-
46-
per_thread = num_queries / num_threads
47-
threads = []
48-
for i in range(num_threads):
49-
thread = Runner(session, query, values, per_thread)
50-
thread.daemon = True
51-
threads.append(thread)
52-
53-
for thread in threads:
54-
thread.start()
55-
56-
for thread in threads:
57-
while thread.is_alive():
58-
thread.join(timeout=0.5)
42+
self.finish_profile()
5943

6044

6145
if __name__ == "__main__":
62-
benchmark(execute)
46+
benchmark(Runner)

benchmarks/future_batches.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
import logging
22
import Queue
3-
from threading import Thread
43

5-
from base import benchmark
4+
from base import benchmark, BenchmarkThread
65

76
log = logging.getLogger(__name__)
87

9-
def execute(session, query, values, num_queries, num_threads):
8+
class Runner(BenchmarkThread):
109

11-
per_thread = num_queries / num_threads
12-
13-
def run():
10+
def run(self):
1411
futures = Queue.Queue(maxsize=121)
1512

16-
for i in range(per_thread):
13+
self.start_profile()
14+
15+
for i in range(self.num_queries):
1716
if i > 0 and i % 120 == 0:
1817
# clear the existing queue
1918
while True:
@@ -22,7 +21,7 @@ def run():
2221
except Queue.Empty:
2322
break
2423

25-
future = session.execute_async(query, values)
24+
future = self.session.execute_async(self.query, self.values)
2625
futures.put_nowait(future)
2726

2827
while True:
@@ -31,19 +30,8 @@ def run():
3130
except Queue.Empty:
3231
break
3332

34-
threads = []
35-
for i in range(num_threads):
36-
thread = Thread(target=run)
37-
thread.daemon = True
38-
threads.append(thread)
39-
40-
for thread in threads:
41-
thread.start()
42-
43-
for thread in threads:
44-
while thread.is_alive():
45-
thread.join(timeout=0.5)
33+
self.finish_profile()
4634

4735

4836
if __name__ == "__main__":
49-
benchmark(execute)
37+
benchmark(Runner)

benchmarks/future_full_pipeline.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
11
import logging
22
import Queue
3-
from threading import Thread
43

5-
from base import benchmark
4+
from base import benchmark, BenchmarkThread
65

76
log = logging.getLogger(__name__)
87

9-
def execute(session, query, values, num_queries, num_threads):
8+
class Runner(BenchmarkThread):
109

11-
per_thread = num_queries / num_threads
12-
13-
def run():
10+
def run(self):
1411
futures = Queue.Queue(maxsize=121)
1512

16-
for i in range(per_thread):
13+
self.start_profile()
14+
15+
for i in range(self.num_queries):
1716
if i >= 120:
1817
old_future = futures.get_nowait()
1918
old_future.result()
2019

21-
future = session.execute_async(query, values)
20+
future = self.session.execute_async(self.query, self.values)
2221
futures.put_nowait(future)
2322

2423
while True:
@@ -27,19 +26,8 @@ def run():
2726
except Queue.Empty:
2827
break
2928

30-
threads = []
31-
for i in range(num_threads):
32-
thread = Thread(target=run)
33-
thread.daemon = True
34-
threads.append(thread)
35-
36-
for thread in threads:
37-
thread.start()
38-
39-
for thread in threads:
40-
while thread.is_alive():
41-
thread.join(timeout=0.5)
29+
self.finish_profile
4230

4331

4432
if __name__ == "__main__":
45-
benchmark(execute)
33+
benchmark(Runner)

benchmarks/future_full_throttle.py

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,25 @@
11
import logging
2-
from threading import Thread
32

4-
from base import benchmark
3+
from base import benchmark, BenchmarkThread
54

65
log = logging.getLogger(__name__)
76

8-
def execute(session, query, values, num_queries, num_threads):
7+
class Runner(BenchmarkThread):
98

10-
per_thread = num_queries / num_threads
11-
12-
def run():
9+
def run(self):
1310
futures = []
1411

15-
for i in range(per_thread):
16-
future = session.execute_async(query, values)
12+
self.start_profile()
13+
14+
for i in range(self.num_queries):
15+
future = self.session.execute_async(self.query, self.values)
1716
futures.append(future)
1817

1918
for future in futures:
2019
future.result()
2120

22-
threads = []
23-
for i in range(num_threads):
24-
thread = Thread(target=run)
25-
thread.daemon = True
26-
threads.append(thread)
27-
28-
for thread in threads:
29-
thread.start()
30-
31-
for thread in threads:
32-
while thread.is_alive():
33-
thread.join(timeout=0.5)
21+
self.finish_profile()
3422

3523

3624
if __name__ == "__main__":
37-
benchmark(execute)
25+
benchmark(Runner)

benchmarks/sync.py

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,15 @@
1-
from threading import Thread
1+
from base import benchmark, BenchmarkThread
22

3-
from base import benchmark
3+
class Runner(BenchmarkThread):
44

5-
def execute(session, query, values, num_queries, num_threads):
5+
def run(self):
6+
self.start_profile()
67

7-
per_thread = num_queries / num_threads
8+
for i in xrange(self.num_queries):
9+
self.session.execute(self.query, self.values)
810

9-
def run():
10-
for i in xrange(per_thread):
11-
session.execute(query, values)
11+
self.finish_profile()
1212

13-
threads = []
14-
for i in range(num_threads):
15-
thread = Thread(target=run)
16-
thread.daemon = True
17-
threads.append(thread)
18-
19-
for thread in threads:
20-
thread.start()
21-
22-
for thread in threads:
23-
while thread.is_alive():
24-
thread.join(timeout=0.5)
2513

2614
if __name__ == "__main__":
27-
benchmark(execute)
15+
benchmark(Runner)

0 commit comments

Comments
 (0)