-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathkernel.py
More file actions
165 lines (139 loc) · 5.02 KB
/
kernel.py
File metadata and controls
165 lines (139 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# -*- coding: utf-8 -*-
"""Basic functionality of CircuitPython kernel."""
import ast
import re
import time
import logging
from ipykernel.kernelbase import Kernel
from .board import connect
from .version import __version__
# Create global KERNEL_LOGGER for debug messages.
KERNEL_LOGGER = logging.getLogger(__name__)
class CircuitPyKernel(Kernel):
"""CircuitPython kernel implementation."""
protocol_version = '4.5.2'
implementation = 'circuitpython_kernel'
implementation_version = __version__
language_info = {
'name': 'python',
'version': '3',
'mimetype': 'text/x-python',
'file_extension': '.py',
'pygments_lexer': 'python3',
'codemirror_mode': {'name': 'python', 'version': 3},
}
banner = "CircuitPython"
help_links = [
{
'text': 'CircuitPython kernel',
'url': 'https://github.com/adafruit/circuitpython_kernel',
}
]
def __init__(self, **kwargs):
"""Set up connection to board"""
super().__init__(**kwargs)
KERNEL_LOGGER.debug('Opening CircuitPython board connection..')
self.serial = connect()
def run_code(self, code):
"""Run a code snippet.
Parameters
----------
code : str
Code to be executed.
Returns
-------
out
Decoded bytearray output result from code run.
err
Decoded bytearray error from code run.
"""
# Send code to board
self.serial.write(code.encode('utf-8') + b'\x04') # code and Control-D
# Set up a bytearray to hold the result from the code run
result = bytearray()
while not result.endswith(b'\x04>'): # Control-D
time.sleep(0.1)
result.extend(self.serial.read_all())
assert result.startswith(b'OK')
out, err = result[2:-2].split(b'\x04', 1) # split result
return out.decode('utf-8', 'replace'), err.decode('utf-8', 'replace')
def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
"""Execute a user's code cell.
Parameters
----------
code : str
Code, one or more lines, to be executed.
silent : bool
True, signals kernel to execute code quietly, and output is not
displayed.
store_history : bool
Whether to record code in history and increase execution count. If
silent is True, this is implicitly false.
user_expressions : dict, optional
Mapping of names to expressions to evaluate after code is run.
allow_stdin : bool
Whether the frontend can provide input on request (e.g. for
Python’s raw_input()).
Returns
-------
dict
Execution results.
"""
out, err = self.run_code(code)
KERNEL_LOGGER.debug('Output: %s', out)
KERNEL_LOGGER.debug("Error %s", err)
if not silent:
out_content = {'name': 'stdout', 'text': out}
err_content = {'name': 'stderr', 'text': err}
if out:
self.send_response(self.iopub_socket, 'stream', out_content)
if err:
self.send_response(self.iopub_socket, 'stream', err_content)
return {
'status': 'ok',
'execution_count': self.execution_count,
'payload': [],
'user_expressions': {},
}
def _eval(self, expr):
"""Evaluate the expression.
Use ast's literal_eval to prevent strange input from execution.
"""
out, err = self.run_code('print({})'.format(expr))
KERNEL_LOGGER.debug('Output: %s', out)
KERNEL_LOGGER.debug('Error %s', err)
return ast.literal_eval(out)
def do_shutdown(self, restart):
"""Handle the kernel shutting down."""
KERNEL_LOGGER.debug('Shutting down CircuitPython Board Connection..')
self.serial.write(b'\r\x02')
KERNEL_LOGGER.debug('closing serial connection..')
self.serial.close()
def do_complete(self, code, cursor_pos):
"""Support code completion."""
code = code[:cursor_pos]
match = re.search(r'(\w+\.)*(\w+)?$', code)
if match:
prefix = match.group()
if '.' in prefix:
obj, prefix = prefix.rsplit('.')
names = self._eval('dir({})'.format(obj))
else:
names = self._eval('dir()')
matches = [n for n in names if n.startswith(prefix)]
return {
'matches': matches,
'cursor_start': cursor_pos - len(prefix),
'cursor_end': cursor_pos,
'metadata': {},
'status': 'ok',
}
else:
return {
'matches': [],
'cursor_start': cursor_pos,
'cursor_end': cursor_pos,
'metadata': {},
'status': 'ok',
}