Skip to content

Commit d8dd2ab

Browse files
committed
The communication to the RGI is now asynchronous on the editor side.
1 parent 1ba5380 commit d8dd2ab

File tree

2 files changed

+54
-22
lines changed

2 files changed

+54
-22
lines changed

floppy/graph.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(self, painter=None):
4040
self.STOREDVALUES = {}
4141
self.connections = {}
4242
self.runner = None
43+
self.status = None
4344
self.reverseConnections = {}
4445
# self.statusLock = Lock()
4546
if painter:
@@ -102,13 +103,16 @@ def needsUpdate(self):
102103
"""
103104
if self.connected:
104105
# IDs = self.requestRemoteStatus()
105-
status = self.requestRemoteStatus()
106-
IDs = status['STATUS']['ran']
107-
self.currentlyRunning = status['STATUS']['running']
108-
self.currentReport = status['REPORT']
109-
if IDs:
110-
self.executedBuffer += IDs
111-
return True
106+
# status = self.requestRemoteStatus()
107+
self.requestRemoteStatus()
108+
status = self.status
109+
if status:
110+
IDs = status['STATUS']['ran']
111+
self.currentlyRunning = status['STATUS']['running']
112+
self.currentReport = status['REPORT']
113+
if IDs:
114+
self.executedBuffer += IDs
115+
return True
112116
if self._requestUpdate:
113117
self._requestUpdate = False
114118
return True
@@ -318,17 +322,20 @@ def runNodePar(self, node, cb=None, arg=None):
318322
# sendCommand('KILL')
319323
# del r
320324

325+
def print(self, message):
326+
print(message)
327+
321328
def updateRunner(self):
322329
"""
323330
Serializes the graph and sends it to the connected graph interpreter telling it to load the new data.
324331
:return:
325332
"""
326333
# self.executedBuffer = []
327-
print(self.rgiConnection.send('PAUSE'))
334+
self.rgiConnection.send('PAUSE', self.print)
328335
message = self.serialize()
329336
# msg = struct.pack('>I', len(message)) + message.encode('utf-8')
330337
# self.sendUpdate(data)
331-
print(self.rgiConnection.send('UPDATE'+message))
338+
self.rgiConnection.send('UPDATE'+message, self.print)
332339

333340
def push2Runner(self):
334341
"""
@@ -337,11 +344,11 @@ def push2Runner(self):
337344
"""
338345
self.executedBuffer = []
339346
self.STOREDVALUES = {}
340-
print(self.rgiConnection.send('PAUSE'))
347+
self.rgiConnection.send('PAUSE', self.print)
341348
message = self.serialize()
342349
# msg = struct.pack('>I', len(message)) + message.encode('utf-8')
343350
# self.sendUpdate(data)
344-
print(self.rgiConnection.send('PUSH'+message))
351+
self.rgiConnection.send('PUSH'+message, self.print)
345352

346353
def serialize(self):
347354
"""
@@ -377,7 +384,7 @@ def killRunner(self):
377384
"""
378385
if not self.slave:
379386
return
380-
print(self.rgiConnection.send('KILL'))
387+
self.rgiConnection.send('KILL', self.print)
381388
# sendCommand('KILL', self.cmdHost, self.cmdPort)
382389
# self.clientSocket.close()
383390
del self.runner
@@ -389,39 +396,42 @@ def pauseRunner(self):
389396
Send PAUSE command to the graph interpreter.
390397
:return:
391398
"""
392-
print(self.rgiConnection.send('PAUSE'))
399+
self.rgiConnection.send('PAUSE', self.print)
393400
# sendCommand('PAUSE', self.cmdHost, self.cmdPort)
394401

395402
def unpauseRunner(self):
396403
"""
397404
Send UNPAUSE command to the graph interpreter.
398405
:return:
399406
"""
400-
print(self.rgiConnection.send('UNPAUSE'))
407+
self.rgiConnection.send('UNPAUSE', self.print)
401408
# sendCommand('UNPAUSE', self.cmdHost, self.cmdPort)
402409

403410
def stepRunner(self):
404411
"""
405412
Send Step command to the graph interpreter causing it to execute one node and then reenter the PAUSED state.
406413
:return:
407414
"""
408-
print(self.rgiConnection.send('STEP'))
415+
self.rgiConnection.send('STEP', self.print)
409416

410417
def gotoRunner(self, nextID):
411418
"""
412419
Send GOTO<Node> command to the graph interpreter causing it to execute the node with the given ID next.
413420
:param nextID:
414421
:return:
415422
"""
416-
print(self.rgiConnection.send('GOTO1'))
423+
self.rgiConnection.send('GOTO1', self.print)
417424

418425
def dropGraph(self):
419-
print(self.rgiConnection.send('DROP'))
426+
self.rgiConnection.send('DROP', self.print)
427+
428+
def setStatus(self, status):
429+
self.status = json.loads(status[10:])
420430

421431
def requestRemoteStatus(self):
422432
if self.connected:
423433
try:
424-
status = self.rgiConnection.send('STATUS***{}'.format(self._requestReport))
434+
self.rgiConnection.send('STATUS***{}'.format(self._requestReport), self.setStatus)
425435
# status = json.loads(status[10:])
426436
except BrokenPipeError:
427437
self.connected = False
@@ -433,7 +443,8 @@ def requestRemoteStatus(self):
433443
self.connected = False
434444
return []
435445
else:
436-
return json.loads(status[10:])
446+
# return json.loads(status[10:])
447+
return []
437448
else:
438449
return []
439450

@@ -739,3 +750,5 @@ def run(self):
739750

740751

741752

753+
754+

floppy/runner.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,11 +411,27 @@ def receive(self):
411411
return data
412412

413413

414-
class RGIConnection(object):
414+
class RGIConnection(Thread):
415415
def __init__(self, verbose=True):
416+
super(RGIConnection, self).__init__()
417+
self.daemon = True
418+
self.cmdQueue = []
416419
self.socket = None
417420
self.host = None
418421
self.port = None
422+
self.alive = True
423+
self.start()
424+
425+
def run(self):
426+
super(RGIConnection, self).run()
427+
while self.alive:
428+
try:
429+
cmd = self.cmdQueue.pop(0)
430+
except IndexError:
431+
time.sleep(.1)
432+
else:
433+
answer = self._send(cmd[0])
434+
cmd[1](answer)
419435

420436
def connect(self, host, port, validate=True):
421437
self.host = host
@@ -424,7 +440,7 @@ def connect(self, host, port, validate=True):
424440
self.socket.settimeout(5.)
425441
self.socket.connect((host, port))
426442
if validate:
427-
print(self.send('READY?'))
443+
self.send('READY?', print)
428444

429445
def disconnect(self):
430446
self.socket.close()
@@ -434,7 +450,10 @@ def reconnect(self):
434450
time.sleep(.5)
435451
self.socket.connect((self.host, self.port))
436452

437-
def send(self, message):
453+
def send(self, message, target):
454+
self.cmdQueue.append((message, target))
455+
456+
def _send(self, message):
438457
# print('[REQUEST] ' + message)
439458
msg = struct.pack('>I', len(message)) + message.encode('utf-8')
440459
self.socket.sendall(msg)

0 commit comments

Comments
 (0)