Skip to content

Commit 7ce015b

Browse files
authored
Merge pull request #431 from aliegha/add-arbitrary-rabbitmq-args
Add arbitrary rabbitmq args
2 parents d093e6f + 37dd76a commit 7ce015b

3 files changed

Lines changed: 25 additions & 2 deletions

File tree

beaver/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ def __init__(self, args, logger=None):
8282
'rabbitmq_exchange_durable': os.environ.get('RABBITMQ_EXCHANGE_DURABLE', '0'),
8383
'rabbitmq_queue_durable': os.environ.get('RABBITMQ_QUEUE_DURABLE', '0'),
8484
'rabbitmq_ha_queue': os.environ.get('RABBITMQ_HA_QUEUE', '0'),
85+
'rabbitmq_arguments': os.environ.get('RABBITMQ_ARGUMENTS', {}),
8586
'rabbitmq_key': os.environ.get('RABBITMQ_KEY', 'logstash-key'),
8687
'rabbitmq_exchange': os.environ.get('RABBITMQ_EXCHANGE', 'logstash-exchange'),
8788
'rabbitmq_timeout': '1',
@@ -248,6 +249,7 @@ def use_ssh_tunnel(self):
248249

249250
def _check_for_deprecated_usage(self):
250251
env_vars = [
252+
'RABBITMQ_ARGUMENTS'
251253
'RABBITMQ_HOST',
252254
'RABBITMQ_PORT',
253255
'RABBITMQ_VHOST',

beaver/transports/rabbitmq_transport.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ def __init__(self, beaver_config, logger=None):
1818
config_to_store = [
1919
'key', 'exchange', 'username', 'password', 'host', 'port', 'vhost',
2020
'queue', 'queue_durable', 'ha_queue', 'exchange_type', 'exchange_durable',
21-
'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'timeout', 'delivery_mode'
21+
'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'timeout', 'delivery_mode', 'arguments'
2222
]
2323

2424
for key in config_to_store:
2525
self._rabbitmq_config[key] = beaver_config.get('rabbitmq_' + key)
26+
if self._rabbitmq_config['arguments']:
27+
self._rabbitmq_config['arguments'] = self.get_rabbitmq_args()
28+
if self._rabbitmq_config['ha_queue']:
29+
self._rabbitmq_config['arguments']['x-ha-policy'] = 'all'
30+
31+
2632

2733
self._connection = None
2834
self._channel = None
@@ -33,6 +39,20 @@ def __init__(self, beaver_config, logger=None):
3339
self._is_valid = True
3440
self._connect()
3541

42+
def get_rabbitmq_args(self):
43+
res = {}
44+
args = self._rabbitmq_config['arguments'].split(',')
45+
for x in args:
46+
k, v = x.split(':')
47+
try:
48+
# convert str to int if not a str
49+
v = int(v)
50+
except ValueError:
51+
pass # is a str, not an int
52+
res[k] = v
53+
return res
54+
55+
3656
def _on_connection_open(self,connection):
3757
self._logger.debug("RabbitMQ: Connection Created")
3858
self._channel = connection.channel(self._on_channel_open)
@@ -49,7 +69,7 @@ def _on_exchange_declareok(self,unused):
4969
self._channel.queue_declare(self._on_queue_declareok,
5070
queue=self._rabbitmq_config['queue'],
5171
durable=self._rabbitmq_config['queue_durable'],
52-
arguments={'x-ha-policy': 'all'} if self._rabbitmq_config['ha_queue'] else {})
72+
arguments=self._rabbitmq_config['arguments'])
5373

5474
def _on_queue_declareok(self,unused):
5575
self._logger.debug("RabbitMQ: Queue Declared")

docs/user/usage.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This
5858
* mqtt_keepalive: Default ``60``. mqtt keepalive ping
5959
* mqtt_topic: Default ``/logstash``. Topic to publish to
6060
* number_of_consumer_processes: Default ``1``. Number of parallel consumer processes that read and process messages from the beaver queue.
61+
* rabbitmq_arguments: Defaults ``{}``. RabbitMQ arguments comma separated, colon separated key value pairs. i.e ``rabbitmq_arguments: x-max-length:750000,x-max-length-bytes:1073741824``
6162
* rabbitmq_host: Defaults ``localhost``. Host for RabbitMQ
6263
* rabbitmq_port: Defaults ``5672``. Port for RabbitMQ
6364
* rabbitmq_ssl: Defaults ``0``. Connect using SSL/TLS

0 commit comments

Comments
 (0)