-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy path__init__.py
More file actions
244 lines (194 loc) · 8.22 KB
/
__init__.py
File metadata and controls
244 lines (194 loc) · 8.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from collections.abc import Iterable, MutableMapping, Sequence
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from typing_extensions import Self
ENVIRON_HTTP_HEADER_FMT = 'http_{}'
STANDARD_HEADER = 'openstack-api-version'
class Version(collections.namedtuple('Version', 'major minor')):
"""A namedtuple containing major and minor values.
Since it is a tuple, it is automatically comparable.
"""
max_version: tuple[int, int]
min_version: tuple[int, int]
def __new__(cls, major: int, minor: int) -> 'Self':
"""Add min and max version attributes to the tuple."""
self = super().__new__(cls, major, minor)
self.max_version = (-1, 0)
self.min_version = (-1, 0)
return self
def __str__(self) -> str:
return f'{self.major}.{self.minor}'
def matches(
self,
min_version: tuple[int, int] | None = None,
max_version: tuple[int, int] | None = None,
) -> bool:
"""Is this version within min_version and max_version."""
# NOTE(cdent): min_version and max_version are expected
# to be set by the code that is creating the Version, if
# they are known.
if min_version is None:
min_version = self.min_version
if max_version is None:
max_version = self.max_version
return min_version <= self <= max_version
def get_version(
headers: Iterable[tuple[str, str]] | MutableMapping[str, str],
service_type: str,
legacy_headers: Iterable[str] | None = None,
) -> str | None:
"""Parse a microversion out of headers
If headers is not a dict we assume is an iterator of tuple-like headers,
which we will fold into a dict.
The flow is that we first look for the new standard singular header:
* ``openstack-api-version: <service> <version>``
If that's not present we fall back to the headers listed in
``legacy_headers``. These often look like this:
* ``openstack-<service>-api-version: <version>``
* ``openstack-<legacy>-api-version: <version>``
* ``x-openstack-<legacy>-api-version: <version>``
Folded headers are joined by ``,``.
:param headers: The headers of a request, dict or list
:param service_type: The service type being looked for in the headers
:param legacy_headers: Other headers to look at for a version
:returns: a version string or "latest"
:raises: ValueError
"""
folded_headers = fold_headers(headers)
version = check_standard_header(folded_headers, service_type)
if version:
return version
if legacy_headers:
version = check_legacy_headers(folded_headers, legacy_headers)
return version
return None
def check_legacy_headers(
headers: MutableMapping[str, str], legacy_headers: Iterable[str]
) -> str | None:
"""Gather values from old headers."""
for legacy_header in legacy_headers:
try:
value = _extract_header_value(headers, legacy_header.lower())
return value.split(',')[-1].strip()
except KeyError:
pass
return None
def check_standard_header(
headers: MutableMapping[str, str], service_type: str
) -> str | None:
"""Parse the standard header to get value for service."""
try:
header = _extract_header_value(headers, STANDARD_HEADER)
for header_value in reversed(header.split(',')):
try:
service, version = header_value.strip().split(None, 1)
if service.lower() == service_type.lower():
return version.strip()
except ValueError:
pass
except (KeyError, ValueError):
return None
return None
# we accept Any even though we know this will be a list of 2-item tuples or a
# dict, in order to avoid reworking logic
def fold_headers(headers: Any) -> MutableMapping[str, str]:
"""Turn a list of headers into a folded dict."""
# If it behaves like a dict, return it. Webob uses objects which
# are not dicts, but behave like them.
try:
return dict((k.lower(), v) for k, v in headers.items())
except AttributeError:
pass
header_dict = collections.defaultdict(list)
for header, value in headers:
header_dict[header.lower()].append(value.strip())
folded_headers = {}
for header, value in header_dict.items():
folded_headers[header] = ','.join(value)
return folded_headers
def headers_from_wsgi_environ(
environ: MutableMapping[str, str],
) -> dict[str, str]:
"""Extract all the HTTP_ keys and values from environ to a new dict.
Note that this does not change the keys in any way in the returned dict.
Nor is the incoming environ modified.
:param environ: A PEP 3333 compliant WSGI environ dict.
"""
return {key: environ[key] for key in environ if key.startswith('HTTP_')}
def _extract_header_value(
headers: MutableMapping[str, str], header_name: str
) -> str:
"""Get the value of a header.
The provided headers is a dict. If a key doesn't exist for ``header_name``,
try using the WSGI environ form of the name.
:raises: KeyError if neither key is found.
"""
try:
value = headers[header_name]
except KeyError:
wsgi_header_name = ENVIRON_HTTP_HEADER_FMT.format(
header_name.replace('-', '_')
)
value = headers[wsgi_header_name]
return value
def parse_version_string(version_string: str) -> Version:
"""Turn a version string into a Version
:param version_string: A string of two numerals, X.Y.
:returns: a Version
:raises: TypeError
"""
try:
# The combination of int and a limited split with the
# named tuple means that this incantation will raise
# ValueError, TypeError or AttributeError when the incoming
# data is poorly formed but will, however, naturally adapt to
# extraneous whitespace.
return Version(*(int(value) for value in version_string.split('.', 1)))
except (ValueError, TypeError, AttributeError) as exc:
raise TypeError(f'invalid version string: {version_string}; {exc}')
def extract_version(
headers: Iterable[tuple[str, str]] | MutableMapping[str, str],
service_type: str,
versions_list: Sequence[str],
) -> Version:
"""Extract the microversion from the headers.
There may be multiple headers and some which don't match our service.
If no version is found then the extracted version is the minimum available
version.
:param headers: Request headers as dict list or WSGI environ
:param service_type: The service type as a string
:param versions_list: List of all possible microversions as strings,
sorted from earliest to latest version.
:returns: a :class:`~Version` with the optional ``min_version`` and
``max_version`` attributes set.
:raises: ValueError
"""
found_version = get_version(headers, service_type=service_type)
min_version_string = versions_list[0]
max_version_string = versions_list[-1]
# If there was no version found in the headers, choose the minimum
# available version.
version_string = found_version or min_version_string
if version_string == 'latest':
version_string = max_version_string
request_version = parse_version_string(version_string)
request_version.max_version = parse_version_string(max_version_string)
request_version.min_version = parse_version_string(min_version_string)
# We need a version that is in versions_list. This gives us the option
# to administratively disable a version if we really need to.
if str(request_version) in versions_list:
return request_version
raise ValueError(f'Unacceptable version header: {version_string}')