Skip to content

Commit a990b59

Browse files
Venkat SundaramVenkat Sundaram
authored andcommitted
Add support for Monasca Log
Monasca is the OpenStack project for MONitoring At SCAle. Monasca also includes a Logging API that supports centralized logging of logs. The monasca-log-api is a OpenStack based microservice and supports Keystone (Identity Management) based authentication. It provides a secure, reliable and multi-tenant based Logging as a service. The flow is as follows: 1. Using the monsaca log configuration properties, get a token from keystone 2. Cache this token for performance 3. Get the monasca-log-api url from the keystone token service catalog. If the monasca_log_hosts is specified in the config, that would be used instead of the keystone catalog. This config property is a list of host names (or IPs) separated by comma 4. If the hosts list is specified, select a random host and form an url using that 5. Use the log url thus arrived at and open a HTTP connection. If it fails, get another random host. Use retries as well 6. Build the monasca-log-api dimensions, token and content-type (HTTP headers) for the request 7. Send the request to the open HTTP connection (using connection pools to reuse connections where possible) 8. Batch the requests to avoid network overhead and improve throughput Added unit tests and refactored for better readability
1 parent 2075547 commit a990b59

8 files changed

Lines changed: 770 additions & 2 deletions

File tree

beaver/config.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def __init__(self, args, logger=None):
2020
'add_field_env': '',
2121
'debug': '0',
2222
'discover_interval': '15',
23+
'inter_pass_interval': '0.1',
2324
'encoding': 'utf_8',
2425

2526
# should be a python regex of files to remove
@@ -132,12 +133,16 @@ def __init__(self, args, logger=None):
132133
# consumer processes
133134
'number_of_consumer_processes': '1',
134135

136+
# bytes to read from a file at a time
137+
'bytes_to_read': '4096',
138+
135139
# interprocess queue max size before puts block
136140
'max_queue_size': '100',
137141

138142
# time in seconds before updating the file mapping
139143
'update_file_mapping_time': '', # deprecated
140144
'discover_interval': '15',
145+
'inter_pass_interval': '0.1',
141146

142147
# time in seconds from last command sent before a queue kills itself
143148
'queue_timeout': '60',
@@ -185,6 +190,7 @@ def __init__(self, args, logger=None):
185190

186191
# Ignore files older then n days, use 0 to disable
187192
'ignore_old_files': 0
193+
188194
}
189195

190196
self._configfile = args.config
@@ -331,6 +337,7 @@ def _main_parser(config):
331337
'kafka_batch_t',
332338
'kafka_ack_timeout',
333339
'number_of_consumer_processes',
340+
'bytes_to_read',
334341
'ignore_old_files'
335342
]
336343
for key in require_int:
@@ -340,6 +347,7 @@ def _main_parser(config):
340347
require_float = [
341348
'update_file_mapping_time',
342349
'discover_interval',
350+
'inter_pass_interval',
343351
]
344352

345353
for key in require_float:

beaver/dispatcher/tail.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ def create_queue_producer():
8080
callback=queue_put,
8181
logger=logger
8282
)
83-
manager.run()
83+
inter_pass_interval = beaver_config.get('inter_pass_interval')
84+
manager.run(interval=inter_pass_interval)
8485

