-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathsandbox.py
More file actions
421 lines (346 loc) · 15 KB
/
sandbox.py
File metadata and controls
421 lines (346 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
import os, sys, re, ast
from os.path import dirname, basename, abspath, relpath, isdir
from functools import partial
from inspect import getfullargspec
from collections import namedtuple
from PyObjCTools import AppHelper
from ..lib.io import MovieExportSession, ImageExportSession
from .common import stacktrace, coredump, uncoded
from plotdevice import util, context, gfx, Halted, DeviceError
__all__ = ['Sandbox']
Outcome = namedtuple('Outcome', ['ok', 'output'])
Output = namedtuple('Output', ['isErr', 'data'])
class Metadata(object):
__slots__ = 'args', 'virtualenv', 'first', 'frame', 'last', 'loop'
def __init__(self, **opts):
for k,v in opts.items(): setattr(self,k,v)
def update(self, changes):
for k,v in changes.items():
try: setattr(self,k,v)
except AttributeError: pass
def items(self):
for k in self.__slots__:
yield k, getattr(self, k)
class Delegate(object):
"""No-op sandbox delegate that will be used by default if a delegate isn't specified"""
def exportFrame(self, status, canvas=None):
pass
def exportStatus(self, status):
pass
def exportProgress(self, written, total):
pass
class ScriptDoctor(ast.NodeVisitor):
# parse the source and determine whether the script is an animation
def __init__(self, src):
self.is_animated = False
try:
self.visit(ast.parse(src))
except SyntaxError:
pass
def visit_Module(self, node):
ast.NodeVisitor.generic_visit(self, node)
def visit_FunctionDef(self, node):
if node.name=='draw': self.is_animated = True
def generic_visit(self, node):
pass
class Sandbox(object):
def __init__(self, delegate=None):
self._meta = None # runtime opts for the script
self._path = None # file path to the active script
self._source = None # unicode contents of script
self._code = None # byte-compiled source
self._anim = None # persistent dict passed to animation functions in script
self.canvas = None # can be handed off to views or exporters to access the image
self.context = None # quartz playground
self.namespace = {} # a reference to the script's namespace (managed by self.context)
self.crashed = False # flag whether the script exited abnormally
self.live = False # whether to keep the output pipe open between runs
self.session = None # the image/movie export session (if any)
self.delegate = None # object with exportFrame and exportProgress methods
# set up the graphics plumbing
self.canvas = context.Canvas()
self.context = context.Context(self.canvas, self.namespace)
self.delegate = delegate or Delegate()
# control params used during exports and console-based runs
self._meta = Metadata(args=[], virtualenv=None, # environmant
first=1, frame=1, last=None, # runtime
loop=False) # export opts
# .script
def _get_path(self):
"""Path to the current python script (r/w)"""
return self._path
def _set_path(self, pth):
if pth==self._path: return
self._path = pth
path = property(_get_path, _set_path)
# .source
def _get_source(self):
"""Contents of the current python script (r/w)"""
return self._source
def _set_source(self, src):
if src==self._source: return
self._source = src
self._code = None
if ScriptDoctor(self._source).is_animated:
self._anim = util.adict()
source = property(_get_source, _set_source)
# .state
def _set_state(self, ui_state):
"""Update the mouse and keyboard globals in the namespace (w)"""
# Update keyboard/mouse event globals
self.namespace.update(ui_state)
state = property(fset=_set_state)
# .metadata
def _get_meta(self):
"""Runtime parameters corresponding to the console.py command line switches (r/w)"""
return dict(self._meta.items())
def _set_meta(self, metadict):
self._meta.update(metadict)
self.live = metadict.get('live', self.live)
metadata = property(_get_meta, _set_meta)
# .params
def _get_params(self):
"""Script variables currently being displayed in the UI (r/w)"""
return self.context._params
def _set_params(self, new_params):
self.context._params.clear()
self.context._params.update(new_params)
params = property(_get_params, _set_params)
@property
def vars(self):
"""Script variables added through the var() method in the last run (r)"""
return self.context._vars
@property
def speed(self):
"""Frames per second if an animation, None if not (r)"""
return self.canvas.speed
@property
def animated(self):
"""Whether the script has multiple frames (r)"""
return self._anim is not None or callable(self.namespace.get('draw',None))
@property
def tty(self):
"""Whether the script's output is being redirected to a pipe (r)"""
return getattr(self.delegate, 'graphicsView', None) is None
def _preflight(self):
"""Set up a namespace for the script and prepare it for rendering"""
# Initialize the namespace
self.context._resetEnvironment()
# add a pathname (if the script exists on the filesystem)
if self._path:
self.namespace['__file__'] = self._path
# start off with all systems nominal (fingers crossed)
self.crashed = False
result = Outcome(True, [])
self._meta.frame = self._meta.first
self._anim = None
# if our .source attr has been changed since the last run, compile it now
if not self._code:
def compileScript():
# _source is already unicode so strip out the `encoding:` comment
src = uncoded(self._source)
fname = self._path or "<Untitled>"
self._code = compile(src, fname, "exec")
result = self.call(compileScript)
if not result.ok:
return result
return result
def run(self, method=None, cmyk=False):
"""Clear the context and run either the entire script or a specific method."""
# if this is the initial pass, reset the namespace and canvas state
if method is None:
check = self._preflight() # compile the script
self.context._outputmode = 'cmyk' if cmyk else 'rgb'
if not check.ok:
return check
# Clear the canvas
if method != 'setup':
self.canvas.clear()
# Reset the context state (and bind the .gfx objects as a side-effect)
self.context._resetContext()
# Set the frame/pagenum
self.namespace['PAGENUM'] = self.namespace['FRAME'] = self._meta.frame
# Run the specified method (or script's top-level if None)
result = self.call(method)
# (non-animation scripts are now complete (as are anims that just crashed))
if self.animated and result.ok:
# animations require special bookkeeping depending on which routine is being run
if method is None:
# we're in the initial pass through the script so flag the run as ongoing
self._anim = util.adict()
# default to 30fps if speed() wasn't called in the script
if self.speed is None:
self.canvas.speed = 30
# determine which of the script's routines accept an argument
for routine in 'setup','draw','stop':
func = self.namespace.get(routine)
# replace each such routine with a partial application passing
# the dict. this means we can .call() it without any explicit args
if callable(func) and getfullargspec(func).args:
self.namespace[routine] = partial(self.namespace[routine], self._anim)
elif method=='draw':
# tick the frame ahead after each draw call
self._meta.frame+=1
if self._meta.last and self._meta.frame > self._meta.last and self._meta.loop:
self._meta.frame = self._meta.first
return result
def call(self, method=None):
"""
Runs the given method in a boxed environment.
Boxed environments:
- Have their current directory set to the directory of the file
- Have their argument set to the filename
- Have their outputs redirect to an output stream.
Returns:
An namedtuple containing two fields:
- "ok": A boolean indicating whether the run was successful
- "output": The OutputFile
"""
# default to running the script itself if a method (e.g., compile) isn't specified.
if not method:
def execScript():
exec(self._code, self.namespace)
method = execScript
elif callable(self.namespace.get(method, None)):
method = self.namespace[method]
elif not callable(method):
# silently skip over undefined methods (convenient if a script lacks 'setup' or 'draw')
return Outcome(True, [])
# find the script name and directory (or substitute placeholder values)
if not self._path:
scriptDir = os.getenv("HOME")
scriptName = "<untitled>"
else:
scriptName = self._path
scriptDir = dirname(scriptName)
# save the external runtime environment
pipes = sys.stdout, sys.stderr
cwd = os.getcwd()
argv = sys.argv
syspath = list(sys.path)
sys.argv = [scriptName] + self._meta.args
# set up environment for script
output = StdIO()
sys.stdout, sys.stderr = output.pipes
if self._meta.virtualenv:
sys.path.insert(0, self._meta.virtualenv)
sys.path.insert(0, scriptDir)
os.chdir(scriptDir)
try:
# run the code object we were passed
method()
except Halted:
return Outcome('HALTED', output.data)
except:
# print the stacktrace and quit
sys.stderr.write(self.die())
return Outcome(False, output.data)
finally:
# restore the environment
sys.stdout, sys.stderr = pipes
os.chdir(cwd)
sys.path = syspath
sys.argv = argv
return Outcome(True, output.data)
def stop(self):
"""Called when an animated run is halted (voluntarily or otherwise)"""
# print "stopping at", self._meta.frame-1, "of", self._meta.last
result = Outcome(True, [])
if not self.crashed:
result = self.call("stop")
return result
def die(self):
"""Triggered by self.call() if the script raised an exception or by
the ScriptController if the view bombed during canvas.draw()"""
self.crashed = coredump(self._path, self._source)
return stacktrace(self._path, self._source)
def export(self, kind, fname, opts):
"""Export graphics and animations to image and movie files.
args:
kind - 'image' or 'movie'
fname - path to outputfile
opts - dictionary with required keys:
first, last, format
for a movie export also include:
bitrate, fps, loop
and for an image sequence:
cmyk, single
"""
# pull off the file extension and use that as the format
opts.setdefault('format', fname.lower().rsplit('.',1)[-1])
# set the in/out frames for the export
self._meta.first, self._meta.last = opts['first'], opts['last']
# compile & evaluate the script once
firstpass = self.run(cmyk=opts.get('cmyk',False))
self.delegate.exportFrame(firstpass, canvas=None)
if not firstpass.ok:
return
# call the script's setup() routine and pass the output along to the delegate
if self.animated:
setup = self.run("setup")
self.delegate.exportFrame(setup)
if not setup.ok:
return
# set up an export manager and attach the delegate's callbacks
ExportSession = ImageExportSession if kind=='image' else MovieExportSession
self.session = ExportSession(fname, **opts)
self.session.on(progress=self.delegate.exportProgress,
status=self.delegate.exportStatus,
complete=self._exportComplete)
# start looping through frames, calling draw() and adding the canvas
# to the export-session on each iteration
self._exportFrame()
def _exportFrame(self):
if self.session.next():
# step to the proper FRAME value
self._meta.frame = self.session.next()
if self.animated:
# run the draw() function if it exists
result = self.run("draw", cmyk=self.session.cmyk)
else:
# if not, we've already exec'd the module & setup() so use the canvas as-is
result = Outcome(True, StdIO().data)
# let the delegate draw to the screen
self.delegate.exportFrame(result, self.canvas)
# pass the frame content to the file-writer
if result.ok:
self.session.add(self.canvas)
# know when to fold 'em
if result.ok in (False, 'HALTED'):
self.session.cancel()
# give the runloop a chance to collect events between frames
AppHelper.callLater(0.001, self._exportFrame)
else:
# we've drawn the final frame in the export
result = self.call("stop")
self.delegate.exportFrame(result, canvas=None)
self.session.done()
def _exportComplete(self):
self.session = None
def _cleanup(self):
# self.session = None
self.delegate = None
class StdIO(object):
class OutputFile(object):
def __init__(self, stream, streamname):
self._stream = stream
self.fileno = lambda: 1 if streamname=='stdout' else 2
def write(self, data):
self._stream.write(Output(self.fileno()==2, data))
def writelines(self, lines):
self._stream.write(Output(self.fileno()==2, ''.join(lines)))
def writable(self):
return True
def readable(self, data):
return False
def isatty(self):
return False
def flush(self):
pass
def __init__(self):
self.data = [] # the list of (isErr, txt) tuples .write calls go to
@property
def pipes(self):
return self.OutputFile(self, 'stdout'), self.OutputFile(self, 'stderr')
def write(self, output):
self.data.append(output)