@@ -9,6 +9,7 @@ import { KernelCommand } from './contracts';
99import { JupyterMessage , ParsedIOMessage } from '../contracts' ;
1010import { Helpers } from '../common/helpers' ;
1111import * as Rx from 'rx' ;
12+ import { KernelRestartedError , KernelShutdownError } from '../common/errors' ;
1213
1314export class JupyterSocketClient extends SocketCallbackHandler {
1415 constructor ( socketServer : SocketServer ) {
@@ -30,6 +31,13 @@ export class JupyterSocketClient extends SocketCallbackHandler {
3031 private pid : number ;
3132 private guid : string ;
3233
34+ public dispose ( ) {
35+ try {
36+ this . SendRawCommand ( Commands . ExitCommandBytes ) ;
37+ }
38+ catch ( ex ) {
39+ }
40+ }
3341 protected handleHandshake ( ) : boolean {
3442 if ( typeof this . guid !== 'string' ) {
3543 this . guid = this . stream . readStringInTransaction ( ) ;
@@ -121,17 +129,20 @@ export class JupyterSocketClient extends SocketCallbackHandler {
121129 public sendKernelCommand ( kernelUUID : string , command : KernelCommand ) : Promise < any > {
122130 const [ def , id ] = this . createId < any > ( ) ;
123131 let commandBytes : Buffer ;
132+ let error ;
124133 switch ( command ) {
125134 case KernelCommand . interrupt : {
126135 commandBytes = Commands . InterruptKernelBytes ;
127136 break ;
128137 }
129138 case KernelCommand . restart : {
130139 commandBytes = Commands . RestartKernelBytes ;
140+ error = new KernelRestartedError ( ) ;
131141 break ;
132142 }
133143 case KernelCommand . shutdown : {
134144 commandBytes = Commands . ShutdownKernelBytes ;
145+ error = new KernelShutdownError ( ) ;
135146 break ;
136147 }
137148 default : {
@@ -141,6 +152,25 @@ export class JupyterSocketClient extends SocketCallbackHandler {
141152 this . SendRawCommand ( commandBytes ) ;
142153 this . stream . WriteString ( id ) ;
143154 this . stream . WriteString ( kernelUUID ) ;
155+
156+ if ( error ) {
157+ // Throw errors for pending commands
158+ this . pendingCommands . forEach ( ( pendingDef , key ) => {
159+ if ( id !== key ) {
160+ this . pendingCommands . delete ( id ) ;
161+ pendingDef . reject ( error ) ;
162+ }
163+ } ) ;
164+
165+ this . msgSubject . forEach ( ( subject , key ) => {
166+ subject . onError ( error ) ;
167+ } ) ;
168+
169+ this . msgSubject . clear ( ) ;
170+ this . unhandledMessages . clear ( ) ;
171+ this . finalMessage . clear ( ) ;
172+ }
173+
144174 return def . promise ;
145175 }
146176 public onKernelCommandComplete ( ) {
@@ -232,6 +262,8 @@ export class JupyterSocketClient extends SocketCallbackHandler {
232262 const status = message . content . status ;
233263 let parsedMesage : ParsedIOMessage ;
234264 switch ( status ) {
265+ case 'abort' :
266+ case 'aborted' :
235267 case 'error' : {
236268 // http://jupyter-client.readthedocs.io/en/latest/messaging.html#request-reply
237269 if ( msg_type !== 'complete_reply' && msg_type !== 'inspect_reply' ) {
@@ -262,6 +294,7 @@ export class JupyterSocketClient extends SocketCallbackHandler {
262294 // If th io message with status='idle' has been received, that means message execution is deemed complete
263295 if ( info . ioStatusSent ) {
264296 this . finalMessage . delete ( msg_id ) ;
297+ this . msgSubject . delete ( msg_id ) ;
265298 subject . onNext ( parsedMesage ) ;
266299 subject . onCompleted ( ) ;
267300 }
@@ -297,15 +330,26 @@ export class JupyterSocketClient extends SocketCallbackHandler {
297330
298331 // Ok, if we have received a status of 'idle' this means the execution has completed
299332 if ( msg_type === 'status' && message . content . execution_state === 'idle' && this . msgSubject . has ( msg_id ) ) {
300- // Wait for the shell message to come through
301- setTimeout ( ( ) => {
302- const subject = this . msgSubject . get ( msg_id ) ;
303- this . msgSubject . delete ( msg_id ) ;
333+ let timesWaited = 0 ;
334+ const waitForFinalIOMessage = ( ) => {
335+ timesWaited += 1 ;
336+ // The Shell message handler has processed the message
337+ if ( ! this . msgSubject . has ( msg_id ) ) {
338+ return ;
339+ }
304340 // Last message sent on shell channel (status='ok' or status='error')
305341 // and now we have a status message, this means the exection is deemed complete
306342 if ( this . finalMessage . has ( msg_id ) ) {
343+ const subject = this . msgSubject . get ( msg_id ) ;
307344 const info = this . finalMessage . get ( msg_id ) ;
345+ if ( ! info . shellMessage && timesWaited < 10 ) {
346+ setTimeout ( ( ) => {
347+ waitForFinalIOMessage ( ) ;
348+ } , 10 ) ;
349+ return ;
350+ }
308351 this . finalMessage . delete ( msg_id ) ;
352+ this . msgSubject . delete ( msg_id ) ;
309353 if ( info . shellMessage ) {
310354 subject . onNext ( info . shellMessage ) ;
311355 }
@@ -314,6 +358,11 @@ export class JupyterSocketClient extends SocketCallbackHandler {
314358 else {
315359 this . finalMessage . set ( msg_id , { ioStatusSent : true } ) ;
316360 }
361+ } ;
362+
363+ // Wait for the shell message to come through
364+ setTimeout ( ( ) => {
365+ waitForFinalIOMessage ( ) ;
317366 } , 10 ) ;
318367 }
319368
@@ -346,6 +395,9 @@ export class JupyterSocketClient extends SocketCallbackHandler {
346395 if ( typeof trace !== 'string' ) {
347396 return ;
348397 }
398+ if ( cmd === 'exit' ) {
399+ return ;
400+ }
349401 if ( id . length > 0 && this . pendingCommands . has ( id ) ) {
350402 const def = this . pendingCommands . get ( id ) ;
351403 this . pendingCommands . delete ( id ) ;
0 commit comments