forked from DonJayamanne/pythonVSCode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
350 lines (293 loc) · 12.6 KB
/
helpers.py
File metadata and controls
350 lines (293 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import contextlib
import io
import json
import os
import pathlib
import socket
import subprocess
import sys
import tempfile
import threading
from typing import Any, Dict, List, Optional, Tuple
import uuid
if sys.platform == "win32":
from namedpipe import NPopen
script_dir = pathlib.Path(__file__).parent.parent.parent
script_dir_child = pathlib.Path(__file__).parent.parent
sys.path.append(os.fspath(script_dir))
sys.path.append(os.fspath(script_dir_child))
sys.path.append(os.fspath(script_dir / "lib" / "python"))
print("sys add path", script_dir)
TEST_DATA_PATH = pathlib.Path(__file__).parent / ".data"
CONTENT_LENGTH: str = "Content-Length:"
CONTENT_TYPE: str = "Content-Type:"
@contextlib.contextmanager
def text_to_python_file(text_file_path: pathlib.Path):
"""Convert a text file to a python file and yield the python file path."""
python_file = None
try:
contents = text_file_path.read_text(encoding="utf-8")
python_file = text_file_path.with_suffix(".py")
python_file.write_text(contents, encoding="utf-8")
yield python_file
finally:
if python_file:
os.unlink(os.fspath(python_file))
@contextlib.contextmanager
def create_symlink(root: pathlib.Path, target_ext: str, destination_ext: str):
destination = None
try:
destination = root / destination_ext
target = root / target_ext
if destination and destination.exists():
print("destination already exists", destination)
try:
destination.symlink_to(target)
except Exception as e:
print("error occurred when attempting to create a symlink", e)
yield target, destination
finally:
if destination and destination.exists():
destination.unlink()
print("destination unlinked", destination)
def process_data_received(data: str) -> List[Dict[str, Any]]:
"""Process the all JSON data which comes from the server. After listen is finished, this function will be called.
Here the data must be split into individual JSON messages and then parsed.
This function also:
- Checks that the jsonrpc value is 2.0
- Checks that the last JSON message contains the `eot` token.
"""
json_messages = []
remaining = data
while remaining:
json_data, remaining = parse_rpc_message(remaining)
# here json_data is a single rpc payload, now check its jsonrpc 2 and save the param data
if "params" not in json_data or "jsonrpc" not in json_data:
raise ValueError("Invalid JSON-RPC message received, missing params or jsonrpc key")
elif json_data["jsonrpc"] != "2.0":
raise ValueError("Invalid JSON-RPC version received, not version 2.0")
else:
json_messages.append(json_data["params"])
last_json = json_messages.pop(-1)
if "eot" not in last_json:
raise ValueError("Last JSON messages does not contain 'eot' as its last payload.")
return json_messages # return the list of json messages, only the params part without the EOT token
def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]:
"""Process the JSON data which comes from the server.
A single rpc payload is in the format:
content-length: #LEN# \r\ncontent-type: application/json\r\n\r\n{"jsonrpc": "2.0", "params": ENTIRE_DATA}
with EOT params: "params": {"command_type": "discovery", "eot": true}
returns:
json_data: A single rpc payload of JSON data from the server.
remaining: The remaining data after the JSON data."""
str_stream: io.StringIO = io.StringIO(data)
length: int = 0
while True:
line: str = str_stream.readline()
if CONTENT_LENGTH.lower() in line.lower():
length = int(line[len(CONTENT_LENGTH) :])
line: str = str_stream.readline()
if CONTENT_TYPE.lower() not in line.lower():
raise ValueError("Header does not contain Content-Type")
line = str_stream.readline()
if line not in ["\r\n", "\n"]:
raise ValueError("Header does not contain space to separate header and body")
# if it passes all these checks then it has the right headers
break
if not line or line.isspace():
raise ValueError("Header does not contain Content-Length")
while True: # keep reading until the number of bytes is the CONTENT_LENGTH
line: str = str_stream.readline(length)
try:
# try to parse the json, if successful it is single payload so return with remaining data
json_data: dict[str, str] = json.loads(line)
return json_data, str_stream.read()
except json.JSONDecodeError:
print("json decode error")
def _listen_on_pipe_new(listener, result: List[str], completed: threading.Event):
"""Listen on the named pipe or Unix domain socket for JSON data from the server.
Created as a separate function for clarity in threading context.
"""
# Windows design
if sys.platform == "win32":
all_data: list = []
stream = listener.wait()
while True:
# Read data from collection
close = stream.closed
if close:
break
data = stream.readlines()
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
print("attempt another connection")
except socket.timeout:
# On timeout, append all collected data to result and return
# result.append("".join(all_data))
return
data_decoded = "".join(data)
all_data.append(data_decoded)
# Append all collected data to result array
result.append("".join(all_data))
else: # Unix design
connection, _ = listener.socket.accept()
listener.socket.settimeout(1)
all_data: list = []
while True:
# Reading from connection
data: bytes = connection.recv(1024 * 1024)
if not data:
if completed.is_set():
break # Exit loop if completed event is set
else:
try:
# Attempt to accept another connection if the current one closes unexpectedly
connection, _ = listener.socket.accept()
except socket.timeout:
# On timeout, append all collected data to result and return
result.append("".join(all_data))
return
all_data.append(data.decode("utf-8"))
# Append all collected data to result array
result.append("".join(all_data))
def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event):
result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd)
completed.set()
return result
def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]:
"""Run the pytest discovery and return the JSON data from the server."""
print("\n Running python test subprocess with cwd set to: ", TEST_DATA_PATH)
return runner_with_cwd(args, TEST_DATA_PATH)
def runner_with_cwd(args: List[str], path: pathlib.Path) -> Optional[List[Dict[str, Any]]]:
"""Run the pytest discovery and return the JSON data from the server."""
process_args: List[str] = [
sys.executable,
"-m",
"pytest",
"-p",
"vscode_pytest",
"-s",
] + args
# Generate pipe name, pipe name specific per OS type.
pipe_name = generate_random_pipe_name("pytest-discovery-test")
# Windows design
if sys.platform == "win32":
with NPopen("r+t", name=pipe_name, bufsize=0) as pipe:
# Update the environment with the pipe name and PYTHONPATH.
env = os.environ.copy()
env.update(
{
"TEST_RUN_PIPE": pipe.path,
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)
completed = threading.Event()
result = [] # result is a string array to store the data during threading
t1: threading.Thread = threading.Thread(
target=_listen_on_pipe_new, args=(pipe, result, completed)
)
t1.start()
t2 = threading.Thread(
target=_run_test_code,
args=(process_args, env, path, completed),
)
t2.start()
t1.join()
t2.join()
return process_data_received(result[0]) if result else None
else: # Unix design
# Update the environment with the pipe name and PYTHONPATH.
env = os.environ.copy()
env.update(
{
"TEST_RUN_PIPE": pipe_name,
"PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent),
}
)
server = UnixPipeServer(pipe_name)
server.start()
completed = threading.Event()
result = [] # result is a string array to store the data during threading
t1: threading.Thread = threading.Thread(
target=_listen_on_pipe_new, args=(server, result, completed)
)
t1.start()
t2 = threading.Thread(
target=_run_test_code,
args=(process_args, env, path, completed),
)
t2.start()
t1.join()
t2.join()
return process_data_received(result[0]) if result else None
def find_test_line_number(test_name: str, test_file_path) -> str:
"""Function which finds the correct line number for a test by looking for the "test_marker--[test_name]" string.
The test_name is split on the "[" character to remove the parameterization information.
Args:
test_name: The name of the test to find the line number for, will be unique per file.
test_file_path: The path to the test file where the test is located.
"""
test_file_unique_id: str = "test_marker--" + test_name.split("[")[0]
with open(test_file_path) as f:
for i, line in enumerate(f):
if test_file_unique_id in line:
return str(i + 1)
error_str: str = f"Test {test_name!r} not found on any line in {test_file_path}"
raise ValueError(error_str)
def get_absolute_test_id(test_id: str, testPath: pathlib.Path) -> str:
"""Get the absolute test id by joining the testPath with the test_id."""
split_id = test_id.split("::")[1:]
absolute_test_id = "::".join([str(testPath), *split_id])
return absolute_test_id
def generate_random_pipe_name(prefix=""):
# Generate a random suffix using UUID4, ensuring uniqueness.
random_suffix = uuid.uuid4().hex[:10]
# Default prefix if not provided.
if not prefix:
prefix = "python-ext-rpc"
# For Windows, named pipes have a specific naming convention.
if sys.platform == "win32":
return f"\\\\.\\pipe\\{prefix}-{random_suffix}-sock"
# For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory.
xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR")
if xdg_runtime_dir:
return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}.sock")
else:
return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}.sock")
class UnixPipeServer:
def __init__(self, name):
self.name = name
self.is_windows = sys.platform == "win32"
if self.is_windows:
raise NotImplementedError(
"This class is only intended for Unix-like systems, not Windows."
)
else:
# For Unix-like systems, use a Unix domain socket.
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# Ensure the socket does not already exist
try:
os.unlink(self.name)
except OSError:
if os.path.exists(self.name):
raise
def start(self):
if self.is_windows:
raise NotImplementedError(
"This class is only intended for Unix-like systems, not Windows."
)
else:
# Bind the socket to the address and listen for incoming connections.
self.socket.bind(self.name)
self.socket.listen(1)
print(f"Server listening on {self.name}")
def stop(self):
# Clean up the server socket.
self.socket.close()
print("Server stopped.")