Skip to content

Commit e322758

Browse files
author
Jamie Cressey
committed
Write to the SQS object not the dict when using sqs_bulk_lines flag
1 parent 20a97e0 commit e322758

1 file changed

Lines changed: 22 additions & 20 deletions

File tree

beaver/transports/sqs_transport.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def __init__(self, beaver_config, logger=None):
1717
self._profile = beaver_config.get('sqs_aws_profile_name')
1818
self._region = beaver_config.get('sqs_aws_region')
1919
self._queue_owner_acct_id = beaver_config.get('sqs_aws_queue_owner_acct_id')
20-
self._queues = beaver_config.get('sqs_aws_queue').split(',')
20+
self._queue = beaver_config.get('sqs_aws_queue').split(',')
2121
self._bulk_lines = beaver_config.get('sqs_bulk_lines')
2222

2323
try:
@@ -35,16 +35,16 @@ def __init__(self, beaver_config, logger=None):
3535
self._logger.warn('Unable to connect to AWS - check your AWS credentials')
3636
raise TransportException('Unable to connect to AWS - check your AWS credentials')
3737

38-
self._queue = {}
39-
for queue in self._queues:
38+
self._queues = {}
39+
for queue in self._queue:
4040
self._logger.debug('Attempting to load SQS queue: {}'.format(queue))
4141
if self._queue_owner_acct_id is None:
42-
self._queue[queue] = self._connection.get_queue(queue)
42+
self._queues[queue] = self._connection.get_queue(queue)
4343
else:
44-
self._queue[queue] = self._connection.get_queue(queue,
44+
self._queues[queue] = self._connection.get_queue(queue,
4545
owner_acct_id=self._queue_owner_acct_id)
4646

47-
if self._queue[queue] is None:
47+
if self._queues[queue] is None:
4848
raise TransportException('Unable to access queue with name {0}'.format(queue))
4949

5050
self._logger.debug('Successfully loaded SQS queue: {}'.format(queue))
@@ -85,6 +85,7 @@ def callback(self, filename, lines, **kwargs):
8585
message_batch = ''
8686
message_count = 0
8787
message_batch_size = 0
88+
8889
# SQS can only handle up to 10 messages in batch send and it can not exceed 256KiB (see above)
8990
elif (len(message_batch) > 0) and (((message_batch_size + message_size) >= message_batch_size_max) or (len(message_batch) == 10)):
9091
self._logger.debug('Flushing {0} messages to SQS queue {1} bytes'.format(len(message_batch), message_batch_size))
@@ -111,24 +112,25 @@ def callback(self, filename, lines, **kwargs):
111112
return True
112113

113114
def _send_message(self, msg):
114-
try:
115-
msg = '[{0}]'.format(msg.rstrip(','))
116-
m = RawMessage()
117-
m.set_body(msg)
118-
result = self._queue.write(m)
119-
if not result:
120-
self._logger.error('Error occurred sending message to SQS queue {0}. result: {1}'.format(
121-
self._queue_name, result))
122-
raise TransportException('Error occurred sending message to queue {0}'.format(self._queue_name))
123-
except Exception, e:
124-
self._logger.exception('Exception occurred sending message to SQS queue')
125-
raise TransportException(e.message)
115+
for queue in self._queues:
116+
try:
117+
msg = '[{0}]'.format(msg.rstrip(','))
118+
m = RawMessage()
119+
m.set_body(msg)
120+
result = self._queues[queue].write(m)
121+
if not result:
122+
self._logger.error('Error occurred sending message to SQS queue {0}. result: {1}'.format(
123+
self._queue_name, result))
124+
raise TransportException('Error occurred sending message to queue {0}'.format(self._queue_name))
125+
except Exception, e:
126+
self._logger.exception('Exception occurred sending message to SQS queue')
127+
raise TransportException(e.message)
126128

127129
def _send_message_batch(self, message_batch):
128-
for queue in self._queue:
130+
for queue in self._queues:
129131
try:
130132
self._logger.debug('Attempting to push batch message to SQS queue: {}'.format(queue))
131-
result = self._queue[queue].write_batch(message_batch)
133+
result = self._queues[queue].write_batch(message_batch)
132134
if not result:
133135
self._logger.error('Error occurred sending messages to SQS queue {0}. result: {1}'.format(
134136
queue, result))

0 commit comments

Comments
 (0)