Skip to content

Commit 5642b5b

Browse files
committed
General code reorganization
Move both BeaverConfig and FileConfig into a single class Consolidated run_worker code with code in beaver binary file. This will create a clearer path for Exception handling, as it is now the responsibility of the calling class, allowing us to remove duplicative exception handling code. Added docstrings to many fuctions and methods Moved extra configuration and setup code to beaver.utils module. In many cases, code was added hastily before. Made many logger calls debug as opposed to info. The info level should be generally reserved for instances where files are watched, unwatched, or some change in the file state has occurred.
1 parent 4669abb commit 5642b5b

4 files changed

Lines changed: 355 additions & 225 deletions

File tree

Lines changed: 199 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
import socket
44
import warnings
55

6+
from beaver.utils import eglob
7+
68

79
class BeaverConfig():
810

9-
def __init__(self, args, logger):
11+
def __init__(self, args, file_config=None, logger=None):
1012
self._logger = logger
11-
self._logger.info('Processing beaver portion of config file %s' % args.config)
13+
self._logger.debug('Processing beaver portion of config file %s' % args.config)
1214

1315
self._beaver_defaults = {
1416
'rabbitmq_host': os.environ.get('RABBITMQ_HOST', 'localhost'),
@@ -53,6 +55,70 @@ def __init__(self, args, logger):
5355
for key in self._beaver_config:
5456
self._logger.debug("[CONFIG] '{0}' => '{1}'".format(key, self._beaver_config.get(key)))
5557

58+
if file_config is not None:
59+
self._update_files(file_config)
60+
61+
self._check_for_deprecated_usage()
62+
63+
def beaver_config(self):
64+
return self._beaver_config
65+
66+
def get(self, key, default=None):
67+
return self._beaver_config.get(key, default)
68+
69+
def set(self, key, value):
70+
self._beaver_config[key] = value
71+
72+
def use_ssh_tunnel(self):
73+
required = [
74+
'ssh_key_file',
75+
'ssh_tunnel',
76+
'ssh_tunnel_port',
77+
'ssh_remote_host',
78+
'ssh_remote_port',
79+
]
80+
81+
has = len(filter(lambda x: self.get(x) != None, required))
82+
if has > 0 and has != len(required):
83+
self._logger.warning("Missing {0} of {1} required config variables for ssh".format(len(required) - has, len(required)))
84+
85+
return has == len(required)
86+
87+
def _check_for_deprecated_usage(self):
88+
env_vars = [
89+
'RABBITMQ_HOST',
90+
'RABBITMQ_PORT',
91+
'RABBITMQ_VHOST',
92+
'RABBITMQ_USERNAME',
93+
'RABBITMQ_PASSWORD',
94+
'RABBITMQ_QUEUE',
95+
'RABBITMQ_EXCHANGE_TYPE',
96+
'RABBITMQ_EXCHANGE_DURABLE',
97+
'RABBITMQ_KEY',
98+
'RABBITMQ_EXCHANGE',
99+
'REDIS_URL',
100+
'REDIS_NAMESPACE',
101+
'UDP_HOST',
102+
'UDP_PORT',
103+
'ZEROMQ_ADDRESS',
104+
'BEAVER_FILES',
105+
'BEAVER_FORMAT',
106+
'BEAVER_MODE',
107+
'BEAVER_PATH',
108+
'BEAVER_TRANSPORT',
109+
]
110+
111+
deprecated_env_var_usage = []
112+
113+
for e in env_vars:
114+
v = os.environ.get(e, None)
115+
if v is not None:
116+
deprecated_env_var_usage.append(e)
117+
118+
if len(deprecated_env_var_usage) > 0:
119+
warnings.simplefilter('default')
120+
warnings.warn('ENV Variable support will be removed by version 20. Stop using: {0}'.format(", ".join(deprecated_env_var_usage)), DeprecationWarning)
121+
56122
def _parse_beaver_config(self, args):
57123
"""Parses the configuration file for beaver configuration info
58124
@@ -104,16 +170,7 @@ def _parse_beaver_config(self, args):
104170

105171
return config
106172

107-
def get(self, key, default=None):
108-
return self._beaver_config.get(key, default)
109-
110-
def set(self, key, value):
111-
self._beaver_config[key] = value
112-
113-
def beaver_config(self):
114-
return self._beaver_config
115-
116-
def update_files(self, config):
173+
def _update_files(self, config):
117174
globs = self.get('files', default=[])
118175
files = self.get('files', default=[])
119176

@@ -127,52 +184,139 @@ def update_files(self, config):
127184
self.set('globs', globs)
128185
self.set('files', files)
129186

130-
def use_ssh_tunnel(self):
131-
required = [
132-
'ssh_key_file',
133-
'ssh_tunnel',
134-
'ssh_tunnel_port',
135-
'ssh_remote_host',
136-
'ssh_remote_port',
137-
]
138187

139-
has = len(filter(lambda x: self.get(x) != None, required))
140-
if has > 0 and has != len(required):
141-
self._logger.warning("Missing {0} of {1} required config variables for ssh".format(len(required) - has, len(required)))
188+
class FileConfig():
189+
'''
190+
Parse a given INI-style config file using ConfigParser module.
191+
Stanza's names match file names, and properties are defaulted as in
192+
http://logstash.net/docs/1.1.1/inputs/file
142193
143-
return has == len(required)
194+
Config file example:
144195
145-
def check_for_deprecated_usage(self):
146-
env_vars = [
147-
'RABBITMQ_HOST',
148-
'RABBITMQ_PORT',
149-
'RABBITMQ_VHOST',
150-
'RABBITMQ_USERNAME',
151-
'RABBITMQ_PASSWORD',
152-
'RABBITMQ_QUEUE',
153-
'RABBITMQ_EXCHANGE_TYPE',
154-
'RABBITMQ_EXCHANGE_DURABLE',
155-
'RABBITMQ_KEY',
156-
'RABBITMQ_EXCHANGE',
157-
'REDIS_URL',
158-
'REDIS_NAMESPACE',
159-
'UDP_HOST',
160-
'UDP_PORT',
161-
'ZEROMQ_ADDRESS',
162-
'BEAVER_FILES',
163-
'BEAVER_FORMAT',
164-
'BEAVER_MODE',
165-
'BEAVER_PATH',
166-
'BEAVER_TRANSPORT',
167-
]
196+
[/var/log/syslog]
197+
type: syslog
198+
tags: sys,main
168199
169-
deprecated_env_var_usage = []
200+
[/var/log/auth]
201+
type: syslog
202+
;tags: auth,main
170203
171-
for e in env_vars:
172-
v = os.environ.get(e, None)
173-
if v is not None:
174-
deprecated_env_var_usage.append(e)
204+
[...]
205+
'''
175206

176-
if len(deprecated_env_var_usage) > 0:
177-
warnings.simplefilter('default')
178-
warnings.warn('ENV Variable support will be removed by version 20. Stop using: {0}'.format(", ".join(deprecated_env_var_usage)), DeprecationWarning)
207+
def __init__(self, args, logger=None):
208+
self._logger = logger
209+
self._logger.debug('Processing file portion of config file %s' % args.config)
210+
211+
self._defaults = {
212+
'add_field': '',
213+
'debug': '',
214+
'discover_interval': '15',
215+
'exclude': '',
216+
'format': '',
217+
'message_format': '',
218+
'sincedb_path': '',
219+
'sincedb_write_interval': '15',
220+
'stat_interval': '1',
221+
'tags': '',
222+
'type': ''
223+
}
224+
225+
self._configfile = args.config
226+
self._config = ConfigParser.ConfigParser(self._defaults)
227+
self._sanitize()
228+
self._files, self._globs = self._parse()
229+
self._default_config = self._gen_config(self._defaults)
230+
self._globbed = []
231+
232+
def get(self, field, filename):
233+
self._logger.debug("Retrieving {0} for {1}".format(field, filename))
234+
return self._files.get(os.path.realpath(filename), self._default_config)[field]
235+
236+
def addglob(self, globname, globbed):
237+
if globname not in self._globbed:
238+
self._logger.debug("Adding glob {0}".format(globname))
239+
config = self._globs.get(globname, self._defaults)
240+
config = self._gen_config(config)
241+
self._globs[globname] = config
242+
for key in config:
243+
self._logger.debug("Config: {0} => {1}".format(key, config[key]))
244+
else:
245+
config = self._globs.get(globname)
246+
247+
for filename in globbed:
248+
self._files[filename] = config
249+
self._globbed.append(globname)
250+
251+
def getfilepaths(self):
252+
return self._files.keys()
253+
254+
def getglobs(self):
255+
return self._globs.keys()
256+
257+
def _gen_config(self, config):
258+
fields = config.get('add_field', '')
259+
if type(fields) != dict:
260+
try:
261+
if type(fields) == str:
262+
fields = filter(None, fields.split(','))
263+
if len(fields) == 0:
264+
config['fields'] = {}
265+
elif (len(fields) % 2) == 1:
266+
raise Exception('Wrong number of values for add_field')
267+
else:
268+
fieldkeys = fields[0::2]
269+
fieldvalues = [[x] for x in fields[1::2]]
270+
config['fields'] = dict(zip(fieldkeys, fieldvalues))
271+
except TypeError:
272+
config['fields'] = {}
273+
274+
if 'add_field' in config:
275+
del config['add_field']
276+
277+
try:
278+
tags = config.get('tags', '')
279+
if type(tags) == str:
280+
tags = filter(None, tags.split(','))
281+
if len(tags) == 0:
282+
tags = []
283+
config['tags'] = tags
284+
except TypeError:
285+
config['tags'] = []
286+
287+
try:
288+
file_type = config.get('type', 'file')
289+
if not file_type:
290+
file_type = 'file'
291+
config['type'] = file_type
292+
except:
293+
config['type'] = "file"
294+
295+
return config
296+
297+
def _parse(self):
298+
glob_paths = {}
299+
files = {}
300+
for filename in self._config.sections():
301+
if not self._config.get(filename, 'type'):
302+
raise Exception('%s: missing mandatory config "type"' % filename)
303+
304+
config = dict((x[0], x[1]) for x in self._config.items(filename))
305+
glob_paths[filename] = config
306+
307+
globs = eglob(filename)
308+
if not globs:
309+
self._logger.debug('Skipping glob due to no files found: %s' % filename)
310+
continue
311+
312+
for globbed_file in globs:
313+
files[os.path.realpath(globbed_file)] = config
314+
315+
return files, glob_paths
316+
317+
def _sanitize(self):
318+
if len(self._config.read(self._configfile)) != 1:
319+
raise Exception('Could not parse config file "%s"' % self._configfile)
320+
321+
if self._config.has_section('beaver'):
322+
self._config.remove_section('beaver')

beaver/utils.py

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,44 @@
1+
import argparse
12
import glob
23
import itertools
34
import logging
5+
import platform
46
import re
7+
import sys
8+
9+
import beaver
510

611
logging.basicConfig()
7-
_magic_brackets = re.compile("({([^}]+)})")
12+
13+
MAGIC_BRACKETS = re.compile("({([^}]+)})")
14+
REOPEN_FILES = 'linux' not in platform.platform().lower()
15+
16+
17+
def parse_args():
18+
epilog_example = """
19+
Beaver provides an lightweight method for shipping local log
20+
files to Logstash. It does this using either redis, stdin,
21+
zeromq as the transport. This means you'll need a redis,
22+
stdin, zeromq input somewhere down the road to get the events.
23+
24+
Events are sent in logstash's json_event format. Options can
25+
also be set as environment variables.
26+
27+
Please see the readme for complete examples.
28+
"""
29+
parser = argparse.ArgumentParser(description='Beaver logfile shipper', epilog=epilog_example, formatter_class=argparse.RawDescriptionHelpFormatter)
30+
parser.add_argument('-c', '--configfile', help='ini config file path', dest='config', default='/dev/null')
31+
parser.add_argument('-d', '--debug', help='enable debug mode', dest='debug', default=False, action='store_true')
32+
parser.add_argument('-f', '--files', help='space-separated filelist to watch, can include globs (*.log). Overrides --path argument', dest='files', default=None, nargs='+')
33+
parser.add_argument('--format', help='format to use when sending to transport', default=None, dest='format', choices=['json', 'msgpack', 'string'])
34+
parser.add_argument('--hostname', help='manual hostname override for source_host', default=None, dest='hostname')
35+
parser.add_argument('-m', '--mode', help='bind or connect mode', dest='mode', default=None, choices=['bind', 'connect'])
36+
parser.add_argument('-p', '--path', help='path to log files', default=None, dest='path')
37+
parser.add_argument('-t', '--transport', help='log transport method', dest='transport', default=None, choices=['rabbitmq', 'redis', 'stdout', 'zmq', 'udp'])
38+
parser.add_argument('-v', '--version', help='output version and quit', dest='version', default=False, action='store_true')
39+
parser.add_argument('--fqdn', help="use the machine's FQDN for source_host", dest="fqdn", default=False, action='store_true')
40+
41+
return parser.parse_args()
842

943

1044
def setup_custom_logger(name, args=None, formatter=None):
@@ -20,27 +54,29 @@ def setup_custom_logger(name, args=None, formatter=None):
2054
handler.setFormatter(formatter)
2155
logger.addHandler(handler)
2256

23-
if args.debug:
57+
has_args = args is not None and type(args) == argparse.Namespace
58+
is_debug = has_args and args.debug == True
59+
60+
if is_debug:
2461
logger.setLevel(logging.DEBUG)
25-
logger.info('Debug level is on')
2662
if hasattr(logging, 'captureWarnings'):
27-
# New in Python 2.7
2863
logging.captureWarnings(True)
2964
else:
3065
logger.setLevel(logging.INFO)
31-
if not args.version:
32-
logger.info('Info level is on')
3366
if hasattr(logging, 'captureWarnings'):
34-
# New in Python 2.7
3567
logging.captureWarnings(False)
3668

69+
logger.debug('Logger level is {0}'.format(logging.getLevelName(logger.level)))
70+
3771
return logger
3872

3973

40-
def _replace_all(path, replacements):
41-
for j in replacements:
42-
path = path.replace(*j)
43-
return path
74+
def version(args):
75+
if args.version:
76+
formatter = logging.Formatter('%(message)s')
77+
logger = setup_custom_logger('beaver', args=args, formatter=formatter)
78+
logger.info('Beaver {0}'.format(beaver.__version__))
79+
sys.exit(0)
4480

4581

4682
def eglob(path):
@@ -65,11 +101,17 @@ def expand_paths(path):
65101
>>> expand_paths("")
66102
"""
67103
pr = itertools.product
68-
parts = _magic_brackets.findall(path)
104+
parts = MAGIC_BRACKETS.findall(path)
69105
if path == "":
70106
return
71107
elif not parts:
72108
return [path]
73109

74110
permutations = [[(p[0], i, 1) for i in p[1].split(",")] for p in parts]
75111
return [_replace_all(path, i) for i in pr(*permutations)]
112+
113+
114+
def _replace_all(path, replacements):
115+
for j in replacements:
116+
path = path.replace(*j)
117+
return path

0 commit comments

Comments
 (0)