Skip to content

Commit fe91d3e

Browse files
committed
Merge pull request python-beaver#367 from josegonzalez/sqs_tests
SQS tests
2 parents b394cce + 8b7c394 commit fe91d3e

5 files changed

Lines changed: 274 additions & 5 deletions

File tree

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ addons:
1111
packages:
1212
- build-essential
1313
- libzmq-dev
14-
install: "pip install -r requirements/zeromq.txt -r requirements/tests.txt --use-mirrors"
14+
install: ./install-dependencies.sh
1515
script:
1616
nosetests --with-coverage --cover-package=beaver
1717
after_success:

beaver/tests/test_sqs_transport.py

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
# -*- coding: utf-8 -*-
2+
import sys
3+
if sys.version_info < (2, 7):
4+
import unittest2 as unittest
5+
else:
6+
import unittest
7+
8+
import mock
9+
import tempfile
10+
import logging
11+
12+
import beaver
13+
from beaver.config import BeaverConfig
14+
from beaver.transports import create_transport
15+
from beaver.unicode_dammit import unicode_dammit
16+
17+
from fixtures import Fixture
18+
19+
from moto import mock_sqs
20+
import boto.sqs
21+
22+
class SqsTests(unittest.TestCase):
23+
24+
@mock_sqs
25+
def _create_queues(cls):
26+
conn = boto.sqs.connect_to_region("us-east-1")
27+
conn.create_queue("queue1")
28+
conn.create_queue("queue2")
29+
30+
@classmethod
31+
def setUpClass(cls):
32+
cls.logger = logging.getLogger(__name__)
33+
34+
empty_conf = tempfile.NamedTemporaryFile(delete=True)
35+
cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name))
36+
cls.beaver_config.set('transport', 'sqs')
37+
cls.beaver_config.set('logstash_version', 1)
38+
39+
output_file = Fixture.download_official_distribution()
40+
Fixture.extract_distribution(output_file)
41+
42+
@mock_sqs
43+
def test_sqs_default_auth_profile(cls):
44+
cls._create_queues()
45+
cls.beaver_config.set('sqs_aws_profile_name', None)
46+
cls.beaver_config.set('sqs_aws_access_key', None)
47+
cls.beaver_config.set('sqs_aws_secret_key', None)
48+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
49+
50+
transport = create_transport(cls.beaver_config, logger=cls.logger)
51+
52+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
53+
transport.interrupt()
54+
55+
@mock_sqs
56+
def test_sqs_auth_profile(cls):
57+
cls._create_queues()
58+
cls.beaver_config.set('sqs_aws_profile_name', 'beaver_queue')
59+
cls.beaver_config.set('sqs_aws_access_key', None)
60+
cls.beaver_config.set('sqs_aws_secret_key', None)
61+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
62+
63+
transport = create_transport(cls.beaver_config, logger=cls.logger)
64+
65+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
66+
67+
@mock_sqs
68+
def test_sqs_auth_key(cls):
69+
cls._create_queues()
70+
cls.beaver_config.set('sqs_aws_profile_name', None)
71+
cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key')
72+
cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret')
73+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
74+
75+
transport = create_transport(cls.beaver_config, logger=cls.logger)
76+
77+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
78+
transport.interrupt()
79+
80+
@mock_sqs
81+
def test_sqs_auth_account_id(cls):
82+
cls._create_queues()
83+
cls.beaver_config.set('sqs_aws_queue_owner_acct_id', 'abc123')
84+
cls.beaver_config.set('sqs_aws_profile_name', None)
85+
cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key')
86+
cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret')
87+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
88+
89+
transport = create_transport(cls.beaver_config, logger=cls.logger)
90+
91+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
92+
transport.interrupt()
93+
94+
@mock_sqs
95+
def test_sqs_single_queue(cls):
96+
cls._create_queues()
97+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
98+
cls.beaver_config.set('sqs_aws_profile_name', None)
99+
cls.beaver_config.set('sqs_aws_access_key', None)
100+
cls.beaver_config.set('sqs_aws_secret_key', None)
101+
102+
transport = create_transport(cls.beaver_config, logger=cls.logger)
103+
104+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
105+
transport.interrupt()
106+
107+
@mock_sqs
108+
def test_sqs_single_queue_bulklines(cls):
109+
cls._create_queues()
110+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
111+
cls.beaver_config.set('sqs_aws_profile_name', None)
112+
cls.beaver_config.set('sqs_aws_access_key', None)
113+
cls.beaver_config.set('sqs_aws_secret_key', None)
114+
cls.beaver_config.set('sqs_bulk_lines', True)
115+
116+
transport = create_transport(cls.beaver_config, logger=cls.logger)
117+
118+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
119+
transport.interrupt()
120+
121+
@mock_sqs
122+
def test_sqs_multi_queue(cls):
123+
cls._create_queues()
124+
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
125+
cls.beaver_config.set('sqs_aws_profile_name', None)
126+
cls.beaver_config.set('sqs_aws_access_key', None)
127+
cls.beaver_config.set('sqs_aws_secret_key', None)
128+
cls.beaver_config.set('sqs_bulk_lines', False)
129+
130+
transport = create_transport(cls.beaver_config, logger=cls.logger)
131+
132+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
133+
transport.interrupt()
134+
135+
@mock_sqs
136+
def test_sqs_multi_queue_bulklines(cls):
137+
cls._create_queues()
138+
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
139+
cls.beaver_config.set('sqs_aws_profile_name', None)
140+
cls.beaver_config.set('sqs_aws_access_key', None)
141+
cls.beaver_config.set('sqs_aws_secret_key', None)
142+
cls.beaver_config.set('sqs_bulk_lines', True)
143+
144+
transport = create_transport(cls.beaver_config, logger=cls.logger)
145+
146+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
147+
transport.interrupt()
148+
149+
@mock_sqs
150+
def test_sqs_send_single_queue(cls):
151+
cls._create_queues()
152+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
153+
cls.beaver_config.set('sqs_aws_profile_name', None)
154+
cls.beaver_config.set('sqs_aws_access_key', None)
155+
cls.beaver_config.set('sqs_aws_secret_key', None)
156+
cls.beaver_config.set('sqs_bulk_lines', False)
157+
158+
transport = create_transport(cls.beaver_config, logger=cls.logger)
159+
160+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
161+
162+
data = {}
163+
lines = []
164+
n=100
165+
for i in range(n):
166+
lines.append('log' + str(i) + '\n')
167+
new_lines = []
168+
for line in lines:
169+
message = unicode_dammit(line)
170+
if len(message) == 0:
171+
continue
172+
new_lines.append(message)
173+
data['lines'] = new_lines
174+
data['fields'] = []
175+
transport.callback("test.log", **data)
176+
177+
@mock_sqs
178+
def test_sqs_send_multi_queue(cls):
179+
cls._create_queues()
180+
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
181+
cls.beaver_config.set('sqs_aws_profile_name', None)
182+
cls.beaver_config.set('sqs_aws_access_key', None)
183+
cls.beaver_config.set('sqs_aws_secret_key', None)
184+
cls.beaver_config.set('sqs_bulk_lines', False)
185+
186+
transport = create_transport(cls.beaver_config, logger=cls.logger)
187+
188+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
189+
190+
data = {}
191+
lines = []
192+
n=100
193+
for i in range(n):
194+
lines.append('log' + str(i) + '\n')
195+
new_lines = []
196+
for line in lines:
197+
message = unicode_dammit(line)
198+
if len(message) == 0:
199+
continue
200+
new_lines.append(message)
201+
data['lines'] = new_lines
202+
data['fields'] = []
203+
transport.callback("test.log", **data)
204+
205+
@mock_sqs
206+
def test_sqs_send_single_queue_bulklines(cls):
207+
cls._create_queues()
208+
cls.beaver_config.set('sqs_aws_queue', 'queue1')
209+
cls.beaver_config.set('sqs_aws_profile_name', None)
210+
cls.beaver_config.set('sqs_aws_access_key', None)
211+
cls.beaver_config.set('sqs_aws_secret_key', None)
212+
cls.beaver_config.set('sqs_bulk_lines', True)
213+
214+
transport = create_transport(cls.beaver_config, logger=cls.logger)
215+
216+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
217+
218+
data = {}
219+
lines = []
220+
n=100
221+
for i in range(n):
222+
lines.append('log' + str(i) + '\n')
223+
new_lines = []
224+
for line in lines:
225+
message = unicode_dammit(line)
226+
if len(message) == 0:
227+
continue
228+
new_lines.append(message)
229+
data['lines'] = new_lines
230+
data['fields'] = []
231+
transport.callback("test.log", **data)
232+
233+
@mock_sqs
234+
def test_sqs_send_multi_queue_bulklines(cls):
235+
cls._create_queues()
236+
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
237+
cls.beaver_config.set('sqs_aws_profile_name', None)
238+
cls.beaver_config.set('sqs_aws_access_key', None)
239+
cls.beaver_config.set('sqs_aws_secret_key', None)
240+
cls.beaver_config.set('sqs_bulk_lines', True)
241+
242+
transport = create_transport(cls.beaver_config, logger=cls.logger)
243+
244+
cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
245+
246+
data = {}
247+
lines = []
248+
n=100
249+
for i in range(n):
250+
lines.append('log' + str(i) + '\n')
251+
new_lines = []
252+
for line in lines:
253+
message = unicode_dammit(line)
254+
if len(message) == 0:
255+
continue
256+
new_lines.append(message)
257+
data['lines'] = new_lines
258+
data['fields'] = []
259+
transport.callback("test.log", **data)

