From 2aed5ceee7b8d6907bb0b9fdd3ba1c7f1ce7a676 Mon Sep 17 00:00:00 2001 From: Jose Diaz-Gonzalez Date: Tue, 14 Oct 2014 06:20:17 -0400 Subject: [PATCH] Remove old worker code in favor of the - now non-experimental - TailManager --- beaver/dispatcher/worker.py | 95 ----- beaver/utils.py | 1 - beaver/worker/worker.py | 691 ------------------------------------ bin/beaver | 11 +- 4 files changed, 2 insertions(+), 796 deletions(-) delete mode 100644 beaver/dispatcher/worker.py delete mode 100644 beaver/worker/worker.py diff --git a/beaver/dispatcher/worker.py b/beaver/dispatcher/worker.py deleted file mode 100644 index ed7b6848..00000000 --- a/beaver/dispatcher/worker.py +++ /dev/null @@ -1,95 +0,0 @@ -# -*- coding: utf-8 -*- -import multiprocessing -import Queue -import signal -import os -import time - -from beaver.config import BeaverConfig -from beaver.run_queue import run_queue -from beaver.ssh_tunnel import create_ssh_tunnel -from beaver.utils import setup_custom_logger, REOPEN_FILES -from beaver.worker.worker import Worker - -def run(args=None): - - logger = setup_custom_logger('beaver', args) - beaver_config = BeaverConfig(args, logger=logger) - # so the config file can override the logger - logger = setup_custom_logger('beaver', args, config=beaver_config) - - if beaver_config.get('logstash_version') not in [0, 1]: - raise LookupError("Invalid logstash_version. Set it to 0 or 1 in your config.") - - queue = multiprocessing.Queue(beaver_config.get('max_queue_size')) - - worker_proc = None - ssh_tunnel = create_ssh_tunnel(beaver_config, logger=logger) - - def cleanup(signalnum, frame): - if signalnum is not None: - sig_name = tuple((v) for v, k in signal.__dict__.iteritems() if k == signalnum)[0] - logger.info('{0} detected'.format(sig_name)) - logger.info('Shutting down. Please wait...') - else: - logger.info('Worker process cleanup in progress...') - - try: - queue.put_nowait(('exit', ())) - except Queue.Full: - pass - - if worker_proc is not None: - try: - worker_proc.terminate() - worker_proc.join() - except RuntimeError: - pass - - if ssh_tunnel is not None: - logger.info('Closing ssh tunnel...') - ssh_tunnel.close() - - if signalnum is not None: - logger.info('Shutdown complete.') - return os._exit(signalnum) - - signal.signal(signal.SIGTERM, cleanup) - signal.signal(signal.SIGINT, cleanup) - signal.signal(signal.SIGQUIT, cleanup) - - def create_queue_consumer(): - process_args = (queue, beaver_config, logger) - proc = multiprocessing.Process(target=run_queue, args=process_args) - - logger.info('Starting queue consumer') - proc.start() - return proc - - def create_queue_producer(): - worker = Worker(beaver_config, queue_consumer_function=create_queue_consumer, callback=queue.put, logger=logger) - worker.loop() - - while 1: - - try: - if REOPEN_FILES: - logger.debug('Detected non-linux platform. Files will be reopened for tailing') - - t = time.time() - while True: - if worker_proc is None or not worker_proc.is_alive(): - logger.info('Starting worker...') - t = time.time() - worker_proc = multiprocessing.Process(target=create_queue_producer) - worker_proc.start() - logger.info('Working...') - worker_proc.join(10) - - if beaver_config.get('refresh_worker_process'): - if beaver_config.get('refresh_worker_process') < time.time() - t: - logger.info('Worker has exceeded refresh limit. Terminating process...') - cleanup(None, None) - - except KeyboardInterrupt: - pass diff --git a/beaver/utils.py b/beaver/utils.py index 4f06a22e..bb622419 100644 --- a/beaver/utils.py +++ b/beaver/utils.py @@ -45,7 +45,6 @@ def parse_args(): parser.add_argument('-p', '--path', help='path to log files', default=None, dest='path') parser.add_argument('-P', '--pid', help='path to pid file', default=None, dest='pid') parser.add_argument('-t', '--transport', help='log transport method', dest='transport', default=None, choices=['kafka', 'mqtt', 'rabbitmq', 'redis', 'sns', 'sqs', 'kinesis', 'stdout', 'tcp', 'udp', 'zmq', 'http']) - parser.add_argument('-e', '--experimental', help='use experimental version of beaver', dest='experimental', default=False, action='store_true') parser.add_argument('-v', '--version', help='output version and quit', dest='version', default=False, action='store_true') parser.add_argument('--fqdn', help='use the machine\'s FQDN for source_host', dest='fqdn', default=False, action='store_true') diff --git a/beaver/worker/worker.py b/beaver/worker/worker.py deleted file mode 100644 index 54875629..00000000 --- a/beaver/worker/worker.py +++ /dev/null @@ -1,691 +0,0 @@ -# -*- coding: utf-8 -*- -import collections -import datetime -import errno -import gzip -import io -import os -import signal -import sqlite3 -import stat -import time -import threading - -from beaver.utils import IS_GZIPPED_FILE, REOPEN_FILES, eglob, multiline_merge -from beaver.unicode_dammit import ENCODINGS - - -class Worker(object): - """Looks for changes in all files of a directory. - This is useful for watching log file changes in real-time. - It also supports files rotation. - - Example: - - >>> def callback(filename, lines): - ... print filename, lines - ... - >>> l = Worker(args, callback, ["log", "txt"]) - >>> l.loop() - """ - - def __init__(self, beaver_config, queue_consumer_function, callback, logger=None): - """Arguments: - - (FileConfig) @file_config: - object containing file-related configuration - - (BeaverConfig) @beaver_config: - object containing global configuration - - (Logger) @logger - object containing a python logger - - (callable) @callback: - a function which is called every time a new line in a - file being watched is found; - this is called with "filename" and "lines" arguments. - """ - self._beaver_config = beaver_config - self._callback = callback - self._create_queue_consumer = queue_consumer_function - self._file_map = {} - self._folder = self._beaver_config.get('path') - self._last_file_mapping_update = {} - self._logger = logger - self._number_of_consumer_processes = int(self._beaver_config.get('number_of_consumer_processes')) - self._proc = [None] * self._number_of_consumer_processes - self._sincedb_path = self._beaver_config.get('sincedb_path') - self._update_time = None - self._running = True - - if not callable(self._callback): - raise RuntimeError("Callback for worker is not callable") - - self.update_files() - self._seek_to_end() - signal.signal(signal.SIGTERM, self.close) - - def __del__(self): - """Closes all files""" - self.close() - - def close(self, signalnum=None, frame=None): - self._running = False - """Closes all currently open file pointers""" - for id, data in self._file_map.iteritems(): - data['file'].close() - self._sincedb_update_position(data['file'], fid=id, force_update=True) - self._file_map.clear() - for n in range(0,self._number_of_consumer_processes): - if self._proc[n] is not None and self._proc[n].is_alive(): - self._logger.debug("Terminate Process: " + str(n)) - self._proc[n].terminate() - self._proc[n].join() - - - def listdir(self): - """List directory and filter files by extension. - You may want to override this to add extra logic or - globbling support. - """ - if self._folder is not None: - ls = os.listdir(self._folder) - return [x for x in ls if os.path.splitext(x)[1][1:] == "log"] - else: - return [] - - def create_queue_consumer_if_required(self, interval=5.0): - - for n in range(0,self._number_of_consumer_processes): - if not (self._proc[n] and self._proc[n].is_alive()): - self._logger.debug("creating consumer process: " + str(n)) - self._proc[n] = self._create_queue_consumer() - timer = threading.Timer(interval, self.create_queue_consumer_if_required) - timer.start() - - def loop(self, interval=0.1, async=False): - """Start the loop. - If async is True make one loop then return. - """ - self.create_queue_consumer_if_required() - - while self._running: - - t = time.time() - - if int(time.time()) - self._update_time > self._beaver_config.get('discover_interval'): - self.update_files() - - self._ensure_files_are_good(current_time=t) - - unwatch_list = [] - - for fid, data in self._file_map.iteritems(): - try: - self._run_pass(fid, data['file']) - except IOError, e: - if e.errno == errno.ESTALE: - unwatch_list.append(fid) - - self.unwatch_list(unwatch_list) - - if async: - return - - self._logger.debug("Iteration took {0:.6f}".format(time.time() - t)) - time.sleep(interval) - - def _run_pass(self, fid, file): - """Read lines from a file and performs a callback against them""" - while True: - try: - data = file.read(4096) - except IOError, e: - if e.errno == errno.ESTALE: - self.active = False - return False - - lines = self._buffer_extract(data=data, fid=fid) - - if not lines: - # Before returning, check if an event (maybe partial) is waiting for too long. - if self._file_map[fid]['current_event'] and time.time() - self._file_map[fid]['last_activity'] > 1: - event = '\n'.join(self._file_map[fid]['current_event']) - self._file_map[fid]['current_event'].clear() - self._callback_wrapper(filename=file.name, lines=[event]) - break - - self._file_map[fid]['last_activity'] = time.time() - - if self._file_map[fid]['multiline_regex_after'] or self._file_map[fid]['multiline_regex_before']: - # Multiline is enabled for this file. - events = multiline_merge( - lines, - self._file_map[fid]['current_event'], - self._file_map[fid]['multiline_regex_after'], - self._file_map[fid]['multiline_regex_before']) - else: - events = lines - - if events: - self._callback_wrapper(filename=file.name, lines=events) - - if self._sincedb_path: - current_line_count = len(lines) - self._sincedb_update_position(file, fid=fid, lines=current_line_count) - - self._sincedb_update_position(file, fid=fid) - - def _buffer_extract(self, data, fid): - """ - Extract takes an arbitrary string of input data and returns an array of - tokenized entities, provided there were any available to extract. This - makes for easy processing of datagrams using a pattern like: - - tokenizer.extract(data).map { |entity| Decode(entity) }.each do ...""" - # Extract token-delimited entities from the input string with the split command. - # There's a bit of craftiness here with the -1 parameter. Normally split would - # behave no differently regardless of if the token lies at the very end of the - # input buffer or not (i.e. a literal edge case) Specifying -1 forces split to - # return "" in this case, meaning that the last entry in the list represents a - # new segment of data where the token has not been encountered - entities = collections.deque(data.split(self._file_map[fid]['delimiter'], -1)) - - # Check to see if the buffer has exceeded capacity, if we're imposing a limit - if self._file_map[fid]['size_limit']: - if self._file_map[fid]['input_size'] + len(entities[0]) > self._file_map[fid]['size_limit']: - raise Exception('input buffer full') - self._file_map[fid]['input_size'] += len(entities[0]) - - # Move the first entry in the resulting array into the input buffer. It represents - # the last segment of a token-delimited entity unless it's the only entry in the list. - first_entry = entities.popleft() - if len(first_entry) > 0: - self._file_map[fid]['input'].append(first_entry) - - # If the resulting array from the split is empty, the token was not encountered - # (not even at the end of the buffer). Since we've encountered no token-delimited - # entities this go-around, return an empty array. - if len(entities) == 0: - return [] - - # At this point, we've hit a token, or potentially multiple tokens. Now we can bring - # together all the data we've buffered from earlier calls without hitting a token, - # and add it to our list of discovered entities. - entities.appendleft(''.join(self._file_map[fid]['input'])) - - # Now that we've hit a token, joined the input buffer and added it to the entities - # list, we can go ahead and clear the input buffer. All of the segments that were - # stored before the join can now be garbage collected. - self._file_map[fid]['input'].clear() - - # The last entity in the list is not token delimited, however, thanks to the -1 - # passed to split. It represents the beginning of a new list of as-yet-untokenized - # data, so we add it to the start of the list. - self._file_map[fid]['input'].append(entities.pop()) - - # Set the new input buffer size, provided we're keeping track - if self._file_map[fid]['size_limit']: - self._file_map[fid]['input_size'] = len(self._file_map[fid]['input'][0]) - - # Now we're left with the list of extracted token-delimited entities we wanted - # in the first place. Hooray! - return entities - - # Flush the contents of the input buffer, i.e. return the input buffer even though - # a token has not yet been encountered - def _buffer_flush(self, fid): - buf = ''.join(self._file_map[fid]['input']) - self._file_map[fid]['input'].clear - return buf - - # Is the buffer empty? - def _buffer_empty(self, fid): - return len(self._file_map[fid]['input']) > 0 - - def _seek_to_end(self): - unwatch_list = [] - - # The first time we run the script we move all file markers at EOF. - # In case of files created afterwards we don't do this. - for fid, data in self._file_map.iteritems(): - self._logger.debug("[{0}] - getting start position {1}".format(fid, data['file'].name)) - start_position = self._beaver_config.get_field('start_position', data['file'].name) - is_active = data['active'] - - if self._sincedb_path: - sincedb_start_position = self._sincedb_start_position(data['file'], fid=fid) - if sincedb_start_position: - start_position = sincedb_start_position - - if start_position == "beginning": - continue - - line_count = 0 - - if str(start_position).isdigit(): - self._logger.debug("[{0}] - going to start position {1} for {2}".format(fid, start_position, data['file'].name)) - start_position = int(start_position) - for encoding in ENCODINGS: - try: - line_count = 0 - while data['file'].readline(): - line_count += 1 - if line_count == start_position: - break - except UnicodeDecodeError: - self._logger.debug("[{0}] - UnicodeDecodeError raised for {1} with encoding {2}".format(fid, data['file'].name, data['encoding'])) - data['file'] = self.open(data['file'].name, encoding=encoding) - if not data['file']: - unwatch_list.append(fid) - is_active = False - break - - data['encoding'] = encoding - - if line_count != start_position: - self._logger.debug("[{0}] - file at different position than {1}, assuming manual truncate for {2}".format(fid, start_position, data['file'].name)) - data['file'].seek(0, os.SEEK_SET) - start_position == "beginning" - - if not is_active: - continue - - if start_position == "beginning": - continue - - if start_position == "end": - self._logger.debug("[{0}] - getting end position for {1}".format(fid, data['file'].name)) - for encoding in ENCODINGS: - try: - line_count = 0 - while data['file'].readline(): - line_count += 1 - break - except UnicodeDecodeError: - self._logger.debug("[{0}] - UnicodeDecodeError raised for {1} with encoding {2}".format(fid, data['file'].name, data['encoding'])) - data['file'] = self.open(data['file'].name, encoding=encoding) - if not data['file']: - unwatch_list.append(fid) - is_active = False - break - - data['encoding'] = encoding - - if not is_active: - continue - - current_position = data['file'].tell() - self._logger.debug("[{0}] - line count {1} for {2}".format(fid, line_count, data['file'].name)) - self._sincedb_update_position(data['file'], fid=fid, lines=line_count, force_update=True) - # Reset this, so line added processed just after this initialization - # will update the sincedb. Without this, if beaver run for less than - # sincedb_write_interval it will always re-process the last lines. - data['update_time'] = 0 - - tail_lines = self._beaver_config.get_field('tail_lines', data['file'].name) - tail_lines = int(tail_lines) - if tail_lines: - encoding = data['encoding'] - - lines = self.tail(data['file'].name, encoding=encoding, window=tail_lines, position=current_position) - if lines: - if self._file_map[fid]['multiline_regex_after'] or self._file_map[fid]['multiline_regex_before']: - # Multiline is enabled for this file. - events = multiline_merge( - lines, - self._file_map[fid]['current_event'], - self._file_map[fid]['multiline_regex_after'], - self._file_map[fid]['multiline_regex_before']) - else: - events = lines - self._callback_wrapper(filename=data['file'].name, lines=events) - - self.unwatch_list(unwatch_list) - - def _callback_wrapper(self, filename, lines): - now = datetime.datetime.utcnow() - timestamp = now.strftime("%Y-%m-%dT%H:%M:%S") + ".%03d" % (now.microsecond / 1000) + "Z" - self._callback(('callback', { - 'fields': self._beaver_config.get_field('fields', filename), - 'filename': filename, - 'format': self._beaver_config.get_field('format', filename), - 'ignore_empty': self._beaver_config.get_field('ignore_empty', filename), - 'lines': lines, - 'timestamp': timestamp, - 'tags': self._beaver_config.get_field('tags', filename), - 'type': self._beaver_config.get_field('type', filename), - })) - - def _sincedb_init(self): - """Initializes the sincedb schema in an sqlite db""" - if not self._sincedb_path: - return - - if not os.path.exists(self._sincedb_path): - self._logger.debug('Initializing sincedb sqlite schema') - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - conn.execute(""" - create table sincedb ( - fid text primary key, - filename text, - position integer default 1 - ); - """) - conn.close() - - def _sincedb_update_position(self, file, fid=None, lines=0, force_update=False): - """Retrieves the starting position from the sincedb sql db for a given file - Returns a boolean representing whether or not it updated the record - """ - if not self._sincedb_path: - return False - - if not fid: - fid = self.get_file_id(os.stat(file.name)) - - self._file_map[fid]['line'] = self._file_map[fid]['line'] + lines - old_count = self._file_map[fid]['line_in_sincedb'] - lines = self._file_map[fid]['line'] - - current_time = int(time.time()) - update_time = self._file_map[fid]['update_time'] - if not force_update: - sincedb_write_interval = self._beaver_config.get_field('sincedb_write_interval', file.name) - if update_time and current_time - update_time <= sincedb_write_interval: - return False - - if old_count == lines: - return False - - self._sincedb_init() - - self._file_map[fid]['update_time'] = current_time - - self._logger.debug("[{0}] - updating sincedb for logfile {1} from {2} to {3}".format(fid, file.name, old_count, lines)) - - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - cursor = conn.cursor() - query = "insert or ignore into sincedb (fid, filename) values (:fid, :filename);" - cursor.execute(query, { - 'fid': fid, - 'filename': file.name - }) - - query = "update sincedb set position = :position where fid = :fid and filename = :filename" - cursor.execute(query, { - 'fid': fid, - 'filename': file.name, - 'position': int(lines), - }) - conn.close() - - self._file_map[fid]['line_in_sincedb'] = lines - - return True - - def _sincedb_start_position(self, file, fid=None): - """Retrieves the starting position from the sincedb sql db - for a given file - """ - if not self._sincedb_path: - return None - - if not fid: - fid = self.get_file_id(os.stat(file.name)) - - self._sincedb_init() - conn = sqlite3.connect(self._sincedb_path, isolation_level=None) - cursor = conn.cursor() - cursor.execute("select position from sincedb where fid = :fid and filename = :filename", { - 'fid': fid, - 'filename': file.name - }) - - start_position = None - for row in cursor.fetchall(): - start_position, = row - - return start_position - - def update_files(self): - """Ensures all files are properly loaded. - Detects new files, file removals, file rotation, and truncation. - On non-linux platforms, it will also manually reload the file for tailing. - Note that this hack is necessary because EOF is cached on BSD systems. - """ - self._update_time = int(time.time()) - - ls = [] - files = [] - if len(self._beaver_config.get('globs')) > 0: - for name, exclude in self._beaver_config.get('globs').items(): - globbed = [os.path.realpath(filename) for filename in eglob(name, exclude)] - files.extend(globbed) - self._beaver_config.addglob(name, globbed) - self._callback(("addglob", (name, globbed))) - else: - for name in self.listdir(): - files.append(os.path.realpath(os.path.join(self._folder, name))) - - for absname in files: - try: - st = os.stat(absname) - except EnvironmentError, err: - if err.errno != errno.ENOENT: - raise - else: - if not stat.S_ISREG(st.st_mode): - continue - elif int(self._beaver_config.get('ignore_old_files')) > 0 and \ - datetime.datetime.fromtimestamp(st.st_mtime) < (datetime.datetime.today() - datetime.timedelta(days=int(self._beaver_config.get('ignore_old_files')))): - self._logger.debug('[{0}] - file {1} older then {2} day so ignoring it'.format(self.get_file_id(st), absname, self._beaver_config.get('ignore_old_files'))) - continue - fid = self.get_file_id(st) - ls.append((fid, absname)) - - # add new ones - for fid, fname in ls: - if fid not in self._file_map: - self.watch(fname) - - def _ensure_files_are_good(self, current_time): - """Every N seconds, ensures that the file we are tailing is the file we expect to be tailing""" - - # We cannot watch/unwatch in a single iteration - rewatch_list = [] - unwatch_list = [] - - # check existent files - for fid, data in self._file_map.iteritems(): - filename = data['file'].name - stat_interval = self._beaver_config.get_field('stat_interval', filename) - if filename in self._last_file_mapping_update and current_time - self._last_file_mapping_update[filename] <= stat_interval: - continue - - self._last_file_mapping_update[filename] = time.time() - - try: - st = os.stat(data['file'].name) - except EnvironmentError, err: - if err.errno == errno.ENOENT: - unwatch_list.append(fid) - else: - raise - else: - if fid != self.get_file_id(st): - self._logger.info("[{0}] - file rotated {1}".format(fid, data['file'].name)) - rewatch_list.append(fid) - elif data['file'].tell() > st.st_size: - if st.st_size == 0 and self._beaver_config.get_field('ignore_truncate', data['file'].name): - self._logger.info("[{0}] - file size is 0 {1}. ".format(fid, data['file'].name) + - "If you use another tool (i.e. logrotate) to truncate " + - "the file, your application may continue to write to " + - "the offset it last wrote later. In such a case, we'd " + - "better do nothing here") - continue - self._logger.info("[{0}] - file truncated {1}".format(fid, data['file'].name)) - rewatch_list.append(fid) - elif REOPEN_FILES: - self._logger.debug("[{0}] - file reloaded (non-linux) {1}".format(fid, data['file'].name)) - position = data['file'].tell() - fname = data['file'].name - data['file'].close() - file = self.open(fname, encoding=data['encoding']) - if file: - file.seek(position) - self._file_map[fid]['file'] = file - - self.unwatch_list(unwatch_list) - self.rewatch_list(rewatch_list) - - def rewatch_list(self, rewatch_list): - for fid in rewatch_list: - if fid not in self._file_map: - continue - - f = self._file_map[fid]['file'] - filename = f.name - self.unwatch(f, fid) - self.watch(filename) - - def unwatch_list(self, unwatch_list): - for fid in unwatch_list: - if fid not in self._file_map: - continue - - f = self._file_map[fid]['file'] - self.unwatch(f, fid) - - def unwatch(self, file, fid): - """file no longer exists; if it has been renamed - try to read it for the last time in case the - log rotator has written something in it. - """ - try: - if file: - self._run_pass(fid, file) - if self._file_map[fid]['current_event']: - event = '\n'.join(self._file_map[fid]['current_event']) - self._file_map[fid]['current_event'].clear() - self._callback_wrapper(filename=file.name, lines=[event]) - except IOError: - # Silently ignore any IOErrors -- file is gone - pass - - if file: - self._logger.info("[{0}] - un-watching logfile {1}".format(fid, file.name)) - else: - self._logger.info("[{0}] - un-watching logfile".format(fid)) - - self._file_map[fid]['file'].close() - del self._file_map[fid] - - def watch(self, fname): - """Opens a file for log tailing""" - try: - file = self.open(fname, encoding=self._beaver_config.get_field('encoding', fname)) - if file: - fid = self.get_file_id(os.stat(fname)) - except EnvironmentError, err: - if err.errno != errno.ENOENT: - raise - else: - if file: - self._logger.info("[{0}] - watching logfile {1}".format(fid, fname)) - self._file_map[fid] = { - 'current_event': collections.deque([]), - 'delimiter': self._beaver_config.get_field('delimiter', fname), - 'encoding': self._beaver_config.get_field('encoding', fname), - 'file': file, - 'input': collections.deque([]), - 'input_size': 0, - 'last_activity': time.time(), - 'line': 0, - 'line_in_sincedb': 0, - 'multiline_regex_after': self._beaver_config.get_field('multiline_regex_after', fname), - 'multiline_regex_before': self._beaver_config.get_field('multiline_regex_before', fname), - 'size_limit': self._beaver_config.get_field('size_limit', fname), - 'update_time': None, - 'active': True, - } - - def open(self, filename, encoding=None): - """Opens a file with the appropriate call""" - try: - if IS_GZIPPED_FILE.search(filename): - _file = gzip.open(filename, "rb") - else: - file_encoding = self._beaver_config.get_field('encoding', filename) - if encoding: - _file = io.open(filename, "r", encoding=encoding, errors='replace') - elif file_encoding: - _file = io.open(filename, "r", encoding=file_encoding, errors='replace') - else: - _file = io.open(filename, "r", errors='replace') - except IOError, e: - self._logger.warning(str(e)) - _file = None - - return _file - - def tail(self, fname, encoding, window, position=None): - """Read last N lines from file fname.""" - if window <= 0: - raise ValueError('invalid window %r' % window) - - encodings = ENCODINGS - if encoding: - encodings = [encoding] + ENCODINGS - - for enc in encodings: - try: - f = self.open(fname, encoding=enc) - if not f: - return [] - return self.tail_read(f, window, position=position) - except IOError, err: - if err.errno == errno.ENOENT: - return [] - raise - except UnicodeDecodeError: - pass - - @staticmethod - def get_file_id(st): - return "%xg%x" % (st.st_dev, st.st_ino) - - @classmethod - def tail_read(cls, f, window, position=None): - BUFSIZ = 1024 - # open() was overridden and file was opened in text - # mode; read() will return a string instead bytes. - encoded = getattr(f, 'encoding', False) - CR = '\n' if encoded else b'\n' - data = '' if encoded else b'' - f.seek(0, os.SEEK_END) - if position is None: - position = f.tell() - - block = -1 - exit = False - read = BUFSIZ - - while not exit: - step = (block * BUFSIZ) + position - if step < 0: - step = 0 - read = ((block + 1) * BUFSIZ) + position - exit = True - - f.seek(step, os.SEEK_SET) - newdata = f.read(read) - - data = newdata + data - if data.count(CR) > window: - break - else: - block -= 1 - - return data.splitlines()[-window:] diff --git a/bin/beaver b/bin/beaver index afd2511a..1eab3db4 100755 --- a/bin/beaver +++ b/bin/beaver @@ -1,7 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from beaver.dispatcher.tail import run as tail_run -from beaver.dispatcher.worker import run as worker_run from beaver.pidfile import PidFile from beaver.utils import CAN_DAEMONIZE, parse_args, version @@ -20,12 +19,6 @@ if args.daemonize: context = daemon.DaemonContext(pidfile=PidFile(args.pid)) with context: - if args.experimental: - tail_run(args) - else: - worker_run(args) -else: - if args.experimental: tail_run(args) - else: - worker_run(args) +else: + tail_run(args)