@@ -18,11 +18,17 @@ def __init__(self, beaver_config, logger=None):
1818 config_to_store = [
1919 'key' , 'exchange' , 'username' , 'password' , 'host' , 'port' , 'vhost' ,
2020 'queue' , 'queue_durable' , 'ha_queue' , 'exchange_type' , 'exchange_durable' ,
21- 'ssl' , 'ssl_key' , 'ssl_cert' , 'ssl_cacert' , 'timeout' , 'delivery_mode'
21+ 'ssl' , 'ssl_key' , 'ssl_cert' , 'ssl_cacert' , 'timeout' , 'delivery_mode' , 'arguments'
2222 ]
2323
2424 for key in config_to_store :
2525 self ._rabbitmq_config [key ] = beaver_config .get ('rabbitmq_' + key )
26+ if self ._rabbitmq_config ['arguments' ]:
27+ self ._rabbitmq_config ['arguments' ] = self .get_rabbitmq_args ()
28+ if self ._rabbitmq_config ['ha_queue' ]:
29+ self ._rabbitmq_config ['arguments' ]['x-ha-policy' ] = 'all'
30+
31+
2632
2733 self ._connection = None
2834 self ._channel = None
@@ -33,6 +39,20 @@ def __init__(self, beaver_config, logger=None):
3339 self ._is_valid = True
3440 self ._connect ()
3541
42+ def get_rabbitmq_args (self ):
43+ res = {}
44+ args = self ._rabbitmq_config ['arguments' ].split (',' )
45+ for x in args :
46+ k , v = x .split (':' )
47+ try :
48+ # convert str to int if not a str
49+ v = int (v )
50+ except ValueError :
51+ pass # is a str, not an int
52+ res [k ] = v
53+ return res
54+
55+
3656 def _on_connection_open (self ,connection ):
3757 self ._logger .debug ("RabbitMQ: Connection Created" )
3858 self ._channel = connection .channel (self ._on_channel_open )
@@ -49,7 +69,7 @@ def _on_exchange_declareok(self,unused):
4969 self ._channel .queue_declare (self ._on_queue_declareok ,
5070 queue = self ._rabbitmq_config ['queue' ],
5171 durable = self ._rabbitmq_config ['queue_durable' ],
52- arguments = { 'x-ha-policy' : 'all' } if self ._rabbitmq_config ['ha_queue' ] else {} )
72+ arguments = self ._rabbitmq_config ['arguments' ] )
5373
5474 def _on_queue_declareok (self ,unused ):
5575 self ._logger .debug ("RabbitMQ: Queue Declared" )
0 commit comments