Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
sqlite autocommit
  • Loading branch information
youknowone committed Jan 1, 2026
commit 8102e8997cd0f19fa3269f4ea71138fb128a666c
3 changes: 3 additions & 0 deletions Lib/dbm/gnu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Provide the _gdbm module as a dbm submodule."""

from _gdbm import *
3 changes: 3 additions & 0 deletions Lib/dbm/ndbm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""Provide the _dbm module as a dbm submodule."""

from _dbm import *
144 changes: 144 additions & 0 deletions Lib/dbm/sqlite3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import os
import sqlite3
from pathlib import Path
from contextlib import suppress, closing
from collections.abc import MutableMapping

BUILD_TABLE = """
CREATE TABLE IF NOT EXISTS Dict (
key BLOB UNIQUE NOT NULL,
value BLOB NOT NULL
)
"""
GET_SIZE = "SELECT COUNT (key) FROM Dict"
LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)"
STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))"
DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)"
ITER_KEYS = "SELECT key FROM Dict"


class error(OSError):
pass


_ERR_CLOSED = "DBM object has already been closed"
_ERR_REINIT = "DBM object does not support reinitialization"


def _normalize_uri(path):
path = Path(path)
uri = path.absolute().as_uri()
while "//" in uri:
uri = uri.replace("//", "/")
return uri


class _Database(MutableMapping):

def __init__(self, path, /, *, flag, mode):
if hasattr(self, "_cx"):
raise error(_ERR_REINIT)

path = os.fsdecode(path)
match flag:
case "r":
flag = "ro"
case "w":
flag = "rw"
case "c":
flag = "rwc"
Path(path).touch(mode=mode, exist_ok=True)
case "n":
flag = "rwc"
Path(path).unlink(missing_ok=True)
Path(path).touch(mode=mode)
case _:
raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', "
f"not {flag!r}")

# We use the URI format when opening the database.
uri = _normalize_uri(path)
uri = f"{uri}?mode={flag}"
if flag == "ro":
# Add immutable=1 to allow read-only SQLite access even if wal/shm missing
uri += "&immutable=1"

try:
self._cx = sqlite3.connect(uri, autocommit=True, uri=True)
except sqlite3.Error as exc:
raise error(str(exc))

if flag != "ro":
# This is an optimization only; it's ok if it fails.
with suppress(sqlite3.OperationalError):
self._cx.execute("PRAGMA journal_mode = wal")

if flag == "rwc":
self._execute(BUILD_TABLE)

def _execute(self, *args, **kwargs):
if not self._cx:
raise error(_ERR_CLOSED)
try:
return closing(self._cx.execute(*args, **kwargs))
except sqlite3.Error as exc:
raise error(str(exc))

def __len__(self):
with self._execute(GET_SIZE) as cu:
row = cu.fetchone()
return row[0]

def __getitem__(self, key):
with self._execute(LOOKUP_KEY, (key,)) as cu:
row = cu.fetchone()
if not row:
raise KeyError(key)
return row[0]

def __setitem__(self, key, value):
self._execute(STORE_KV, (key, value))

def __delitem__(self, key):
with self._execute(DELETE_KEY, (key,)) as cu:
if not cu.rowcount:
raise KeyError(key)

def __iter__(self):
try:
with self._execute(ITER_KEYS) as cu:
for row in cu:
yield row[0]
except sqlite3.Error as exc:
raise error(str(exc))

def close(self):
if self._cx:
self._cx.close()
self._cx = None

def keys(self):
return list(super().keys())

def __enter__(self):
return self

def __exit__(self, *args):
self.close()


def open(filename, /, flag="r", mode=0o666):
"""Open a dbm.sqlite3 database and return the dbm object.

The 'filename' parameter is the name of the database file.

The optional 'flag' parameter can be one of ...:
'r' (default): open an existing database for read only access
'w': open an existing database for read/write access
'c': create a database if it does not exist; open for read/write access
'n': always create a new, empty database; open for read/write access

The optional 'mode' parameter is the Unix file access mode of the database;
only used when creating a new database. Default: 0o666.
"""
return _Database(filename, flag=flag, mode=mode)
222 changes: 222 additions & 0 deletions Lib/test/test_dbm_gnu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import os
import unittest
from test import support
from test.support import cpython_only, import_helper
from test.support.os_helper import (TESTFN, TESTFN_NONASCII, FakePath,
create_empty_file, temp_dir, unlink)

