@@ -248,12 +248,14 @@ class Connection : public ObjectWrap {
248248 bool connecting_;
249249 bool ioInitialized_;
250250 bool copyOutMode_;
251+ bool copyInMode_;
251252 Connection () : ObjectWrap ()
252253 {
253254 connection_ = NULL ;
254255 connecting_ = false ;
255256 ioInitialized_ = false ;
256257 copyOutMode_ = false ;
258+ copyInMode_ = false ;
257259 TRACE (" Initializing ev watchers" );
258260 read_watcher_.data = this ;
259261 write_watcher_.data = this ;
@@ -278,8 +280,13 @@ class Connection : public ObjectWrap {
278280 EndCopyFrom (const Arguments& args) {
279281 HandleScope scope;
280282 Connection *self = ObjectWrap::Unwrap<Connection>(args.This ());
283+ char * error_msg = NULL ;
284+ if (args[0 ]->IsString ()) {
285+ error_msg = MallocCString (args[0 ]);
286+ }
281287 // TODO handle errors in some way
282- self->EndCopyFrom ();
288+ self->EndCopyFrom (error_msg);
289+ free (error_msg);
283290 return Undefined ();
284291 }
285292
@@ -433,23 +440,19 @@ class Connection : public ObjectWrap {
433440 if (this ->copyOutMode_ ) {
434441 this ->HandleCopyOut ();
435442 }
436- if (PQisBusy (connection_) == 0 ) {
443+ if (! this -> copyInMode_ && ! this -> copyOutMode_ && PQisBusy (connection_) == 0 ) {
437444 PGresult *result;
438445 bool didHandleResult = false ;
439446 while ((result = PQgetResult (connection_))) {
440- if (PGRES_COPY_IN == PQresultStatus (result)) {
441- didHandleResult = false ;
442- Emit (" copyInResponse" );
443- PQclear (result);
447+ didHandleResult = HandleResult (result);
448+ PQclear (result);
449+ if (!didHandleResult) {
450+ // this means that we are in copy in or copy out mode
451+ // in this situation PQgetResult will return same
452+ // result untill all data will be read (copy out) or
453+ // until data end notification (copy in)
454+ // and because of this, we need to break cycle
444455 break ;
445- } else if (PGRES_COPY_OUT == PQresultStatus (result)) {
446- PQclear (result);
447- this ->copyOutMode_ = true ;
448- didHandleResult = this ->HandleCopyOut ();
449- } else {
450- HandleResult (result);
451- didHandleResult = true ;
452- PQclear (result);
453456 }
454457 }
455458 // might have fired from notification
@@ -479,52 +482,65 @@ class Connection : public ObjectWrap {
479482 }
480483 bool HandleCopyOut () {
481484 char * buffer = NULL ;
482- int copied = PQgetCopyData (connection_, &buffer, 1 );
483- if (copied > 0 ) {
484- Buffer * chunk = Buffer::New (buffer, copied);
485+ int copied;
486+ Buffer * chunk;
487+ copied = PQgetCopyData (connection_, &buffer, 1 );
488+ while (copied > 0 ) {
489+ chunk = Buffer::New (buffer, copied);
485490 Handle<Value> node_chunk = chunk->handle_ ;
486491 Emit (" copyData" , &node_chunk);
487492 PQfreemem (buffer);
488- // result was not handled copmpletely
489- return false ;
490- } else if (copied == 0 ) {
493+ copied = PQgetCopyData (connection_, &buffer, 1 );
494+ }
495+ if (copied == 0 ) {
491496 // wait for next read ready
492497 // result was not handled copmpletely
493498 return false ;
494499 } else if (copied == -1 ) {
495- PGresult *result;
496- // result is handled completely
497500 this ->copyOutMode_ = false ;
498- if (PQisBusy (connection_) == 0 && (result = PQgetResult (connection_))) {
499- HandleResult (result);
500- PQclear (result);
501- return true ;
502- } else {
503- return false ;
504- }
501+ return true ;
505502 } else if (copied == -2 ) {
506- // TODO error handling
507- // result is handled with error
508- HandleErrorResult (NULL );
503+ this ->copyOutMode_ = false ;
509504 return true ;
510505 }
511506 }
512- void HandleResult (PGresult* result)
507+ bool HandleResult (PGresult* result)
513508 {
514509 ExecStatusType status = PQresultStatus (result);
515510 switch (status) {
516511 case PGRES_TUPLES_OK:
517512 {
518513 HandleTuplesResult (result);
519514 EmitCommandMetaData (result);
515+ return true ;
520516 }
521517 break ;
522518 case PGRES_FATAL_ERROR:
523- HandleErrorResult (result);
519+ {
520+ HandleErrorResult (result);
521+ return true ;
522+ }
524523 break ;
525524 case PGRES_COMMAND_OK:
526525 case PGRES_EMPTY_QUERY:
527- EmitCommandMetaData (result);
526+ {
527+ EmitCommandMetaData (result);
528+ return true ;
529+ }
530+ break ;
531+ case PGRES_COPY_IN:
532+ {
533+ this ->copyInMode_ = true ;
534+ Emit (" copyInResponse" );
535+ return false ;
536+ }
537+ break ;
538+ case PGRES_COPY_OUT:
539+ {
540+ this ->copyOutMode_ = true ;
541+ Emit (" copyOutResponse" );
542+ return this ->HandleCopyOut ();
543+ }
528544 break ;
529545 default :
530546 printf (" YOU SHOULD NEVER SEE THIS! PLEASE OPEN AN ISSUE ON GITHUB! Unrecogized query status: %s\n " , PQresStatus (status));
@@ -772,8 +788,9 @@ class Connection : public ObjectWrap {
772788 void SendCopyFromChunk (Handle<Object> chunk) {
773789 PQputCopyData (connection_, Buffer::Data (chunk), Buffer::Length (chunk));
774790 }
775- void EndCopyFrom () {
776- PQputCopyEnd (connection_, NULL );
791+ void EndCopyFrom (char * error_msg) {
792+ PQputCopyEnd (connection_, error_msg);
793+ this ->copyInMode_ = false ;
777794 }
778795
779796};
0 commit comments