@@ -10,38 +10,50 @@ class RabbitmqTransport(BaseTransport):
1010 def __init__ (self , beaver_config , logger = None ):
1111 super (RabbitmqTransport , self ).__init__ (beaver_config , logger = logger )
1212
13- self ._rabbitmq_key = beaver_config .get ('rabbitmq_key' )
14- self ._rabbitmq_exchange = beaver_config .get ('rabbitmq_exchange' )
13+ self ._rabbitmq_config = {}
14+ config_to_store = [
15+ 'key' , 'exchange' , 'username' , 'password' , 'host' , 'port' , 'vhost' ,
16+ 'queue' , 'queue_durable' , 'ha_queue' , 'exchange_type' , 'exchange_durable'
17+ ]
18+
19+ for key in config_to_store :
20+ self ._rabbitmq_config [key ] = beaver_config .get ('rabbitmq_' + key )
21+
22+ self ._connection = None
23+ self ._channel = None
24+ self ._connect ()
25+
26+ def _connect (self ):
1527
1628 # Setup RabbitMQ connection
1729 credentials = pika .PlainCredentials (
18- beaver_config . get ( 'rabbitmq_username' ) ,
19- beaver_config . get ( 'rabbitmq_password' )
30+ self . _rabbitmq_config [ 'username' ] ,
31+ self . _rabbitmq_config [ 'password' ]
2032 )
2133 parameters = pika .connection .ConnectionParameters (
2234 credentials = credentials ,
23- host = beaver_config . get ( 'rabbitmq_host' ) ,
24- port = beaver_config . get ( 'rabbitmq_port' ) ,
25- virtual_host = beaver_config . get ( 'rabbitmq_vhost' )
35+ host = self . _rabbitmq_config [ 'host' ] ,
36+ port = self . _rabbitmq_config [ 'port' ] ,
37+ virtual_host = self . _rabbitmq_config [ 'vhost' ]
2638 )
2739 self ._connection = pika .adapters .BlockingConnection (parameters )
2840 self ._channel = self ._connection .channel ()
2941
3042 # Declare RabbitMQ queue and bindings
3143 self ._channel .queue_declare (
32- queue = beaver_config . get ( 'rabbitmq_queue' ) ,
33- durable = beaver_config . get ( 'rabbitmq_queue_durable' ) ,
34- arguments = {'x-ha-policy' : 'all' } if beaver_config . get ( 'rabbitmq_ha_queue' ) else {}
44+ queue = self . _rabbitmq_config [ 'queue' ] ,
45+ durable = self . _rabbitmq_config [ 'queue_durable' ] ,
46+ arguments = {'x-ha-policy' : 'all' } if self . _rabbitmq_config [ 'ha_queue' ] else {}
3547 )
3648 self ._channel .exchange_declare (
37- exchange = self ._rabbitmq_exchange ,
38- type = beaver_config . get ( 'rabbitmq_exchange_type' ) ,
39- durable = beaver_config . get ( 'rabbitmq_exchange_durable' )
49+ exchange = self ._rabbitmq_config [ 'exchange' ] ,
50+ exchange_type = self . _rabbitmq_config [ 'exchange_type' ] ,
51+ durable = self . _rabbitmq_config [ 'exchange_durable' ]
4052 )
4153 self ._channel .queue_bind (
42- exchange = self ._rabbitmq_exchange ,
43- queue = beaver_config . get ( 'rabbitmq_queue' ) ,
44- routing_key = self ._rabbitmq_key
54+ exchange = self ._rabbitmq_config [ 'exchange' ] ,
55+ queue = self . _rabbitmq_config [ 'queue' ] ,
56+ routing_key = self ._rabbitmq_config [ 'key' ]
4557 )
4658
4759 def callback (self , filename , lines , ** kwargs ):
@@ -55,8 +67,8 @@ def callback(self, filename, lines, **kwargs):
5567 with warnings .catch_warnings ():
5668 warnings .simplefilter ('error' )
5769 self ._channel .basic_publish (
58- exchange = self ._rabbitmq_exchange ,
59- routing_key = self ._rabbitmq_key ,
70+ exchange = self ._rabbitmq_config [ 'exchange' ] ,
71+ routing_key = self ._rabbitmq_config [ 'key' ] ,
6072 body = self .format (filename , line , timestamp , ** kwargs ),
6173 properties = pika .BasicProperties (
6274 content_type = 'text/json' ,
@@ -77,5 +89,8 @@ def interrupt(self):
7789 if self ._connection :
7890 self ._connection .close ()
7991
92+ def reconnect (self ):
93+ self ._connect ()
94+
8095 def unhandled (self ):
8196 return True
0 commit comments