Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ addons:
packages:
- build-essential
- libzmq-dev
install: "pip install -r requirements/zeromq.txt -r requirements/tests.txt --use-mirrors"
install: ./install-dependencies.sh
script:
nosetests --with-coverage --cover-package=beaver
after_success:
Expand Down
259 changes: 259 additions & 0 deletions beaver/tests/test_sqs_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# -*- coding: utf-8 -*-
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest

import mock
import tempfile
import logging

import beaver
from beaver.config import BeaverConfig
from beaver.transports import create_transport
from beaver.unicode_dammit import unicode_dammit

from fixtures import Fixture

from moto import mock_sqs
import boto.sqs

class SqsTests(unittest.TestCase):

@mock_sqs
def _create_queues(cls):
conn = boto.sqs.connect_to_region("us-east-1")
conn.create_queue("queue1")
conn.create_queue("queue2")

@classmethod
def setUpClass(cls):
cls.logger = logging.getLogger(__name__)

empty_conf = tempfile.NamedTemporaryFile(delete=True)
cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name))
cls.beaver_config.set('transport', 'sqs')
cls.beaver_config.set('logstash_version', 1)

output_file = Fixture.download_official_distribution()
Fixture.extract_distribution(output_file)

@mock_sqs
def test_sqs_default_auth_profile(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_aws_queue', 'queue1')

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_auth_profile(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_profile_name', 'beaver_queue')
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_aws_queue', 'queue1')

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)

@mock_sqs
def test_sqs_auth_key(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key')
cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret')
cls.beaver_config.set('sqs_aws_queue', 'queue1')

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_auth_account_id(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue_owner_acct_id', 'abc123')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', 'beaver_test_key')
cls.beaver_config.set('sqs_aws_secret_key', 'beaver_test_secret')
cls.beaver_config.set('sqs_aws_queue', 'queue1')

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_single_queue(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_single_queue_bulklines(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', True)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_multi_queue(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', False)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_multi_queue_bulklines(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', True)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)
transport.interrupt()

@mock_sqs
def test_sqs_send_single_queue(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', False)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)

data = {}
lines = []
n=100
for i in range(n):
lines.append('log' + str(i) + '\n')
new_lines = []
for line in lines:
message = unicode_dammit(line)
if len(message) == 0:
continue
new_lines.append(message)
data['lines'] = new_lines
data['fields'] = []
transport.callback("test.log", **data)

@mock_sqs
def test_sqs_send_multi_queue(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', False)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)

data = {}
lines = []
n=100
for i in range(n):
lines.append('log' + str(i) + '\n')
new_lines = []
for line in lines:
message = unicode_dammit(line)
if len(message) == 0:
continue
new_lines.append(message)
data['lines'] = new_lines
data['fields'] = []
transport.callback("test.log", **data)

@mock_sqs
def test_sqs_send_single_queue_bulklines(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', True)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)

data = {}
lines = []
n=100
for i in range(n):
lines.append('log' + str(i) + '\n')
new_lines = []
for line in lines:
message = unicode_dammit(line)
if len(message) == 0:
continue
new_lines.append(message)
data['lines'] = new_lines
data['fields'] = []
transport.callback("test.log", **data)

@mock_sqs
def test_sqs_send_multi_queue_bulklines(cls):
cls._create_queues()
cls.beaver_config.set('sqs_aws_queue', 'queue1,queue2')
cls.beaver_config.set('sqs_aws_profile_name', None)
cls.beaver_config.set('sqs_aws_access_key', None)
cls.beaver_config.set('sqs_aws_secret_key', None)
cls.beaver_config.set('sqs_bulk_lines', True)

transport = create_transport(cls.beaver_config, logger=cls.logger)

cls.assertIsInstance(transport, beaver.transports.sqs_transport.SqsTransport)

data = {}
lines = []
n=100
for i in range(n):
lines.append('log' + str(i) + '\n')
new_lines = []
for line in lines:
message = unicode_dammit(line)
if len(message) == 0:
continue
new_lines.append(message)
data['lines'] = new_lines
data['fields'] = []
transport.callback("test.log", **data)
8 changes: 4 additions & 4 deletions beaver/transports/sqs_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, beaver_config, logger=None):

self._queues = {}
for queue in self._queue:
self._logger.debug('Attempting to load SQS queue: {}'.format(queue))
self._logger.debug('Attempting to load SQS queue: {0}'.format(queue))
if self._queue_owner_acct_id is None:
self._queues[queue] = self._connection.get_queue(queue)
else:
Expand All @@ -47,7 +47,7 @@ def __init__(self, beaver_config, logger=None):
if self._queues[queue] is None:
raise TransportException('Unable to access queue with name {0}'.format(queue))

self._logger.debug('Successfully loaded SQS queue: {}'.format(queue))
self._logger.debug('Successfully loaded SQS queue: {0}'.format(queue))
except Exception, e:
raise TransportException(e.message)

Expand Down Expand Up @@ -129,13 +129,13 @@ def _send_message(self, msg):
def _send_message_batch(self, message_batch):
for queue in self._queues:
try:
self._logger.debug('Attempting to push batch message to SQS queue: {}'.format(queue))
self._logger.debug('Attempting to push batch message to SQS queue: {0}'.format(queue))
result = self._queues[queue].write_batch(message_batch)
if not result:
self._logger.error('Error occurred sending messages to SQS queue {0}. result: {1}'.format(
queue, result))
raise TransportException('Error occurred sending message to queue {0}'.format(queue))
self._logger.debug('Successfully pushed batch message to SQS queue: {}'.format(queue))
self._logger.debug('Successfully pushed batch message to SQS queue: {0}'.format(queue))
except Exception, e:
self._logger.exception('Exception occurred sending batch to SQS queue')
raise TransportException(e.message)
Expand Down
9 changes: 9 additions & 0 deletions install-dependencies.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash
pip install -r requirements/zeromq.txt -r requirements/tests.txt --use-mirrors

mkdir ~/.aws/
cat > ~/.aws/credentials << EOL
[beaver_queue]
aws_access_key_id = 111
aws_secret_access_key = 1111
EOL
1 change: 1 addition & 0 deletions requirements/tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ nose
six
unittest2
coveralls
moto