Skip to content

Commit 7865507

Browse files
committed
Add verification of the number of copied records
1 parent db1b1e2 commit 7865507

2 files changed

Lines changed: 62 additions & 43 deletions

File tree

_python/pgbench_python.py

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def psycopg_copy(conn, query, args):
4848
f.seek(0)
4949
cur = conn.cursor()
5050
cur.copy_from(f, copy['table'], columns=copy['columns'])
51+
conn.commit()
5152
return cur.rowcount
5253

5354

@@ -154,7 +155,9 @@ async def runner(args, connector, executor, copy_executor, is_async,
154155
if arg_format == 'python':
155156
query = re.sub(r'\$\d+', '%s', query)
156157

157-
if query.startswith('COPY '):
158+
is_copy = query.startswith('COPY ')
159+
160+
if is_copy:
158161
if copy_executor is None:
159162
raise RuntimeError('COPY is not supported for {}'.format(executor))
160163
executor = copy_executor
@@ -213,51 +216,67 @@ async def _do_run(run_duration):
213216
await admin_conn.execute(setup)
214217

215218
try:
216-
if args.warmup_time:
217-
await _do_run(args.warmup_time)
218-
219-
results, duration = await _do_run(args.duration)
220-
221-
finally:
222-
for conn in conns:
223-
if is_async:
224-
await conn.close()
219+
try:
220+
if args.warmup_time:
221+
await _do_run(args.warmup_time)
222+
223+
results, duration = await _do_run(args.duration)
224+
finally:
225+
for conn in conns:
226+
if is_async:
227+
await conn.close()
228+
else:
229+
conn.close()
230+
231+
min_latency = float('inf')
232+
max_latency = 0.0
233+
queries = 0
234+
rows = 0
235+
latency_stats = None
236+
237+
for result in results:
238+
t_queries, t_rows, t_latency_stats, t_min_latency, t_max_latency =\
239+
result
240+
queries += t_queries
241+
rows += t_rows
242+
if latency_stats is None:
243+
latency_stats = t_latency_stats
225244
else:
226-
conn.close()
245+
latency_stats = np.add(latency_stats, t_latency_stats)
246+
if t_max_latency > max_latency:
247+
max_latency = t_max_latency
248+
if t_min_latency < min_latency:
249+
min_latency = t_min_latency
250+
251+
if is_copy:
252+
copyargs = query_args[-1]
253+
254+
rowcount = await admin_conn.fetchval('''
255+
SELECT
256+
count(*)
257+
FROM
258+
"{tabname}"
259+
'''.format(tabname=copyargs['table']))
260+
261+
print(rowcount, file=sys.stderr)
262+
263+
if rowcount < len(query_args[0]) * queries:
264+
raise RuntimeError(
265+
'COPY did not insert the expected number of rows')
266+
267+
data = {
268+
'queries': queries,
269+
'rows': rows,
270+
'duration': duration,
271+
'min_latency': min_latency,
272+
'max_latency': max_latency,
273+
'latency_stats': latency_stats.tolist(),
274+
'output_format': args.output_format
275+
}
227276

277+
finally:
228278
if teardown:
229279
await admin_conn.execute(teardown)
230-
await admin_conn.close()
231-
232-
min_latency = float('inf')
233-
max_latency = 0.0
234-
queries = 0
235-
rows = 0
236-
latency_stats = None
237-
238-
for result in results:
239-
t_queries, t_rows, t_latency_stats, t_min_latency, t_max_latency = \
240-
result
241-
queries += t_queries
242-
rows += t_rows
243-
if latency_stats is None:
244-
latency_stats = t_latency_stats
245-
else:
246-
latency_stats = np.add(latency_stats, t_latency_stats)
247-
if t_max_latency > max_latency:
248-
max_latency = t_max_latency
249-
if t_min_latency < min_latency:
250-
min_latency = t_min_latency
251-
252-
data = {
253-
'queries': queries,
254-
'rows': rows,
255-
'duration': duration,
256-
'min_latency': min_latency,
257-
'max_latency': max_latency,
258-
'latency_stats': latency_stats.tolist(),
259-
'output_format': args.output_format
260-
}
261280

262281
print(json.dumps(data))
263282

pgbench

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ def format_report(data, target_file):
324324
'''<tr class="benchmark">
325325
<td>{name}</td>
326326
{empty_tds}
327-
</tr>'''.format(name=bname, empty_tds='<td></td>' * vc)
327+
</tr>'''.format(name=bname, empty_tds='<td></td>' * (vc + 1))
328328
)
329329

330330
for metric, metric_data in entry['benchmarks'].items():

0 commit comments

Comments
 (0)