From c03010972558b20f8d0c5ea13dd81c5d90130826 Mon Sep 17 00:00:00 2001 From: eroberts Date: Sun, 14 Oct 2018 17:12:18 -0600 Subject: [PATCH 1/2] Add arbitrary RabbitMQ arguments - Updated transport to parse arguments -Updated config.py to add system property - Updated user usage documentation - Confirmed no change in functionality if args are not passed --- beaver/config.py | 2 ++ beaver/transports/rabbitmq_transport.py | 24 ++++++++++++++++++++++-- docs/user/usage.rst | 1 + 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/beaver/config.py b/beaver/config.py index a246dc14..ec7b6421 100644 --- a/beaver/config.py +++ b/beaver/config.py @@ -82,6 +82,7 @@ def __init__(self, args, logger=None): 'rabbitmq_exchange_durable': os.environ.get('RABBITMQ_EXCHANGE_DURABLE', '0'), 'rabbitmq_queue_durable': os.environ.get('RABBITMQ_QUEUE_DURABLE', '0'), 'rabbitmq_ha_queue': os.environ.get('RABBITMQ_HA_QUEUE', '0'), + 'rabbitmq_arguments': os.environ.get('RABBIT_ARGUMENTS', {}), 'rabbitmq_key': os.environ.get('RABBITMQ_KEY', 'logstash-key'), 'rabbitmq_exchange': os.environ.get('RABBITMQ_EXCHANGE', 'logstash-exchange'), 'rabbitmq_timeout': '1', @@ -248,6 +249,7 @@ def use_ssh_tunnel(self): def _check_for_deprecated_usage(self): env_vars = [ + 'RABBITMQ_ARGUMENTS' 'RABBITMQ_HOST', 'RABBITMQ_PORT', 'RABBITMQ_VHOST', diff --git a/beaver/transports/rabbitmq_transport.py b/beaver/transports/rabbitmq_transport.py index 46d10b84..8992fb81 100644 --- a/beaver/transports/rabbitmq_transport.py +++ b/beaver/transports/rabbitmq_transport.py @@ -18,11 +18,17 @@ def __init__(self, beaver_config, logger=None): config_to_store = [ 'key', 'exchange', 'username', 'password', 'host', 'port', 'vhost', 'queue', 'queue_durable', 'ha_queue', 'exchange_type', 'exchange_durable', - 'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'timeout', 'delivery_mode' + 'ssl', 'ssl_key', 'ssl_cert', 'ssl_cacert', 'timeout', 'delivery_mode', 'arguments' ] for key in config_to_store: self._rabbitmq_config[key] = beaver_config.get('rabbitmq_' + key) + if self._rabbitmq_config['arguments']: + self._rabbitmq_config['arguments'] = self.get_rabbitmq_args() + if self._rabbitmq_config['ha_queue']: + self._rabbitmq_config['arguments']['x-ha-policy'] = 'all' + + self._connection = None self._channel = None @@ -33,6 +39,20 @@ def __init__(self, beaver_config, logger=None): self._is_valid = True self._connect() + def get_rabbitmq_args(self): + res = {} + args = self._rabbitmq_config['arguments'].split(',') + for x in args: + k, v = x.split(':') + try: + # convert str to int if not a str + v = int(v) + except ValueError: + pass # is a str, not an int + res[k] = v + return res + + def _on_connection_open(self,connection): self._logger.debug("RabbitMQ: Connection Created") self._channel = connection.channel(self._on_channel_open) @@ -49,7 +69,7 @@ def _on_exchange_declareok(self,unused): self._channel.queue_declare(self._on_queue_declareok, queue=self._rabbitmq_config['queue'], durable=self._rabbitmq_config['queue_durable'], - arguments={'x-ha-policy': 'all'} if self._rabbitmq_config['ha_queue'] else {}) + arguments=self._rabbitmq_config['arguments']) def _on_queue_declareok(self,unused): self._logger.debug("RabbitMQ: Queue Declared") diff --git a/docs/user/usage.rst b/docs/user/usage.rst index 6c55ff36..a56efdf8 100644 --- a/docs/user/usage.rst +++ b/docs/user/usage.rst @@ -58,6 +58,7 @@ Beaver can optionally get data from a ``configfile`` using the ``-c`` flag. This * mqtt_keepalive: Default ``60``. mqtt keepalive ping * mqtt_topic: Default ``/logstash``. Topic to publish to * number_of_consumer_processes: Default ``1``. Number of parallel consumer processes that read and process messages from the beaver queue. +* rabbitmq_arguments: Defaults ``{}``. RabbitMQ arguments comma separated, colon separated key value pairs. i.e ``rabbitmq_arguments: x-max-length:750000,x-max-length-bytes:1073741824`` * rabbitmq_host: Defaults ``localhost``. Host for RabbitMQ * rabbitmq_port: Defaults ``5672``. Port for RabbitMQ * rabbitmq_ssl: Defaults ``0``. Connect using SSL/TLS From 37dd76ac761dc3e64e0ef5bb51c3ac302a306a4d Mon Sep 17 00:00:00 2001 From: eroberts Date: Sun, 14 Oct 2018 17:29:52 -0600 Subject: [PATCH 2/2] Fix typo in RABBITMQ_ARGUMENT system property --- beaver/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beaver/config.py b/beaver/config.py index ec7b6421..14092c53 100644 --- a/beaver/config.py +++ b/beaver/config.py @@ -82,7 +82,7 @@ def __init__(self, args, logger=None): 'rabbitmq_exchange_durable': os.environ.get('RABBITMQ_EXCHANGE_DURABLE', '0'), 'rabbitmq_queue_durable': os.environ.get('RABBITMQ_QUEUE_DURABLE', '0'), 'rabbitmq_ha_queue': os.environ.get('RABBITMQ_HA_QUEUE', '0'), - 'rabbitmq_arguments': os.environ.get('RABBIT_ARGUMENTS', {}), + 'rabbitmq_arguments': os.environ.get('RABBITMQ_ARGUMENTS', {}), 'rabbitmq_key': os.environ.get('RABBITMQ_KEY', 'logstash-key'), 'rabbitmq_exchange': os.environ.get('RABBITMQ_EXCHANGE', 'logstash-exchange'), 'rabbitmq_timeout': '1',