Skip to content

Commit 10fa3cf

Browse files
committed
parsing shell and iopub messages
1 parent 06ec418 commit 10fa3cf

File tree

10 files changed

+520
-109
lines changed

10 files changed

+520
-109
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@
798798
"named-js-regexp": "^1.3.1",
799799
"node-static": "^0.7.9",
800800
"prepend-file": "^1.3.0",
801+
"rx": "^4.1.0",
801802
"socket.io": "^1.4.8",
802803
"tmp": "0.0.29",
803804
"transformime": "^3.1.2",
@@ -815,6 +816,7 @@
815816
"@types/jquery": "^1.10.31",
816817
"@types/mocha": "^2.2.32",
817818
"@types/node": "^6.0.40",
819+
"@types/rx": "^2.5.33",
818820
"@types/socket.io": "^1.4.27",
819821
"@types/socket.io-client": "^1.4.27",
820822
"@types/uuid": "^3.3.27",

pythonFiles/PythonTools/ipythonServer.py

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777

7878
# End of the great "support IPython 2, 3, 4" strat
7979

80+
8081
def _debug_write(out):
8182
if DEBUG:
8283
sys.__stdout__.write(out)
@@ -140,7 +141,8 @@ def __init__(self, kernelUUID, socketConnection, send_lock, shell_channel, iopub
140141
self.shell_channel = shell_channel
141142
self.iopub_channel = iopub_channel
142143
self.send_lock = send_lock
143-
thread.start_new_thread(self.start_processing, ())
144+
thread.start_new_thread(self.start_processing, ('shell', 1))
145+
thread.start_new_thread(self.start_processing, ('io', 1))
144146

145147
def stop(self):
146148
self.is_stop_requested = True
@@ -156,7 +158,7 @@ def _populateErrorContents(self, sourceContents, targetContents):
156158
except AttributeError:
157159
pass
158160

159-
def start_processing(self):
161+
def start_processing(self, channel, *args):
160162
"""loop to read the io ports/messages"""
161163

162164
_debug_write('Started processing thread')
@@ -165,30 +167,32 @@ def start_processing(self):
165167
if self.check_for_exit_socket_loop():
166168
break
167169

168-
try:
169-
# We can ignore msgtype=execute_request
170-
171-
exe_result = self.shell_channel.get_shell_msg(timeout=1)
172-
# message can be JSON, but not always
173-
# (http://jupyter-client.readthedocs.io/en/latest/messaging.html)
174-
# assume for now that dates are the only crappy (non JSONable stuff sent)
175-
json_to_send = json.dumps(exe_result, default=str)
176-
with self.send_lock:
177-
_debug_write('shell_result')
178-
write_bytes(self.conn, iPythonSocketServer._SHEL)
179-
write_string(self.conn, json_to_send)
180-
except Empty:
181-
pass
182-
183-
try:
184-
msg = self.iopub_channel.get_iopub_msg(timeout=10)
185-
json_to_send = json.dumps(msg, default=str)
186-
with self.send_lock:
187-
_debug_write('iopub_msg')
188-
write_bytes(self.conn, iPythonSocketServer._IOPB)
189-
write_string(self.conn, json_to_send)
190-
except Empty:
191-
pass
170+
# message can be JSON, but not always
171+
# (http://jupyter-client.readthedocs.io/en/latest/messaging.html)
172+
# assume for now that dates are the only crappy (non JSONable
173+
# stuff sent)
174+
if channel == 'shell':
175+
try:
176+
exe_result = self.shell_channel.get_shell_msg(
177+
timeout=30)
178+
json_to_send = json.dumps(exe_result, default=str)
179+
with self.send_lock:
180+
_debug_write('shell_result')
181+
write_bytes(self.conn, iPythonSocketServer._SHEL)
182+
write_string(self.conn, json_to_send)
183+
except Empty:
184+
pass
185+
186+
if channel == 'io':
187+
try:
188+
msg = self.iopub_channel.get_iopub_msg(timeout=30)
189+
json_to_send = json.dumps(msg, default=str)
190+
with self.send_lock:
191+
_debug_write('iopub_msg')
192+
write_bytes(self.conn, iPythonSocketServer._IOPB)
193+
write_string(self.conn, json_to_send)
194+
except Empty:
195+
pass
192196

193197
except IPythonExitException:
194198
_debug_write('IPythonExitException')
@@ -294,7 +298,8 @@ def start_processing(self):
294298
except:
295299
commandName = utf_8.decode(inp)[0]
296300
try:
297-
commandName = ascii.Codec.encode(commandName)[0]
301+
commandName = ascii.Codec.encode(commandName)[
302+
0]
298303
except UnicodeEncodeError:
299304
pass
300305
self.replyWithError(commandName, id)
@@ -406,13 +411,15 @@ def _postStartKernel(self, kernelUUID):
406411
kernel_client.wait_for_ready()
407412
iopub = kernel_client
408413
shell = kernel_client
414+
# todo: get_stdin_msg
409415
except AttributeError:
410416
# Ipython 2.x
411417
# Based on https://github.com/paulgb/runipy/pull/49/files
412418
iopub = kernel_client.iopub_channel
413419
shell = kernel_client.shell_channel
414420
shell.get_shell_msg = shell.get_msg
415421
iopub.get_iopub_msg = iopub.get_msg
422+
# todo: get_stdin_msg
416423

417424
self.shell_channel = shell
418425
self.kernelMonitor = iPythonKernelResponseMonitor(

pythonFiles/PythonTools/jupyterTest.py

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -149,69 +149,73 @@ def _execute_cell(code, shell, iopub, timeout=300):
149149

150150
# Execute input
151151
#shell.execute("10+20")
152-
shell.execute("import time\ntime.sleep(10)\nprint(112341234)")
153-
shell.execute("1+2")
152+
# msg_id = shell.execute("print(1)\nimport time\ntime.sleep(10)\nprint(11)")
153+
# print(msg_id)
154+
# msg_id = shell.execute("1+2")
155+
# print(msg_id)
156+
# msg_id = shell.execute("print(2)\nimport time\ntime.sleep(10)\nprint(22)")
157+
# print(msg_id)
158+
159+
msg_id = shell.execute("print(2)\nimport time\ntime.sleep(10)\nprint(x)")
160+
print(msg_id)
154161

155162
cell_outputs = list()
156163

157164
# Poll for iopub messages until no more messages are available
158165
while True:
159166
try:
160-
exe_result = shell.get_shell_msg(timeout=timeout)
167+
exe_result = shell.get_shell_msg(timeout=0.5)
161168
print('exe_result')
162169
print(exe_result)
163170
print('')
164-
if exe_result['content']['status'] == 'error':
165-
print('-----------------------crap---------------------')
166-
raise RuntimeError('Failed to execute cell due to error: {!r}'.format(
167-
str(exe_result['content']['evalue'])))
171+
# if exe_result['content']['status'] == 'error':
172+
# print('-----------------------crap---------------------')
173+
# raise RuntimeError('Failed to execute cell due to error: {!r}'.format(
174+
# str(exe_result['content']['evalue'])))
168175
except Empty:
169-
print('quue empty, try again')
170176
pass
171177

172-
print('\n\n-------------------------------------------------\ntrying\n----------------------------------\n')
173178
try:
174179
msg = iopub.get_iopub_msg(timeout=0.5)
175180
print('get_iopub_msg')
176181
print('msg')
177182
print(msg)
178183
print('')
179184
except Empty:
180-
print('get_iopub_msg is empty--------------------------------------')
181185
pass
182186

183-
msg_type = msg['msg_type']
184-
if msg_type in ('status', 'pyin', 'execute_input', 'execute_result'):
185-
continue
186-
187-
content = msg['content']
188-
node = NotebookNode(output_type=msg_type)
189-
190-
if msg_type == 'stream':
191-
node.stream = content['name']
192-
if 'text' in content:
193-
# v4 notebook format
194-
node.text = content['text']
195-
else:
196-
# v3 notebook format
197-
node.text = content['data']
198-
elif msg_type in ('display_data', 'pyout'):
199-
node['metadata'] = content['metadata']
200-
for mime, data in content['data'].items():
201-
attr = mime.split('/')[-1].lower()
202-
attr = attr.replace('+xml', '').replace('plain', 'text')
203-
setattr(node, attr, data)
204-
if msg_type == 'pyout':
205-
node.prompt_number = content['execution_count']
206-
elif msg_type == 'pyerr':
207-
node.ename = content['ename']
208-
node.evalue = content['evalue']
209-
node.traceback = content['traceback']
210-
else:
211-
raise RuntimeError('Unhandled iopub message of type: {}'.format(
212-
msg_type))
213-
214-
cell_outputs.append(node)
187+
# msg_type = msg['msg_type']
188+
# if msg_type in ('status', 'pyin', 'execute_input', 'execute_result'):
189+
# continue
190+
191+
# content = msg['content']
192+
# node = NotebookNode(output_type=msg_type)
193+
194+
# if msg_type == 'stream':
195+
# node.stream = content['name']
196+
# if 'text' in content:
197+
# # v4 notebook format
198+
# node.text = content['text']
199+
# else:
200+
# # v3 notebook format
201+
# node.text = content['data']
202+
# elif msg_type in ('display_data', 'pyout'):
203+
# node['metadata'] = content['metadata']
204+
# for mime, data in content['data'].items():
205+
# attr = mime.split('/')[-1].lower()
206+
# attr = attr.replace('+xml', '').replace('plain', 'text')
207+
# setattr(node, attr, data)
208+
# if msg_type == 'pyout':
209+
# node.prompt_number = content['execution_count']
210+
# elif msg_type == 'pyerr':
211+
# node.ename = content['ename']
212+
# node.evalue = content['evalue']
213+
# node.traceback = content['traceback']
214+
# else:
215+
# raise RuntimeError('Unhandled iopub message of type: {}'.format(
216+
# msg_type))
217+
218+
# cell_outputs.append(node)
215219

216220
return cell_outputs
217221

src/client/jupyter/contracts.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ export interface KernelEvents {
2222
onStatusChange: vscode.Event<[KernelspecMetadata, string]>;
2323
}
2424

25+
export interface ParsedIOMessage {
26+
data: { [key: string]: any } | string;
27+
type: string;
28+
stream: string;
29+
message?: string;
30+
}
31+
2532
// export interface JupyterMessage extends Object {
2633
// parent_header: any;
2734
// content: any;
@@ -37,7 +44,8 @@ export interface JupyterMessage {
3744
msg_id: string;
3845
msg_type: 'execute_request' | 'execute_reply';
3946
parent_header: {
40-
msg_id: string
47+
msg_id: string,
48+
msg_type: string
4149
};
4250
content: {
4351
// status = 'busy', sent back as soon as a request is received by the kernel (kind of an ACK)

src/client/jupyter/jupyter_client-Kernel.ts

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,16 @@
1-
import * as child_process from 'child_process';
2-
import * as path from 'path';
3-
import * as fs from 'fs';
41
import { Kernel } from './kernel';
5-
import * as vscode from 'vscode';
62
import { KernelspecMetadata, JupyterMessage } from './contracts';
7-
import { iPythonAdapter } from './jupyter_client/ipythonAdapter';
8-
import { SocketServer } from '../common/comms/socketServer';
9-
import { createDeferred } from '../common/helpers';
10-
import * as settings from '../common/configSettings';
113
import { IJupyterClient } from './jupyter_client/contracts';
12-
13-
const pythonSettings = settings.PythonSettings.getInstance();
4+
import { EventEmitter } from 'events';
145

156
export class JupyterClientKernel extends Kernel {
16-
private process: child_process.ChildProcess;
17-
private socketServer: SocketServer;
18-
private ipythonAdapter: iPythonAdapter;
19-
207
constructor(kernelSpec: KernelspecMetadata, language: string, private connection: any, private connectionFile: string, private kernelUUID: string, private jupyterClient: IJupyterClient) {
218
super(kernelSpec, language);
9+
((this.jupyterClient as any) as EventEmitter).on('status', status => {
10+
this.raiseOnStatusChange(status);
11+
});
2212
}
2313

24-
private shutdownPromise: Promise<any>;
2514
public dispose() {
2615
this.shutdown();
2716
super.dispose();
@@ -39,11 +28,9 @@ export class JupyterClientKernel extends Kernel {
3928
};
4029

4130
public execute(code: string, onResults: Function) {
42-
this.jupyterClient.runCode(code).then(() => {
43-
const y = '';
44-
}).catch(reason => {
45-
const x = '';
46-
})
31+
this.jupyterClient.runCodeEx(code, (data) => {
32+
onResults(data);
33+
});
4734
};
4835

4936
public executeWatch(code: string, onResults: Function) {

src/client/jupyter/jupyter_client/contracts.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import { KernelspecMetadata, Kernelspec } from '../contracts';
1+
import { KernelspecMetadata, Kernelspec, ParsedIOMessage } from '../contracts';
22

33
export interface IJupyterClient {
44
getAllKernelSpecs(): Promise<{ [key: string]: Kernelspec }>;
55
startKernel(kernelSpec: KernelspecMetadata): Promise<[string, any, string]>;
66
shutdownkernel(kernelUUID: string): Promise<any>;
77
interruptKernel(kernelUUID: string): Promise<any>;
88
restartKernel(kernelUUID: string): Promise<any>;
9-
runCode(code: string): Promise<any>;
9+
runCode(code: string): Promise<string>;
10+
runCodeEx(code: string, onResults: Function): Promise<any>;
1011
}
1112

1213
export enum KernelCommand {

0 commit comments

Comments
 (0)