forked from astropy/astropy
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
418 lines (337 loc) · 14.1 KB
/
core.py
File metadata and controls
418 lines (337 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
import sys
from collections import OrderedDict
from .base import IORegistryError, _UnifiedIORegistryBase
__all__ = ["UnifiedIORegistry", "UnifiedInputRegistry", "UnifiedOutputRegistry"]
PATH_TYPES = (str, os.PathLike) # TODO! include bytes
def _expand_user_in_args(args):
# Conservatively attempt to apply `os.path.expanduser` to the first
# argument, which can be either a path or the contents of a table.
if len(args) and isinstance(args[0], PATH_TYPES):
ex_user = os.path.expanduser(args[0])
if ex_user != args[0] and os.path.exists(os.path.dirname(ex_user)):
args = (ex_user,) + args[1:]
return args
# -----------------------------------------------------------------------------
class UnifiedInputRegistry(_UnifiedIORegistryBase):
"""Read-only Unified Registry.
.. versionadded:: 5.0
Examples
--------
First let's start by creating a read-only registry.
.. code-block:: python
>>> from astropy.io.registry import UnifiedInputRegistry
>>> read_reg = UnifiedInputRegistry()
There is nothing in this registry. Let's make a reader for the
:class:`~astropy.table.Table` class::
from astropy.table import Table
def my_table_reader(filename, some_option=1):
# Read in the table by any means necessary
return table # should be an instance of Table
Such a function can then be registered with the I/O registry::
read_reg.register_reader('my-table-format', Table, my_table_reader)
Note that we CANNOT then read in a table with::
d = Table.read('my_table_file.mtf', format='my-table-format')
Why? because ``Table.read`` uses Astropy's default global registry and this
is a separate registry.
Instead we can read by the read method on the registry::
d = read_reg.read(Table, 'my_table_file.mtf', format='my-table-format')
"""
def __init__(self):
super().__init__() # set _identifiers
self._readers = OrderedDict()
self._registries["read"] = {"attr": "_readers", "column": "Read"}
self._registries_order = ("read", "identify")
# =========================================================================
# Read methods
def register_reader(
self, data_format, data_class, function, force=False, priority=0
):
"""
Register a reader function.
Parameters
----------
data_format : str
The data format identifier. This is the string that will be used to
specify the data type when reading.
data_class : class
The class of the object that the reader produces.
function : function
The function to read in a data object.
force : bool, optional
Whether to override any existing function if already present.
Default is ``False``.
priority : int, optional
The priority of the reader, used to compare possible formats when
trying to determine the best reader to use. Higher priorities are
preferred over lower priorities, with the default priority being 0
(negative numbers are allowed though).
"""
if (data_format, data_class) not in self._readers or force:
self._readers[(data_format, data_class)] = function, priority
else:
raise IORegistryError(
f"Reader for format '{data_format}' and class '{data_class.__name__}'"
" is already defined"
)
if data_class not in self._delayed_docs_classes:
self._update__doc__(data_class, "read")
def unregister_reader(self, data_format, data_class):
"""
Unregister a reader function.
Parameters
----------
data_format : str
The data format identifier.
data_class : class
The class of the object that the reader produces.
"""
if (data_format, data_class) in self._readers:
self._readers.pop((data_format, data_class))
else:
raise IORegistryError(
f"No reader defined for format '{data_format}' and class"
f" '{data_class.__name__}'"
)
if data_class not in self._delayed_docs_classes:
self._update__doc__(data_class, "read")
def get_reader(self, data_format, data_class):
"""Get reader for ``data_format``.
Parameters
----------
data_format : str
The data format identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : class
The class of the object that can be written.
Returns
-------
reader : callable
The registered reader function for this format and class.
"""
readers = [(fmt, cls) for fmt, cls in self._readers if fmt == data_format]
for reader_format, reader_class in readers:
if self._is_best_match(data_class, reader_class, readers):
return self._readers[(reader_format, reader_class)][0]
format_table_str = self._get_format_table_str(data_class, "Read")
raise IORegistryError(
f"No reader defined for format '{data_format}' and class"
f" '{data_class.__name__}'.\n\nThe available formats"
f" are:\n\n{format_table_str}"
)
def read(self, cls, *args, format=None, cache=False, **kwargs):
"""
Read in data.
Parameters
----------
cls : class
*args
The arguments passed to this method depend on the format.
format : str or None
cache : bool
Whether to cache the results of reading in the data.
**kwargs
The arguments passed to this method depend on the format.
Returns
-------
object or None
The output of the registered reader.
"""
ctx = None
try:
# Expand a tilde-prefixed path if present in args[0]
args = _expand_user_in_args(args)
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], PATH_TYPES) and not os.path.isdir(args[0]):
from astropy.utils.data import get_readable_fileobj
# path might be a os.PathLike object
if isinstance(args[0], os.PathLike):
args = (os.fspath(args[0]),) + args[1:]
path = args[0]
try:
ctx = get_readable_fileobj(
args[0], encoding="binary", cache=cache
)
fileobj = ctx.__enter__()
except OSError:
raise
except Exception:
fileobj = None
else:
args = [fileobj] + list(args[1:])
elif hasattr(args[0], "read"):
path = None
fileobj = args[0]
format = self._get_valid_format(
"read", cls, path, fileobj, args, kwargs
)
reader = self.get_reader(format, cls)
data = reader(*args, **kwargs)
if not isinstance(data, cls):
# User has read with a subclass where only the parent class is
# registered. This returns the parent class, so try coercing
# to desired subclass.
try:
data = cls(data)
except Exception:
raise TypeError(
f"could not convert reader output to {cls.__name__} class."
)
finally:
if ctx is not None:
ctx.__exit__(*sys.exc_info())
return data
# -----------------------------------------------------------------------------
class UnifiedOutputRegistry(_UnifiedIORegistryBase):
"""Write-only Registry.
.. versionadded:: 5.0
"""
def __init__(self):
super().__init__()
self._writers = OrderedDict()
self._registries["write"] = {"attr": "_writers", "column": "Write"}
self._registries_order = ("write", "identify")
# =========================================================================
# Write Methods
def register_writer(
self, data_format, data_class, function, force=False, priority=0
):
"""
Register a table writer function.
Parameters
----------
data_format : str
The data format identifier. This is the string that will be used to
specify the data type when writing.
data_class : class
The class of the object that can be written.
function : function
The function to write out a data object.
force : bool, optional
Whether to override any existing function if already present.
Default is ``False``.
priority : int, optional
The priority of the writer, used to compare possible formats when trying
to determine the best writer to use. Higher priorities are preferred
over lower priorities, with the default priority being 0 (negative
numbers are allowed though).
"""
if not (data_format, data_class) in self._writers or force: # noqa: E713
self._writers[(data_format, data_class)] = function, priority
else:
raise IORegistryError(
f"Writer for format '{data_format}' and class '{data_class.__name__}'"
" is already defined"
)
if data_class not in self._delayed_docs_classes:
self._update__doc__(data_class, "write")
def unregister_writer(self, data_format, data_class):
"""
Unregister a writer function.
Parameters
----------
data_format : str
The data format identifier.
data_class : class
The class of the object that can be written.
"""
if (data_format, data_class) in self._writers:
self._writers.pop((data_format, data_class))
else:
raise IORegistryError(
f"No writer defined for format '{data_format}' and class"
f" '{data_class.__name__}'"
)
if data_class not in self._delayed_docs_classes:
self._update__doc__(data_class, "write")
def get_writer(self, data_format, data_class):
"""Get writer for ``data_format``.
Parameters
----------
data_format : str
The data format identifier. This is the string that is used to
specify the data type when reading/writing.
data_class : class
The class of the object that can be written.
Returns
-------
writer : callable
The registered writer function for this format and class.
"""
writers = [(fmt, cls) for fmt, cls in self._writers if fmt == data_format]
for writer_format, writer_class in writers:
if self._is_best_match(data_class, writer_class, writers):
return self._writers[(writer_format, writer_class)][0]
format_table_str = self._get_format_table_str(data_class, "Write")
raise IORegistryError(
f"No writer defined for format '{data_format}' and class"
f" '{data_class.__name__}'.\n\nThe available formats"
f" are:\n\n{format_table_str}"
)
def write(self, data, *args, format=None, **kwargs):
"""
Write out data.
Parameters
----------
data : object
The data to write.
*args
The arguments passed to this method depend on the format.
format : str or None
**kwargs
The arguments passed to this method depend on the format.
Returns
-------
object or None
The output of the registered writer. Most often `None`.
.. versionadded:: 4.3
"""
# Expand a tilde-prefixed path if present in args[0]
args = _expand_user_in_args(args)
if format is None:
path = None
fileobj = None
if len(args):
if isinstance(args[0], PATH_TYPES):
# path might be a os.PathLike object
if isinstance(args[0], os.PathLike):
args = (os.fspath(args[0]),) + args[1:]
path = args[0]
fileobj = None
elif hasattr(args[0], "read"):
path = None
fileobj = args[0]
format = self._get_valid_format(
"write", data.__class__, path, fileobj, args, kwargs
)
writer = self.get_writer(format, data.__class__)
return writer(data, *args, **kwargs)
# -----------------------------------------------------------------------------
class UnifiedIORegistry(UnifiedInputRegistry, UnifiedOutputRegistry):
"""Unified I/O Registry.
.. versionadded:: 5.0
"""
def __init__(self):
super().__init__()
self._registries_order = ("read", "write", "identify")
def get_formats(self, data_class=None, readwrite=None):
"""
Get the list of registered I/O formats as a `~astropy.table.Table`.
Parameters
----------
data_class : class, optional
Filter readers/writer to match data class (default = all classes).
readwrite : str or None, optional
Search only for readers (``"Read"``) or writers (``"Write"``).
If None search for both. Default is None.
.. versionadded:: 1.3
Returns
-------
format_table : :class:`~astropy.table.Table`
Table of available I/O formats.
"""
return super().get_formats(data_class, readwrite)