@@ -72,14 +72,15 @@ def _publish_message(self):
7272 self ._count = 0
7373 else :
7474 self ._count += 1
75- self ._channel .basic_publish (
76- exchange = self ._rabbitmq_config ['exchange' ],
77- routing_key = self ._rabbitmq_config ['key' ],
78- body = line ,
79- properties = pika .BasicProperties (
80- content_type = 'text/json' ,
81- delivery_mode = self ._rabbitmq_config ['delivery_mode' ]
82- ))
75+ if self ._channel != None :
76+ self ._channel .basic_publish (
77+ exchange = self ._rabbitmq_config ['exchange' ],
78+ routing_key = self ._rabbitmq_config ['key' ],
79+ body = line ,
80+ properties = pika .BasicProperties (
81+ content_type = 'text/json' ,
82+ delivery_mode = self ._rabbitmq_config ['delivery_mode' ]
83+ ))
8384 else :
8485 self ._logger .debug ("RabbitMQ transport queue is empty, sleeping for 1 second." )
8586 time .sleep (1 )
@@ -93,15 +94,22 @@ def _on_connection_open_error(self,non_used_connection=None,error=None):
9394
9495 def _on_connection_closed (self , connection , reply_code , reply_text ):
9596 self ._channel = None
96- if self ._connection ._closing :
97+ if hasattr (self ._connection , '_closing' ):
98+ closing = self ._connection ._closing
99+ elif hasattr (self ._connection , 'is_closing' ):
100+ closing = self ._connection .is_closing
101+ else :
102+ raise NotImplementedError ('Unsure how to check whether the connection is closing.' )
103+ if closing :
97104 try :
98105 self ._connection .ioloop .stop ()
99106 except :
100107 pass
101108 else :
102109 self ._logger .warning ('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s' ,
103110 reply_code , reply_text )
104- self ._connection .add_timeout (1 , self .reconnect )
111+ time .sleep (1 )
112+ self .reconnect ()
105113
106114 def reconnect (self ):
107115 try :
0 commit comments