gdbm = import_helper.import_module("dbm.gnu") # skip if not supported

filename = TESTFN

class TestGdbm(unittest.TestCase):
@staticmethod
def setUpClass():
if support.verbose:
try:
from _gdbm import _GDBM_VERSION as version
except ImportError:
pass
else:
print(f"gdbm version: {version}")

def setUp(self):
self.g = None

def tearDown(self):
if self.g is not None:
self.g.close()
unlink(filename)

@cpython_only
def test_disallow_instantiation(self):
# Ensure that the type disallows instantiation (bpo-43916)
self.g = gdbm.open(filename, 'c')
support.check_disallow_instantiation(self, type(self.g))

def test_key_methods(self):
self.g = gdbm.open(filename, 'c')
self.assertEqual(self.g.keys(), [])
self.g['a'] = 'b'
self.g['12345678910'] = '019237410982340912840198242'
self.g[b'bytes'] = b'data'
key_set = set(self.g.keys())
self.assertEqual(key_set, set([b'a', b'bytes', b'12345678910']))
self.assertIn('a', self.g)
self.assertIn(b'a', self.g)
self.assertEqual(self.g[b'bytes'], b'data')
key = self.g.firstkey()
while key:
self.assertIn(key, key_set)
key_set.remove(key)
key = self.g.nextkey(key)
# get() and setdefault() work as in the dict interface
self.assertEqual(self.g.get(b'a'), b'b')
self.assertIsNone(self.g.get(b'xxx'))
self.assertEqual(self.g.get(b'xxx', b'foo'), b'foo')
with self.assertRaises(KeyError):
self.g['xxx']
self.assertEqual(self.g.setdefault(b'xxx', b'foo'), b'foo')
self.assertEqual(self.g[b'xxx'], b'foo')

def test_error_conditions(self):
# Try to open a non-existent database.
unlink(filename)
self.assertRaises(gdbm.error, gdbm.open, filename, 'r')
# Try to access a closed database.
self.g = gdbm.open(filename, 'c')
self.g.close()
self.assertRaises(gdbm.error, lambda: self.g['a'])
# try pass an invalid open flag
self.assertRaises(gdbm.error, lambda: gdbm.open(filename, 'rx').close())

def test_flags(self):
# Test the flag parameter open() by trying all supported flag modes.
all = set(gdbm.open_flags)
# Test standard flags (presumably "crwn").
modes = all - set('fsu')
for mode in sorted(modes): # put "c" mode first
self.g = gdbm.open(filename, mode)
self.g.close()

# Test additional flags (presumably "fsu").
flags = all - set('crwn')
for mode in modes:
for flag in flags:
self.g = gdbm.open(filename, mode + flag)
self.g.close()

def test_reorganize(self):
self.g = gdbm.open(filename, 'c')
size0 = os.path.getsize(filename)

# bpo-33901: on macOS with gdbm 1.15, an empty database uses 16 MiB
# and adding an entry of 10,000 B has no effect on the file size.
# Add size0 bytes to make sure that the file size changes.
value_size = max(size0, 10000)
self.g['x'] = 'x' * value_size
size1 = os.path.getsize(filename)
self.assertGreater(size1, size0)

del self.g['x']
# 'size' is supposed to be the same even after deleting an entry.
self.assertEqual(os.path.getsize(filename), size1)

self.g.reorganize()
size2 = os.path.getsize(filename)
self.assertLess(size2, size1)
self.assertGreaterEqual(size2, size0)

def test_context_manager(self):
with gdbm.open(filename, 'c') as db:
db["gdbm context manager"] = "context manager"

with gdbm.open(filename, 'r') as db:
self.assertEqual(list(db.keys()), [b"gdbm context manager"])

