Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix issue 46799
  • Loading branch information
tcl326 committed Feb 20, 2022
commit c0d4f5f263f5665ef78221d13d324bc3f6a645c0
106 changes: 52 additions & 54 deletions Lib/multiprocessing/shared_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,9 @@ class ShareableList:

# The shared memory area is organized as follows:
# - 8 bytes: number of items (N) as a 64-bit integer
# - (N + 1) * 8 bytes: offsets of each element from the start of the
# data area
# - (2 * N + 1) * 8 bytes: offsets from the start of the data
# area and the `struct` format string for
# each elements
# - K bytes: the data area storing item values (with encoding and size
# depending on their respective types)
# - N * 8 bytes: `struct` format string for each element
Expand Down Expand Up @@ -293,14 +294,25 @@ def _extract_recreation_code(value):
else:
return 3 # NoneType

@staticmethod
def _encode_if_string(value):
"""
Encode the value into bytes if the value is a string
"""
return value.encode(_encoding) if isinstance(value, str) else value

def __init__(self, sequence=None, *, name=None):
if name is None or sequence is not None:
sequence = sequence or ()
_formats = [
self._types_mapping[type(item)]
if not isinstance(item, (str, bytes))
else self._types_mapping[type(item)] % (
self._alignment * (len(item) // self._alignment + 1),
self._alignment
* max(
((len(self._encode_if_string(item)) - 1)
// self._alignment + 1),
1),
)
for item in sequence
]
Expand All @@ -310,17 +322,17 @@ def __init__(self, sequence=None, *, name=None):
# The offsets of each list element into the shared memory's
# data area (0 meaning the start of the data area, not the start
# of the shared memory area).
self._allocated_offsets = [0]
_allocated_offsets_and_formats = [0]
for fmt in _formats:
offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
self._allocated_offsets.append(offset)
_allocated_offsets_and_formats.append(fmt.encode(_encoding))
_allocated_offsets_and_formats.append(offset)
_recreation_codes = [
self._extract_recreation_code(item) for item in sequence
]
requested_size = struct.calcsize(
"q" + self._format_size_metainfo +
"q" + self._format_size_and_packing_metainfo +
"".join(_formats) +
self._format_packing_metainfo +
self._format_back_transform_codes
)

Expand All @@ -329,25 +341,19 @@ def __init__(self, sequence=None, *, name=None):
self.shm = SharedMemory(name)

if sequence is not None:
_enc = _encoding
self._data_size = _allocated_offsets_and_formats[-1]
struct.pack_into(
"q" + self._format_size_metainfo,
"q" + self._format_size_and_packing_metainfo,
self.shm.buf,
0,
self._list_len,
*(self._allocated_offsets)
*(_allocated_offsets_and_formats)
)
struct.pack_into(
"".join(_formats),
self.shm.buf,
self._offset_data_start,
*(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
)
struct.pack_into(
self._format_packing_metainfo,
self.shm.buf,
self._offset_packing_formats,
*(v.encode(_enc) for v in _formats)
*(self._encode_if_string(v) for v in sequence)
)
struct.pack_into(
self._format_back_transform_codes,
Expand All @@ -358,29 +364,27 @@ def __init__(self, sequence=None, *, name=None):

else:
self._list_len = len(self) # Obtains size from offset 0 in buffer.
self._allocated_offsets = list(
struct.unpack_from(
self._format_size_metainfo,
self.shm.buf,
1 * 8
)
)
self._data_size = struct.unpack_from(
"q",
self.shm.buf,
(2 * self._list_len + 1) * 8
)[0]

def _get_packing_format(self, position):
def _get_offset_and_packing_format(self, position):
"Gets the packing format for a single value stored in the list."
position = position if position >= 0 else position + self._list_len
if (position >= self._list_len) or (self._list_len < 0):
raise IndexError("Requested position out of range.")

v = struct.unpack_from(
"8s",
offset, v = struct.unpack_from(
"q8s",
self.shm.buf,
self._offset_packing_formats + position * 8
)[0]
(2 * position + 1) * 8
)
fmt = v.rstrip(b'\x00')
fmt_as_str = fmt.decode(_encoding)

return fmt_as_str
return offset, fmt_as_str

def _get_back_transform(self, position):
"Gets the back transformation function for a single value."
Expand All @@ -407,7 +411,7 @@ def _set_packing_format_and_transform(self, position, fmt_as_str, value):
struct.pack_into(
"8s",
self.shm.buf,
self._offset_packing_formats + position * 8,
(2 * position + 2) * 8,
fmt_as_str.encode(_encoding)
)

Expand All @@ -422,9 +426,10 @@ def _set_packing_format_and_transform(self, position, fmt_as_str, value):
def __getitem__(self, position):
position = position if position >= 0 else position + self._list_len
try:
offset = self._offset_data_start + self._allocated_offsets[position]
item_offset, format = self._get_offset_and_packing_format(position)
offset = self._offset_data_start + item_offset
(v,) = struct.unpack_from(
self._get_packing_format(position),
format,
self.shm.buf,
offset
)
Expand All @@ -439,20 +444,22 @@ def __getitem__(self, position):
def __setitem__(self, position, value):
position = position if position >= 0 else position + self._list_len
try:
item_offset = self._allocated_offsets[position]
item_offset, current_format = self._get_offset_and_packing_format(position)
offset = self._offset_data_start + item_offset
current_format = self._get_packing_format(position)
except IndexError:
raise IndexError("assignment index out of range")

if not isinstance(value, (str, bytes)):
new_format = self._types_mapping[type(value)]
encoded_value = value
else:
allocated_length = self._allocated_offsets[position + 1] - item_offset
if position == self._list_len:
next_item_offset = self._data_size
else:
next_item_offset, _ = self._get_offset_and_packing_format(position + 1)
allocated_length = next_item_offset - item_offset

encoded_value = (value.encode(_encoding)
if isinstance(value, str) else value)
encoded_value = self._encode_if_string(value)
if len(encoded_value) > allocated_length:
raise ValueError("bytes/str item exceeds available storage")
if current_format[-1] == "s":
Expand Down Expand Up @@ -482,18 +489,13 @@ def __repr__(self):
def format(self):
"The struct packing format used by all currently stored items."
return "".join(
self._get_packing_format(i) for i in range(self._list_len)
self._get_offset_and_packing_format(i)[1] for i in range(self._list_len)
)

@property
def _format_size_metainfo(self):
"The struct packing format used for the items' storage offsets."
return "q" * (self._list_len + 1)

@property
def _format_packing_metainfo(self):
"The struct packing format used for the items' packing formats."
return "8s" * self._list_len
def _format_size_and_packing_metainfo(self):
"The struct packing format used for the items' storage offsets and packing formats."
return "q8s" * self._list_len + "q"

@property
def _format_back_transform_codes(self):
Expand All @@ -503,16 +505,12 @@ def _format_back_transform_codes(self):
@property
def _offset_data_start(self):
# - 8 bytes for the list length
# - (N + 1) * 8 bytes for the element offsets
return (self._list_len + 2) * 8

@property
def _offset_packing_formats(self):
return self._offset_data_start + self._allocated_offsets[-1]
# - (2 * N + 1) * 8 bytes for the element offsets and packing format
return (self._list_len * 2 + 2) * 8

@property
def _offset_back_transform_codes(self):
return self._offset_packing_formats + self._list_len * 8
return self._offset_data_start + self._data_size

def count(self, value):
"L.count(value) -> integer -- return number of occurrences of value."
Expand Down
4 changes: 2 additions & 2 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4102,9 +4102,9 @@ def test_shared_memory_ShareableList_basics(self):
sl[7] = 2

# Assign value without format change (str -> str)
current_format = sl._get_packing_format(0)
current_format = sl._get_offset_and_packing_format(0)[1]
sl[0] = 'howdy'
self.assertEqual(current_format, sl._get_packing_format(0))
self.assertEqual(current_format, sl._get_offset_and_packing_format(0)[1])

# Verify attributes are readable.
self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
Expand Down
10 changes: 10 additions & 0 deletions Misc/NEWS.d/next/Library/2022-02-20-09-03-53.bpo-46799.BgnVIE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Fix :class:`multiprocessing.shared_memory.ShareableList` memory bloat by
reading the offsets directly from the shared memory. Improve
:class:`multiprocessing.shared_memory.ShareableList` performance by merging
the area in shared memory dedicated to offsets and packing formats together.
This allows a single :func:`struct.unpack_from` call to retrieve both the
offset and the packing format of a sinlge entry Fix UnicodeDecodeError with
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small typo sinlge

multibyte utf8 characters in
:class:`multiprocessing.shared_memory.ShareableList` by allocating the
shared memory using the length of the utf8 encoded string rather than the
length of the string.