forked from bugy/script-server
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_base.py
More file actions
113 lines (83 loc) · 3.12 KB
/
Copy pathprocess_base.py
File metadata and controls
113 lines (83 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import abc
import os
import signal
import subprocess
import threading
import logging
from react.observable import ReplayObservable
from utils import os_utils
LOGGER = logging.getLogger('script_server.process_base')
class ProcessWrapper(metaclass=abc.ABCMeta):
def __init__(self, command, working_directory, env_variables):
self.process = None
self.working_directory = working_directory
self.command = command
self.env_variables = env_variables
self.finish_listeners = []
# output_stream is guaranteed to close not earlier than process exit
self.output_stream = ReplayObservable()
self.notify_finish_thread = None
def start(self):
self.start_execution(self.command, self.working_directory)
read_output_thread = threading.Thread(target=self.pipe_process_output)
read_output_thread.start()
self.notify_finish_thread = threading.Thread(target=self.notify_finished)
self.notify_finish_thread.start()
@abc.abstractmethod
def pipe_process_output(self):
pass
@abc.abstractmethod
def start_execution(self, command, working_directory):
pass
@abc.abstractmethod
def write_to_input(self, value):
pass
@abc.abstractmethod
def wait_finish(self):
pass
def get_process_id(self):
return self.process.pid
def is_finished(self):
return self.process.poll() is not None
def get_return_code(self):
return self.process.returncode
def _write_script_output(self, text):
self.output_stream.push(text)
def stop(self):
if not self.is_finished():
if not os_utils.is_win():
group_id = os.getpgid(self.get_process_id())
os.killpg(group_id, signal.SIGTERM)
class KillChildren(object):
def finished(self):
try:
os.killpg(group_id, signal.SIGKILL)
except ProcessLookupError:
# probably there are no children left
pass
self.add_finish_listener(KillChildren())
else:
self.process.terminate()
self._write_script_output('\n>> STOPPED BY USER\n')
def kill(self):
if not self.is_finished():
if not os_utils.is_win():
group_id = os.getpgid(self.get_process_id())
os.killpg(group_id, signal.SIGKILL)
self._write_script_output('\n>> KILLED\n')
else:
subprocess.Popen("taskkill /F /T /PID " + self.get_process_id())
def add_finish_listener(self, listener):
if self.is_finished():
listener.finished()
return
self.finish_listeners.append(listener)
def notify_finished(self):
self.wait_finish()
for listener in self.finish_listeners:
try:
listener.finished()
except:
LOGGER.exception('Failed to notify listener: ' + str(listener))
def cleanup(self):
self.output_stream.dispose()