Skip to content

Commit 35f3afc

Browse files
author
Tom Kregenbild
committed
#323 - add the ability to increase the number of Queue consumers by creating additional processes while running with --experimental flag
1 parent 45385c9 commit 35f3afc

1 file changed

Lines changed: 11 additions & 6 deletions

File tree

beaver/worker/tail_manager.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ def __init__(self, beaver_config, queue_consumer_function, callback, logger=None
2222
self._create_queue_consumer = queue_consumer_function
2323
self._discover_interval = beaver_config.get('discover_interval', 15)
2424
self._log_template = "[TailManager] - {0}"
25-
self._proc = None
25+
self._number_of_consumer_processes = int(self._beaver_config.get('number_of_consumer_processes'))
26+
self._proc = [None] * self._number_of_consumer_processes
2627
self._tails = {}
2728
self._update_time = None
2829

@@ -52,8 +53,10 @@ def watch(self, paths=[]):
5253
self._tails[tail.fid()] = tail
5354

5455
def create_queue_consumer_if_required(self, interval=5.0):
55-
if not (self._proc and self._proc.is_alive()):
56-
self._proc = self._create_queue_consumer()
56+
for n in range(0,self._number_of_consumer_processes):
57+
if not (self._proc[n] and self._proc[n].is_alive()):
58+
self._logger.debug("creating consumer process: " + str(n))
59+
self._proc[n] = self._create_queue_consumer()
5760
timer = threading.Timer(interval, self.create_queue_consumer_if_required)
5861
timer.start()
5962

@@ -128,9 +131,11 @@ def close(self, signalnum=None, frame=None):
128131
self._active = False
129132
for fid in self._tails:
130133
self._tails[fid].close()
131-
if self._proc is not None and self._proc.is_alive():
132-
self._proc.terminate()
133-
self._proc.join()
134+
for n in range(0,self._number_of_consumer_processes):
135+
if self._proc[n] is not None and self._proc[n].is_alive():
136+
self._logger.debug("Terminate Process: " + str(n))
137+
self._proc[n].terminate()
138+
self._proc[n].join()
134139

135140
@staticmethod
136141
def get_file_id(st):

0 commit comments

Comments
 (0)