1212from concurrent import futures
1313import csv
1414import io
15+ import itertools
1516import json
1617import re
1718import sys
2728import psycopg2 .extras
2829
2930
31+ def _chunks (iterable , n ):
32+ i = 0
33+
34+ def _ctr (_ ):
35+ nonlocal i
36+ k = i // n
37+ i += 1
38+ return k
39+ for _ , g in itertools .groupby (iterable , _ctr ):
40+ yield g
41+
42+
3043def psycopg_connect (args ):
3144 conn = psycopg2 .connect (user = args .pguser , host = args .pghost ,
3245 port = args .pgport )
@@ -77,12 +90,16 @@ async def aiopg_execute(conn, query, args):
7790 return rv
7891
7992
93+ async def _aiopg_executemany (cursor , query , rows ):
94+ for batch in _chunks (rows , n = 100 ):
95+ sqls = [cursor .mogrify (query , args ) for args in batch ]
96+ await cursor .execute (b";" .join (sqls ))
97+ return len (rows )
98+
99+
80100async def aiopg_executemany (conn , query , rows ):
81101 cur = await conn .cursor (cursor_factory = psycopg2 .extras .DictCursor )
82- rv = 0
83- for args in rows :
84- await cur .execute (query , args )
85- rv += cur .rowcount
102+ rv = await _aiopg_executemany (cur , query , rows )
86103 cur .close ()
87104 return rv
88105
@@ -100,10 +117,7 @@ async def aiopg_tuples_execute(conn, query, args):
100117
101118async def aiopg_tuples_executemany (conn , query , rows ):
102119 cur = await conn .cursor ()
103- rv = 0
104- for args in rows :
105- await cur .execute (query , args )
106- rv += cur .rowcount
120+ rv = await _aiopg_executemany (cur , query , rows )
107121 cur .close ()
108122 return rv
109123
0 commit comments