@@ -31,6 +31,7 @@ import (
3131 "github.com/CovenantSQL/CovenantSQL/rpc"
3232 "github.com/CovenantSQL/CovenantSQL/types"
3333 "github.com/CovenantSQL/CovenantSQL/utils/log"
34+ "github.com/CovenantSQL/CovenantSQL/utils/trace"
3435 "github.com/pkg/errors"
3536)
3637
@@ -150,7 +151,7 @@ ackWorkerLoop:
150151 oneTime .Do (func () {
151152 pc = rpc .NewPersistentCaller (c .pCaller .TargetID )
152153 })
153- if err = ack .Sign (c .parent .privKey , false ); err != nil {
154+ if err = ack .Sign (c .parent .privKey ); err != nil {
154155 log .WithField ("target" , pc .TargetID ).WithError (err ).Error ("failed to sign ack" )
155156 continue
156157 }
@@ -164,7 +165,7 @@ ackWorkerLoop:
164165 }
165166
166167 if pc != nil {
167- pc .CloseStream ()
168+ pc .Close ()
168169 }
169170
170171 log .Debug ("ack worker quiting" )
@@ -173,7 +174,7 @@ ackWorkerLoop:
173174func (c * pconn ) close () error {
174175 c .stopAckWorkers ()
175176 if c .pCaller != nil {
176- c .pCaller .CloseStream ()
177+ c .pCaller .Close ()
177178 }
178179 return nil
179180}
@@ -237,6 +238,8 @@ func (c *conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, e
237238
238239// ExecContext implements the driver.ExecerContext.ExecContext method.
239240func (c * conn ) ExecContext (ctx context.Context , query string , args []driver.NamedValue ) (result driver.Result , err error ) {
241+ defer trace .StartRegion (ctx , "dbExec" ).End ()
242+
240243 if atomic .LoadInt32 (& c .closed ) != 0 {
241244 err = driver .ErrBadConn
242245 return
@@ -246,7 +249,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
246249 sq := convertQuery (query , args )
247250
248251 var affectedRows , lastInsertID int64
249- if affectedRows , lastInsertID , _ , err = c .addQuery (types .WriteQuery , sq ); err != nil {
252+ if affectedRows , lastInsertID , _ , err = c .addQuery (ctx , types .WriteQuery , sq ); err != nil {
250253 return
251254 }
252255
@@ -260,14 +263,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
260263
261264// QueryContext implements the driver.QueryerContext.QueryContext method.
262265func (c * conn ) QueryContext (ctx context.Context , query string , args []driver.NamedValue ) (rows driver.Rows , err error ) {
266+ defer trace .StartRegion (ctx , "dbQuery" ).End ()
267+
263268 if atomic .LoadInt32 (& c .closed ) != 0 {
264269 err = driver .ErrBadConn
265270 return
266271 }
267272
268273 // TODO(xq262144): make use of the ctx argument
269274 sq := convertQuery (query , args )
270- _ , _ , rows , err = c .addQuery (types .ReadQuery , sq )
275+ _ , _ , rows , err = c .addQuery (ctx , types .ReadQuery , sq )
271276
272277 return
273278}
@@ -289,7 +294,7 @@ func (c *conn) Commit() (err error) {
289294
290295 if len (c .queries ) > 0 {
291296 // send query
292- if _ , _ , _ , err = c .sendQuery (types .WriteQuery , c .queries ); err != nil {
297+ if _ , _ , _ , err = c .sendQuery (context . Background (), types .WriteQuery , c .queries ); err != nil {
293298 return
294299 }
295300 }
@@ -319,7 +324,7 @@ func (c *conn) Rollback() error {
319324 return nil
320325}
321326
322- func (c * conn ) addQuery (queryType types.QueryType , query * types.Query ) (affectedRows int64 , lastInsertID int64 , rows driver.Rows , err error ) {
327+ func (c * conn ) addQuery (ctx context. Context , queryType types.QueryType , query * types.Query ) (affectedRows int64 , lastInsertID int64 , rows driver.Rows , err error ) {
323328 if c .inTransaction {
324329 // check query type, enqueue query
325330 if queryType == types .ReadQuery {
@@ -344,10 +349,10 @@ func (c *conn) addQuery(queryType types.QueryType, query *types.Query) (affected
344349 "args" : query .Args ,
345350 }).Debug ("execute query" )
346351
347- return c .sendQuery (queryType , []types.Query {* query })
352+ return c .sendQuery (ctx , queryType , []types.Query {* query })
348353}
349354
350- func (c * conn ) sendQuery (queryType types.QueryType , queries []types.Query ) (affectedRows int64 , lastInsertID int64 , rows driver.Rows , err error ) {
355+ func (c * conn ) sendQuery (ctx context. Context , queryType types.QueryType , queries []types.Query ) (affectedRows int64 , lastInsertID int64 , rows driver.Rows , err error ) {
351356 var uc * pconn // peer connection used to execute the queries
352357
353358 uc = c .leader
@@ -399,11 +404,6 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
399404 if err = uc .pCaller .Call (route .DBSQuery .String (), req , & response ); err != nil {
400405 return
401406 }
402-
403- // verify response
404- if err = response .Verify (); err != nil {
405- return
406- }
407407 rows = newRows (& response )
408408
409409 if queryType == types .WriteQuery {
@@ -412,15 +412,19 @@ func (c *conn) sendQuery(queryType types.QueryType, queries []types.Query) (affe
412412 }
413413
414414 // build ack
415- uc .ackCh <- & types.Ack {
416- Header : types.SignedAckHeader {
417- AckHeader : types.AckHeader {
418- Response : response .Header ,
419- NodeID : c .localNodeID ,
420- Timestamp : getLocalTime (),
415+ func () {
416+ defer trace .StartRegion (ctx , "ackEnqueue" ).End ()
417+ uc .ackCh <- & types.Ack {
418+ Header : types.SignedAckHeader {
419+ AckHeader : types.AckHeader {
420+ Response : response .Header .ResponseHeader ,
421+ ResponseHash : response .Header .Hash (),
422+ NodeID : c .localNodeID ,
423+ Timestamp : getLocalTime (),
424+ },
421425 },
422- },
423- }
426+ }
427+ }()
424428
425429 return
426430}
0 commit comments