2626LOG_FILE_PATTERN = '/tmp/kclipy.*.log'
2727DEFAULT_DDB_LEASE_TABLE_SUFFIX = '-app'
2828
29+ # define Java class names
30+ MULTI_LANG_DAEMON_CLASS = 'com.atlassian.KinesisStarter'
31+
2932# set up log levels
3033logging .SEVERE = 60
3134logging .FATAL = 70
@@ -82,15 +85,17 @@ def run_processor(log_file=None, processor_func=None):
8285
8386class KinesisProcessorThread (ShellCommandThread ):
8487 def __init__ (self , params ):
85- multi_lang_daemon_class = 'com.atlassian.KinesisStarter'
8688 props_file = params ['properties_file' ]
8789 env_vars = params ['env_vars' ]
8890 cmd = kclipy_helper .get_kcl_app_command ('java' ,
89- multi_lang_daemon_class , props_file )
91+ MULTI_LANG_DAEMON_CLASS , props_file )
9092 if not params ['log_file' ]:
9193 params ['log_file' ] = '%s.log' % props_file
9294 TMP_FILES .append (params ['log_file' ])
93- ShellCommandThread .__init__ (self , cmd , outfile = params ['log_file' ], env_vars = env_vars )
95+ # print(cmd)
96+ env = aws_stack .get_environment ()
97+ quiet = env .region == REGION_LOCAL
98+ ShellCommandThread .__init__ (self , cmd , outfile = params ['log_file' ], env_vars = env_vars , quiet = quiet )
9499
95100 @staticmethod
96101 def start_consumer (kinesis_stream ):
@@ -105,42 +110,53 @@ def __init__(self, params):
105110 self .running = True
106111 self .buffer = []
107112 self .params = params
108- self .prefix = params .get ('log_prefix' ) or 'LOG: '
109113 # number of lines that make up a single log entry
110114 self .buffer_size = 2
111- self .log_level = params .get ('level' ) or DEFAULT_KCL_LOG_LEVEL
112- # regular expression to filter the printed output
113- levels = OutputReaderThread .get_log_level_names (self .log_level )
114- self .filter_regex = r'.*(%s):.*' % ('|' .join (levels ))
115+ # determine log level
116+ self .log_level = params .get ('level' )
117+ if self .log_level is None :
118+ self .log_level = DEFAULT_KCL_LOG_LEVEL
119+ if self .log_level > 0 :
120+ levels = OutputReaderThread .get_log_level_names (self .log_level )
121+ # regular expression to filter the printed output
122+ self .filter_regex = r'.*(%s):.*' % ('|' .join (levels ))
123+ # create prefix and logger
124+ self .prefix = params .get ('log_prefix' ) or 'LOG'
125+ self .logger = logging .getLogger (self .prefix )
126+ self .logger .severe = self .logger .critical
127+ self .logger .fatal = self .logger .critical
128+ self .logger .setLevel (self .log_level )
115129
116130 @classmethod
117131 def get_log_level_names (cls , min_level ):
118132 return [logging .getLevelName (l ) for l in LOG_LEVELS if l >= min_level ]
119133
120- @classmethod
121- def get_logger_for_level_in_log_line (cls , line , level ):
122- for level in LOG_LEVELS :
123- level_name = logging .getLevelName (level )
124- if re .match (r'.*(%s):.*' % level , line ):
125- return LOGGER .__dict__ [level .lower ()]
134+ def get_logger_for_level_in_log_line (self , line ):
135+ level = self .log_level
136+ for l in LOG_LEVELS :
137+ if l >= level :
138+ level_name = logging .getLevelName (l )
139+ if re .match (r'.*(%s):.*' % level_name , line ):
140+ return getattr (self .logger , level_name .lower ())
126141 return None
127142
128143 def start_reading (self , params ):
129144 for line in tail ("-n" , 0 , "-f" , params ['file' ], _iter = True ):
130145 if not self .running :
131146 return
132- line = line .replace ('\n ' , '' )
133- self .buffer .append (line )
134- if len (self .buffer ) >= self .buffer_size :
135- logger_func = None
136- for line in self .buffer :
137- if re .match (self .filter_regex , line ):
138- logger_func = OutputReaderThread .get_logger_for_level_in_log_line (line , self .log_level )
139- break
140- if logger_func :
141- for buffered_line in self .buffer :
142- logger_func ('%s%s' % (self .prefix , buffered_line ))
143- self .buffer = []
147+ if self .log_level > 0 :
148+ line = line .replace ('\n ' , '' )
149+ self .buffer .append (line )
150+ if len (self .buffer ) >= self .buffer_size :
151+ logger_func = None
152+ for line in self .buffer :
153+ if re .match (self .filter_regex , line ):
154+ logger_func = self .get_logger_for_level_in_log_line (line )
155+ break
156+ if logger_func :
157+ for buffered_line in self .buffer :
158+ logger_func (buffered_line )
159+ self .buffer = []
144160
145161 def stop (self , quiet = True ):
146162 self .running = False
@@ -239,14 +255,18 @@ def start_kcl_client_process(stream_name, listener_script, log_file=None, env=No
239255 'AWS_ACCESS_KEY_ID' , 'AWS_SECRET_ACCESS_KEY' , 'AWS_SESSION_TOKEN' ]:
240256 if var_name in os .environ and var_name not in env_vars :
241257 env_vars [var_name ] = os .environ [var_name ]
258+ if env .region == REGION_LOCAL :
259+ # need to disable CBOR protocol, enforce use of plain JSON,
260+ # see https://github.com/mhart/kinesalite/issues/31
261+ env_vars ['AWS_CBOR_DISABLE' ] = 'true'
242262 if kcl_log_level :
243263 if not log_file :
244264 log_file = LOG_FILE_PATTERN .replace ('*' , short_uid ())
245265 TMP_FILES .append (log_file )
246266 run ('touch %s' % log_file )
247267 # start log output reader thread which will read the KCL log
248268 # file and print each line to stdout of this process...
249- reader_thread = OutputReaderThread ({'file' : log_file , 'level' : kcl_log_level , 'log_prefix' : 'KCL: ' })
269+ reader_thread = OutputReaderThread ({'file' : log_file , 'level' : kcl_log_level , 'log_prefix' : 'KCL' })
250270 reader_thread .start ()
251271
252272 # construct stream info
0 commit comments