@@ -7,21 +7,21 @@ var NativeQuery = require('./query');
77
88var Client = module . exports = function ( config ) {
99 EventEmitter . call ( this ) ;
10- if ( typeof config === 'string' ) {
11- this . connectionString = config ;
12- }
1310 this . native = new Native ( ) ;
1411 this . _queryQueue = [ ] ;
1512 this . _connected = false ;
1613
1714 //keep these on the object for legacy reasons
1815 //for the time being. TODO: deprecate all this jazz
19- var cp = new ConnectionParameters ( config ) ;
16+ var cp = this . connectionParameters = new ConnectionParameters ( config ) ;
2017 this . user = cp . user ;
2118 this . password = cp . password ;
2219 this . database = cp . database ;
2320 this . host = cp . host ;
2421 this . port = cp . port ;
22+
23+ //a hash to hold named queries
24+ this . namedQueries = { } ;
2525} ;
2626
2727util . inherits ( Client , EventEmitter ) ;
@@ -33,20 +33,41 @@ util.inherits(Client, EventEmitter);
3333//the client will emit an error event.
3434Client . prototype . connect = function ( cb ) {
3535 var self = this ;
36- this . native . connect ( this . connectionString , function ( err ) {
37- //error handling
38- if ( err ) {
39- if ( cb ) return cb ( err ) ;
40- return self . emit ( 'error' , err ) ;
41- }
4236
43- //set internal states to connected
44- self . _connected = true ;
45- self . emit ( 'connect' ) ;
46- self . _pulseQueryQueue ( true ) ;
37+ var onError = function ( err ) {
38+ if ( cb ) return cb ( err ) ;
39+ return self . emit ( 'error' , err ) ;
40+ } ;
4741
48- //possibly call the optional callback
49- if ( cb ) cb ( ) ;
42+ this . connectionParameters . getLibpqConnectionString ( function ( err , conString ) {
43+ if ( err ) return onError ( err ) ;
44+ self . native . connect ( conString , function ( err ) {
45+ if ( err ) return onError ( err ) ;
46+
47+ //set internal states to connected
48+ self . _connected = true ;
49+ self . emit ( 'connect' ) ;
50+ self . _pulseQueryQueue ( true ) ;
51+
52+ //handle connection errors from the native layer
53+ self . native . on ( 'error' , function ( err ) {
54+ //error will be handled by active query
55+ if ( self . _activeQuery && self . _activeQuery . state != 'end' ) {
56+ return ;
57+ }
58+ self . emit ( 'error' , err ) ;
59+ } ) ;
60+
61+ self . native . on ( 'notification' , function ( msg ) {
62+ self . emit ( 'notification' , {
63+ channel : msg . relname ,
64+ payload : msg . extra
65+ } ) ;
66+ } ) ;
67+
68+ //possibly call the optional callback
69+ if ( cb ) cb ( ) ;
70+ } ) ;
5071 } ) ;
5172} ;
5273
@@ -86,19 +107,27 @@ Client.prototype.query = function(config, values, callback) {
86107Client . prototype . end = function ( cb ) {
87108 var self = this ;
88109 this . native . end ( function ( ) {
110+ //send an error to the active query
111+ if ( self . _hasActiveQuery ( ) ) {
112+ var msg = 'Connection terminated' ;
113+ self . _queryQueue . length = 0 ;
114+ self . _activeQuery . handleError ( new Error ( msg ) ) ;
115+ }
89116 self . emit ( 'end' ) ;
90117 if ( cb ) cb ( ) ;
91118 } ) ;
92119} ;
93120
121+ Client . prototype . _hasActiveQuery = function ( ) {
122+ return this . _activeQuery && this . _activeQuery . state != 'error' && this . _activeQuery . state != 'end' ;
123+ } ;
124+
94125Client . prototype . _pulseQueryQueue = function ( initialConnection ) {
95126 if ( ! this . _connected ) {
96127 return ;
97128 }
98- if ( this . _activeQuery ) {
99- if ( this . _activeQuery . state != 'error' && this . _activeQuery . state != 'end' ) {
100- return ;
101- }
129+ if ( this . _hasActiveQuery ( ) ) {
130+ return ;
102131 }
103132 var query = this . _queryQueue . shift ( ) ;
104133 if ( ! query ) {
@@ -108,7 +137,7 @@ Client.prototype._pulseQueryQueue = function(initialConnection) {
108137 return ;
109138 }
110139 this . _activeQuery = query ;
111- query . submit ( ) ;
140+ query . submit ( this ) ;
112141 var self = this ;
113142 query . once ( '_done' , function ( ) {
114143 self . _pulseQueryQueue ( ) ;
0 commit comments