This repository was archived by the owner on Aug 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy path__init__.py
More file actions
231 lines (188 loc) · 7.11 KB
/
__init__.py
File metadata and controls
231 lines (188 loc) · 7.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from collections import OrderedDict
from coreapi.codecs.base import BaseCodec
from coreapi.compat import force_bytes, string_types, urlparse
from coreapi.compat import COMPACT_SEPARATORS, VERBOSE_SEPARATORS
from coreapi.document import Document, Link, Array, Object, Field, Error
from coreapi.exceptions import ParseError
import json
import uritemplate
__version__ = "1.0.2"
def _get_string(item, key):
value = item.get(key)
if isinstance(value, string_types):
return value
return ''
def _get_dict(item, key):
value = item.get(key)
if isinstance(value, dict):
return value
return {}
def _get_bool(item, key):
value = item.get(key)
if isinstance(value, bool):
return value
return False
def _is_array_containing_instance(value, datatype):
return isinstance(value, Array) and value and isinstance(value[0], datatype)
def _is_map_containing_instance(value, datatype):
return isinstance(value, Object) and value and isinstance(value[0], datatype)
def _document_to_primative(node, base_url=None):
if isinstance(node, Document):
url = urlparse.urljoin(base_url, node.url)
links = OrderedDict()
embedded = OrderedDict()
data = OrderedDict()
self_link = OrderedDict()
self_link['href'] = url
if node.title:
self_link['title'] = node.title
links['self'] = self_link
for key, value in node.items():
if isinstance(value, Link):
links[key] = _document_to_primative(value, base_url=url)
elif _is_array_containing_instance(value, Link):
links[key] = [
_document_to_primative(item, base_url=url)
for item in value
if isinstance(item, Link)
]
elif _is_map_containing_instance(value, Link):
links[key] = {
key: _document_to_primative(val, base_url=url)
for key, val in value.items()
if isinstance(val, Link)
}
elif isinstance(value, Document):
embedded[key] = _document_to_primative(value, base_url=url)
elif _is_array_containing_instance(value, Document):
embedded[key] = [
_document_to_primative(item, base_url=url)
for item in value
if isinstance(item, Document)
]
elif key not in ('_links', '_embedded'):
data[key] = _document_to_primative(value)
ret = OrderedDict()
ret['_links'] = links
ret.update(data)
if embedded:
ret['_embedded'] = embedded
return ret
elif isinstance(node, Link):
url = urlparse.urljoin(base_url, node.url)
ret = OrderedDict()
ret['href'] = url
templated = [field.name for field in node.fields if field.location == 'path']
if templated:
ret['href'] += "{?%s}" % ','.join([name for name in templated])
ret['templated'] = True
return ret
elif isinstance(node, Array):
return [
_document_to_primative(item) for item in node
]
elif isinstance(node, Object):
return OrderedDict([
(key, _document_to_primative(value)) for key, value in node.items()
if not isinstance(value, (Document, Link))
])
elif isinstance(node, Error):
return OrderedDict([
(key, _document_to_primative(value)) for key, value in node.items()
])
return node
def _parse_link(data, base_url=None):
url = _get_string(data, 'href')
url = urlparse.urljoin(base_url, url)
if _get_bool(data, 'templated'):
fields = [
Field(name, location='path') for name in uritemplate.variables(url)
]
else:
fields = None
return Link(url=url, fields=fields)
def _map_to_coreapi_key(key):
# HAL uses 'rel' values to index links and nested resources.
if key.startswith('http://') or key.startswith('https://'):
# Fully qualified URL - just use last portion of the path.
return urlparse.urlsplit(key).path.split('/')[-1]
elif ':' in key:
# A curried 'rel' value. Use the named portion.
return key.split(':', 1)[1]
# A reserved 'rel' value, such as "next".
return key
def _parse_document(data, base_url=None):
links = _get_dict(data, '_links')
embedded = _get_dict(data, '_embedded')
self = _get_dict(links, 'self')
url = _get_string(self, 'href')
url = urlparse.urljoin(base_url, url)
title = _get_string(self, 'title')
content = {}
for key, value in links.items():
if key in ('self', 'curies'):
continue
key = _map_to_coreapi_key(key)
if isinstance(value, list):
if value and 'name' in value[0]:
# Lists of named links should be represented inside a map.
content[key] = {
item['name']: _parse_link(item, base_url)
for item in value
if 'name' in item
}
else:
# Lists of named links should be represented inside a list.
content[key] = [
_parse_link(item, base_url)
for item in value
]
elif isinstance(value, dict):
# Single link instance.
content[key] = _parse_link(value, base_url)
# Embedded resources.
for key, value in embedded.items():
key = _map_to_coreapi_key(key)
if isinstance(value, list):
content[key] = [_parse_document(item, base_url=url) for item in value]
elif isinstance(value, dict):
content[key] = _parse_document(value, base_url=url)
# Data.
for key, value in data.items():
if key not in ('_embedded', '_links'):
content[key] = value
return Document(url=url, title=title, content=content)
class HALCodec(BaseCodec):
media_type = "application/hal+json"
supports = ['encoding', 'decoding']
def dump(self, document, indent=False, **kwargs):
"""
Takes a document and returns a bytestring.
"""
if indent:
options = {
'ensure_ascii': False,
'indent': 4,
'separators': VERBOSE_SEPARATORS
}
else:
options = {
'ensure_ascii': False,
'indent': None,
'separators': COMPACT_SEPARATORS
}
data = _document_to_primative(document)
return force_bytes(json.dumps(data, **options))
def load(self, bytes, **kwargs):
"""
Takes a bytestring and returns a document.
"""
base_url = kwargs.get('base_url', None)
try:
data = json.loads(bytes.decode('utf-8'))
except ValueError as exc:
raise ParseError('Malformed JSON. %s' % exc)
doc = _parse_document(data, base_url)
if not isinstance(doc, Document):
raise ParseError('Top level node must be a document.')
return doc