Skip to content

Commit 0803a46

Browse files
committed
Make heartmonitor.Heart monitorable
If a mon_addr is given, send any ping to this zmq socket. This can be used to monitor the heartbeat on the engine side to check if the cluster controller is still active. Also correct the code on __main__ to work (for testing purpose). Signed-off-by: Jan Schulz <jasc@gmx.net>
1 parent 144f08a commit 0803a46

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

IPython/parallel/controller/heartmonitor.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import uuid
2020

2121
import zmq
22-
from zmq.devices import ThreadDevice
22+
from zmq.devices import ThreadDevice, ThreadMonitoredQueue
2323
from zmq.eventloop import ioloop, zmqstream
2424

2525
from IPython.config.configurable import LoggingConfigurable
@@ -39,15 +39,20 @@ class Heart(object):
3939
You can specify the DEALER's IDENTITY via the optional heart_id argument."""
4040
device=None
4141
id=None
42-
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
43-
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42+
def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
43+
if mon_addr is None:
44+
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
45+
else:
46+
self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
4447
# do not allow the device to share global Context.instance,
4548
# which is the default behavior in pyzmq > 2.1.10
4649
self.device.context_factory = zmq.Context
4750

4851
self.device.daemon=True
4952
self.device.connect_in(in_addr)
5053
self.device.connect_out(out_addr)
54+
if mon_addr is not None:
55+
self.device.connect_mon(mon_addr)
5156
if in_type == zmq.SUB:
5257
self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
5358
if heart_id is None:
@@ -122,7 +127,7 @@ def beat(self):
122127
map(self.handle_heart_failure, heartfailures)
123128
self.on_probation = missed_beats.intersection(self.hearts)
124129
self.responses = set()
125-
# print self.on_probation, self.hearts
130+
#print self.on_probation, self.hearts
126131
# self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
127132
self.pingstream.send(str_to_bytes(str(self.lifetime)))
128133
# flush stream to force immediate socket send
@@ -177,6 +182,8 @@ def handle_pong(self, msg):
177182
outstream = zmqstream.ZMQStream(pub, loop)
178183
instream = zmqstream.ZMQStream(router, loop)
179184

180-
hb = HeartMonitor(loop, outstream, instream)
181-
185+
hb = HeartMonitor(loop=loop, pingstream=outstream, pongstream=instream)
186+
import logging
187+
hb.log.setLevel(logging.DEBUG)
188+
hb.start()
182189
loop.start()

0 commit comments

Comments
 (0)