Skip to content

Commit c0f3468

Browse files
committed
gracefully ending commands in restart and inerrupt
1 parent 9217228 commit c0f3468

4 files changed

Lines changed: 76 additions & 4 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
export class KernelRestartedError extends Error {
2+
constructor() {
3+
super('Kernel has been restarted');
4+
this.message = 'Kernel has been restarted';
5+
}
6+
}
7+
export class KernelShutdownError extends Error {
8+
constructor() {
9+
super('Kernel has been shutdown');
10+
this.message = 'Kernel has been shutdown';
11+
}
12+
}

src/client/jupyter/jupyter_client/jupyterSocketClient.ts

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { KernelCommand } from './contracts';
99
import { JupyterMessage, ParsedIOMessage } from '../contracts';
1010
import { Helpers } from '../common/helpers';
1111
import * as Rx from 'rx';
12+
import { KernelRestartedError, KernelShutdownError } from '../common/errors';
1213

1314
export class JupyterSocketClient extends SocketCallbackHandler {
1415
constructor(socketServer: SocketServer) {
@@ -30,6 +31,13 @@ export class JupyterSocketClient extends SocketCallbackHandler {
3031
private pid: number;
3132
private guid: string;
3233

34+
public dispose() {
35+
try {
36+
this.SendRawCommand(Commands.ExitCommandBytes);
37+
}
38+
catch (ex) {
39+
}
40+
}
3341
protected handleHandshake(): boolean {
3442
if (typeof this.guid !== 'string') {
3543
this.guid = this.stream.readStringInTransaction();
@@ -121,17 +129,20 @@ export class JupyterSocketClient extends SocketCallbackHandler {
121129
public sendKernelCommand(kernelUUID: string, command: KernelCommand): Promise<any> {
122130
const [def, id] = this.createId<any>();
123131
let commandBytes: Buffer;
132+
let error;
124133
switch (command) {
125134
case KernelCommand.interrupt: {
126135
commandBytes = Commands.InterruptKernelBytes;
127136
break;
128137
}
129138
case KernelCommand.restart: {
130139
commandBytes = Commands.RestartKernelBytes;
140+
error = new KernelRestartedError();
131141
break;
132142
}
133143
case KernelCommand.shutdown: {
134144
commandBytes = Commands.ShutdownKernelBytes;
145+
error = new KernelShutdownError();
135146
break;
136147
}
137148
default: {
@@ -141,6 +152,25 @@ export class JupyterSocketClient extends SocketCallbackHandler {
141152
this.SendRawCommand(commandBytes);
142153
this.stream.WriteString(id);
143154
this.stream.WriteString(kernelUUID);
155+
156+
if (error) {
157+
// Throw errors for pending commands
158+
this.pendingCommands.forEach((pendingDef, key) => {
159+
if (id !== key) {
160+
this.pendingCommands.delete(id);
161+
pendingDef.reject(error);
162+
}
163+
});
164+
165+
this.msgSubject.forEach((subject, key) => {
166+
subject.onError(error);
167+
});
168+
169+
this.msgSubject.clear();
170+
this.unhandledMessages.clear();
171+
this.finalMessage.clear();
172+
}
173+
144174
return def.promise;
145175
}
146176
public onKernelCommandComplete() {
@@ -232,6 +262,8 @@ export class JupyterSocketClient extends SocketCallbackHandler {
232262
const status = message.content.status;
233263
let parsedMesage: ParsedIOMessage;
234264
switch (status) {
265+
case 'abort':
266+
case 'aborted':
235267
case 'error': {
236268
// http://jupyter-client.readthedocs.io/en/latest/messaging.html#request-reply
237269
if (msg_type !== 'complete_reply' && msg_type !== 'inspect_reply') {
@@ -262,6 +294,7 @@ export class JupyterSocketClient extends SocketCallbackHandler {
262294
// If th io message with status='idle' has been received, that means message execution is deemed complete
263295
if (info.ioStatusSent) {
264296
this.finalMessage.delete(msg_id);
297+
this.msgSubject.delete(msg_id);
265298
subject.onNext(parsedMesage);
266299
subject.onCompleted();
267300
}
@@ -297,15 +330,26 @@ export class JupyterSocketClient extends SocketCallbackHandler {
297330

298331
// Ok, if we have received a status of 'idle' this means the execution has completed
299332
if (msg_type === 'status' && message.content.execution_state === 'idle' && this.msgSubject.has(msg_id)) {
300-
// Wait for the shell message to come through
301-
setTimeout(() => {
302-
const subject = this.msgSubject.get(msg_id);
303-
this.msgSubject.delete(msg_id);
333+
let timesWaited = 0;
334+
const waitForFinalIOMessage = () => {
335+
timesWaited += 1;
336+
// The Shell message handler has processed the message
337+
if (!this.msgSubject.has(msg_id)) {
338+
return;
339+
}
304340
// Last message sent on shell channel (status='ok' or status='error')
305341
// and now we have a status message, this means the exection is deemed complete
306342
if (this.finalMessage.has(msg_id)) {
343+
const subject = this.msgSubject.get(msg_id);
307344
const info = this.finalMessage.get(msg_id);
345+
if (!info.shellMessage && timesWaited < 10) {
346+
setTimeout(() => {
347+
waitForFinalIOMessage();
348+
}, 10);
349+
return;
350+
}
308351
this.finalMessage.delete(msg_id);
352+
this.msgSubject.delete(msg_id);
309353
if (info.shellMessage) {
310354
subject.onNext(info.shellMessage);
311355
}
@@ -314,6 +358,11 @@ export class JupyterSocketClient extends SocketCallbackHandler {
314358
else {
315359
this.finalMessage.set(msg_id, { ioStatusSent: true });
316360
}
361+
};
362+
363+
// Wait for the shell message to come through
364+
setTimeout(() => {
365+
waitForFinalIOMessage();
317366
}, 10);
318367
}
319368

@@ -346,6 +395,9 @@ export class JupyterSocketClient extends SocketCallbackHandler {
346395
if (typeof trace !== 'string') {
347396
return;
348397
}
398+
if (cmd === 'exit') {
399+
return;
400+
}
349401
if (id.length > 0 && this.pendingCommands.has(id)) {
350402
const def = this.pendingCommands.get(id);
351403
this.pendingCommands.delete(id);

src/client/jupyter/kernel-manager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { PythonSettings } from '../common/configSettings';
88
import { formatErrorForLogging } from '../common/utils';
99
import { JupyterClientAdapter } from './jupyter_client/main';
1010
import { JupyterClientKernel } from './jupyter_client-Kernel';
11+
import { KernelRestartedError, KernelShutdownError } from './common/errors';
1112

1213
const pythonSettings = PythonSettings.getInstance();
1314

@@ -120,6 +121,9 @@ export class KernelManagerImpl extends EventEmitter {
120121
vscode.window.showWarningMessage(errorMessage);
121122
}
122123
}, reason => {
124+
if (reason instanceof KernelRestartedError || reason instanceof KernelShutdownError) {
125+
return resolve();
126+
}
123127
// It doesn't matter if startup code execution Failed
124128
// Possible they have placed some stuff that is invalid or we have some missing packages (e.g. matplot lib)
125129
this.outputChannel.appendLine(formatErrorForLogging(reason));

src/client/jupyter/main.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { Documentation } from '../common/constants';
1111
import * as telemetryHelper from '../common/telemetry';
1212
import * as telemetryContracts from '../common/telemetryContracts';
1313
import * as main from './jupyter_client/main';
14+
import { KernelRestartedError, KernelShutdownError } from './common/errors';
1415

1516
// Todo: Refactor the error handling and displaying of messages
1617

@@ -134,6 +135,9 @@ export class Jupyter extends vscode.Disposable {
134135
}
135136
responses.push(result.data);
136137
}, reason => {
138+
if (reason instanceof KernelRestartedError || reason instanceof KernelShutdownError) {
139+
return resolve([]);
140+
}
137141
reject(reason);
138142
}, () => {
139143
resolve(responses);

0 commit comments

Comments
 (0)