Skip to content

Commit 965285c

Browse files
haihuang-mlchemelnucfin
authored andcommitted
YCSB-like benchmarker. (googleapis#4539)
* YCSB-like benchmarker. * All comments addressed * 2nd batch for comments. * 3rd pass for comments * 4th pass for comments, part 1 * 4th pass for comments, part 2 * License header for bin/ycsb
1 parent e9e906b commit 965285c

File tree

2 files changed

+285
-0
lines changed

2 files changed

+285
-0
lines changed

spanner/benchmark/bin/ycsb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/bin/bash
2+
3+
# Copyright 2017 Google LLC
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
16+
# A YCSB-like executable that can be integrated into PerfKitBenchmarker.
17+
18+
Dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
19+
20+
python ${DIR}/../ycsb.py "${@:1}"

spanner/benchmark/ycsb.py

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
# Copyright 2017 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
14+
"""The YCSB client in Python.
15+
16+
Usage:
17+
18+
# Set up instance and load data into database.
19+
20+
# Set up environment variables. You should use your own credentials and gcloud
21+
# project.
22+
$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
23+
$ export GCLOUD_PROJECT=gcloud-project-name
24+
25+
# Run the benchmark.
26+
$ python spanner/benchmark/ycsb.py run cloud_spanner -P pkb/workloada \
27+
-p table=usertable -p cloudspanner.instance=ycsb-542756a4 \
28+
-p recordcount=5000 -p operationcount=100 -p cloudspanner.database=ycsb \
29+
-p num_worker=1
30+
31+
# To make a package so it can work with PerfKitBenchmarker.
32+
$ cd spanner; tar -cvzf ycsb-python.0.0.5.tar.gz benchmark/*
33+
34+
"""
35+
36+
from google.cloud import spanner
37+
38+
import argparse
39+
import numpy
40+
import random
41+
import string
42+
import threading
43+
import timeit
44+
45+
46+
OPERATIONS = ['readproportion', 'updateproportion', 'scanproportion',
47+
'insertproportion']
48+
NUM_FIELD = 10
49+
50+
51+
def parse_options():
52+
"""Parses options."""
53+
parser = argparse.ArgumentParser()
54+
parser.add_argument('command', help='The YCSB command.')
55+
parser.add_argument('benchmark', help='The YCSB benchmark.')
56+
parser.add_argument('-P', '--workload', action='store', dest='workload',
57+
default='', help='The path to a YCSB workload file.')
58+
parser.add_argument('-p', '--parameter', action='append', dest='parameters',
59+
default=[], help='The key=value pair of parameter.')
60+
parser.add_argument('-b', '--num_bucket', action='store', type=int,
61+
dest='num_bucket', default=1000,
62+
help='The number of buckets in output.')
63+
64+
args = parser.parse_args()
65+
66+
parameters = {}
67+
parameters['command'] = args.command
68+
parameters['num_bucket'] = args.num_bucket
69+
70+
for parameter in args.parameters:
71+
parts = parameter.strip().split('=')
72+
parameters[parts[0]] = parts[1]
73+
74+
with open(args.workload, 'r') as f:
75+
for line in f.readlines():
76+
parts = line.split('=')
77+
key = parts[0].strip()
78+
if key in OPERATIONS:
79+
parameters[key] = parts[1].strip()
80+
81+
return parameters
82+
83+
84+
def open_database(parameters):
85+
"""Opens a database specified by the parameters from parse_options()."""
86+
spanner_client = spanner.Client()
87+
instance_id = parameters['cloudspanner.instance']
88+
instance = spanner_client.instance(instance_id)
89+
database_id = parameters['cloudspanner.database']
90+
pool = spanner.BurstyPool(int(parameters['num_worker']))
91+
database = instance.database(database_id, pool=pool)
92+
93+
return database
94+
95+
96+
def load_keys(database, parameters):
97+
"""Loads keys from database."""
98+
keys = []
99+
results = database.execute_sql(
100+
'SELECT u.id FROM %s u' % parameters['table'])
101+
102+
for row in results:
103+
keys.append(row[0])
104+
105+
return keys
106+
107+
108+
def read(database, table, key):
109+
"""Does a single read operation."""
110+
with database.snapshot() as snapshot:
111+
result = snapshot.execute_sql('SELECT u.* FROM %s u WHERE u.id="%s"' %
112+
(table, key))
113+
for row in result:
114+
key = row[0]
115+
for i in range(NUM_FIELD):
116+
field = row[i + 1]
117+
118+
119+
def update(database, table, key):
120+
"""Does a single update operation."""
121+
field = random.randrange(10)
122+
value = ''.join(random.choice(string.printable) for i in range(100))
123+
with database.batch() as batch:
124+
batch.update(table=table, columns=('id', 'field%d' % field),
125+
values=[(key, value)])
126+
127+
128+
def do_operation(database, keys, table, operation, latencies_ms):
129+
"""Does a single operation and records latency."""
130+
key = random.choice(keys)
131+
start = timeit.default_timer()
132+
if operation == 'read':
133+
read(database, table, key)
134+
elif operation == 'update':
135+
update(database, table, key)
136+
else:
137+
raise ValueError('Unknown operation: %s' % operation)
138+
end = timeit.default_timer()
139+
latencies_ms[operation].append((end - start) * 1000)
140+
141+
142+
def aggregate_metrics(latencies_ms, duration_ms, num_bucket):
143+
"""Aggregates metrics."""
144+
overall_op_count = 0
145+
op_counts = {operation : len(latency) for operation,
146+
latency in latencies_ms.iteritems()}
147+
overall_op_count = sum([op_count for op_count in op_counts.itervalues()])
148+
149+
print '[OVERALL], RunTime(ms), %f' % duration_ms
150+
print '[OVERALL], Throughput(ops/sec), %f' % (float(overall_op_count) /
151+
duration_ms * 1000.0)
152+
153+
for operation in op_counts.keys():
154+
operation_upper = operation.upper()
155+
print '[%s], Operations, %d' % (operation_upper, op_counts[operation])
156+
print '[%s], AverageLatency(us), %f' % (
157+
operation_upper, numpy.average(latencies_ms[operation]) * 1000.0)
158+
print '[%s], LatencyVariance(us), %f' % (
159+
operation_upper, numpy.var(latencies_ms[operation]) * 1000.0)
160+
print '[%s], MinLatency(us), %f' % (
161+
operation_upper, min(latencies_ms[operation]) * 1000.0)
162+
print '[%s], MaxLatency(us), %f' % (
163+
operation_upper, max(latencies_ms[operation]) * 1000.0)
164+
print '[%s], 95thPercentileLatency(us), %f' % (
165+
operation_upper,
166+
numpy.percentile(latencies_ms[operation], 95.0) * 1000.0)
167+
print '[%s], 99thPercentileLatency(us), %f' % (
168+
operation_upper,
169+
numpy.percentile(latencies_ms[operation], 99.0) * 1000.0)
170+
print '[%s], 99.9thPercentileLatency(us), %f' % (
171+
operation_upper,
172+
numpy.percentile(latencies_ms[operation], 99.9) * 1000.0)
173+
print '[%s], Return=OK, %d' % (operation_upper, op_counts[operation])
174+
latency_array = numpy.array(latencies_ms[operation])
175+
for j in range(num_bucket):
176+
print '[%s], %d, %d' % (
177+
operation_upper, j,
178+
((j <= latency_array) & (latency_array < (j + 1))).sum())
179+
print '[%s], >%d, %d' % (operation_upper, num_bucket,
180+
(num_bucket <= latency_array).sum())
181+
182+
183+
class WorkloadThread(threading.Thread):
184+
"""A single thread running workload."""
185+
186+
def __init__(self, database, keys, parameters, total_weight, weights,
187+
operations):
188+
threading.Thread.__init__(self)
189+
self._database = database
190+
self._keys = keys
191+
self._parameters = parameters
192+
self._total_weight = total_weight
193+
self._weights = weights
194+
self._operations = operations
195+
self._latencies_ms = {}
196+
for operation in self._operations:
197+
self._latencies_ms[operation] = []
198+
199+
def run(self):
200+
"""Run a single thread of the workload."""
201+
i = 0
202+
operation_count = int(self._parameters['operationcount'])
203+
while i < operation_count:
204+
i += 1
205+
weight = random.uniform(0, self._total_weight)
206+
for j in range(len(self._weights)):
207+
if weight <= self._weights[j]:
208+
do_operation(self._database, self._keys,
209+
self._parameters['table'],
210+
self._operations[j], self._latencies_ms)
211+
break
212+
213+
def latencies_ms(self):
214+
"""Returns the latencies."""
215+
return self._latencies_ms
216+
217+
218+
def run_workload(database, keys, parameters):
219+
"""Runs workload against the database."""
220+
total_weight = 0.0
221+
weights = []
222+
operations = []
223+
latencies_ms = {}
224+
for operation in OPERATIONS:
225+
weight = float(parameters[operation])
226+
if weight <= 0.0:
227+
continue
228+
total_weight += weight
229+
op_code = operation.split('proportion')[0]
230+
operations.append(op_code)
231+
weights.append(total_weight)
232+
latencies_ms[op_code] = []
233+
234+
threads = []
235+
start = timeit.default_timer()
236+
for i in range(int(parameters['num_worker'])):
237+
thread = WorkloadThread(database, keys, parameters, total_weight,
238+
weights, operations)
239+
thread.start()
240+
threads.append(thread)
241+
242+
for thread in threads:
243+
thread.join()
244+
end = timeit.default_timer()
245+
246+
for thread in threads:
247+
thread_latencies_ms = thread.latencies_ms()
248+
for key in latencies_ms.keys():
249+
latencies_ms[key].extend(thread_latencies_ms[key])
250+
251+
aggregate_metrics(latencies_ms, (end - start) * 1000.0,
252+
parameters['num_bucket'])
253+
254+
255+
if __name__ == '__main__':
256+
parameters = parse_options()
257+
if parameters['command'] == 'run':
258+
if 'cloudspanner.channels' in parameters:
259+
assert parameters['cloudspanner.channels'] == 1, (
260+
'Python doesn\'t support channels > 1.')
261+
database = open_database(parameters)
262+
keys = load_keys(database, parameters)
263+
run_workload(database, keys, parameters)
264+
else:
265+
raise ValueError('Unknown command %s.' % parameters['command'])

0 commit comments

Comments
 (0)