From 7a7424975fc5b773ae8c9ee03b88628f497a20a1 Mon Sep 17 00:00:00 2001 From: Tim Stoop Date: Fri, 11 Sep 2015 19:27:57 +0200 Subject: [PATCH 1/2] Make reconnecting to a lost RabbitMQ work. --- beaver/transports/rabbitmq_transport.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/beaver/transports/rabbitmq_transport.py b/beaver/transports/rabbitmq_transport.py index 589b5ff6..51fd8ed7 100644 --- a/beaver/transports/rabbitmq_transport.py +++ b/beaver/transports/rabbitmq_transport.py @@ -72,14 +72,15 @@ def _publish_message(self): self._count = 0 else: self._count += 1 - self._channel.basic_publish( - exchange=self._rabbitmq_config['exchange'], - routing_key=self._rabbitmq_config['key'], - body=line, - properties=pika.BasicProperties( - content_type='text/json', - delivery_mode=self._rabbitmq_config['delivery_mode'] - )) + if self._channel != None: + self._channel.basic_publish( + exchange=self._rabbitmq_config['exchange'], + routing_key=self._rabbitmq_config['key'], + body=line, + properties=pika.BasicProperties( + content_type='text/json', + delivery_mode=self._rabbitmq_config['delivery_mode'] + )) else: self._logger.debug("RabbitMQ transport queue is empty, sleeping for 1 second.") time.sleep(1) @@ -93,7 +94,7 @@ def _on_connection_open_error(self,non_used_connection=None,error=None): def _on_connection_closed(self, connection, reply_code, reply_text): self._channel = None - if self._connection._closing: + if self._connection.is_closing: try: self._connection.ioloop.stop() except: @@ -101,7 +102,8 @@ def _on_connection_closed(self, connection, reply_code, reply_text): else: self._logger.warning('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s', reply_code, reply_text) - self._connection.add_timeout(1, self.reconnect) + time.sleep(1) + self.reconnect() def reconnect(self): try: From 2033a5f31d7711496b12f3c4556d5c64e25f5fd0 Mon Sep 17 00:00:00 2001 From: Tim Stoop Date: Fri, 11 Sep 2015 19:51:40 +0200 Subject: [PATCH 2/2] Support both older and newer pika. --- beaver/transports/rabbitmq_transport.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/beaver/transports/rabbitmq_transport.py b/beaver/transports/rabbitmq_transport.py index 51fd8ed7..c575e1a2 100644 --- a/beaver/transports/rabbitmq_transport.py +++ b/beaver/transports/rabbitmq_transport.py @@ -94,7 +94,13 @@ def _on_connection_open_error(self,non_used_connection=None,error=None): def _on_connection_closed(self, connection, reply_code, reply_text): self._channel = None - if self._connection.is_closing: + if hasattr(self._connection, '_closing'): + closing = self._connection._closing + elif hasattr(self._connection, 'is_closing'): + closing = self._connection.is_closing + else: + raise NotImplementedError('Unsure how to check whether the connection is closing.') + if closing: try: self._connection.ioloop.stop() except: