@@ -17,7 +17,7 @@ def __init__(self, beaver_config, logger=None):
1717 self ._profile = beaver_config .get ('sqs_aws_profile_name' )
1818 self ._region = beaver_config .get ('sqs_aws_region' )
1919 self ._queue_owner_acct_id = beaver_config .get ('sqs_aws_queue_owner_acct_id' )
20- self ._queues = beaver_config .get ('sqs_aws_queue' ).split (',' )
20+ self ._queue = beaver_config .get ('sqs_aws_queue' ).split (',' )
2121 self ._bulk_lines = beaver_config .get ('sqs_bulk_lines' )
2222
2323 try :
@@ -35,16 +35,16 @@ def __init__(self, beaver_config, logger=None):
3535 self ._logger .warn ('Unable to connect to AWS - check your AWS credentials' )
3636 raise TransportException ('Unable to connect to AWS - check your AWS credentials' )
3737
38- self ._queue = {}
39- for queue in self ._queues :
38+ self ._queues = {}
39+ for queue in self ._queue :
4040 self ._logger .debug ('Attempting to load SQS queue: {}' .format (queue ))
4141 if self ._queue_owner_acct_id is None :
42- self ._queue [queue ] = self ._connection .get_queue (queue )
42+ self ._queues [queue ] = self ._connection .get_queue (queue )
4343 else :
44- self ._queue [queue ] = self ._connection .get_queue (queue ,
44+ self ._queues [queue ] = self ._connection .get_queue (queue ,
4545 owner_acct_id = self ._queue_owner_acct_id )
4646
47- if self ._queue [queue ] is None :
47+ if self ._queues [queue ] is None :
4848 raise TransportException ('Unable to access queue with name {0}' .format (queue ))
4949
5050 self ._logger .debug ('Successfully loaded SQS queue: {}' .format (queue ))
@@ -85,6 +85,7 @@ def callback(self, filename, lines, **kwargs):
8585 message_batch = ''
8686 message_count = 0
8787 message_batch_size = 0
88+
8889 # SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (see above)
8990 elif (len (message_batch ) > 0 ) and (((message_batch_size + message_size ) >= message_batch_size_max ) or (len (message_batch ) == 10 )):
9091 self ._logger .debug ('Flushing {0} messages to SQS queue {1} bytes' .format (len (message_batch ), message_batch_size ))
@@ -111,24 +112,25 @@ def callback(self, filename, lines, **kwargs):
111112 return True
112113
113114 def _send_message (self , msg ):
114- try :
115- msg = '[{0}]' .format (msg .rstrip (',' ))
116- m = RawMessage ()
117- m .set_body (msg )
118- result = self ._queue .write (m )
119- if not result :
120- self ._logger .error ('Error occurred sending message to SQS queue {0}. result: {1}' .format (
121- self ._queue_name , result ))
122- raise TransportException ('Error occurred sending message to queue {0}' .format (self ._queue_name ))
123- except Exception , e :
124- self ._logger .exception ('Exception occurred sending message to SQS queue' )
125- raise TransportException (e .message )
115+ for queue in self ._queues :
116+ try :
117+ msg = '[{0}]' .format (msg .rstrip (',' ))
118+ m = RawMessage ()
119+ m .set_body (msg )
120+ result = self ._queues [queue ].write (m )
121+ if not result :
122+ self ._logger .error ('Error occurred sending message to SQS queue {0}. result: {1}' .format (
123+ self ._queue_name , result ))
124+ raise TransportException ('Error occurred sending message to queue {0}' .format (self ._queue_name ))
125+ except Exception , e :
126+ self ._logger .exception ('Exception occurred sending message to SQS queue' )
127+ raise TransportException (e .message )
126128
127129 def _send_message_batch (self , message_batch ):
128- for queue in self ._queue :
130+ for queue in self ._queues :
129131 try :
130132 self ._logger .debug ('Attempting to push batch message to SQS queue: {}' .format (queue ))
131- result = self ._queue [queue ].write_batch (message_batch )
133+ result = self ._queues [queue ].write_batch (message_batch )
132134 if not result :
133135 self ._logger .error ('Error occurred sending messages to SQS queue {0}. result: {1}' .format (
134136 queue , result ))
0 commit comments