@@ -30,17 +30,20 @@ type Conn interface {
3030 Close () error
3131 Closed () bool
3232 Cleanup ()
33+ NextPacket () ([]byte , error )
3334
34- // ConnectionID is the connection id at greeting
35+ // ConnectionID is the connection id at greeting.
3536 ConnectionID () uint32
36- NextPacket () ([]byte , error )
3737
38- // Query gets the row iterator
38+ // Query get the row cursor.
3939 Query (sql string ) (Rows , error )
4040 Exec (sql string ) error
4141
42- // FetchAll fetchs all results
42+ // FetchAll fetchs all results.
4343 FetchAll (sql string , maxrows int ) (* sqltypes.Result , error )
44+
45+ // FetchAllWithFunc fetchs all results but the row cursor can be interrupted by the fn.
46+ FetchAllWithFunc (sql string , maxrows int , fn Func ) (* sqltypes.Result , error )
4447}
4548
4649type conn struct {
@@ -238,39 +241,52 @@ func (c *conn) Exec(sql string) error {
238241}
239242
240243func (c * conn ) FetchAll (sql string , maxrows int ) (* sqltypes.Result , error ) {
241- var r * sqltypes.Result
244+ return c .FetchAllWithFunc (sql , maxrows , func (rows Rows ) error { return nil })
245+ }
242246
243- rows , err := c .query (sqldb .COM_QUERY , sql )
244- if err != nil {
247+ // Func calls on every rows.Next.
248+ // If func returns error, the row.Next() is interrupted and the error is return.
249+ type Func func (rows Rows ) error
250+
251+ func (c * conn ) FetchAllWithFunc (sql string , maxrows int , fn Func ) (* sqltypes.Result , error ) {
252+ var err error
253+ var iRows Rows
254+ var qrRow []sqltypes.Value
255+ var qrRows [][]sqltypes.Value
256+
257+ if iRows , err = c .query (sqldb .COM_QUERY , sql ); err != nil {
245258 return nil , err
246259 }
247260
248- r = & sqltypes. Result {
249- Fields : rows . Fields (),
250- RowsAffected : rows . RowsAffected (),
251- InsertID : rows . LastInsertID (),
252- }
261+ for iRows . Next () {
262+ // callback check.
263+ if err = fn ( iRows ); err != nil {
264+ break
265+ }
253266
254- for rows . Next () {
255- if len (r . Rows ) == maxrows {
267+ // Max rows check.
268+ if len (qrRows ) == maxrows {
256269 break
257270 }
258- row , err := rows .RowValues ()
259- if err != nil {
271+ if qrRow , err = iRows .RowValues (); err != nil {
272+ c . Cleanup ()
260273 return nil , err
261274 }
262- r .Rows = append (r .Rows , row )
263- }
264- if len (r .Rows ) > 0 {
265- r .RowsAffected = uint64 (len (r .Rows ))
275+ qrRows = append (qrRows , qrRow )
266276 }
267277
268- // Check last error
269- if err := rows .Close (); err != nil {
278+ // Drain the results and check last error.
279+ if err := iRows .Close (); err != nil {
270280 c .Cleanup ()
271281 return nil , err
272282 }
273- return r , nil
283+ qr := & sqltypes.Result {
284+ Fields : iRows .Fields (),
285+ RowsAffected : uint64 (len (qrRows )),
286+ InsertID : iRows .LastInsertID (),
287+ Rows : qrRows ,
288+ }
289+ return qr , err
274290}
275291
276292// NextPacket used to get the next packet
0 commit comments