Skip to content

Commit 398478b

Browse files
committed
Fixed sincedb_write_interval (Bugs #229).
1 parent d9de1d8 commit 398478b

2 files changed

Lines changed: 28 additions & 18 deletions

File tree

beaver/worker/tail.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def __init__(self, filename, callback, position="end", logger=None, beaver_confi
2828
self._last_sincedb_write = None
2929
self._last_file_mapping_update = None
3030
self._line_count = 0
31+
self._line_count_sincedb = 0
3132
self._log_template = '[' + self._filename + '] - {0}'
3233

3334
self._sincedb_path = beaver_config.get('sincedb_path')
@@ -229,7 +230,6 @@ def _ensure_file_is_good(self, current_time):
229230

230231
def _run_pass(self):
231232
"""Read lines from a file and performs a callback against them"""
232-
line_count = 0
233233
while True:
234234
try:
235235
data = self._file.read(4096)
@@ -265,11 +265,9 @@ def _run_pass(self):
265265

266266
if self._sincedb_path:
267267
current_line_count = len(lines)
268-
if not self._sincedb_update_position(lines=current_line_count):
269-
line_count += current_line_count
268+
self._sincedb_update_position(lines=current_line_count)
270269

271-
if line_count > 0:
272-
self._sincedb_update_position(lines=line_count, force_update=True)
270+
self._sincedb_update_position()
273271

274272
def _callback_wrapper(self, lines):
275273
now = datetime.datetime.utcnow()
@@ -328,6 +326,10 @@ def _seek_to_end(self):
328326
self._log_debug('line count {0}'.format(line_count))
329327
self._log_debug('current position {0}'.format(current_position))
330328
self._sincedb_update_position(lines=line_count, force_update=True)
329+
# Reset this, so line added processed just after this initialization
330+
# will update the sincedb. Without this, if beaver run for less than
331+
# sincedb_write_interval it will always re-process the last lines.
332+
self._last_sincedb_write = 0
331333

332334
if self._tail_lines:
333335
self._log_debug('tailing {0} lines'.format(self._tail_lines))
@@ -397,20 +399,21 @@ def _sincedb_update_position(self, lines=0, force_update=False):
397399
if not self._sincedb_path:
398400
return False
399401

402+
self._line_count = self._line_count + lines
403+
old_count = self._line_count_sincedb
404+
lines = self._line_count
405+
400406
current_time = int(time.time())
401407
if not force_update:
402408
if self._last_sincedb_write and current_time - self._last_sincedb_write <= self._sincedb_write_interval:
403409
return False
404410

405-
if lines == 0:
411+
if old_count == lines:
406412
return False
407413

408414
self._sincedb_init()
409415

410-
old_count = self._line_count
411416
self._last_sincedb_write = current_time
412-
self._line_count = old_count + lines
413-
lines = self._line_count
414417

415418
self._log_debug('updating sincedb to {0}'.format(lines))
416419

@@ -430,6 +433,8 @@ def _sincedb_update_position(self, lines=0, force_update=False):
430433
})
431434
conn.close()
432435

436+
self._line_count_sincedb = lines
437+
433438
return True
434439

435440
def _sincedb_start_position(self):

beaver/worker/worker.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ def loop(self, interval=0.1, async=False):
132132

133133
def _run_pass(self, fid, file):
134134
"""Read lines from a file and performs a callback against them"""
135-
line_count = 0
136135
while True:
137136
try:
138137
data = file.read(4096)
@@ -168,11 +167,9 @@ def _run_pass(self, fid, file):
168167

169168
if self._sincedb_path:
170169
current_line_count = len(lines)
171-
if not self._sincedb_update_position(file, fid=fid, lines=current_line_count):
172-
line_count += current_line_count
170+
self._sincedb_update_position(file, fid=fid, lines=current_line_count)
173171

174-
if line_count > 0:
175-
self._sincedb_update_position(file, fid=fid, lines=line_count, force_update=True)
172+
self._sincedb_update_position(file, fid=fid)
176173

177174
def _buffer_extract(self, data, fid):
178175
"""
@@ -316,6 +313,10 @@ def _seek_to_end(self):
316313
current_position = data['file'].tell()
317314
self._logger.debug("[{0}] - line count {1} for {2}".format(fid, line_count, data['file'].name))
318315
self._sincedb_update_position(data['file'], fid=fid, lines=line_count, force_update=True)
316+
# Reset this, so line added processed just after this initialization
317+
# will update the sincedb. Without this, if beaver run for less than
318+
# sincedb_write_interval it will always re-process the last lines.
319+
data['update_time'] = 0
319320

320321
tail_lines = self._beaver_config.get_field('tail_lines', data['file'].name)
321322
tail_lines = int(tail_lines)
@@ -378,22 +379,23 @@ def _sincedb_update_position(self, file, fid=None, lines=0, force_update=False):
378379
if not fid:
379380
fid = self.get_file_id(os.stat(file.name))
380381

382+
self._file_map[fid]['line'] = self._file_map[fid]['line'] + lines
383+
old_count = self._file_map[fid]['line_in_sincedb']
384+
lines = self._file_map[fid]['line']
385+
381386
current_time = int(time.time())
382387
update_time = self._file_map[fid]['update_time']
383388
if not force_update:
384389
sincedb_write_interval = self._beaver_config.get_field('sincedb_write_interval', file.name)
385390
if update_time and current_time - update_time <= sincedb_write_interval:
386391
return False
387392

388-
if lines == 0:
393+
if old_count == lines:
389394
return False
390395

391396
self._sincedb_init()
392397

393-
old_count = self._file_map[fid]['line']
394398
self._file_map[fid]['update_time'] = current_time
395-
self._file_map[fid]['line'] = old_count + lines
396-
lines = self._file_map[fid]['line']
397399

398400
self._logger.debug("[{0}] - updating sincedb for logfile {1} from {2} to {3}".format(fid, file.name, old_count, lines))
399401

@@ -413,6 +415,8 @@ def _sincedb_update_position(self, file, fid=None, lines=0, force_update=False):
413415
})
414416
conn.close()
415417

418+
self._file_map[fid]['line_in_sincedb'] = lines
419+
416420
return True
417421

418422
def _sincedb_start_position(self, file, fid=None):
@@ -589,6 +593,7 @@ def watch(self, fname):
589593
'input_size': 0,
590594
'last_activity': time.time(),
591595
'line': 0,
596+
'line_in_sincedb': 0,
592597
'multiline_regex_after': self._beaver_config.get_field('multiline_regex_after', fname),
593598
'multiline_regex_before': self._beaver_config.get_field('multiline_regex_before', fname),
594599
'size_limit': self._beaver_config.get_field('size_limit', fname),

0 commit comments

Comments
 (0)