@@ -48,6 +48,7 @@ def psycopg_copy(conn, query, args):
4848 f .seek (0 )
4949 cur = conn .cursor ()
5050 cur .copy_from (f , copy ['table' ], columns = copy ['columns' ])
51+ conn .commit ()
5152 return cur .rowcount
5253
5354
@@ -154,7 +155,9 @@ async def runner(args, connector, executor, copy_executor, is_async,
154155 if arg_format == 'python' :
155156 query = re .sub (r'\$\d+' , '%s' , query )
156157
157- if query .startswith ('COPY ' ):
158+ is_copy = query .startswith ('COPY ' )
159+
160+ if is_copy :
158161 if copy_executor is None :
159162 raise RuntimeError ('COPY is not supported for {}' .format (executor ))
160163 executor = copy_executor
@@ -213,51 +216,67 @@ async def _do_run(run_duration):
213216 await admin_conn .execute (setup )
214217
215218 try :
216- if args .warmup_time :
217- await _do_run (args .warmup_time )
218-
219- results , duration = await _do_run (args .duration )
220-
221- finally :
222- for conn in conns :
223- if is_async :
224- await conn .close ()
219+ try :
220+ if args .warmup_time :
221+ await _do_run (args .warmup_time )
222+
223+ results , duration = await _do_run (args .duration )
224+ finally :
225+ for conn in conns :
226+ if is_async :
227+ await conn .close ()
228+ else :
229+ conn .close ()
230+
231+ min_latency = float ('inf' )
232+ max_latency = 0.0
233+ queries = 0
234+ rows = 0
235+ latency_stats = None
236+
237+ for result in results :
238+ t_queries , t_rows , t_latency_stats , t_min_latency , t_max_latency = \
239+ result
240+ queries += t_queries
241+ rows += t_rows
242+ if latency_stats is None :
243+ latency_stats = t_latency_stats
225244 else :
226- conn .close ()
245+ latency_stats = np .add (latency_stats , t_latency_stats )
246+ if t_max_latency > max_latency :
247+ max_latency = t_max_latency
248+ if t_min_latency < min_latency :
249+ min_latency = t_min_latency
250+
251+ if is_copy :
252+ copyargs = query_args [- 1 ]
253+
254+ rowcount = await admin_conn .fetchval ('''
255+ SELECT
256+ count(*)
257+ FROM
258+ "{tabname}"
259+ ''' .format (tabname = copyargs ['table' ]))
260+
261+ print (rowcount , file = sys .stderr )
262+
263+ if rowcount < len (query_args [0 ]) * queries :
264+ raise RuntimeError (
265+ 'COPY did not insert the expected number of rows' )
266+
267+ data = {
268+ 'queries' : queries ,
269+ 'rows' : rows ,
270+ 'duration' : duration ,
271+ 'min_latency' : min_latency ,
272+ 'max_latency' : max_latency ,
273+ 'latency_stats' : latency_stats .tolist (),
274+ 'output_format' : args .output_format
275+ }
227276
277+ finally :
228278 if teardown :
229279 await admin_conn .execute (teardown )
230- await admin_conn .close ()
231-
232- min_latency = float ('inf' )
233- max_latency = 0.0
234- queries = 0
235- rows = 0
236- latency_stats = None
237-
238- for result in results :
239- t_queries , t_rows , t_latency_stats , t_min_latency , t_max_latency = \
240- result
241- queries += t_queries
242- rows += t_rows
243- if latency_stats is None :
244- latency_stats = t_latency_stats
245- else :
246- latency_stats = np .add (latency_stats , t_latency_stats )
247- if t_max_latency > max_latency :
248- max_latency = t_max_latency
249- if t_min_latency < min_latency :
250- min_latency = t_min_latency
251-
252- data = {
253- 'queries' : queries ,
254- 'rows' : rows ,
255- 'duration' : duration ,
256- 'min_latency' : min_latency ,
257- 'max_latency' : max_latency ,
258- 'latency_stats' : latency_stats .tolist (),
259- 'output_format' : args .output_format
260- }
261280
262281 print (json .dumps (data ))
263282
0 commit comments