diff --git a/news/3 Code Health/11470.md b/news/3 Code Health/11470.md new file mode 100644 index 000000000000..68fb36f751b8 --- /dev/null +++ b/news/3 Code Health/11470.md @@ -0,0 +1 @@ +Remove IJMPConnection implementation while maintaining tests written for it. \ No newline at end of file diff --git a/src/client/datascience/raw-kernel/enchannel-zmq-backend-6/index.ts b/src/client/datascience/raw-kernel/enchannel-zmq-backend-6/index.ts deleted file mode 100644 index c421b9600ac2..000000000000 --- a/src/client/datascience/raw-kernel/enchannel-zmq-backend-6/index.ts +++ /dev/null @@ -1,286 +0,0 @@ -// This code was copied from https://github.com/nteract/enchannel-zmq-backend/blob/master/src/index.ts -// and modified to work with zeromq-beta-6 - -import type { Channels, JupyterMessage } from '@nteract/messaging'; -import * as wireProtocol from '@nteract/messaging/lib/wire-protocol'; -import * as Events from 'events'; -import * as rxjs from 'rxjs'; -import { map, publish, refCount } from 'rxjs/operators'; -import { v4 as uuid } from 'uuid'; -import * as zeromq from 'zeromq'; -import { traceError } from '../../../common/logger'; - -type ChannelName = 'iopub' | 'stdin' | 'shell' | 'control'; - -// tslint:disable: interface-name no-any -export interface JupyterConnectionInfo { - version: number; - iopub_port: number; - shell_port: number; - stdin_port: number; - control_port: number; - signature_scheme: 'hmac-sha256'; - hb_port: number; - ip: string; - key: string; - transport: 'tcp' | 'ipc'; -} - -interface HeaderFiller { - session: string; - username: string; -} - -/** - * Takes a Jupyter spec connection info object and channel and returns the - * string for a channel. Abstracts away tcp and ipc connection string - * formatting - * - * @param config Jupyter connection information - * @param channel Jupyter channel ("iopub", "shell", "control", "stdin") - * - * @returns The connection string - */ -export const formConnectionString = (config: JupyterConnectionInfo, channel: ChannelName) => { - const portDelimiter = config.transport === 'tcp' ? ':' : '-'; - const port = config[`${channel}_port` as keyof JupyterConnectionInfo]; - if (!port) { - throw new Error(`Port not found for channel "${channel}"`); - } - return `${config.transport}://${config.ip}${portDelimiter}${port}`; -}; - -/** - * Creates a socket for the given channel with ZMQ channel type given a config - * - * @param channel Jupyter channel ("iopub", "shell", "control", "stdin") - * @param identity UUID - * @param config Jupyter connection information - * - * @returns The new Jupyter ZMQ socket - */ -export async function createSubscriber( - channel: ChannelName, - config: JupyterConnectionInfo -): Promise { - const socket = new zeromq.Subscriber(); - - const url = formConnectionString(config, channel); - socket.connect(url); - return socket; -} - -/** - * Creates a socket for the given channel with ZMQ channel type given a config - * - * @param channel Jupyter channel ("iopub", "shell", "control", "stdin") - * @param identity UUID - * @param config Jupyter connection information - * - * @returns The new Jupyter ZMQ socket - */ -export async function createDealer( - channel: ChannelName, - identity: string, - config: JupyterConnectionInfo -): Promise { - // tslint:disable-next-line: no-require-imports - const socket = new zeromq.Dealer({ routingId: identity }); - - const url = formConnectionString(config, channel); - socket.connect(url); - return socket; -} - -export const getUsername = () => - process.env.LOGNAME || process.env.USER || process.env.LNAME || process.env.USERNAME || 'username'; // This is the fallback that the classic notebook uses - -interface Sockets { - shell: zeromq.Dealer; - control: zeromq.Dealer; - stdin: zeromq.Dealer; - iopub: zeromq.Subscriber; -} - -/** - * Sets up the sockets for each of the jupyter channels. - * - * @param config Jupyter connection information - * @param subscription The topic to filter the subscription to the iopub channel on - * @param identity UUID - * @param jmp A reference to the JMP Node module - * - * @returns Sockets for each Jupyter channel - */ -export const createSockets = async ( - config: JupyterConnectionInfo, - subscription: string = '', - identity = uuid() -): Promise => { - const [shell, control, stdin, iopub] = await Promise.all([ - createDealer('shell', identity, config), - createDealer('control', identity, config), - createDealer('stdin', identity, config), - createSubscriber('iopub', config) - ]); - - // NOTE: ZMQ PUB/SUB subscription (not an Rx subscription) - iopub.subscribe(subscription); - - return { - shell, - control, - stdin, - iopub - }; -}; - -class SocketEventEmitter extends Events.EventEmitter { - constructor(socket: zeromq.Dealer | zeromq.Subscriber) { - super(); - this.waitForReceive(socket); - } - - private waitForReceive(socket: zeromq.Dealer | zeromq.Subscriber) { - if (!socket.closed) { - // tslint:disable-next-line: no-floating-promises - socket - .receive() - .then((b) => { - this.emit('message', b); - setTimeout(this.waitForReceive.bind(this, socket), 0); - }) - .catch((exc) => { - traceError('Exception communicating with kernel:', exc); - }); - } - } -} - -/** - * Creates a multiplexed set of channels. - * - * @param sockets An object containing associations between channel types and 0MQ sockets - * @param header The session and username to place in kernel message headers - * @param jmp A reference to the JMP Node module - * - * @returns Creates an Observable for each channel connection that allows us - * to send and receive messages through the Jupyter protocol. - */ -export const createMainChannelFromSockets = ( - sockets: Sockets, - connectionInfo: JupyterConnectionInfo, - header: HeaderFiller = { - session: uuid(), - username: getUsername() - } -): Channels => { - // The mega subject that encapsulates all the sockets as one multiplexed - // stream - const outgoingMessages = rxjs.Subscriber.create( - async (message) => { - // There's always a chance that a bad message is sent, we'll ignore it - // instead of consuming it - if (!message || !message.channel) { - console.warn('message sent without a channel', message); - return; - } - const socket = (sockets as any)[message.channel]; - if (!socket) { - // If, for some reason, a message is sent on a channel we don't have - // a socket for, warn about it but don't bomb the stream - console.warn('channel not understood for message', message); - return; - } - try { - const jMessage: wireProtocol.RawJupyterMessage = { - // Fold in the setup header to ease usage of messages on channels - header: { ...message.header, ...header }, - parent_header: message.parent_header as any, - content: message.content, - metadata: message.metadata, - buffers: message.buffers as any, - idents: [] - }; - if ((socket as any).send !== undefined) { - await (socket as zeromq.Dealer).send( - wireProtocol.encode(jMessage, connectionInfo.key, connectionInfo.signature_scheme) - ); - } - } catch (err) { - traceError('Error sending message', err, message); - } - }, - undefined, // not bothering with sending errors on - () => { - // When the subject is completed / disposed, close all the event - // listeners and shutdown the socket - const closer = (closable: { close(): void }) => { - try { - closable.close(); - } catch (ex) { - traceError(`Error during socket shutdown`, ex); - } - }; - closer(sockets.control); - closer(sockets.iopub); - closer(sockets.shell); - closer(sockets.stdin); - } - ); - - // Messages from kernel on the sockets - const incomingMessages: rxjs.Observable = rxjs - .merge( - // Form an Observable with each socket - ...Object.keys(sockets).map((name) => { - // Wrap in something that will emit an event whenever a message is received. - const socketEmitter = new SocketEventEmitter((sockets as any)[name]); - return rxjs.fromEvent(socketEmitter, 'message').pipe( - map( - (body: any): JupyterMessage => { - const message = wireProtocol.decode( - body, - connectionInfo.key, - connectionInfo.signature_scheme - ) as any; - // Add on our channel property - message.channel = name; - return message; - } - ), - publish(), - refCount() - ); - }) - ) - .pipe(publish(), refCount()); - - return rxjs.Subject.create(outgoingMessages, incomingMessages); -}; - -/** - * Creates a multiplexed set of channels. - * - * @param config Jupyter connection information - * @param config.ip IP address of the kernel - * @param config.transport Transport, e.g. TCP - * @param config.signature_scheme Hashing scheme, e.g. hmac-sha256 - * @param config.iopub_port Port for iopub channel - * @param subscription subscribed topic; defaults to all - * @param identity UUID - * - * @returns Subject containing multiplexed channels - */ -export const createMainChannel = async ( - config: JupyterConnectionInfo, - subscription: string = '', - identity: string = uuid(), - header: HeaderFiller = { - session: uuid(), - username: getUsername() - } -): Promise => { - const sockets = await createSockets(config, subscription, identity); - return createMainChannelFromSockets(sockets, config, header); -}; diff --git a/src/client/datascience/raw-kernel/enchannelJMPConnection.ts b/src/client/datascience/raw-kernel/enchannelJMPConnection.ts deleted file mode 100644 index 94adc7401cbb..000000000000 --- a/src/client/datascience/raw-kernel/enchannelJMPConnection.ts +++ /dev/null @@ -1,42 +0,0 @@ -import type { KernelMessage } from '@jupyterlab/services'; -import type { Channels } from '@nteract/messaging'; -import { injectable } from 'inversify'; -import { noop } from '../../common/utils/misc'; -import { IJMPConnection, IJMPConnectionInfo } from '../types'; - -@injectable() -export class EnchannelJMPConnection implements IJMPConnection { - private mainChannel: Channels | undefined; - - public async connect(connectInfo: IJMPConnectionInfo): Promise { - // zmq may not load, so do it dynamically - // tslint:disable-next-line: no-require-imports - const enchannelZmq6 = (await require('./enchannel-zmq-backend-6/index')) as typeof import('./enchannel-zmq-backend-6/index'); - - // tslint:disable-next-line:no-any - this.mainChannel = await enchannelZmq6.createMainChannel(connectInfo as any); - } - public sendMessage(message: KernelMessage.IMessage): void { - if (this.mainChannel) { - // jupyterlab types and enchannel types seem to have small changes - // with how they are defined, just use an any cast for now, but they appear to be the - // same actual object - // tslint:disable-next-line:no-any - this.mainChannel.next(message as any); - } - } - // tslint:disable-next-line: no-any - public subscribe(handlerFunc: (message: KernelMessage.IMessage) => void, errorHandler?: (exc: any) => void) { - if (this.mainChannel) { - // tslint:disable-next-line:no-any - this.mainChannel.subscribe(handlerFunc as any, errorHandler ? errorHandler : noop); - } - } - - public dispose(): void { - if (this.mainChannel) { - this.mainChannel.unsubscribe(); - this.mainChannel = undefined; - } - } -} diff --git a/src/client/datascience/raw-kernel/rawKernel.ts b/src/client/datascience/raw-kernel/rawKernel.ts index 0737a15bbc4e..4bf25d54ed16 100644 --- a/src/client/datascience/raw-kernel/rawKernel.ts +++ b/src/client/datascience/raw-kernel/rawKernel.ts @@ -4,7 +4,6 @@ import type { Kernel, KernelMessage, ServerConnection } from '@jupyterlab/servic import * as uuid from 'uuid/v4'; import { isTestExecution } from '../../common/constants'; import { IDisposable } from '../../common/types'; -import { noop } from '../../common/utils/misc'; import { IKernelProcess } from '../kernel-launcher/types'; import { IWebSocketLike } from '../kernelSocketWrapper'; import { IKernelSocket } from '../types'; @@ -107,9 +106,7 @@ export class RawKernel implements Kernel.IKernel { public shutdown(): Promise { suppressShutdownErrors(this.realKernel); - return this.realKernel.shutdown().catch((_exc) => { - noop(); - }); + return this.realKernel.shutdown().then(() => this.kernelProcess.dispose()); } public getSpec(): Promise { return this.realKernel.getSpec(); diff --git a/src/client/datascience/serviceRegistry.ts b/src/client/datascience/serviceRegistry.ts index e67c0dff7484..423403491b09 100644 --- a/src/client/datascience/serviceRegistry.ts +++ b/src/client/datascience/serviceRegistry.ts @@ -81,7 +81,6 @@ import { PlotViewer } from './plotting/plotViewer'; import { PlotViewerProvider } from './plotting/plotViewerProvider'; import { PreWarmActivatedJupyterEnvironmentVariables } from './preWarmVariables'; import { ProgressReporter } from './progress/progressReporter'; -import { EnchannelJMPConnection } from './raw-kernel/enchannelJMPConnection'; import { RawNotebookProviderWrapper } from './raw-kernel/rawNotebookProviderWrapper'; import { StatusProvider } from './statusProvider'; import { ThemeFinder } from './themeFinder'; @@ -103,7 +102,6 @@ import { IInteractiveWindow, IInteractiveWindowListener, IInteractiveWindowProvider, - IJMPConnection, IJupyterCommandFactory, IJupyterDebugger, IJupyterExecution, @@ -209,7 +207,6 @@ export function registerTypes(serviceManager: IServiceManager) { serviceManager.addSingleton(NativeEditorSynchronizer, NativeEditorSynchronizer); serviceManager.addSingleton(INotebookProvider, NotebookProvider); serviceManager.addSingleton(IJupyterServerProvider, NotebookServerProvider); - serviceManager.add(IJMPConnection, EnchannelJMPConnection); serviceManager.addSingleton(IPyWidgetMessageDispatcherFactory, IPyWidgetMessageDispatcherFactory); serviceManager.addSingleton(IJupyterInterpreterDependencyManager, JupyterInterpreterSubCommandExecutionService); serviceManager.addSingleton(IJupyterSubCommandExecutionService, JupyterInterpreterSubCommandExecutionService); diff --git a/src/client/datascience/types.ts b/src/client/datascience/types.ts index db7fe4fe3bd4..da4d1daeec30 100644 --- a/src/client/datascience/types.ts +++ b/src/client/datascience/types.ts @@ -1119,29 +1119,6 @@ export type KernelSocketInformation = { readonly options: KernelSocketOptions; }; -// Connection info to connect to a kernel over JMP -export interface IJMPConnectionInfo { - version: number; - iopub_port: number; - shell_port: number; - stdin_port: number; - control_port: number; - signature_scheme: string; - hb_port: number; - ip: string; - key: string; - transport: string; -} - -export const IJMPConnection = Symbol('IJMPConnection'); -// A service to send and recieve messages over Jupyter messaging protocol -export interface IJMPConnection extends IDisposable { - connect(connectInfo: IJMPConnectionInfo): Promise; - sendMessage(message: KernelMessage.IMessage): void; - // tslint:disable-next-line: no-any - subscribe(handlerFunc: (message: KernelMessage.IMessage) => void, errorHandler?: (exc: any) => void): void; -} - export enum KernelInterpreterDependencyResponse { ok, cancel diff --git a/src/test/datascience/dataScienceIocContainer.ts b/src/test/datascience/dataScienceIocContainer.ts index 0e58d9be4a2f..116afa487f86 100644 --- a/src/test/datascience/dataScienceIocContainer.ts +++ b/src/test/datascience/dataScienceIocContainer.ts @@ -229,7 +229,6 @@ import { NotebookAndInteractiveWindowUsageTracker } from '../../client/datascien import { PlotViewer } from '../../client/datascience/plotting/plotViewer'; import { PlotViewerProvider } from '../../client/datascience/plotting/plotViewerProvider'; import { ProgressReporter } from '../../client/datascience/progress/progressReporter'; -import { EnchannelJMPConnection } from '../../client/datascience/raw-kernel/enchannelJMPConnection'; import { RawNotebookProviderWrapper } from '../../client/datascience/raw-kernel/rawNotebookProviderWrapper'; import { StatusProvider } from '../../client/datascience/statusProvider'; import { ThemeFinder } from '../../client/datascience/themeFinder'; @@ -251,7 +250,6 @@ import { IInteractiveWindow, IInteractiveWindowListener, IInteractiveWindowProvider, - IJMPConnection, IJupyterCommandFactory, IJupyterDebugger, IJupyterExecution, @@ -1100,7 +1098,6 @@ export class DataScienceIocContainer extends UnitTestIocContainer { this.serviceManager.addSingleton(KernelService, KernelService); this.serviceManager.addSingleton(IProcessServiceFactory, ProcessServiceFactory); this.serviceManager.addSingleton(IPythonExecutionFactory, PythonExecutionFactory); - this.serviceManager.add(IJMPConnection, EnchannelJMPConnection); // Make sure full interpreter services are available. registerInterpreterTypes(this.serviceManager); diff --git a/src/test/datascience/kernelLauncher.functional.test.ts b/src/test/datascience/kernelLauncher.functional.test.ts index e74fa379bc91..e7cfcbe0f070 100644 --- a/src/test/datascience/kernelLauncher.functional.test.ts +++ b/src/test/datascience/kernelLauncher.functional.test.ts @@ -5,7 +5,6 @@ import { assert } from 'chai'; import { KernelMessage } from '@jupyterlab/services'; -import { Observable } from 'rxjs'; import * as uuid from 'uuid/v4'; import { IFileSystem } from '../../client/common/platform/types'; import { IProcessServiceFactory, IPythonExecutionFactory } from '../../client/common/process/types'; @@ -13,11 +12,13 @@ import { createDeferred } from '../../client/common/utils/async'; import { JupyterZMQBinariesNotFoundError } from '../../client/datascience/jupyter/jupyterZMQBinariesNotFoundError'; import { KernelLauncher } from '../../client/datascience/kernel-launcher/kernelLauncher'; import { IKernelConnection, IKernelFinder } from '../../client/datascience/kernel-launcher/types'; -import { IJMPConnection, IJupyterKernelSpec } from '../../client/datascience/types'; +import { createRawKernel } from '../../client/datascience/raw-kernel/rawKernel'; +import { IJupyterKernelSpec } from '../../client/datascience/types'; import { PythonInterpreter } from '../../client/interpreter/contracts'; -import { PYTHON_PATH, sleep, waitForCondition } from '../common'; +import { sleep, waitForCondition } from '../common'; import { DataScienceIocContainer } from './dataScienceIocContainer'; import { MockKernelFinder } from './mockKernelFinder'; +import { requestExecute } from './raw-kernel/rawKernelTestHelpers'; suite('DataScience - Kernel Launcher', () => { let ioc: DataScienceIocContainer; @@ -34,10 +35,10 @@ suite('DataScience - Kernel Launcher', () => { const file = ioc.serviceContainer.get(IFileSystem); const processServiceFactory = ioc.serviceContainer.get(IProcessServiceFactory); kernelLauncher = new KernelLauncher(executionFactory, processServiceFactory, file); - + await ioc.activate(); pythonInterpreter = await ioc.getJupyterCapableInterpreter(); kernelSpec = { - argv: [PYTHON_PATH, '-m', 'ipykernel_launcher', '-f', '{connection_file}'], + argv: [pythonInterpreter!.path, '-m', 'ipykernel_launcher', '-f', '{connection_file}'], display_name: 'new kernel', language: 'python', name: 'newkernel', @@ -51,97 +52,33 @@ suite('DataScience - Kernel Launcher', () => { // tslint:disable-next-line: no-invalid-this this.skip(); } else { + let exitExpected = false; + const deferred = createDeferred(); const kernel = await kernelLauncher.launch(kernelSpec, undefined); - const exited = new Promise((resolve) => kernel.exited(() => resolve(true))); + kernel.exited(() => { + if (exitExpected) { + deferred.resolve(true); + } else { + deferred.reject(new Error('Kernel exited prematurely')); + } + }); assert.isOk(kernel.connection, 'Connection not found'); // It should not exit. await assert.isRejected( - waitForCondition(() => exited, 5_000, 'Timeout'), + waitForCondition(() => deferred.promise, 2_000, 'Timeout'), 'Timeout' ); // Upon disposing, we should get an exit event within 100ms or less. // If this happens, then we know a process existed. + exitExpected = true; await kernel.dispose(); - assert.isRejected( - waitForCondition(() => exited, 100, 'Timeout'), - 'Timeout' - ); + await deferred.promise; } }).timeout(10_000); - function createExecutionMessage(code: string, sessionId: string): KernelMessage.IExecuteRequestMsg { - return { - channel: 'shell', - content: { - code, - silent: false, - store_history: false - }, - header: { - date: Date.now().toString(), - msg_id: uuid(), - msg_type: 'execute_request', - session: sessionId, - username: 'user', - version: '5.1' - }, - parent_header: {}, - metadata: {} - }; - } - - function sendMessage( - enchannelConnection: IJMPConnection, - messageObservable: Observable, - message: KernelMessage.IMessage - ): Promise[]> { - const waiter = createDeferred[]>(); - const replies: KernelMessage.IMessage[] = []; - let expectedReplyType = 'status'; - switch (message.header.msg_type) { - case 'shutdown_request': - expectedReplyType = 'shutdown_reply'; - break; - - case 'execute_request': - expectedReplyType = 'execute_reply'; - break; - - case 'inspect_request': - expectedReplyType = 'inspect_reply'; - break; - default: - break; - } - let foundReply = false; - let foundIdle = false; - const subscr = messageObservable.subscribe((m) => { - replies.push(m); - if (m.header.msg_type === 'status') { - // tslint:disable-next-line: no-any - foundIdle = (m.content as any).execution_state === 'idle'; - } else if (m.header.msg_type === expectedReplyType) { - foundReply = true; - } - - if (m.header.msg_type === 'shutdown_reply') { - // Special case, status may never come after this. - waiter.resolve(replies); - } - if (!waiter.resolved && foundReply && foundIdle) { - waiter.resolve(replies); - } - }); - enchannelConnection.sendMessage(message); - return waiter.promise.then((m) => { - subscr.unsubscribe(); - return m; - }); - } - test('Launch with environment', async function () { if (!process.env.VSCODE_PYTHON_ROLLING || !pythonInterpreter) { // tslint:disable-next-line: no-invalid-this @@ -162,26 +99,12 @@ suite('DataScience - Kernel Launcher', () => { const kernel = await kernelLauncher.launch(spec, undefined); const exited = new Promise((resolve) => kernel.exited(() => resolve(true))); - // It should not exit. - await assert.isRejected( - waitForCondition(() => exited, 5_000, 'Timeout'), - 'Timeout' - ); - assert.isOk(kernel.connection, 'Connection not found'); // Send a request to print out the env vars - const sessionId = uuid(); - const enchannelConnection = ioc.get(IJMPConnection); - const messageObservable = new Observable((subscriber) => { - enchannelConnection.subscribe(subscriber.next.bind(subscriber)); - }); - await enchannelConnection.connect(kernel.connection); - const result = await sendMessage( - enchannelConnection, - messageObservable, - createExecutionMessage('import os\nprint(os.getenv("TEST_VAR"))', sessionId) - ); + const rawKernel = createRawKernel(kernel, uuid()); + + const result = await requestExecute(rawKernel, 'import os\nprint(os.getenv("TEST_VAR"))'); assert.ok(result, 'No result returned'); // Should have a stream output message const output = result.find((r) => r.header.msg_type === 'stream') as KernelMessage.IStreamMsg; diff --git a/src/test/datascience/raw-kernel/mockJMP.ts b/src/test/datascience/raw-kernel/mockJMP.ts deleted file mode 100644 index 949879c85375..000000000000 --- a/src/test/datascience/raw-kernel/mockJMP.ts +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. -import { KernelMessage } from '@jupyterlab/services'; -import { IJMPConnection, IJMPConnectionInfo } from '../../../client/datascience/types'; - -export class MockJMPConnection implements IJMPConnection { - public firstHeaderSeen: KernelMessage.IHeader | undefined; - public messagesSeen: KernelMessage.IMessage[] = []; - private callback: ((message: KernelMessage.IMessage) => void) | undefined; - - public async connect(_connectInfo: IJMPConnectionInfo): Promise { - return; - } - public sendMessage(message: KernelMessage.IMessage): void { - if (!this.firstHeaderSeen) { - this.firstHeaderSeen = message.header; - } - - this.messagesSeen.push(message); - - return; - } - public subscribe(handlerFunc: (message: KernelMessage.IMessage) => void): void { - this.callback = handlerFunc; - } - public dispose(): void { - return; - } - - // Send a kernel message back to the hander function - public messageBack(message: KernelMessage.IMessage) { - if (this.callback) { - this.callback(message); - } - } -} diff --git a/src/test/datascience/raw-kernel/rawKernel.functional.test.ts b/src/test/datascience/raw-kernel/rawKernel.functional.test.ts index df6b16b98b9b..38e7d2d722b1 100644 --- a/src/test/datascience/raw-kernel/rawKernel.functional.test.ts +++ b/src/test/datascience/raw-kernel/rawKernel.functional.test.ts @@ -1,29 +1,27 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. 'use strict'; -import { KernelMessage } from '@jupyterlab/services'; import { assert } from 'chai'; import * as fs from 'fs-extra'; import { noop } from 'jquery'; import * as os from 'os'; import * as path from 'path'; -import { Observable } from 'rxjs'; import * as uuid from 'uuid/v4'; import { IFileSystem } from '../../../client/common/platform/types'; import { IProcessServiceFactory, IPythonExecutionFactory } from '../../../client/common/process/types'; -import { createDeferred } from '../../../client/common/utils/async'; +import { createDeferred, sleep } from '../../../client/common/utils/async'; import { KernelProcess } from '../../../client/datascience/kernel-launcher/kernelProcess'; -import { IJMPConnection, IJupyterKernelSpec } from '../../../client/datascience/types'; +import { createRawKernel, RawKernel } from '../../../client/datascience/raw-kernel/rawKernel'; +import { IJupyterKernelSpec } from '../../../client/datascience/types'; import { IInterpreterService } from '../../../client/interpreter/contracts'; import { DataScienceIocContainer } from '../dataScienceIocContainer'; +import { requestExecute, requestInspect } from './rawKernelTestHelpers'; // tslint:disable:no-any no-multiline-string max-func-body-length no-console max-classes-per-file trailing-comma suite('DataScience raw kernel tests', () => { let ioc: DataScienceIocContainer; - let enchannelConnection: IJMPConnection; let connectionFile: string; - let messageObservable: Observable>; - let sessionId: string; + let rawKernel: RawKernel; const connectionInfo = { shell_port: 57718, iopub_port: 57719, @@ -46,7 +44,7 @@ suite('DataScience raw kernel tests', () => { // tslint:disable-next-line: no-invalid-this this.skip(); } else { - await connectToKernel(57718); + rawKernel = await connectToKernel(57718); } }); @@ -55,13 +53,12 @@ suite('DataScience raw kernel tests', () => { await ioc.dispose(); }); - async function connectToKernel(startPort: number) { + async function connectToKernel(startPort: number): Promise { connectionInfo.stdin_port = startPort; connectionInfo.shell_port = startPort + 1; connectionInfo.iopub_port = startPort + 2; connectionInfo.hb_port = startPort + 3; connectionInfo.control_port = startPort + 4; - enchannelConnection = ioc.get(IJMPConnection); // Find our jupyter interpreter const interpreter = await ioc @@ -94,14 +91,7 @@ suite('DataScience raw kernel tests', () => { undefined ); await kernelProcess.launch(); - - // Keep kernel alive while the tests are running. - kernelProcess.exited(() => enchannelConnection.dispose()); - sessionId = uuid(); - await enchannelConnection.connect(connectionInfo); - messageObservable = new Observable((subscriber) => { - enchannelConnection.subscribe(subscriber.next.bind(subscriber)); - }); + return createRawKernel(kernelProcess, uuid()); } async function disconnectFromKernel() { @@ -111,200 +101,72 @@ suite('DataScience raw kernel tests', () => { } catch { noop(); } - enchannelConnection.dispose(); - } - - function createShutdownMessage(): KernelMessage.IMessage<'shutdown_request'> { - return { - channel: 'control', - content: { - restart: false - }, - header: { - date: Date.now().toString(), - msg_id: uuid(), - msg_type: 'shutdown_request', - session: sessionId, - username: 'user', - version: '5.1' - }, - parent_header: {}, - metadata: {} - }; - } - - function createExecutionMessage(code: string): KernelMessage.IExecuteRequestMsg { - return { - channel: 'shell', - content: { - code, - silent: false, - store_history: false - }, - header: { - date: Date.now().toString(), - msg_id: uuid(), - msg_type: 'execute_request', - session: sessionId, - username: 'user', - version: '5.1' - }, - parent_header: {}, - metadata: {} - }; } - function createInspectMessage(code: string): KernelMessage.IInspectRequestMsg { - return { - channel: 'shell', - content: { - code, - cursor_pos: code.length, - detail_level: 1 - }, - header: { - date: Date.now().toString(), - msg_id: uuid(), - msg_type: 'inspect_request', - session: sessionId, - username: 'user', - version: '5.1' - }, - parent_header: {}, - metadata: {} - }; - } - - function sendMessage( - message: KernelMessage.IMessage - ): Promise[]> { - const waiter = createDeferred[]>(); - const replies: KernelMessage.IMessage[] = []; - let expectedReplyType = 'status'; - switch (message.header.msg_type) { - case 'shutdown_request': - expectedReplyType = 'shutdown_reply'; - break; - - case 'execute_request': - expectedReplyType = 'execute_reply'; - break; - - case 'inspect_request': - expectedReplyType = 'inspect_reply'; - break; - default: - break; - } - let foundReply = false; - let foundIdle = false; - const subscr = messageObservable.subscribe((m) => { - if ((m.parent_header as any).msg_id !== message.header.msg_id) { - return; - } - replies.push(m); - if (m.header.msg_type === 'status') { - foundIdle = (m.content as any).execution_state === 'idle'; - } else if (m.header.msg_type === expectedReplyType) { - foundReply = true; - } - - if (m.header.msg_type === 'shutdown_reply') { - // Special case, status may never come after this. - waiter.resolve(replies); - } - if (!waiter.resolved && foundReply && foundIdle) { - waiter.resolve(replies); - } - }); - enchannelConnection.sendMessage(message); - return waiter.promise.then((m) => { - subscr.unsubscribe(); - return m; - }); + async function shutdown(): Promise { + return rawKernel.shutdown(); } test('Basic connection', async () => { - const replies = await sendMessage(createShutdownMessage()); - assert.ok( - replies.find((r) => r.header.msg_type === 'shutdown_reply'), - 'Reply not sent for shutdown' - ); + let exited = false; + kernelProcess.exited(() => (exited = true)); + await shutdown(); + await sleep(500); // Give time for the shutdown to go across + assert.ok(exited, 'Kernel did not shutdown'); }); test('Basic request', async () => { - const replies = await sendMessage(createExecutionMessage('a=1\na')); + const replies = await requestExecute(rawKernel, 'a=1\na'); const executeResult = replies.find((r) => r.header.msg_type === 'execute_result'); assert.ok(executeResult, 'Result not found'); assert.equal((executeResult?.content as any).data['text/plain'], '1', 'Results were not computed'); }); test('Interrupt pending request', async () => { - const executionStarted = createDeferred(); - const kernelInterrupted = createDeferred(); + const executionStarted = createDeferred(); // If the interrupt doesn't work, then test will timeout as execution will sleep for `300s`. // Hence timeout is a test failure. - const longCellExecutionRequest = createExecutionMessage('import time\nfor i in range(300):\n time.sleep(1)'); - - const subscription = messageObservable.subscribe((m) => { - if ((m.parent_header as any).msg_id !== longCellExecutionRequest.header.msg_id) { - return; - } - switch (m.header.msg_type) { - case 'status': - if ((m as KernelMessage.IStatusMsg).content.execution_state === 'busy') { - executionStarted.resolve(); - } - break; - case 'execute_reply': { - // When interrupting a kernel we MUST get the `KeyboardInterrupt` error sent as output. - if ((m as KernelMessage.IErrorMsg).content.ename === 'KeyboardInterrupt') { - kernelInterrupted.resolve(); - } - break; - } - default: - } - }); - - // Execute a cell that will take a long time. - sendMessage(longCellExecutionRequest).catch(noop); + const longCellExecutionRequest = requestExecute( + rawKernel, + 'import time\nfor i in range(300):\n time.sleep(1)', + executionStarted + ); // Wait until the execution has started (cuz we cannot interrupt until exec has started). await executionStarted.promise; - await kernelProcess.interrupt(); + // Then throw the interrupt + await rawKernel.interrupt(); - // Upon successful interruptoin, the exception should be returned. - await kernelInterrupted.promise; - - subscription.unsubscribe(); + // Verify our results + const replies = await longCellExecutionRequest; + const executeResult = replies.find((r) => r.header.msg_type === 'execute_reply'); + assert.ok(executeResult, 'Result not found'); + assert.equal((executeResult?.content as any).ename, 'KeyboardInterrupt', 'Interrupt not found'); // Based on tests 2s is sufficient. Lets give 10s for CI and slow Windows machines. }).timeout(10_000); test('Multiple requests', async () => { - let replies = await sendMessage(createExecutionMessage('a=1\na')); + let replies = await requestExecute(rawKernel, 'a=1\na'); let executeResult = replies.find((r) => r.header.msg_type === 'execute_result'); assert.ok(executeResult, 'Result not found'); - replies = await sendMessage(createExecutionMessage('a=2\na')); + replies = await requestExecute(rawKernel, 'a=2\na'); executeResult = replies.find((r) => r.header.msg_type === 'execute_result'); assert.ok(executeResult, 'Result 2 not found'); assert.equal((executeResult?.content as any).data['text/plain'], '2', 'Results were not computed'); - replies = await sendMessage(createInspectMessage('a')); - const inspectResult = replies.find((r) => r.header.msg_type === 'inspect_reply'); - assert.ok(inspectResult, 'Inspect result not found'); - assert.ok((inspectResult?.content as any).data['text/plain'], 'Inspect reply was not computed'); + const json = await requestInspect(rawKernel, 'a'); + assert.ok(json, 'Inspect reply was not computed'); }); test('Startup and shutdown', async () => { - let replies = await sendMessage(createExecutionMessage('a=1\na')); + let replies = await requestExecute(rawKernel, 'a=1\na'); let executeResult = replies.find((r) => r.header.msg_type === 'execute_result'); assert.ok(executeResult, 'Result not found'); - await disconnectFromKernel(); - await connectToKernel(57418); - replies = await sendMessage(createExecutionMessage('a=1\na')); + await shutdown(); + rawKernel = await connectToKernel(57418); + replies = await requestExecute(rawKernel, 'a=1\na'); executeResult = replies.find((r) => r.header.msg_type === 'execute_result'); assert.ok(executeResult, 'Result not found'); }); diff --git a/src/test/datascience/raw-kernel/rawKernelTestHelpers.ts b/src/test/datascience/raw-kernel/rawKernelTestHelpers.ts new file mode 100644 index 000000000000..43d806423f6f --- /dev/null +++ b/src/test/datascience/raw-kernel/rawKernelTestHelpers.ts @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +'use strict'; +import { KernelMessage } from '@jupyterlab/services'; +import { JSONObject } from '@phosphor/coreutils'; +import { createDeferred, Deferred } from '../../../client/common/utils/async'; +import { RawKernel } from '../../../client/datascience/raw-kernel/rawKernel'; + +// tslint:disable: no-any +export async function requestExecute( + rawKernel: RawKernel, + code: string, + started?: Deferred +): Promise { + const waiter = createDeferred[]>(); + const requestContent = { + code, + silent: false, + store_history: false + }; + + const replies: KernelMessage.IMessage[] = []; + let foundReply = false; + let foundIdle = false; + const ioPubHandler = (m: KernelMessage.IIOPubMessage) => { + replies.push(m); + if (m.header.msg_type === 'status') { + foundIdle = (m.content as any).execution_state === 'idle'; + if (started && (m.content as any).execution_state === 'busy') { + started.resolve(); + } + } + if (!waiter.resolved && foundReply && foundIdle) { + waiter.resolve(replies); + } + }; + const shellHandler = (m: KernelMessage.IExecuteReplyMsg | KernelMessage.IExecuteRequestMsg) => { + replies.push(m); + if (m.header.msg_type === 'execute_reply') { + foundReply = true; + } + if (!waiter.resolved && foundReply && foundIdle) { + waiter.resolve(replies); + } + }; + const future = rawKernel.requestExecute(requestContent); + future.onIOPub = ioPubHandler; + future.onReply = shellHandler; + rawKernel.requestExecute(requestContent, true); + return waiter.promise.then((m) => { + return m; + }); +} + +export async function requestInspect(rawKernel: RawKernel, code: string): Promise { + // Create a deferred that will fire when the request completes + const deferred = createDeferred(); + + rawKernel + .requestInspect({ code, cursor_pos: 0, detail_level: 0 }) + .then((r) => { + if (r && r.content.status === 'ok') { + deferred.resolve(r.content.data); + } else { + deferred.resolve(undefined); + } + }) + .catch((ex) => { + deferred.reject(ex); + }); + + return deferred.promise; +}