diff --git a/beaver/transports/rabbitmq_transport.py b/beaver/transports/rabbitmq_transport.py index 589b5ff6..c575e1a2 100644 --- a/beaver/transports/rabbitmq_transport.py +++ b/beaver/transports/rabbitmq_transport.py @@ -72,14 +72,15 @@ def _publish_message(self): self._count = 0 else: self._count += 1 - self._channel.basic_publish( - exchange=self._rabbitmq_config['exchange'], - routing_key=self._rabbitmq_config['key'], - body=line, - properties=pika.BasicProperties( - content_type='text/json', - delivery_mode=self._rabbitmq_config['delivery_mode'] - )) + if self._channel != None: + self._channel.basic_publish( + exchange=self._rabbitmq_config['exchange'], + routing_key=self._rabbitmq_config['key'], + body=line, + properties=pika.BasicProperties( + content_type='text/json', + delivery_mode=self._rabbitmq_config['delivery_mode'] + )) else: self._logger.debug("RabbitMQ transport queue is empty, sleeping for 1 second.") time.sleep(1) @@ -93,7 +94,13 @@ def _on_connection_open_error(self,non_used_connection=None,error=None): def _on_connection_closed(self, connection, reply_code, reply_text): self._channel = None - if self._connection._closing: + if hasattr(self._connection, '_closing'): + closing = self._connection._closing + elif hasattr(self._connection, 'is_closing'): + closing = self._connection.is_closing + else: + raise NotImplementedError('Unsure how to check whether the connection is closing.') + if closing: try: self._connection.ioloop.stop() except: @@ -101,7 +108,8 @@ def _on_connection_closed(self, connection, reply_code, reply_text): else: self._logger.warning('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s', reply_code, reply_text) - self._connection.add_timeout(1, self.reconnect) + time.sleep(1) + self.reconnect() def reconnect(self): try: