Skip to content

Commit 7caffae

Browse files
committed
Merge pull request #347 from timstoop/connection_error_issue_346
Make reconnecting to a lost RabbitMQ work.
2 parents 1df81f5 + 2033a5f commit 7caffae

1 file changed

Lines changed: 18 additions & 10 deletions

File tree

beaver/transports/rabbitmq_transport.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,15 @@ def _publish_message(self):
7272
self._count = 0
7373
else:
7474
self._count += 1
75-
self._channel.basic_publish(
76-
exchange=self._rabbitmq_config['exchange'],
77-
routing_key=self._rabbitmq_config['key'],
78-
body=line,
79-
properties=pika.BasicProperties(
80-
content_type='text/json',
81-
delivery_mode=self._rabbitmq_config['delivery_mode']
82-
))
75+
if self._channel != None:
76+
self._channel.basic_publish(
77+
exchange=self._rabbitmq_config['exchange'],
78+
routing_key=self._rabbitmq_config['key'],
79+
body=line,
80+
properties=pika.BasicProperties(
81+
content_type='text/json',
82+
delivery_mode=self._rabbitmq_config['delivery_mode']
83+
))
8384
else:
8485
self._logger.debug("RabbitMQ transport queue is empty, sleeping for 1 second.")
8586
time.sleep(1)
@@ -93,15 +94,22 @@ def _on_connection_open_error(self,non_used_connection=None,error=None):
9394

9495
def _on_connection_closed(self, connection, reply_code, reply_text):
9596
self._channel = None
96-
if self._connection._closing:
97+
if hasattr(self._connection, '_closing'):
98+
closing = self._connection._closing
99+
elif hasattr(self._connection, 'is_closing'):
100+
closing = self._connection.is_closing
101+
else:
102+
raise NotImplementedError('Unsure how to check whether the connection is closing.')
103+
if closing:
97104
try:
98105
self._connection.ioloop.stop()
99106
except:
100107
pass
101108
else:
102109
self._logger.warning('RabbitMQ Connection closed, reopening in 1 seconds: (%s) %s',
103110
reply_code, reply_text)
104-
self._connection.add_timeout(1, self.reconnect)
111+
time.sleep(1)
112+
self.reconnect()
105113

106114
def reconnect(self):
107115
try:

0 commit comments

Comments
 (0)