with self.assertRaises(gdbm.error) as cm:
db.keys()
self.assertEqual(str(cm.exception),
"GDBM object has already been closed")

def test_bool_empty(self):
with gdbm.open(filename, 'c') as db:
self.assertFalse(bool(db))

def test_bool_not_empty(self):
with gdbm.open(filename, 'c') as db:
db['a'] = 'b'
self.assertTrue(bool(db))

def test_bool_on_closed_db_raises(self):
with gdbm.open(filename, 'c') as db:
db['a'] = 'b'
self.assertRaises(gdbm.error, bool, db)

def test_bytes(self):
with gdbm.open(filename, 'c') as db:
db[b'bytes key \xbd'] = b'bytes value \xbd'
with gdbm.open(filename, 'r') as db:
self.assertEqual(list(db.keys()), [b'bytes key \xbd'])
self.assertTrue(b'bytes key \xbd' in db)
self.assertEqual(db[b'bytes key \xbd'], b'bytes value \xbd')

def test_unicode(self):
with gdbm.open(filename, 'c') as db:
db['Unicode key \U0001f40d'] = 'Unicode value \U0001f40d'
with gdbm.open(filename, 'r') as db:
self.assertEqual(list(db.keys()), ['Unicode key \U0001f40d'.encode()])
self.assertTrue('Unicode key \U0001f40d'.encode() in db)
self.assertTrue('Unicode key \U0001f40d' in db)
self.assertEqual(db['Unicode key \U0001f40d'.encode()],
'Unicode value \U0001f40d'.encode())
self.assertEqual(db['Unicode key \U0001f40d'],
'Unicode value \U0001f40d'.encode())

def test_write_readonly_file(self):
with gdbm.open(filename, 'c') as db:
db[b'bytes key'] = b'bytes value'
with gdbm.open(filename, 'r') as db:
with self.assertRaises(gdbm.error):
del db[b'not exist key']
with self.assertRaises(gdbm.error):
del db[b'bytes key']
with self.assertRaises(gdbm.error):
db[b'not exist key'] = b'not exist value'

@unittest.skipUnless(TESTFN_NONASCII,
'requires OS support of non-ASCII encodings')
def test_nonascii_filename(self):
filename = TESTFN_NONASCII
self.addCleanup(unlink, filename)
with gdbm.open(filename, 'c') as db:
db[b'key'] = b'value'
self.assertTrue(os.path.exists(filename))
with gdbm.open(filename, 'r') as db:
self.assertEqual(list(db.keys()), [b'key'])
self.assertTrue(b'key' in db)
self.assertEqual(db[b'key'], b'value')

def test_nonexisting_file(self):
nonexisting_file = 'nonexisting-file'
with self.assertRaises(gdbm.error) as cm:
gdbm.open(nonexisting_file)
self.assertIn(nonexisting_file, str(cm.exception))
self.assertEqual(cm.exception.filename, nonexisting_file)

def test_open_with_pathlib_path(self):
gdbm.open(FakePath(filename), "c").close()

def test_open_with_bytes_path(self):
gdbm.open(os.fsencode(filename), "c").close()

def test_open_with_pathlib_bytes_path(self):
gdbm.open(FakePath(os.fsencode(filename)), "c").close()

def test_clear(self):
kvs = [('foo', 'bar'), ('1234', '5678')]
with gdbm.open(filename, 'c') as db:
for k, v in kvs:
db[k] = v
self.assertIn(k, db)
self.assertEqual(len(db), len(kvs))

db.clear()
for k, v in kvs:
self.assertNotIn(k, db)
self.assertEqual(len(db), 0)

@support.run_with_locale(
'LC_ALL',
'fr_FR.iso88591', 'ja_JP.sjis', 'zh_CN.gbk',
'fr_FR.utf8', 'en_US.utf8',
'',
)
def test_localized_error(self):
with temp_dir() as d:
create_empty_file(os.path.join(d, 'test'))
self.assertRaises(gdbm.error, gdbm.open, filename, 'r')


if __name__ == '__main__':
unittest.main()
Loading