Skip to content

Commit c231515

Browse files
committed
Add benchmarks for more futures use cases
1 parent 9ac977b commit c231515

5 files changed

Lines changed: 81 additions & 3 deletions

benchmarks/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def teardown():
6565
def benchmark(run_fn):
6666
for conn_class in supported_reactors:
6767
setup()
68-
log.info("Testing %s" % (conn_class.__name__,))
68+
log.info("==== %s ====" % (conn_class.__name__,))
6969

7070
cluster = Cluster(['127.0.0.1'])
7171
cluster.connection_class = conn_class

benchmarks/single_thread_callback_full_pipeline.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
from base import benchmark
22

33
import logging
4-
from collections import deque
54
from itertools import count
65
from threading import Event
76

87
log = logging.getLogger(__name__)
98

10-
futures = deque()
119
initial = object()
1210

1311
def execute(session, query, values, num_queries):
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from base import benchmark
2+
3+
import logging
4+
import Queue
5+
6+
log = logging.getLogger(__name__)
7+
8+
def execute(session, query, values, num_queries):
9+
10+
futures = Queue.Queue(maxsize=121)
11+
12+
for i in range(num_queries):
13+
if i > 0 and i % 120 == 0:
14+
# clear the existing queue
15+
while True:
16+
try:
17+
futures.get_nowait().result()
18+
except Queue.Empty:
19+
break
20+
21+
future = session.execute_async(query, values)
22+
futures.put_nowait(future)
23+
24+
while True:
25+
try:
26+
futures.get_nowait().result()
27+
except Queue.Empty:
28+
break
29+
30+
31+
if __name__ == "__main__":
32+
benchmark(execute)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from base import benchmark
2+
3+
import logging
4+
import Queue
5+
6+
log = logging.getLogger(__name__)
7+
8+
def execute(session, query, values, num_queries):
9+
10+
futures = Queue.Queue(maxsize=121)
11+
12+
for i in range(num_queries):
13+
if i >= 120:
14+
old_future = futures.get_nowait()
15+
old_future.result()
16+
17+
future = session.execute_async(query, values)
18+
futures.put_nowait(future)
19+
20+
while True:
21+
try:
22+
futures.get_nowait().result()
23+
except Queue.Empty:
24+
break
25+
26+
27+
if __name__ == "__main__":
28+
benchmark(execute)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from base import benchmark
2+
3+
import logging
4+
5+
log = logging.getLogger(__name__)
6+
7+
def execute(session, query, values, num_queries):
8+
9+
futures = []
10+
11+
for i in range(num_queries):
12+
future = session.execute_async(query, values)
13+
futures.append(future)
14+
15+
for future in futures:
16+
future.result()
17+
18+
19+
if __name__ == "__main__":
20+
benchmark(execute)

0 commit comments

Comments
 (0)