forked from hugapi/hug
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathoutput_format.py
More file actions
351 lines (276 loc) · 12.8 KB
/
Copy pathoutput_format.py
File metadata and controls
351 lines (276 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""hug/output_format.py
Defines Hug's built-in output formatting methods
Copyright (C) 2016 Timothy Edmund Crosley
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
"""
from __future__ import absolute_import
import base64
import json as json_converter
import mimetypes
import os
import re
import tempfile
from datetime import date, datetime
from decimal import Decimal
from functools import wraps
from io import BytesIO
from operator import itemgetter
import falcon
from falcon import HTTP_NOT_FOUND
from hug import introspect
from hug.format import camelcase, content_type
IMAGE_TYPES = ('png', 'jpg', 'bmp', 'eps', 'gif', 'im', 'jpeg', 'msp', 'pcx', 'ppm', 'spider', 'tiff', 'webp', 'xbm',
'cur', 'dcx', 'fli', 'flc', 'gbr', 'gd', 'ico', 'icns', 'imt', 'iptc', 'naa', 'mcidas', 'mpo', 'pcd',
'psd', 'sgi', 'tga', 'wal', 'xpm', 'svg', 'svg+xml')
VIDEO_TYPES = (('flv', 'video/x-flv'), ('mp4', 'video/mp4'), ('m3u8', 'application/x-mpegURL'), ('ts', 'video/MP2T'),
('3gp', 'video/3gpp'), ('mov', 'video/quicktime'), ('avi', 'video/x-msvideo'), ('wmv', 'video/x-ms-wmv'))
RE_ACCEPT_QUALITY = re.compile("q=(?P<quality>[^;]+)")
json_converters = {}
stream = tempfile.NamedTemporaryFile if 'UWSGI_ORIGINAL_PROC_NAME' in os.environ else BytesIO
def _json_converter(item):
if hasattr(item, '__native_types__'):
return item.__native_types__()
for kind, transformer in json_converters.items():
if isinstance(item, kind):
return transformer(item)
if isinstance(item, (date, datetime)):
return item.isoformat()
elif isinstance(item, bytes):
try:
return item.decode('utf8')
except UnicodeDecodeError:
return base64.b64encode(item)
elif hasattr(item, '__iter__'):
return list(item)
elif isinstance(item, Decimal):
return str(item)
raise TypeError("Type not serializable")
def json_convert(*kinds):
"""Registers the wrapped method as a JSON converter for the provided types.
NOTE: custom converters are always globally applied
"""
def register_json_converter(function):
for kind in kinds:
json_converters[kind] = function
return function
return register_json_converter
@content_type('application/json')
def json(content, **kwargs):
"""JSON (Javascript Serialized Object Notation)"""
if hasattr(content, 'read'):
return content
if isinstance(content, tuple) and getattr(content, '_fields', None):
content = {field: getattr(content, field) for field in content._fields}
return json_converter.dumps(content, default=_json_converter, **kwargs).encode('utf8')
def on_valid(valid_content_type, on_invalid=json):
"""Renders as the specified content type only if no errors are found in the provided data object"""
invalid_kwargs = introspect.generate_accepted_kwargs(on_invalid, 'request', 'response')
invalid_takes_response = introspect.takes_all_arguments(on_invalid, 'response')
def wrapper(function):
valid_kwargs = introspect.generate_accepted_kwargs(function, 'request', 'response')
valid_takes_response = introspect.takes_all_arguments(function, 'response')
@content_type(valid_content_type)
@wraps(function)
def output_content(content, response, **kwargs):
if type(content) == dict and 'errors' in content:
response.content_type = on_invalid.content_type
if invalid_takes_response:
kwargs['response'] = response
return on_invalid(content, **invalid_kwargs(kwargs))
if valid_takes_response:
kwargs['response'] = response
return function(content, **valid_kwargs(kwargs))
return output_content
return wrapper
@content_type('text/plain')
def text(content):
"""Free form UTF-8 text"""
if hasattr(content, 'read'):
return content
return str(content).encode('utf8')
@content_type('text/html')
def html(content):
"""HTML (Hypertext Markup Language)"""
if hasattr(content, 'read'):
return content
elif hasattr(content, 'render'):
return content.render().encode('utf8')
return str(content).encode('utf8')
def _camelcase(dictionary):
if not isinstance(dictionary, dict):
return dictionary
new_dictionary = {}
for key, value in dictionary.items():
if isinstance(key, str):
key = camelcase(key)
new_dictionary[key] = _camelcase(value)
return new_dictionary
@content_type('application/json')
def json_camelcase(content):
"""JSON (Javascript Serialized Object Notation) with all keys camelCased"""
return json(_camelcase(content))
@content_type('application/json')
def pretty_json(content):
"""JSON (Javascript Serialized Object Notion) pretty printed and indented"""
return json(content, indent=4, separators=(',', ': '))
def image(image_format, doc=None):
"""Dynamically creates an image type handler for the specified image type"""
@on_valid('image/{0}'.format(image_format))
def image_handler(data):
if hasattr(data, 'read'):
return data
elif hasattr(data, 'save'):
output = stream()
if introspect.takes_all_arguments(data.save, 'format') or introspect.takes_kwargs(data.save):
data.save(output, format=image_format.upper())
else:
data.save(output)
output.seek(0)
return output
elif hasattr(data, 'render'):
return data.render()
elif os.path.isfile(data):
return open(data, 'rb')
image_handler.__doc__ = doc or "{0} formatted image".format(image_format)
return image_handler
for image_type in IMAGE_TYPES:
globals()['{0}_image'.format(image_type.replace("+", "_"))] = image(image_type)
def video(video_type, video_mime, doc=None):
"""Dynamically creates a video type handler for the specified video type"""
@on_valid(video_mime)
def video_handler(data):
if hasattr(data, 'read'):
return data
elif hasattr(data, 'save'):
output = stream()
data.save(output, format=video_type.upper())
output.seek(0)
return output
elif hasattr(data, 'render'):
return data.render()
elif os.path.isfile(data):
return open(data, 'rb')
video_handler.__doc__ = doc or "{0} formatted video".format(video_type)
return video_handler
for (video_type, video_mime) in VIDEO_TYPES:
globals()['{0}_video'.format(video_type)] = video(video_type, video_mime)
@on_valid('file/dynamic')
def file(data, response):
"""A dynamically retrieved file"""
if hasattr(data, 'read'):
name, data = getattr(data, 'name', ''), data
elif os.path.isfile(data):
name, data = data, open(data, 'rb')
else:
response.content_type = 'text/plain'
response.status = HTTP_NOT_FOUND
return 'File not found!'
response.content_type = mimetypes.guess_type(name, None)[0] or 'application/octet-stream'
return data
def on_content_type(handlers, default=None, error='The requested content type does not match any of those allowed'):
"""Returns a content in a different format based on the clients provided content type,
should pass in a dict with the following format:
{'[content-type]': action,
...
}
"""
def output_type(data, request, response):
handler = handlers.get(request.content_type.split(';')[0], default)
if not handler:
raise falcon.HTTPNotAcceptable(error)
response.content_type = handler.content_type
return handler(data)
output_type.__doc__ = 'Supports any of the following formats: {0}'.format(', '.join(function.__doc__ for function in
handlers.values()))
output_type.content_type = ', '.join(handlers.keys())
return output_type
def accept_quality(accept, default=1):
"""Separates out the quality score from the accepted content_type"""
quality = default
if accept and ";" in accept:
accept, rest = accept.split(";", 1)
accept_quality = RE_ACCEPT_QUALITY.search(rest)
if accept_quality:
quality = float(accept_quality.groupdict().get('quality', quality).strip())
return (quality, accept.strip())
def accept(handlers, default=None, error='The requested content type does not match any of those allowed'):
"""Returns a content in a different format based on the clients defined accepted content type,
should pass in a dict with the following format:
{'[content-type]': action,
...
}
"""
def output_type(data, request, response):
accept = request.accept
if accept in ('', '*', '/'):
handler = default or handlers and next(iter(handlers.values()))
else:
handler = default
accepted = [accept_quality(accept_type) for accept_type in accept.split(',')]
accepted.sort(key=itemgetter(0))
for quality, accepted_content_type in reversed(accepted):
if accepted_content_type in handlers:
handler = handlers[accepted_content_type]
break
if not handler:
raise falcon.HTTPNotAcceptable(error)
response.content_type = handler.content_type
return handler(data)
output_type.__doc__ = 'Supports any of the following formats: {0}'.format(', '.join(function.__doc__ for function in
handlers.values()))
output_type.content_type = ', '.join(handlers.keys())
return output_type
def suffix(handlers, default=None, error='The requested suffix does not match any of those allowed'):
"""Returns a content in a different format based on the suffix placed at the end of the URL route
should pass in a dict with the following format:
{'[suffix]': action,
...
}
"""
def output_type(data, request, response):
path = request.path
handler = default
for suffix_test, suffix_handler in handlers.items():
if path.endswith(suffix_test):
handler = suffix_handler
break
if not handler:
raise falcon.HTTPNotAcceptable(error)
response.content_type = handler.content_type
return handler(data)
output_type.__doc__ = 'Supports any of the following formats: {0}'.format(', '.join(function.__doc__ for function in
handlers.values()))
output_type.content_type = ', '.join(handlers.keys())
return output_type
def prefix(handlers, default=None, error='The requested prefix does not match any of those allowed'):
"""Returns a content in a different format based on the prefix placed at the end of the URL route
should pass in a dict with the following format:
{'[prefix]': action,
...
}
"""
def output_type(data, request, response):
path = request.path
handler = default
for prefix_test, prefix_handler in handlers.items():
if path.startswith(prefix_test):
handler = prefix_handler
break
if not handler:
raise falcon.HTTPNotAcceptable(error)
response.content_type = handler.content_type
return handler(data)
output_type.__doc__ = 'Supports any of the following formats: {0}'.format(', '.join(function.__doc__ for function in
handlers.values()))
output_type.content_type = ', '.join(handlers.keys())
return output_type