Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Updated mailbox library
  • Loading branch information
terryluan12 committed Dec 31, 2025
commit 24f318a99bc0cba707b24ceddcb8654790761bcf
96 changes: 84 additions & 12 deletions Lib/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,56 @@ def get_file(self, key):
f = open(os.path.join(self._path, self._lookup(key)), 'rb')
return _ProxyFile(f)

def get_info(self, key):
"""Get the keyed message's "info" as a string."""
subpath = self._lookup(key)
if self.colon in subpath:
return subpath.split(self.colon)[-1]
return ''

def set_info(self, key, info: str):
"""Set the keyed message's "info" string."""
if not isinstance(info, str):
raise TypeError(f'info must be a string: {type(info)}')
old_subpath = self._lookup(key)
new_subpath = old_subpath.split(self.colon)[0]
if info:
new_subpath += self.colon + info
if new_subpath == old_subpath:
return
old_path = os.path.join(self._path, old_subpath)
new_path = os.path.join(self._path, new_subpath)
os.rename(old_path, new_path)
self._toc[key] = new_subpath

def get_flags(self, key):
"""Return as a string the standard flags that are set on the keyed message."""
info = self.get_info(key)
if info.startswith('2,'):
return info[2:]
return ''

def set_flags(self, key, flags: str):
"""Set the given flags and unset all others on the keyed message."""
if not isinstance(flags, str):
raise TypeError(f'flags must be a string: {type(flags)}')
# TODO: check if flags are valid standard flag characters?
self.set_info(key, '2,' + ''.join(sorted(set(flags))))

def add_flag(self, key, flag: str):
"""Set the given flag(s) without changing others on the keyed message."""
if not isinstance(flag, str):
raise TypeError(f'flag must be a string: {type(flag)}')
# TODO: check that flag is a valid standard flag character?
self.set_flags(key, ''.join(set(self.get_flags(key)) | set(flag)))

def remove_flag(self, key, flag: str):
"""Unset the given string flag(s) without changing others on the keyed message."""
if not isinstance(flag, str):
raise TypeError(f'flag must be a string: {type(flag)}')
if self.get_flags(key):
self.set_flags(key, ''.join(set(self.get_flags(key)) - set(flag)))

def iterkeys(self):
"""Return an iterator over keys."""
self._refresh()
Expand Down Expand Up @@ -540,6 +590,8 @@ def _refresh(self):
for subdir in self._toc_mtimes:
path = self._paths[subdir]
for entry in os.listdir(path):
if entry.startswith('.'):
continue
p = os.path.join(path, entry)
if os.path.isdir(p):
continue
Expand Down Expand Up @@ -698,9 +750,13 @@ def flush(self):
_sync_close(new_file)
# self._file is about to get replaced, so no need to sync.
self._file.close()
# Make sure the new file's mode is the same as the old file's
mode = os.stat(self._path).st_mode
os.chmod(new_file.name, mode)
# Make sure the new file's mode and owner are the same as the old file's
info = os.stat(self._path)
os.chmod(new_file.name, info.st_mode)
try:
os.chown(new_file.name, info.st_uid, info.st_gid)
except (AttributeError, OSError):
pass
try:
os.rename(new_file.name, self._path)
except FileExistsError:
Expand Down Expand Up @@ -778,10 +834,11 @@ def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
from_line = self._file.readline().replace(linesep, b'')
from_line = self._file.readline().replace(linesep, b'').decode('ascii')
string = self._file.read(stop - self._file.tell())
msg = self._message_factory(string.replace(linesep, b'\n'))
msg.set_from(from_line[5:].decode('ascii'))
msg.set_unixfrom(from_line)
msg.set_from(from_line[5:])
return msg

def get_string(self, key, from_=False):
Expand Down Expand Up @@ -1089,10 +1146,24 @@ def __len__(self):
"""Return a count of messages in the mailbox."""
return len(list(self.iterkeys()))

def _open_mh_sequences_file(self, text):
mode = '' if text else 'b'
kwargs = {'encoding': 'ASCII'} if text else {}
path = os.path.join(self._path, '.mh_sequences')
while True:
try:
return open(path, 'r+' + mode, **kwargs)
except FileNotFoundError:
pass
try:
return open(path, 'x+' + mode, **kwargs)
except FileExistsError:
pass

def lock(self):
"""Lock the mailbox."""
if not self._locked:
self._file = open(os.path.join(self._path, '.mh_sequences'), 'rb+')
self._file = self._open_mh_sequences_file(text=False)
_lock_file(self._file)
self._locked = True

Expand Down Expand Up @@ -1146,7 +1217,11 @@ def remove_folder(self, folder):
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
try:
f = open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII')
except FileNotFoundError:
return results
with f:
all_keys = set(self.keys())
for line in f:
try:
Expand All @@ -1169,7 +1244,7 @@ def get_sequences(self):

def set_sequences(self, sequences):
"""Set sequences using the given name-to-key-list dictionary."""
f = open(os.path.join(self._path, '.mh_sequences'), 'r+', encoding='ASCII')
f = self._open_mh_sequences_file(text=True)
try:
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
for name, keys in sequences.items():
Expand Down Expand Up @@ -1956,10 +2031,7 @@ def readlines(self, sizehint=None):

def __iter__(self):
"""Iterate over lines."""
while True:
line = self.readline()
if not line:
return
while line := self.readline():
yield line

def tell(self):
Expand Down