-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfile.py
More file actions
160 lines (139 loc) · 4.95 KB
/
file.py
File metadata and controls
160 lines (139 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Intel Corporation
import os
import io
import abc
import bz2
import gzip
import lzma
import errno
import zipfile
from contextlib import contextmanager
import pathlib
from ..base import config
from .source import BaseSource
from ..util.entrypoint import entrypoint
@config
class FileSourceConfig:
filename: str
tag: str = "untagged"
readwrite: bool = False
allowempty: bool = False
mkdirs: bool = False
@entrypoint("file")
class FileSource(BaseSource):
"""
FileSource reads and write from a file on open / close.
"""
CONFIG = FileSourceConfig
READMODE: str = "r"
WRITEMODE: str = "w"
READMODE_COMPRESSED: str = "rt"
WRITEMODE_COMPRESSED: str = "wt"
def __init__(self, config):
super().__init__(config)
if isinstance(getattr(self.config, "filename", None), str):
with self.config.no_enforce_immutable():
self.config.filename = pathlib.Path(self.config.filename)
async def __aenter__(self) -> "BaseSourceContext":
await self._open()
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self._close()
async def _empty_file_init(self):
return {}
async def _open(self):
# Create directories for default source if not exists
filepath = pathlib.Path(self.config.filename)
if not filepath.parent.is_dir() and self.config.mkdirs:
filepath.parent.mkdir(parents=True)
if not os.path.exists(self.config.filename) or os.path.isdir(
self.config.filename
):
if self.config.allowempty:
self.logger.debug(
("%r is not a file, " % (self.config.filename,))
+ "initializing memory to empty dict"
)
self.mem = await self._empty_file_init()
return
else:
raise FileNotFoundError(
errno.ENOENT,
os.strerror(errno.ENOENT),
self.config.filename,
)
if self.config.filename.suffix == ".gz":
opener = gzip.open(self.config.filename, self.READMODE_COMPRESSED)
elif self.config.filename.suffix == ".bz2":
opener = bz2.open(self.config.filename, self.READMODE_COMPRESSED)
elif (
self.config.filename.suffix == ".xz"
or self.config.filename.suffix == ".lzma"
):
opener = lzma.open(self.config.filename, self.READMODE_COMPRESSED)
elif self.config.filename.suffix == ".zip":
opener = self.zip_opener_helper()
else:
opener = open(self.config.filename, self.READMODE)
with opener as fd:
await self.load_fd(fd)
async def _close(self):
if self.config.readwrite:
if self.config.filename.suffix == ".gz":
close = gzip.open(
self.config.filename, self.WRITEMODE_COMPRESSED
)
elif self.config.filename.suffix == ".bz2":
close = bz2.open(
self.config.filename, self.WRITEMODE_COMPRESSED
)
elif (
self.config.filename.suffix == ".xz"
or self.config.filename.suffix == ".lzma"
):
close = lzma.open(
self.config.filename, self.WRITEMODE_COMPRESSED
)
elif self.config.filename.suffix == ".zip":
close = self.zip_closer_helper()
else:
close = open(self.config.filename, self.WRITEMODE, newline="")
with close as fd:
await self.dump_fd(fd)
@contextmanager
def zip_opener_helper(self):
with zipfile.ZipFile(self.config.filename) as archive:
with archive.open(
self.__class__.__qualname__, mode=self.READMODE
) as zip_fd:
with io.TextIOWrapper(zip_fd, write_through=True) as fd:
yield fd
@contextmanager
def zip_closer_helper(self):
with zipfile.ZipFile(
self.config.filename, self.WRITEMODE, compression=zipfile.ZIP_BZIP2
) as archive:
with archive.open(
self.__class__.__qualname__,
mode=self.WRITEMODE,
force_zip64=True,
) as zip_fd:
with io.TextIOWrapper(zip_fd, write_through=True) as fd:
yield fd
@abc.abstractmethod
async def load_fd(self, fd):
pass # pragma: no cover
@abc.abstractmethod
async def dump_fd(self, fd):
pass # pragma: no cover
@config
class BinaryFileSourceConfig(FileSourceConfig):
pass
@entrypoint("binaryfile")
class BinaryFileSource(FileSource):
CONFIG = BinaryFileSourceConfig
READMODE: str = "rb"
WRITEMODE: str = "wb"
READMODE_COMPRESSED: str = "rb"
WRITEMODE_COMPRESSED: str = "wb"