|
| 1 | +// Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | +// Licensed under the MIT License. |
| 3 | +import { Kernel, KernelMessage } from '@jupyterlab/services'; |
| 4 | +import { createDeferred, Deferred } from '../../common/utils/async'; |
| 5 | +import { noop } from '../../common/utils/misc'; |
| 6 | + |
| 7 | +/* |
| 8 | +RawFuture represents the IFuture interface that JupyterLab services returns from functions like executeRequest. |
| 9 | +It provides an interface for getting updates on the status of the request such as reply messages or io messages |
| 10 | +*/ |
| 11 | +export class RawFuture< |
| 12 | + REQUEST extends KernelMessage.IShellControlMessage, |
| 13 | + REPLY extends KernelMessage.IShellControlMessage |
| 14 | +> implements Kernel.IFuture<REQUEST, REPLY> { |
| 15 | + public isDisposed: boolean = false; |
| 16 | + public msg: REQUEST; |
| 17 | + |
| 18 | + private donePromise: Deferred<REPLY>; |
| 19 | + private stdIn: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void> = noop; |
| 20 | + private ioPub: (msg: KernelMessage.IIOPubMessage) => void | PromiseLike<void> = noop; |
| 21 | + private reply: (msg: REPLY) => void | PromiseLike<void> = noop; |
| 22 | + private replyMessage: REPLY | undefined; |
| 23 | + private disposeOnDone: boolean; |
| 24 | + private idleSeen: boolean; |
| 25 | + |
| 26 | + constructor(msg: REQUEST, disposeOnDone: boolean) { |
| 27 | + this.msg = msg; |
| 28 | + this.donePromise = createDeferred<REPLY>(); |
| 29 | + this.disposeOnDone = disposeOnDone; |
| 30 | + this.idleSeen = false; |
| 31 | + } |
| 32 | + |
| 33 | + get done(): Promise<REPLY | undefined> { |
| 34 | + return this.donePromise.promise; |
| 35 | + } |
| 36 | + |
| 37 | + // Message handlers that can be hooked up to for message notifications |
| 38 | + get onStdin(): (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void> { |
| 39 | + return this.stdIn; |
| 40 | + } |
| 41 | + |
| 42 | + set onStdin(handler: (msg: KernelMessage.IStdinMessage) => void | PromiseLike<void>) { |
| 43 | + this.stdIn = handler; |
| 44 | + } |
| 45 | + |
| 46 | + get onIOPub(): (msg: KernelMessage.IIOPubMessage) => void | PromiseLike<void> { |
| 47 | + return this.ioPub; |
| 48 | + } |
| 49 | + |
| 50 | + set onIOPub(cb: (msg: KernelMessage.IIOPubMessage) => void | PromiseLike<void>) { |
| 51 | + this.ioPub = cb; |
| 52 | + } |
| 53 | + get onReply(): (msg: REPLY) => void | PromiseLike<void> { |
| 54 | + return this.reply; |
| 55 | + } |
| 56 | + |
| 57 | + set onReply(handler: (msg: REPLY) => void | PromiseLike<void>) { |
| 58 | + this.reply = handler; |
| 59 | + } |
| 60 | + |
| 61 | + // Handle a new message passed from the kernel |
| 62 | + public async handleMessage(message: KernelMessage.IMessage<KernelMessage.MessageType>): Promise<void> { |
| 63 | + switch (message.channel) { |
| 64 | + case 'stdin': |
| 65 | + await this.handleStdIn(message as KernelMessage.IStdinMessage); |
| 66 | + break; |
| 67 | + case 'iopub': |
| 68 | + await this.handleIOPub(message as KernelMessage.IIOPubMessage); |
| 69 | + break; |
| 70 | + case 'control': |
| 71 | + case 'shell': |
| 72 | + await this.handleShellControl(message as KernelMessage.IShellControlMessage); |
| 73 | + break; |
| 74 | + default: |
| 75 | + break; |
| 76 | + } |
| 77 | + } |
| 78 | + |
| 79 | + public dispose(): void { |
| 80 | + if (!this.isDisposed) { |
| 81 | + // First clear out our handlers |
| 82 | + this.stdIn = noop; |
| 83 | + this.ioPub = noop; |
| 84 | + this.reply = noop; |
| 85 | + |
| 86 | + // Reject our done promise |
| 87 | + this.donePromise.reject(new Error('Disposed Future')); |
| 88 | + this.isDisposed = true; |
| 89 | + } |
| 90 | + } |
| 91 | + |
| 92 | + // RAWKERNEL: Not Implemented |
| 93 | + public registerMessageHook(_hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike<boolean>): void { |
| 94 | + throw new Error('Not yet implemented'); |
| 95 | + } |
| 96 | + public removeMessageHook(_hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike<boolean>): void { |
| 97 | + throw new Error('Not yet implemented'); |
| 98 | + } |
| 99 | + public sendInputReply(_content: KernelMessage.IInputReplyMsg['content']): void { |
| 100 | + throw new Error('Not yet implemented'); |
| 101 | + } |
| 102 | + |
| 103 | + // Private Functions |
| 104 | + |
| 105 | + // Functions for handling specific message types |
| 106 | + private async handleStdIn(message: KernelMessage.IStdinMessage): Promise<void> { |
| 107 | + // Call our handler for stdin, might just be noop |
| 108 | + // RAWKERNEL: same channel type string != 'stdin' cast issue |
| 109 | + // tslint:disable-next-line:no-any |
| 110 | + await this.stdIn(message); |
| 111 | + } |
| 112 | + |
| 113 | + private async handleIOPub(message: KernelMessage.IIOPubMessage): Promise<void> { |
| 114 | + // RAWKERNEL: Check hooks process first? |
| 115 | + // tslint:disable-next-line:no-any |
| 116 | + await this.ioPub(message); |
| 117 | + |
| 118 | + // If we get an idle status message and a reply then we are done |
| 119 | + if (KernelMessage.isStatusMsg(message) && message.content.execution_state === 'idle') { |
| 120 | + this.idleSeen = true; |
| 121 | + |
| 122 | + if (this.replyMessage) { |
| 123 | + this.handleDone(); |
| 124 | + } |
| 125 | + } |
| 126 | + } |
| 127 | + |
| 128 | + private async handleShellControl(message: KernelMessage.IShellControlMessage): Promise<void> { |
| 129 | + if (message.channel === this.msg.channel && message.parent_header) { |
| 130 | + const parentHeader = message.parent_header as KernelMessage.IHeader; |
| 131 | + if (parentHeader.msg_id === this.msg.header.msg_id) { |
| 132 | + await this.handleReply(message as REPLY); |
| 133 | + } |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + private async handleReply(message: REPLY): Promise<void> { |
| 138 | + await this.reply(message); |
| 139 | + |
| 140 | + this.replyMessage = message; |
| 141 | + |
| 142 | + // If we've gotten an idle status message we are done now |
| 143 | + if (this.idleSeen) { |
| 144 | + this.handleDone(); |
| 145 | + } |
| 146 | + } |
| 147 | + |
| 148 | + private handleDone(): void { |
| 149 | + this.donePromise.resolve(this.replyMessage); |
| 150 | + |
| 151 | + if (this.disposeOnDone) { |
| 152 | + this.dispose(); |
| 153 | + } |
| 154 | + } |
| 155 | +} |
0 commit comments