diff --git a/beaver/tests/test_kinesis_transport.py b/beaver/tests/test_kinesis_transport.py new file mode 100644 index 00000000..dbea972e --- /dev/null +++ b/beaver/tests/test_kinesis_transport.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + +import mock +import tempfile +import logging + +import beaver +from beaver.config import BeaverConfig +from beaver.transports import create_transport +from beaver.unicode_dammit import unicode_dammit + +from fixtures import Fixture + +from moto import mock_kinesis +import boto.kinesis + +class KinesisTests(unittest.TestCase): + + @mock_kinesis + def _create_streams(self): + conn = boto.kinesis.connect_to_region("us-east-1") + conn.create_stream("stream1", 1) + conn.create_stream("stream2", 1) + + @classmethod + def setUpClass(cls): + cls.logger = logging.getLogger(__name__) + + empty_conf = tempfile.NamedTemporaryFile(delete=True) + cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name)) + cls.beaver_config.set('transport', 'kinesis') + cls.beaver_config.set('logstash_version', 1) + + output_file = Fixture.download_official_distribution() + Fixture.extract_distribution(output_file) + + @mock_kinesis + def test_kinesis_default_auth_profile(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_auth_profile(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', 'beaver_stream') + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + @mock_kinesis + def test_kinesis_auth_key(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_auth_account_id(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream_owner_acct_id', 'abc123') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', 'beaver_test_key') + self.beaver_config.set('kinesis_aws_secret_key', 'beaver_test_secret') + self.beaver_config.set('kinesis_aws_stream', 'stream1') + + transport = create_transport(self.beaver_config, logger=self.logger) + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + transport.interrupt() + + @mock_kinesis + def test_kinesis_send_stream(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream', 'stream1') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_bulk_lines', False) + + transport = create_transport(self.beaver_config, logger=self.logger) + mock_send_batch = mock.Mock() + transport._send_message_batch = mock_send_batch + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + data = {} + lines = [] + n=500 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + self.assertTrue(transport.callback("test.log", **data)) + self.assertEqual(1, mock_send_batch.call_count) + + + @mock_kinesis + def test_kinesis_send_stream_with_record_count_cutoff(self): + self._create_streams() + self.beaver_config.set('kinesis_aws_stream', 'stream1') + self.beaver_config.set('kinesis_aws_profile_name', None) + self.beaver_config.set('kinesis_aws_access_key', None) + self.beaver_config.set('kinesis_aws_secret_key', None) + self.beaver_config.set('kinesis_bulk_lines', False) + + transport = create_transport(self.beaver_config, logger=self.logger) + mock_send_batch = mock.Mock() + transport._send_message_batch = mock_send_batch + + self.assertIsInstance(transport, beaver.transports.kinesis_transport.KinesisTransport) + + data = {} + lines = [] + n = 501 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + self.assertTrue(transport.callback("test.log", **data)) + self.assertEqual(2, mock_send_batch.call_count) diff --git a/beaver/transports/kinesis_transport.py b/beaver/transports/kinesis_transport.py index 46aff6ba..e790a40c 100644 --- a/beaver/transports/kinesis_transport.py +++ b/beaver/transports/kinesis_transport.py @@ -20,6 +20,9 @@ def __init__(self, beaver_config, logger=None): # self-imposed max batch size to minimize the number of records in a given call to Kinesis self._batch_size_max = beaver_config.get('kinesis_aws_batch_size_max', '512000') + # Kinesis Limit http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html#API_PutRecords_RequestSyntax + self._max_records_per_batch = 500 + try: if self._access_key is None and self._secret_key is None: self._connection = boto.kinesis.connect_to_region(self._region) @@ -55,7 +58,7 @@ def callback(self, filename, lines, **kwargs): continue # Check the self-enforced/declared batch size and flush before moving forward if we've eclipsed the max - if (len(message_batch) > 0) and ((message_batch_size + message_size) >= self._batch_size_max): + if len(message_batch) > 0 and ((message_batch_size + message_size) >= self._batch_size_max or len(message_batch) == self._max_records_per_batch): self._logger.debug('Flushing {0} messages to Kinesis stream {1} bytes'.format( len(message_batch), message_batch_size)) self._send_message_batch(message_batch)