1919import uuid
2020
2121import zmq
22- from zmq .devices import ThreadDevice
22+ from zmq .devices import ThreadDevice , ThreadMonitoredQueue
2323from zmq .eventloop import ioloop , zmqstream
2424
2525from IPython .config .configurable import LoggingConfigurable
@@ -39,15 +39,20 @@ class Heart(object):
3939 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
4040 device = None
4141 id = None
42- def __init__ (self , in_addr , out_addr , in_type = zmq .SUB , out_type = zmq .DEALER , heart_id = None ):
43- self .device = ThreadDevice (zmq .FORWARDER , in_type , out_type )
42+ def __init__ (self , in_addr , out_addr , mon_addr = None , in_type = zmq .SUB , out_type = zmq .DEALER , mon_type = zmq .PUB , heart_id = None ):
43+ if mon_addr is None :
44+ self .device = ThreadDevice (zmq .FORWARDER , in_type , out_type )
45+ else :
46+ self .device = ThreadMonitoredQueue (in_type , out_type , mon_type , in_prefix = b"" , out_prefix = b"" )
4447 # do not allow the device to share global Context.instance,
4548 # which is the default behavior in pyzmq > 2.1.10
4649 self .device .context_factory = zmq .Context
4750
4851 self .device .daemon = True
4952 self .device .connect_in (in_addr )
5053 self .device .connect_out (out_addr )
54+ if mon_addr is not None :
55+ self .device .connect_mon (mon_addr )
5156 if in_type == zmq .SUB :
5257 self .device .setsockopt_in (zmq .SUBSCRIBE , b"" )
5358 if heart_id is None :
@@ -122,7 +127,7 @@ def beat(self):
122127 map (self .handle_heart_failure , heartfailures )
123128 self .on_probation = missed_beats .intersection (self .hearts )
124129 self .responses = set ()
125- # print self.on_probation, self.hearts
130+ #print self.on_probation, self.hearts
126131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
127132 self .pingstream .send (str_to_bytes (str (self .lifetime )))
128133 # flush stream to force immediate socket send
@@ -177,6 +182,8 @@ def handle_pong(self, msg):
177182 outstream = zmqstream .ZMQStream (pub , loop )
178183 instream = zmqstream .ZMQStream (router , loop )
179184
180- hb = HeartMonitor (loop , outstream , instream )
181-
185+ hb = HeartMonitor (loop = loop , pingstream = outstream , pongstream = instream )
186+ import logging
187+ hb .log .setLevel (logging .DEBUG )
188+ hb .start ()
182189 loop .start ()
0 commit comments