Skip to content

Commit d58ae8e

Browse files
committed
Isolate connection logic. Provide proper reconnect support.
1 parent ad5d84a commit d58ae8e

1 file changed

Lines changed: 33 additions & 18 deletions

File tree

beaver/transports/rabbitmq_transport.py

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,38 +10,50 @@ class RabbitmqTransport(BaseTransport):
1010
def __init__(self, beaver_config, logger=None):
1111
super(RabbitmqTransport, self).__init__(beaver_config, logger=logger)
1212

13-
self._rabbitmq_key = beaver_config.get('rabbitmq_key')
14-
self._rabbitmq_exchange = beaver_config.get('rabbitmq_exchange')
13+
self._rabbitmq_config = {}
14+
config_to_store = [
15+
'key', 'exchange', 'username', 'password', 'host', 'port', 'vhost',
16+
'queue', 'queue_durable', 'ha_queue', 'exchange_type', 'exchange_durable'
17+
]
18+
19+
for key in config_to_store:
20+
self._rabbitmq_config[key] = beaver_config.get('rabbitmq_' + key)
21+
22+
self._connection = None
23+
self._channel = None
24+
self._connect()
25+
26+
def _connect(self):
1527

1628
# Setup RabbitMQ connection
1729
credentials = pika.PlainCredentials(
18-
beaver_config.get('rabbitmq_username'),
19-
beaver_config.get('rabbitmq_password')
30+
self._rabbitmq_config['username'],
31+
self._rabbitmq_config['password']
2032
)
2133
parameters = pika.connection.ConnectionParameters(
2234
credentials=credentials,
23-
host=beaver_config.get('rabbitmq_host'),
24-
port=beaver_config.get('rabbitmq_port'),
25-
virtual_host=beaver_config.get('rabbitmq_vhost')
35+
host=self._rabbitmq_config['host'],
36+
port=self._rabbitmq_config['port'],
37+
virtual_host=self._rabbitmq_config['vhost']
2638
)
2739
self._connection = pika.adapters.BlockingConnection(parameters)
2840
self._channel = self._connection.channel()
2941

3042
# Declare RabbitMQ queue and bindings
3143
self._channel.queue_declare(
32-
queue=beaver_config.get('rabbitmq_queue'),
33-
durable=beaver_config.get('rabbitmq_queue_durable'),
34-
arguments={'x-ha-policy': 'all'} if beaver_config.get('rabbitmq_ha_queue') else {}
44+
queue=self._rabbitmq_config['queue'],
45+
durable=self._rabbitmq_config['queue_durable'],
46+
arguments={'x-ha-policy': 'all'} if self._rabbitmq_config['ha_queue'] else {}
3547
)
3648
self._channel.exchange_declare(
37-
exchange=self._rabbitmq_exchange,
38-
type=beaver_config.get('rabbitmq_exchange_type'),
39-
durable=beaver_config.get('rabbitmq_exchange_durable')
49+
exchange=self._rabbitmq_config['exchange'],
50+
exchange_type=self._rabbitmq_config['exchange_type'],
51+
durable=self._rabbitmq_config['exchange_durable']
4052
)
4153
self._channel.queue_bind(
42-
exchange=self._rabbitmq_exchange,
43-
queue=beaver_config.get('rabbitmq_queue'),
44-
routing_key=self._rabbitmq_key
54+
exchange=self._rabbitmq_config['exchange'],
55+
queue=self._rabbitmq_config['queue'],
56+
routing_key=self._rabbitmq_config['key']
4557
)
4658

4759
def callback(self, filename, lines, **kwargs):
@@ -55,8 +67,8 @@ def callback(self, filename, lines, **kwargs):
5567
with warnings.catch_warnings():
5668
warnings.simplefilter('error')
5769
self._channel.basic_publish(
58-
exchange=self._rabbitmq_exchange,
59-
routing_key=self._rabbitmq_key,
70+
exchange=self._rabbitmq_config['exchange'],
71+
routing_key=self._rabbitmq_config['key'],
6072
body=self.format(filename, line, timestamp, **kwargs),
6173
properties=pika.BasicProperties(
6274
content_type='text/json',
@@ -77,5 +89,8 @@ def interrupt(self):
7789
if self._connection:
7890
self._connection.close()
7991

92+
def reconnect(self):
93+
self._connect()
94+
8095
def unhandled(self):
8196
return True

0 commit comments

Comments
 (0)