-
Notifications
You must be signed in to change notification settings - Fork 45
Expand file tree
/
Copy pathwriter_text.py
More file actions
454 lines (363 loc) · 15.2 KB
/
writer_text.py
File metadata and controls
454 lines (363 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at:
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
# OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the
# License.
"""Implementations of Ion Text writers."""
import base64
import math
import re
from functools import partial
from datetime import datetime
from decimal import Decimal
from io import BytesIO
from amazon.ion.symbols import SymbolToken
from . import symbols
from .util import coroutine, unicode_iter
from .core import DataEvent, Transition, IonEventType, IonType, TIMESTAMP_PRECISION_FIELD, TimestampPrecision, \
_ZERO_DELTA, TIMESTAMP_FRACTION_PRECISION_FIELD, MICROSECOND_PRECISION, TIMESTAMP_FRACTIONAL_SECONDS_FIELD, \
Timestamp, DECIMAL_ZERO
from .writer import partial_transition, writer_trampoline, serialize_scalar, validate_scalar_value, \
illegal_state_null, NOOP_WRITER_EVENT
from .writer import WriteEventType
_IVM_WRITER_EVENT = DataEvent(WriteEventType.COMPLETE, symbols.TEXT_ION_1_0.encode())
_NULL_TYPE_NAMES = [
b'null',
b'null.bool',
b'null.int',
b'null.float',
b'null.decimal',
b'null.timestamp',
b'null.symbol',
b'null.string',
b'null.clob',
b'null.blob',
b'null.list',
b'null.sexp',
b'null.struct',
]
def _serialize_bool(ion_event):
if ion_event.value:
return b'true'
else:
return b'false'
def _serialize_scalar_from_string_representation_factory(type_name, types, str_func=str):
"""Builds functions that leverage Python ``str()`` or similar functionality.
Args:
type_name (str): The name of the Ion type.
types (Union[Sequence[type],type]): The Python types to validate for.
str_func (Optional[Callable]): The function to convert the value with, defaults to ``str``.
Returns:
function: The function for serializing scalars of a given type to Ion text bytes.
"""
def serialize(ion_event):
value = ion_event.value
validate_scalar_value(value, types)
# This assumes an encoding of UTF-8, which is the only one supported at time
# of writing (Dec 2022).
return str_func(value).encode("utf-8")
serialize.__name__ = '_serialize_' + type_name
return serialize
_serialize_int = _serialize_scalar_from_string_representation_factory(
'int', int
)
_EXPONENT_PAT = re.compile(r'[eE]')
# TODO Make this cleaner.
def _float_str(val):
if math.isnan(val):
return 'nan'
if math.isinf(val):
if val > 0:
return '+inf'
else:
return '-inf'
text = repr(val)
if _EXPONENT_PAT.search(text) is None:
text += 'e0'
return text
_serialize_float = _serialize_scalar_from_string_representation_factory(
'float', float,
str_func=_float_str
)
# TODO Make this cleaner.
def _decimal_str(val):
text = str(val)
new_text = _EXPONENT_PAT.sub('d', text)
if text == new_text and text.find('.') == -1:
new_text += 'd0'
return new_text
_serialize_decimal = _serialize_scalar_from_string_representation_factory(
'decimal', Decimal,
str_func=_decimal_str
)
def _bytes_utc_offset(dt):
offset = dt.utcoffset()
if offset is None:
return '-00:00'
elif offset == _ZERO_DELTA:
return 'Z'
offset_str = dt.strftime('%z')
offset_str = offset_str[:3] + ':' + offset_str[3:]
return offset_str
def _bytes_datetime(dt):
original_dt = dt
precision = getattr(original_dt, TIMESTAMP_PRECISION_FIELD, TimestampPrecision.SECOND)
if dt.year < 1900:
# In some Python interpreter versions, strftime inexplicably does not support pre-1900 years.
# This unfortunate ugliness compensates for that.
year = str(dt.year)
year = ('0' * (4 - len(year))) + year
dt = dt.replace(year=2008) # Note: this fake year must be a leap year.
else:
year = dt.strftime('%Y')
tz_string = year
if precision.includes_month:
tz_string += dt.strftime('-%m')
else:
return tz_string + 'T'
if precision.includes_day:
tz_string += dt.strftime('-%dT')
else:
return tz_string + 'T'
if precision.includes_minute:
tz_string += dt.strftime('%H:%M')
else:
return tz_string
if precision.includes_second:
tz_string += dt.strftime(':%S')
else:
return tz_string + _bytes_utc_offset(dt)
if isinstance(original_dt, Timestamp):
fractional_seconds = getattr(original_dt, TIMESTAMP_FRACTIONAL_SECONDS_FIELD, None)
if fractional_seconds is not None:
_, digits, exponent = fractional_seconds.as_tuple()
if not (fractional_seconds == DECIMAL_ZERO and exponent >= 0):
leading_zeroes = -exponent - len(digits)
tz_string += '.'
if leading_zeroes > 0:
tz_string += '0' * leading_zeroes
tz_string += ''.join(str(x) for x in digits)
else:
# This must be a normal datetime, which always has a range-validated microsecond value.
tz_string += '.' + dt.strftime('%f')
return tz_string + _bytes_utc_offset(dt)
_serialize_timestamp = _serialize_scalar_from_string_representation_factory(
'timestamp',
datetime,
str_func=_bytes_datetime
)
_PRINTABLE_ASCII_START = 0x20
_PRINTABLE_ASCII_END = 0x7E
def _is_printable_ascii(code_point):
return code_point >= _PRINTABLE_ASCII_START and code_point <= _PRINTABLE_ASCII_END
_SERIALIZE_COMMON_ESCAPE_MAP = {
b'\n'[0]: br'\n',
b'\r'[0]: br'\r',
b'\t'[0]: br'\t',
}
_2B_ESCAPE_MAX = 0xFF
_4B_ESCAPE_MAX = 0xFFFF
def _escape(code_point):
escape = _SERIALIZE_COMMON_ESCAPE_MAP.get(code_point, None)
if escape is not None:
return escape
if code_point <= _2B_ESCAPE_MAX:
return (u'\\x%02x' % code_point).encode()
if code_point <= _4B_ESCAPE_MAX:
return (u'\\u%04x' % code_point).encode()
return (u'\\U%08x' % code_point).encode()
def _bytes_text(code_point_iter, quote, prefix=b'', suffix=b''):
quote_code_point = None if len(quote) == 0 else quote[0]
with BytesIO() as buf:
buf.write(prefix)
buf.write(quote)
for code_point in code_point_iter:
if code_point == quote_code_point:
buf.write(b'\\' + quote)
elif code_point == b'\\'[0]:
buf.write(b'\\\\')
elif _is_printable_ascii(code_point):
buf.write(bytes((code_point,)))
else:
buf.write(_escape(code_point))
buf.write(quote)
buf.write(suffix)
return buf.getvalue()
_SINGLE_QUOTE = b"'"
_DOUBLE_QUOTE = b'"'
# all typed nulls (such as null.int) and the +inf, and -inf keywords are covered by this regex
_UNQUOTED_SYMBOL_REGEX = re.compile(r'\A[a-zA-Z$_][a-zA-Z0-9$_]*\Z')
_ADDITIONAL_SYMBOLS_REQUIRING_QUOTES = set(['nan', 'null', 'false', 'true'])
def _symbol_needs_quotes(text):
return text in _ADDITIONAL_SYMBOLS_REQUIRING_QUOTES or _UNQUOTED_SYMBOL_REGEX.search(text) is None
def _serialize_symbol_value(value, suffix=b''):
# TODO Support not quoting operators in s-expressions: https://amazon-ion.github.io/ion-docs/docs/symbols.html
try:
text = value.text
if text is None:
return (u'$%d' % value.sid).encode() + suffix
except AttributeError:
text = value
validate_scalar_value(text, (str, type(SymbolToken)))
quote = _SINGLE_QUOTE if _symbol_needs_quotes(text) else b''
return _bytes_text(unicode_iter(text), quote, suffix=suffix)
def _serialize_symbol(ion_event):
return _serialize_symbol_value(ion_event.value)
def _serialize_string(ion_event):
# TODO Support multi-line strings.
value = ion_event.value
validate_scalar_value(value, str)
return _bytes_text(unicode_iter(value), _DOUBLE_QUOTE)
_LOB_START = b'{{'
_LOB_END = b'}}'
def _serialize_clob(ion_event):
value = ion_event.value
return _bytes_text(iter(value), _DOUBLE_QUOTE, prefix=_LOB_START, suffix=_LOB_END)
def _serialize_blob(ion_event):
value = ion_event.value
return _LOB_START + base64.b64encode(value) + _LOB_END
_SERIALIZE_SCALAR_JUMP_TABLE = {
IonType.NULL: illegal_state_null,
IonType.BOOL: _serialize_bool,
IonType.INT: _serialize_int,
IonType.FLOAT: _serialize_float,
IonType.DECIMAL: _serialize_decimal,
IonType.TIMESTAMP: _serialize_timestamp,
IonType.SYMBOL: _serialize_symbol,
IonType.STRING: _serialize_string,
IonType.CLOB: _serialize_clob,
IonType.BLOB: _serialize_blob,
}
_serialize_scalar = partial(serialize_scalar, jump_table=_SERIALIZE_SCALAR_JUMP_TABLE, null_table=_NULL_TYPE_NAMES)
_FIELD_NAME_DELIMITER = b':'
_ANNOTATION_DELIMITER = b'::'
def _serialize_field_name(ion_event):
return _serialize_symbol_value(ion_event.field_name, suffix=_FIELD_NAME_DELIMITER)
def _serialize_annotation_value(annotation):
return _serialize_symbol_value(annotation, suffix=_ANNOTATION_DELIMITER)
def _serialize_container_factory(suffix, container_map):
"""Returns a function that serializes container start/end.
Args:
suffix (str): The suffix to name the function with.
container_map (Dictionary[core.IonType, bytes]): The
Returns:
function: The closure for serialization.
"""
def serialize(ion_event):
if not ion_event.ion_type.is_container:
raise TypeError('Expected container type')
return container_map[ion_event.ion_type]
serialize.__name__ = '_serialize_container_' + suffix
return serialize
_CONTAINER_START_MAP = {
IonType.STRUCT: b'{',
IonType.LIST: b'[',
IonType.SEXP: b'(',
}
_CONTAINER_END_MAP = {
IonType.STRUCT: b'}',
IonType.LIST: b']',
IonType.SEXP: b')',
}
_CONTAINER_DELIMITER_MAP_NORMAL = {
IonType.STRUCT: b',',
IonType.LIST: b',',
IonType.SEXP: b' ',
}
_CONTAINER_DELIMITER_MAP_PRETTY = {
IonType.STRUCT: b',',
IonType.LIST: b',',
IonType.SEXP: b'', # we use newlines when pretty printing
}
_serialize_container_start = _serialize_container_factory('start', _CONTAINER_START_MAP)
_serialize_container_end = _serialize_container_factory('end', _CONTAINER_END_MAP)
_serialize_container_delimiter_normal = _serialize_container_factory('delimiter', _CONTAINER_DELIMITER_MAP_NORMAL)
_serialize_container_delimiter_pretty = _serialize_container_factory('delimiter', _CONTAINER_DELIMITER_MAP_PRETTY)
@coroutine
def _raw_writer_coroutine(depth=0, container_event=None, whence=None, indent=None, trailing_commas=False):
pretty = indent is not None
serialize_container_delimiter = \
_serialize_container_delimiter_pretty if pretty else _serialize_container_delimiter_normal
has_written_values = False
transition = None
while True:
ion_event, self = (yield transition)
delegate = self
if has_written_values and ((indent and trailing_commas) or not ion_event.event_type.ends_container):
# TODO This will always emit a delimiter for containers--should make it not do that.
# Write the delimiter for the next value.
if depth == 0:
# if we are pretty printing, we'll insert a newline between top-level containers
delimiter = b'' if pretty else b' '
else:
delimiter = serialize_container_delimiter(container_event)
if len(delimiter) > 0:
yield partial_transition(delimiter, self)
if pretty and (has_written_values or container_event is not None) and not ion_event.event_type is IonEventType.STREAM_END:
yield partial_transition(b'\n', self)
indent_depth = depth - (1 if ion_event.event_type is IonEventType.CONTAINER_END else 0)
if indent_depth > 0:
yield partial_transition(indent * indent_depth, self)
if depth > 0 \
and container_event.ion_type is IonType.STRUCT \
and ion_event.event_type.begins_value:
# Write the field name.
yield partial_transition(_serialize_field_name(ion_event), self)
if pretty:
# separate the field name and the field value
yield partial_transition(b' ', self)
if ion_event.event_type.begins_value:
# Write the annotations.
for annotation in ion_event.annotations:
yield partial_transition(_serialize_annotation_value(annotation), self)
if ion_event.event_type is IonEventType.CONTAINER_START:
writer_event = DataEvent(WriteEventType.NEEDS_INPUT, _serialize_container_start(ion_event))
delegate = _raw_writer_coroutine(depth + 1, ion_event, self, indent=indent,
trailing_commas=trailing_commas)
elif depth == 0:
# Serialize at the top-level.
if ion_event.event_type is IonEventType.STREAM_END:
writer_event = NOOP_WRITER_EVENT
elif ion_event.event_type is IonEventType.VERSION_MARKER:
writer_event = _IVM_WRITER_EVENT
elif ion_event.event_type is IonEventType.SCALAR:
writer_event = DataEvent(WriteEventType.COMPLETE, _serialize_scalar(ion_event))
else:
raise TypeError('Invalid event: %s' % ion_event)
else:
# Serialize within a container.
if ion_event.event_type is IonEventType.SCALAR:
writer_event = DataEvent(WriteEventType.NEEDS_INPUT, _serialize_scalar(ion_event))
elif ion_event.event_type is IonEventType.CONTAINER_END:
write_type = WriteEventType.COMPLETE if depth == 1 else WriteEventType.NEEDS_INPUT
writer_event = DataEvent(write_type, _serialize_container_end(container_event))
delegate = whence
else:
raise TypeError('Invalid event: %s' % ion_event)
has_written_values = True
transition = Transition(writer_event, delegate)
# TODO Add options for text formatting.
def raw_writer(indent=None, trailing_commas=False):
"""Returns a raw text writer co-routine.
Yields:
DataEvent: serialization events to write out
Receives :class:`amazon.ion.core.IonEvent` or ``None`` when the co-routine yields
``HAS_PENDING`` :class:`WriteEventType` events.
"""
is_whitespace_str = isinstance(indent, str) and re.search(r'\A\s*\Z', indent, re.M) is not None
if not (indent is None or is_whitespace_str):
raise ValueError('The indent parameter must either be None or a string containing only whitespace')
# This assumes an encoding of UTF-8, which is the only one supported at time
# of writing (Dec 2022).
indent_bytes = indent.encode("UTF-8") if isinstance(indent, str) else indent
return writer_trampoline(_raw_writer_coroutine(indent=indent_bytes, trailing_commas=trailing_commas))
# TODO Determine if we need to do anything special for non-raw writer. Validation?
text_writer = raw_writer