8586
while 1:
8687

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
# -*- coding: utf-8 -*-
2+
import sys
3+
if sys.version_info < (2, 7):
4+
import unittest2 as unittest
5+
else:
6+
import unittest
7+
8+
import socket
9+
import mock
10+
import tempfile
11+
import logging
12+
import httpretty
13+
14+
import beaver
15+
from beaver.config import BeaverConfig
16+
from beaver.transports import create_transport
17+
from beaver.transports.exception import TransportException
18+
19+
from beaver.unicode_dammit import unicode_dammit
20+
21+
from fixtures import Fixture, ZookeeperFixture, KafkaFixture
22+
23+
from beaver.transports.monascalog_transport import MonascalogTransport
24+
from beaver.transports.monascalog_transport import MonascaLogRecord
25+
from beaver.transports.monascalog_transport import MonascaLogger
26+
27+
28+
logging.basicConfig(loglevel=logging.DEBUG)
29+
logger = logging.getLogger("test")
30+
31+
class MonascaLogRecordTest(unittest.TestCase):
32+
33+
@classmethod
34+
def SetUpClass(cls):
35+
pass
36+
37+
@classmethod
38+
def TearDownClass(cls):
39+
pass
40+
41+
def test_plain_text_format(self):
42+
# test a simple plain text log
43+
record = MonascaLogRecord("simple log line")
44+
self.assertEqual(record.is_json(), False)
45+
46+
def test_json_format(self):
47+
# test a simple json log line
48+
record = MonascaLogRecord('{"message": "simple log line", "type": "keystone"}',
49+
is_json=True,
50+
logger=logger)
51+
self.assertEqual(record.is_json(), True)
52+
53+
# get the json payload and make sure it is formatted in a way log api
54+
# expects it
55+
json_line = record.to_json()
56+
self.assertIn("dimensions", json_line.keys())
57+
self.assertIn("type", json_line["dimensions"].keys())
58+
self.assertEqual(json_line["message"], "simple log line")
59+
self.assertEqual(record.is_valid(), True)
60+
61+
62+
class MonascaLoggerTest(unittest.TestCase):
63+
64+
@classmethod
65+
def SetUpClass(cls):
66+
pass
67+
68+
@classmethod
69+
def TearDownClass(cls):
70+
pass
71+
72+
def test_construction(self):
73+
m_logger = MonascaLogger(logger=logger)
74+
self.assertIsInstance(m_logger, MonascaLogger)
75+
self.assertEqual(m_logger._enable_batching, True)
76+
77+
# using localhost to prevent proxy issues with httpretty
78+
#LOG_URL = "http://www.logapi.com:5607/v3.0/logs"
79+
LOG_URL = "http://localhost:5607/v3.0/logs"
80+
81+
class MonascalogTests(unittest.TestCase):
82+
83+
@classmethod
84+
def setUpClass(cls):
85+
cls.logger = logging.getLogger(__name__)
86+
87+
empty_conf = tempfile.NamedTemporaryFile(delete=True)
88+
cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name))
89+
cls.server_host = "localhost"
90+
cls.server_port = 5607
91+
cls.keystone_auth_url = "https://www.keystone.com:5000/v3.0/auth/tokens"
92+
cls.keystone_user = "logger"
93+
cls.keystone_password = "logger"
94+
cls.keystone_project_name = "logger"
95+
cls.keystone_domain_name = "logger"
96+
cls.log_requests = []
97+
cls.hostname = socket.gethostname()
98+
99+
@classmethod
100+
def tearDownClass(cls):
101+
pass
102+
103+
@httpretty.activate
104+
@mock.patch('beaver.transports.monascalog_transport.KeystoneClientHelper.get_token_and_log_url', return_value=("1234",LOG_URL))
105+
def test_monascalog(cls, token_mock):
106+
107+
# dynamic callback to verify the log messages sent by the transport
108+
def request_callback(request, uri, headers):
109+
cls.log_requests.append(request.parsed_body)
110+
return (204, headers, "created")
111+
112+
# fake the first get call that is used for checking connection
113+
httpretty.register_uri(httpretty.GET, LOG_URL, status=405, body="Method Not Allowed")
114+
httpretty.register_uri(httpretty.POST, LOG_URL, status=204, body="Created")
115+
cls.beaver_config.set('transport', 'monascalog')
116+
cls.beaver_config.set('logstash_version', 1)
117+
#cls.beaver_config.set('monascalog_hosts', "{}:{}".format(cls.server_host, cls.server_port))
118+
cls.beaver_config.set('monascalog_max_retries', 3)
119+
cls.beaver_config.set('monascalog_auth_url', cls.keystone_auth_url)
120+
cls.beaver_config.set('monascalog_user_name', cls.keystone_user)
121+
cls.beaver_config.set('monascalog_password', cls.keystone_password)
122+
cls.beaver_config.set('monascalog_project_name', cls.keystone_project_name)
123+
cls.beaver_config.set('monascalog_domain_name', cls.keystone_domain_name)
124+
125+
transport = create_transport(cls.beaver_config, logger=cls.logger)
126+
127+
cls.assertIsInstance(transport, beaver.transports.monascalog_transport.MonascalogTransport)
128+
cls.assertEqual(transport.valid(), True)
129+
130+
data = {}
131+
lines = []
132+
n=100
133+
for i in range(n):
134+
lines.append('log' + str(i) + '\n')
135+
new_lines = []
136+
for line in lines:
137+
message = unicode_dammit(line)
138+
if len(message) == 0:
139+
continue
140+
new_lines.append(message)
141+
data['lines'] = new_lines
142+
data['fields'] = []
143+
cls.assertEqual(transport.callback("test.log", **data), True)
144+
145+
# Fake a log api failure
146+
httpretty.reset()
147+
httpretty.register_uri(httpretty.POST, LOG_URL, status=500, body="Internal Server Error")
148+
cls.assertRaises(TransportException, transport.callback, "test.log", **data)
149+
150+
# simulate a single failure followed by success, to test if retry works
151+
httpretty.reset()
152+
httpretty.register_uri(httpretty.POST, LOG_URL, responses=[
153+
httpretty.Response(status=503, body="Service Unavailable"),
154+
httpretty.Response(status=204, body="Created")
155+
])
156+
cls.assertEqual(transport.callback("test.log", **data), True)
157+
158+
# next, test if the logs made it to the server
159+
httpretty.reset()
160+
httpretty.register_uri(httpretty.POST, LOG_URL, body=request_callback)
161+
# clear logs from previous tests
162+
del cls.log_requests[:]
163+
cls.assertEqual(transport.callback("test.log", **data), True)
164+
cls._consume_messages(n)
165+
166+
# repeat same test, but with batching turned off
167+
cls.beaver_config.set('monascalog_enable_batching', False)
168+
httpretty.reset()
169+
httpretty.register_uri(httpretty.POST, LOG_URL, body=request_callback)
170+
# fake the first get call that is used for checking connection
171+
httpretty.register_uri(httpretty.GET, LOG_URL, status=405, body="Method Not Allowed")
172+
transport = create_transport(cls.beaver_config, logger=cls.logger)
173+
# clear logs from previous tests
174+
del cls.log_requests[:]
175+
cls.assertEqual(transport.callback("test.log", **data), True)
176+
cls._consume_messages(n, batching=False)
177+
transport.interrupt()
178+
179+
def _consume_messages(cls, number_of_messages, batching=True):
180+
messages = cls.log_requests
181+
for message in messages:
182+
cls.assertIn("dimensions", message)
183+
cls.assertIn("logs", message)
184+
cls.assertIsInstance(message["dimensions"], dict)
185+
dims = message["dimensions"]
186+
cls.assertEqual(dims["file"], "test.log")
187+
cls.assertEqual(dims["host"], cls.hostname)
188+
cls.assertIsInstance(message["logs"], list)
189+
for log in message["logs"]:
190+
cls.assertIn("message", log)
191+
if not log["message"]:
192+
cls.fail("Log message is empty")
193+
#print(message)
194+
#print('\n')
195+
196+
if not batching:
197+
cls.assertEqual(len(messages), number_of_messages)
198+
else:
199+
msg = messages[0]
200+
cls.assertEqual(len(msg["logs"]), number_of_messages)

0 commit comments

Comments
 (0)