-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpypi_package_scan.py
More file actions
117 lines (90 loc) · 4.69 KB
/
pypi_package_scan.py
File metadata and controls
117 lines (90 loc) · 4.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
License GPLv3 or higher.
(C) 2025 Created by Maikel Mardjan - https://nocomplexity.com/
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Public API functions for Python Code Audit aka codeaudit on pypi.org
"""
import gzip
import json
import tarfile
import tempfile
import zlib
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from importlib.metadata import version
CA_VERSION = version("codeaudit")
NOCX_HEADERS = {
"user-agent": f"Python Code Audit /{CA_VERSION} (https://github.com/nocomplexity/codeaudit)",
"Accept": "text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8",
"Accept-Encoding": "gzip, deflate,br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}
def get_pypi_package_info(package_name):
"""JSON response, needed to get download URL of sdist"""
url = f"https://pypi.org/pypi/{package_name}/json"
try:
with urlopen(url) as response:
return json.load(response)
except (
HTTPError
): # When urlopen receives a 4xx (client error) or 5xx (server error) status code, it does not return the response object; instead, it immediately raises an exception called urllib.error.HTTPError. If a package is not found a 40x is send with json response {"message": "Not Found"}, I keep handling errors simple
return False # No package with this name found on pypi.org!
except URLError as e:
print(f"Network error: {e}")
return None
def get_pypi_download_info(package_name):
"""Retrieves the sdist download URL
Using the PyPI JSON API to get the sdist download URL (https://docs.pypi.org/api/json/)
Note JSON API result is a nested dict with all release info published, so finding the correct sdist download URL needs logic.
"""
data = get_pypi_package_info(package_name)
if not data:
return False
# Get the official "latest" version string from the API metadata
latest_version = data.get("info", {}).get("version")
if not latest_version:
return False
# Access the files associated with that specific version
releases_list = data.get("releases", {}).get(latest_version, [])
sdist_download_url = None
# Explicitly look for the source distribution (sdist)
for file_info in releases_list:
if file_info.get("packagetype") == "sdist":
url = file_info.get("url")
if url and url.endswith(
".tar.gz"
): # PEP527 I only extract .tar.gz files, older source formats not supported.
sdist_download_url = url
break # Found it, stop looking
return {"download_url": sdist_download_url, "release": latest_version}
def get_package_source(url, nocxheaders=NOCX_HEADERS, nocxtimeout=10):
"""Retrieves a package source and extract so SAST scanning can be applied
Make sure to cleanup the temporary dir!! Using e.g. `tmp_handle.cleanup()` # deletes everything
"""
try:
request = Request(url, headers=nocxheaders or {})
with urlopen(request, timeout=nocxtimeout) as response:
content = response.read()
content_encoding = response.headers.get("Content-Encoding")
if content_encoding == "gzip":
content = gzip.decompress(content)
elif content_encoding == "deflate":
content = zlib.decompress(content, -zlib.MAX_WBITS)
elif content_encoding not in [None]:
raise ValueError(f"Unexpected content encoding: {content_encoding}")
# This directory will auto-delete when the context block exits
tmpdir_obj = tempfile.TemporaryDirectory(prefix="codeaudit_")
temp_dir = tmpdir_obj.name
tar_path = f"{temp_dir}/package.tar.gz"
with open(tar_path, "wb") as f:
f.write(content)
with tarfile.open(tar_path, "r:gz") as tar:
tar.extractall(
path=temp_dir, filter="data"
) # nosec Possible risks are mitigated as far as possible, see architecture notes.
return temp_dir, tmpdir_obj # return both so caller controls lifetime
except Exception as e:
print(e)