forked from bugy/script-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.py
More file actions
324 lines (238 loc) · 10.2 KB
/
Copy pathexecutor.py
File metadata and controls
324 lines (238 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import logging
import re
import sys
from execution import process_popen, process_base
from model import model_helper
from model.model_helper import read_bool
from utils import file_utils, process_utils, os_utils
from utils.transliteration import transliterate
TIME_BUFFER_MS = 100
LOGGER = logging.getLogger('script_server.ScriptExecutor')
mock_process = False
def create_process_wrapper(executor, command, working_directory, env_variables):
run_pty = executor.config.requires_terminal
if run_pty and not os_utils.is_pty_supported():
LOGGER.warning(
"Requested PTY mode, but it's not supported for this OS (" + sys.platform + '). Falling back to POpen')
run_pty = False
if run_pty:
from execution import process_pty
process_wrapper = process_pty.PtyProcessWrapper(command, working_directory, env_variables)
else:
process_wrapper = process_popen.POpenProcessWrapper(command, working_directory, env_variables)
return process_wrapper
_process_creator = create_process_wrapper
def _normalize_working_dir(working_directory):
if working_directory is None:
return None
return file_utils.normalize_path(working_directory)
def _wrap_values(user_values, parameters):
result = {}
for parameter in parameters:
name = parameter.name
if parameter.constant:
value = parameter.default
result[name] = _Value(None, value, value, parameter.value_to_str(value))
continue
if name in user_values:
user_value = user_values[name]
if parameter.no_value:
bool_value = model_helper.read_bool(user_value)
result[name] = _Value(user_value, bool_value, bool_value)
continue
elif user_value:
mapped_value = parameter.map_to_script(user_value)
script_arg = parameter.to_script_args(mapped_value)
secure_value = parameter.get_secured_value(script_arg)
result[name] = _Value(user_value, mapped_value, script_arg, secure_value)
else:
result[name] = _Value(None, None, None)
return result
class ScriptExecutor:
def __init__(self, config, parameter_values):
self.config = config
self._parameter_values = _wrap_values(parameter_values, config.parameters)
self._working_directory = _normalize_working_dir(config.working_directory)
self.script_base_command = process_utils.split_command(
self.config.script_command,
self._working_directory)
self.secure_replacements = self.__init_secure_replacements()
self.process_wrapper = None # type: process_base.ProcessWrapper
self.raw_output_stream = None
self.protected_output_stream = None
def start(self):
if self.process_wrapper is not None:
raise Exception('Executor already started')
parameter_values = self.get_script_parameter_values()
script_args = build_command_args(parameter_values, self.config)
command = self.script_base_command + script_args
env_variables = _build_env_variables(parameter_values, self.config.parameters)
process_wrapper = _process_creator(self, command, self._working_directory, env_variables)
process_wrapper.start()
self.process_wrapper = process_wrapper
output_stream = process_wrapper.output_stream.time_buffered(TIME_BUFFER_MS, _concat_output)
self.raw_output_stream = output_stream.replay()
if self.secure_replacements:
self.protected_output_stream = output_stream \
.map(self.__replace_secure_variables) \
.replay()
else:
self.protected_output_stream = self.raw_output_stream
def __init_secure_replacements(self):
word_replacements = {}
for parameter in self.config.parameters:
if not parameter.secure:
continue
value = self._parameter_values.get(parameter.name)
if value is None:
continue
mapped_value = value.mapped_script_value
if model_helper.is_empty(mapped_value):
continue
if isinstance(mapped_value, list):
elements = mapped_value
else:
elements = [mapped_value]
for value_element in elements:
element_string = str(value_element)
if not element_string.strip():
continue
value_pattern = '((?<!\w)|^)' + re.escape(element_string) + '((?!\w)|$)'
word_replacements[value_pattern] = model_helper.SECURE_MASK
return word_replacements
def __replace_secure_variables(self, output):
result = output
replacements = self.secure_replacements
if replacements:
for word, replacement in replacements.items():
result = re.sub(word, replacement, result)
return result
def get_secure_command(self):
audit_script_args = build_command_args(
{name: v.get_secure_value() for name, v in self._parameter_values.items()},
self.config)
audit_script_args = [str(a) for a in audit_script_args]
command = self.script_base_command + audit_script_args
return ' '.join(command)
def get_anonymized_output_stream(self):
return self.protected_output_stream
def get_raw_output_stream(self):
return self.raw_output_stream
def get_process_id(self):
return self.process_wrapper.get_process_id()
def get_return_code(self):
return self.process_wrapper.get_return_code()
def is_finished(self):
return self.process_wrapper.is_finished()
def add_finish_listener(self, listener):
self.process_wrapper.add_finish_listener(listener)
def write_to_input(self, text):
if self.process_wrapper.is_finished():
LOGGER.warning('process already finished, ignoring input')
return
self.process_wrapper.write_to_input(text)
def get_user_parameter_values(self):
return {name: value.user_value
for name, value in self._parameter_values.items()
if value.user_value is not None}
def get_script_parameter_values(self):
return {name: value.script_arg for name, value in self._parameter_values.items()}
def kill(self):
if not self.process_wrapper.is_finished():
self.process_wrapper.kill()
def stop(self):
if not self.process_wrapper.is_finished():
self.process_wrapper.stop()
def cleanup(self):
self.raw_output_stream.dispose()
self.protected_output_stream.dispose()
self.process_wrapper.cleanup()
def build_command_args(param_values, config):
result = []
for parameter in config.parameters:
name = parameter.name
option_name = parameter.param
if parameter.param is None:
option_name = ''
if name in param_values:
value = param_values[name]
if parameter.no_value:
if value is True and option_name:
result.append(option_name)
elif value:
def add_param_and_value(parameter_sh, value_sh):
if parameter_sh.param:
result.append(parameter_sh.param)
if isinstance(value_sh, list):
result.extend(value_sh)
else:
result.append(value_sh)
if isinstance(value, list):
if parameter.same_arg_param:
for val in value:
if parameter.repeat_param:
add_param_and_value(parameter, val)
else:
result.append(option_name + str(val))
else:
if parameter.repeat_param:
add_param_and_value(parameter, value)
else:
result.append(option_name + str(value[0]))
if len(value) > 1:
result.extend(value[1:])
else:
if parameter.repeat_param:
add_param_and_value(parameter, value)
else:
result.append(option_name + str(value))
return result
def _to_env_name(key):
transliterated = transliterate(key)
replaced = re.sub('[^0-9a-zA-Z_]+', '_', transliterated)
if replaced == '_':
return None
return 'PARAM_' + replaced.upper()
def _build_env_variables(parameter_values, parameters):
result = {}
excluded = []
for param_name, value in parameter_values.items():
if isinstance(value, list) or (value is None):
continue
found_parameters = [p for p in parameters if p.name == param_name]
if len(found_parameters) != 1:
continue
parameter = found_parameters[0]
env_var = parameter.env_var
if env_var is None:
env_var = _to_env_name(param_name)
if (not env_var) or (env_var in excluded):
continue
if env_var in result:
excluded.append(env_var)
del result[env_var]
continue
if parameter.no_value:
if (value is not None) and (read_bool(value) == True):
result[env_var] = 'true'
continue
result[env_var] = str(value)
return result
def _concat_output(output_chunks):
if not output_chunks:
return output_chunks
return [''.join(output_chunks)]
class _Value:
def __init__(self, user_value, mapped_script_value, script_arg, secure_value=None):
self.user_value = user_value
self.mapped_script_value = mapped_script_value
self.script_arg = script_arg
self.secure_value = secure_value
def get_secure_value(self):
if self.secure_value is not None:
return self.secure_value
return self.script_arg
def __str__(self) -> str:
if self.secure_value is not None:
return str(self.secure_value)
return str(self.script_arg)