@@ -22,7 +22,8 @@ def __init__(self, beaver_config, queue_consumer_function, callback, logger=None
2222 self ._create_queue_consumer = queue_consumer_function
2323 self ._discover_interval = beaver_config .get ('discover_interval' , 15 )
2424 self ._log_template = "[TailManager] - {0}"
25- self ._proc = None
25+ self ._number_of_consumer_processes = int (self ._beaver_config .get ('number_of_consumer_processes' ))
26+ self ._proc = [None ] * self ._number_of_consumer_processes
2627 self ._tails = {}
2728 self ._update_time = None
2829
@@ -52,8 +53,10 @@ def watch(self, paths=[]):
5253 self ._tails [tail .fid ()] = tail
5354
5455 def create_queue_consumer_if_required (self , interval = 5.0 ):
55- if not (self ._proc and self ._proc .is_alive ()):
56- self ._proc = self ._create_queue_consumer ()
56+ for n in range (0 ,self ._number_of_consumer_processes ):
57+ if not (self ._proc [n ] and self ._proc [n ].is_alive ()):
58+ self ._logger .debug ("creating consumer process: " + str (n ))
59+ self ._proc [n ] = self ._create_queue_consumer ()
5760 timer = threading .Timer (interval , self .create_queue_consumer_if_required )
5861 timer .start ()
5962
@@ -128,9 +131,11 @@ def close(self, signalnum=None, frame=None):
128131 self ._active = False
129132 for fid in self ._tails :
130133 self ._tails [fid ].close ()
131- if self ._proc is not None and self ._proc .is_alive ():
132- self ._proc .terminate ()
133- self ._proc .join ()
134+ for n in range (0 ,self ._number_of_consumer_processes ):
135+ if self ._proc [n ] is not None and self ._proc [n ].is_alive ():
136+ self ._logger .debug ("Terminate Process: " + str (n ))
137+ self ._proc [n ].terminate ()
138+ self ._proc [n ].join ()
134139
135140 @staticmethod
136141 def get_file_id (st ):
0 commit comments