forked from Ekopalypse/NppPythonScripts
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathio_handler.py
More file actions
325 lines (269 loc) · 10.9 KB
/
io_handler.py
File metadata and controls
325 lines (269 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
'''
Responsible for starting, stoping and communicating with the respective LSP servers
by monitoring the input/output channels
Sends an NEW_LSP_MESSAGE_RECEIVED event when receiving new messages from LSP servers
'''
import os
import threading
import subprocess
import queue
import socket
import select
import time
import logging
log = logging.info
class TCP_OBJECT:
def __init__(self, proc_config):
self.config = proc_config
def start(self):
''' start_process '''
def _start_tcp_client(self, port, max_tcp_retry=0, ip=None):
''' _start_tcp_client '''
_socket = None
if ip is None:
ip = 'localhost'
while max_tcp_retry >= 0:
try:
_socket = socket.create_connection((ip, port))
max_tcp_retry = -1
except Exception as e: # pylint: disable=W0703
log(f'{e}')
time.sleep(1)
max_tcp_retry -= 1
return _socket
executable = self.config['executable']
args = self.config.get('args', None)
if args is None:
args = [executable]
else:
args.insert(0, executable)
max_tcp_retries = self.config['tcpretries']
port = self.config.get('port', 2087)
si = subprocess.STARTUPINFO()
si.dwFlags = subprocess.STARTF_USESTDHANDLES
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
process = None
_socket = None
try:
process = subprocess.Popen(args,
startupinfo=si,
cwd=executable.rpartition('\\')[0],
close_fds=False)
if process:
_socket = self._start_tcp_client(port, max_tcp_retries)
if _socket is None:
log('failed to establish a connection - going to stop lsp process')
process.kill()
process = None
except Exception as e: # pylint: disable=W0703
log(f'{e}')
process = None
_socket = None
return process, _socket
def stop(self):
raise NotImplementedError
def send_to(self, message):
raise NotImplementedError
def read_from(self):
raise NotImplementedError
class PIPE_OBJECT:
def __init__(self, proc_config):
log('PIPE_OBJECT')
self.config = proc_config
self.process = None
def start(self):
''' start_process '''
log(f'{self.config["executable"]}')
executable = self.config['executable']
args = self.config.get('args', None)
env = self.config.get('env', '')
if args is None:
args = [executable]
else:
args.insert(0, executable)
_env = os.environ.copy()
if env:
for var in env:
k, v = var.split('=', 1)
_env[k] = v
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
try:
self.process = subprocess.Popen(args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
startupinfo=si,
cwd=executable.rpartition('\\')[0],
close_fds=False,
env=_env)
log(f'{self.process.pid}')
except Exception as e: # pylint: disable=W0703
log(f'{e}')
self.process = None
return self, None
def stop(self):
raise NotImplementedError
def send_to(self, message):
log(f"{message.encode('UTF-8')}")
self.process.stdin.write(message.encode('UTF-8'))
self.process.stdin.flush()
def read_from(self):
raise NotImplementedError
class PROCESS_MONITOR(threading.Thread):
def __init__(self, queue_obj=None, com_obj=None, callback=None, ready_event=None):
log('PROCESS_MONITOR')
super(PROCESS_MONITOR, self).__init__()
self.keep_reading = True
self.queue = queue_obj
self.com_obj = com_obj
self.callback = callback
self.ready = ready_event
def enqueue_io_messsage(self, out):
''' enqueue_io_messsage '''
log(f'{out}')
self.ready.set()
while self.keep_reading:
for line in iter(out.readline, ''):
time.sleep(0.1)
if line == b'\r\n' or not line:
continue
log(f'{line=}')
parts = line.split(b'\r\n')
# if parts[0].find(b'Content-Length: ') > -1:
log(f'{parts[0]=}')
header_parts = parts[0].split()
log(f'{header_parts=}')
if len(header_parts) > 1:
expected_content_length = int(header_parts[1])
log(f'{expected_content_length=}')
content = out.read(expected_content_length)
log(f'{content=}')
while (start_json := content.find(b'{')) == -1:
log(f'{start_json=}')
content += out.read(expected_content_length)
while (content_length := len(content[start_json:])) != expected_content_length:
log(f'{content_length=}')
missing = expected_content_length - content_length
# log(f'{missing=}')
content += out.read(missing)
log(f'full: {content=}')
else:
log(f'Content header without length !! ???? {parts}')
break
log(f'callback: {content.decode()[-expected_content_length:]}')
self.callback(content.decode()[-expected_content_length:])
break
def enqueue_tcp_messsage(self, _socket, queue):
''' enqueue_output '''
self.ready.set()
BUFF_SIZE = 4096
fragments = []
try:
while _socket:
infds, outfds, errfds = select.select([_socket], [_socket], [], None)
if len(infds) != 0:
chunck = _socket.recv(BUFF_SIZE)
if b'Content-Length: ' in chunck:
if chunck.startswith(b'Content-Length: '):
if len(fragments) > 0:
self.callback(''.join(fragments))
fragments = []
fragments.append(chunck)
else:
split_position = chunck.find(b'Content-Length: ')
fragments.append(chunck[:split_position])
self.callback(''.join(fragments))
fragments = []
fragments.append(chunck[split_position:])
else:
fragments.append(chunck)
else:
if len(fragments) > 0:
msg = ''.join(fragments)
length = int(msg[16:msg.find('\r\n')].strip())
content = len(msg[msg.find('{'):])
if length == content:
self.callback(msg)
fragments = []
else:
time.sleep(.1)
except Exception as e: # pylint: disable=W0703
log(f'{e}')
finally:
log('going to close socket')
_socket.close()
def run(self):
log('process monitor started')
if isinstance(self.com_obj, PIPE_OBJECT):
self.enqueue_io_messsage(self.com_obj.process.stdout)
# elif isinstance(self.com_obj, socket._socketobject):
# self.enqueue_tcp_messsage(self.com_obj, self.queue)
else:
log(f'unknown object:{self.com_obj}')
class COMMUNICATION_MANAGER:
def __init__(self, lsp_server_configs, on_receive_callback):
log('communication manager')
self.available_servers = lsp_server_configs
self.running_servers = dict()
self.callback = on_receive_callback
self.com_obj = None
self.current_queue = None
self.max_queue_wait_time = 2.0
self.waiting_for_initialize_result = False
self.backlog = []
def start_process(self, proc_config):
''' start_process '''
log(f'{proc_config}')
if proc_config['pipe'] == 'io':
process, _socket = PIPE_OBJECT(proc_config).start()
else:
process, _socket = TCP_OBJECT(proc_config).start()
return process, _socket
def send(self, lspmessage):
''' Called by client on various notepad++ and scintilla events '''
if self.waiting_for_initialize_result:
self.backlog.append(lspmessage)
else:
self.com_obj.send_to(lspmessage)
def send_initialized(self, lspmessage):
''' Called by client after initialize result has been received '''
self.com_obj.send_to(lspmessage)
self.waiting_for_initialize_result = False
if self.backlog:
log(f'backlog message:{self.backlog}')
while self.backlog:
msg = self.backlog.pop(0)
self.com_obj.send_to(msg)
def already_initialized(self, language):
log(f'{language}')
if language in self.running_servers:
self.com_obj = self.running_servers[language][1]
self.current_queue = self.running_servers[language][0].queue
return True
else:
obj, _socket = self.start_process(self.available_servers[language])
if obj:
if _socket:
self.com_obj = _socket
else:
self.com_obj = obj
self.current_queue = queue.Queue()
ready = threading.Event()
process_monitor = PROCESS_MONITOR(self.current_queue, self.com_obj, self.callback, ready)
process_monitor.setDaemon(True)
start = time.time()
process_monitor.start()
ready.wait(2)
log(f'thread start took {time.time() - start}')
log(f'self.com_obj:{type(self.com_obj)}')
self.running_servers[language] = (process_monitor, self.com_obj)
# self.running_servers[f'{language}_intialized'] = False
return False
def stop_monitoring_thread(self, language):
log(f'{language}')
self.com_obj = language[1]
language[0].keep_reading = False
def running_monitoring_threads(self):
for language in self.running_servers.keys():
log(f'{language=}')
yield self.running_servers[language]