-
Notifications
You must be signed in to change notification settings - Fork 6.4k
Expand file tree
/
Copy pathutils.py
More file actions
265 lines (221 loc) · 7.5 KB
/
utils.py
File metadata and controls
265 lines (221 loc) · 7.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import datetime
import re
import string
import ssl
from typing import Optional, MutableMapping, Tuple, Any, Union
from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
import logging
log = logging.getLogger(__name__)
_SIZE_RE = re.compile(r'^\s*(\d+)\s*([KMGTP]?)\s*([iI]?[bB])?\s*$')
def datetime_now() -> datetime.datetime:
"""
Return the current local date and time.
:return: Returns an aware datetime object of the current date
and time.
"""
return datetime.datetime.now(tz=datetime.timezone.utc)
def datetime_to_str(dt: datetime.datetime) -> str:
"""
Convert a datetime object into a ISO 8601 string, e.g.
'2019-04-24T17:06:53.039991Z'.
:param dt: The datetime object to process.
:return: Return a string representing the date in
ISO 8601 (timezone=UTC).
"""
return dt.astimezone(tz=datetime.timezone.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'
)
def str_to_datetime(string: str) -> datetime.datetime:
"""
Convert an ISO 8601 string into a datetime object.
The following formats are supported:
- 2020-03-03T09:21:43.636153304Z
- 2020-03-03T15:52:30.136257504-0600
- 2020-03-03T15:52:30.136257504
:param string: The string to parse.
:return: Returns an aware datetime object of the given date
and time string.
:raises: :exc:`~exceptions.ValueError` for an unknown
datetime string.
"""
fmts = [
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%f%z',
]
# In *all* cases, the 9 digit second precision is too much for
# Python's strptime. Shorten it to 6 digits.
p = re.compile(r'(\.[\d]{6})[\d]*')
string = p.sub(r'\1', string)
# Replace trailing Z with -0000, since (on Python 3.6.8) it
# won't parse.
if string and string[-1] == 'Z':
string = string[:-1] + '-0000'
for fmt in fmts:
try:
dt = datetime.datetime.strptime(string, fmt)
# Make sure the datetime object is aware (timezone is set).
# If not, then assume the time is in UTC.
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt
except ValueError:
pass
raise ValueError(
"Time data {} does not match one of the formats {}".format(
string, str(fmts)
)
)
def parse_timedelta(delta: str) -> Optional[datetime.timedelta]:
"""
Returns a timedelta object represents a duration, the difference
between two dates or times.
>>> parse_timedelta('foo')
>>> parse_timedelta('2d') == datetime.timedelta(days=2)
True
>>> parse_timedelta("4w") == datetime.timedelta(days=28)
True
>>> parse_timedelta("5s") == datetime.timedelta(seconds=5)
True
>>> parse_timedelta("-5s") == datetime.timedelta(days=-1, seconds=86395)
True
:param delta: The string to process, e.g. '2h', '10d', '30s'.
:return: The `datetime.timedelta` object or `None` in case of
a parsing error.
"""
parts = re.match(
r'(?P<seconds>-?\d+)s|'
r'(?P<minutes>-?\d+)m|'
r'(?P<hours>-?\d+)h|'
r'(?P<days>-?\d+)d|'
r'(?P<weeks>-?\d+)w$',
delta,
re.IGNORECASE,
)
if not parts:
return None
parts = parts.groupdict()
args = {name: int(param) for name, param in parts.items() if param}
return datetime.timedelta(**args)
def is_hex(s: str, strict: bool = True) -> bool:
"""Simple check that a string contains only hex chars"""
try:
int(s, 16)
except ValueError:
return False
# s is multiple chars, but we should catch a '+/-' prefix too.
if strict:
if s[0] not in string.hexdigits:
return False
return True
def http_req(
hostname: str = '',
port: str = '443',
method: Optional[str] = None,
headers: MutableMapping[str, str] = {},
data: Optional[str] = None,
endpoint: str = '/',
scheme: str = 'https',
ssl_verify: bool = False,
timeout: Optional[int] = None,
ssl_ctx: Optional[Any] = None,
) -> Tuple[Any, Any, Any]:
if not ssl_ctx:
ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
if not ssl_verify:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
else:
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
url: str = f'{scheme}://{hostname}:{port}{endpoint}'
_data = bytes(data, 'ascii') if data else None
_headers = headers
if data and not method:
method = 'POST'
if not _headers.get('Content-Type') and method in ['POST', 'PATCH']:
_headers['Content-Type'] = 'application/json'
try:
req = Request(url, _data, _headers, method=method)
with urlopen(req, context=ssl_ctx, timeout=timeout) as response:
response_str = response.read()
response_headers = response.headers
response_code = response.code
return response_headers, response_str.decode(), response_code
except (HTTPError, URLError) as e:
log.error(e)
# handle error here if needed
raise
_TRUE_VALS = {'y', 'yes', 't', 'true', 'on', '1'}
_FALSE_VALS = {'n', 'no', 'f', 'false', 'off', '0'}
def strtobool(value: str) -> bool:
"""Convert a string to a boolean value.
Based on a simlilar function once available at distutils.util.strtobool.
"""
if value.lower() in _TRUE_VALS:
return True
if value.lower() in _FALSE_VALS:
return False
raise ValueError(f'invalid truth value {value!r}')
def size_to_bytes(v: Union[str, int]) -> int:
if isinstance(v, int):
return v
if isinstance(v, str):
m = _SIZE_RE.match(v)
if not m:
raise ValueError(
f'invalid size "{v}" (examples: 10737418240, 10G, 512M)'
)
num = int(m.group(1))
unit = m.group(2) or ''
unit = unit.upper()
mult = {
'': 1,
'K': 1024,
'M': 1024**2,
'G': 1024**3,
'T': 1024**4,
'P': 1024**5,
}[unit]
return num * mult
raise ValueError(f'invalid size type {type(v)} (expected int or str)')
def bytes_to_human(num: float, mode: str = 'decimal') -> str:
"""Convert a bytes value into it's human-readable form.
:param num: number, in bytes, to convert
:param mode: Either decimal (default) or binary to determine divisor
:returns: string representing the bytes value in a more readable format
"""
unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
divisor = 1000.0
yotta = 'YB'
if mode == 'binary':
unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
divisor = 1024.0
yotta = 'YiB'
for unit in unit_list:
if abs(num) < divisor:
return '%3.1f%s' % (num, unit)
num /= divisor
return '%.1f%s' % (num, yotta)
def with_units_to_int(v: str) -> int:
if not v:
return 0
if v.endswith('iB'):
v = v[:-2]
bytes_mult = 1024
elif v.endswith('B'):
v = v[:-1]
bytes_mult = 1000
mult = 1
if v[-1].upper() == 'K':
mult = bytes_mult
v = v[:-1]
elif v[-1].upper() == 'M':
mult = bytes_mult * bytes_mult
v = v[:-1]
elif v[-1].upper() == 'G':
mult = bytes_mult * bytes_mult * bytes_mult
v = v[:-1]
elif v[-1].upper() == 'T':
mult = bytes_mult * bytes_mult * bytes_mult * bytes_mult
v = v[:-1]
return int(float(v) * mult)