beaver/transports/sqs_transport.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(self, beaver_config, logger=None):
3737

3838
self._queues = {}
3939
for queue in self._queue:
40-
self._logger.debug('Attempting to load SQS queue: {}'.format(queue))
40+
self._logger.debug('Attempting to load SQS queue: {0}'.format(queue))
4141
if self._queue_owner_acct_id is None:
4242
self._queues[queue] = self._connection.get_queue(queue)
4343
else:
@@ -47,7 +47,7 @@ def __init__(self, beaver_config, logger=None):
4747
if self._queues[queue] is None:
4848
raise TransportException('Unable to access queue with name {0}'.format(queue))
4949

50-
self._logger.debug('Successfully loaded SQS queue: {}'.format(queue))
50+
self._logger.debug('Successfully loaded SQS queue: {0}'.format(queue))
5151
except Exception, e:
5252
raise TransportException(e.message)
5353

@@ -129,13 +129,13 @@ def _send_message(self, msg):
129129
def _send_message_batch(self, message_batch):
130130
for queue in self._queues:
131131
try:
132-
self._logger.debug('Attempting to push batch message to SQS queue: {}'.format(queue))
132+
self._logger.debug('Attempting to push batch message to SQS queue: {0}'.format(queue))
133133
result = self._queues[queue].write_batch(message_batch)
134134
if not result:
135135
self._logger.error('Error occurred sending messages to SQS queue {0}. result: {1}'.format(
136136
queue, result))
137137
raise TransportException('Error occurred sending message to queue {0}'.format(queue))
138-
self._logger.debug('Successfully pushed batch message to SQS queue: {}'.format(queue))
138+
self._logger.debug('Successfully pushed batch message to SQS queue: {0}'.format(queue))
139139
except Exception, e:
140140
self._logger.exception('Exception occurred sending batch to SQS queue')
141141
raise TransportException(e.message)

install-dependencies.sh

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#!/usr/bin/env bash
2+
pip install -r requirements/zeromq.txt -r requirements/tests.txt --use-mirrors
3+
4+
mkdir ~/.aws/
5+
cat > ~/.aws/credentials << EOL
6+
[beaver_queue]
7+
aws_access_key_id = 111
8+
aws_secret_access_key = 1111
9+
EOL

requirements/tests.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ nose
55
six
66
unittest2
77
coveralls
8+
moto

0 commit comments

Comments
 (0)