The official Python SDK for ToonDB — a high-performance embedded document database with HNSW vector search, built-in multi-tenancy, and SQL support.
- ✅ Key-Value Store — Simple
get()/put()/delete()operations - ✅ Path-Native API — Hierarchical keys like
users/alice/email - ✅ Namespace Isolation — Type-safe multi-tenancy with
NamespaceandCollection - ✅ Hybrid Search — Vector + BM25 keyword search with RRF fusion
- ✅ ContextQuery Builder — Token-aware LLM context retrieval with budgeting
- ✅ Multi-Vector Documents — Chunk-level embeddings with aggregation
- ✅ Prefix Scanning — Fast
scan_prefix()for safe tenant-scoped iteration - ✅ ACID Transactions — Full snapshot isolation with automatic commit/abort
- ✅ Vector Search — HNSW with bulk API (~1,600 vec/s ingestion)
- ✅ SQL Support — Full DDL/DML with CREATE, INSERT, SELECT, UPDATE, DELETE
- ✅ Enhanced Error Taxonomy — ErrorCode enum with remediation hints
- ✅ CLI Tools —
toondb-server,toondb-bulk,toondb-grpc-servercommands - ✅ Dual Mode — Embedded (FFI) or IPC (multi-process)
- ✅ Zero Compilation — Pre-built binaries for Linux/macOS/Windows
pip install toondb-clientImport Note: The package is installed as
toondb-clientbut imported astoondb:from toondb import Database # Correct
Pre-built binaries included for:
- Linux x86_64 and aarch64 (glibc ≥ 2.17)
- macOS Intel and Apple Silicon (universal2)
- Windows x64
No Rust toolchain required!
ℹ️ About the Binaries: This Python SDK packages pre-compiled binaries from the main ToonDB repository. Each wheel contains platform-specific executables (
toondb-bulk,toondb-server,toondb-grpc-server) and native FFI libraries. See RELEASE.md for details on the release process.
Build lightweight graph structures on top of ToonDB's KV storage for agent memory:
from toondb import Database, GraphOverlay
db = Database.open("./agent_db")
graph = GraphOverlay(db, namespace="agent_memory")
# Add nodes (entities, concepts, events)
graph.add_node("user_alice", "person", {"name": "Alice", "role": "developer"})
graph.add_node("conv_123", "conversation", {"topic": "ToonDB features"})
graph.add_node("action_456", "action", {"type": "code_commit", "status": "success"})
# Add edges (relationships, causality, references)
graph.add_edge("user_alice", "started", "conv_123", {"timestamp": "2026-01-05"})
graph.add_edge("conv_123", "triggered", "action_456", {"reason": "user request"})
# Retrieve nodes and edges
node = graph.get_node("user_alice")
edges = graph.get_edges("user_alice", edge_type="started")
# Graph traversal
visited = graph.bfs_traversal("user_alice", max_depth=3) # BFS from Alice
path = graph.shortest_path("user_alice", "action_456") # Find connection
# Get neighbors
neighbors = graph.get_neighbors("conv_123", direction="both")
# Extract subgraph
subgraph = graph.get_subgraph(["user_alice", "conv_123", "action_456"])Use Cases:
- Agent conversation history with causal chains
- Entity relationship tracking across sessions
- Action dependency graphs for planning
- Knowledge graph construction
Enforce safety policies on agent operations with pre/post triggers:
from toondb import Database, PolicyEngine, PolicyAction
db = Database.open("./agent_data")
policy = PolicyEngine(db)
# Block writes to system keys from agents
@policy.before_write("system/*")
def block_system_writes(key, value, context):
if context.get("agent_id"):
return PolicyAction.DENY
return PolicyAction.ALLOW
# Redact sensitive data on read
@policy.after_read("users/*/email")
def redact_emails(key, value, context):
if context.get("redact_pii"):
return b"[REDACTED]"
return value
# Rate limit writes per agent
policy.add_rate_limit("write", max_per_minute=100, scope="agent_id")
# Enable audit logging
policy.enable_audit()
# Use policy-wrapped operations
policy.put(b"users/alice", b"data", context={"agent_id": "agent_001"})Route tool calls to specialized agents with automatic failover:
from toondb import Database, ToolDispatcher, ToolCategory, RoutingStrategy
db = Database.open("./agent_data")
dispatcher = ToolDispatcher(db)
# Register agents with capabilities
dispatcher.register_local_agent(
"code_agent",
capabilities=[ToolCategory.CODE, ToolCategory.GIT],
handler=lambda tool, args: {"result": f"Processed {tool}"},
)
dispatcher.register_remote_agent(
"search_agent",
capabilities=[ToolCategory.SEARCH],
endpoint="http://localhost:8001/invoke",
)
# Register tools
dispatcher.register_tool(
name="search_code",
description="Search codebase",
category=ToolCategory.CODE,
)
# Invoke with automatic routing (priority, round-robin, fastest, etc.)
result = dispatcher.invoke("search_code", {"query": "auth"}, session_id="sess_001")
print(f"Routed to: {result.agent_id}, Success: {result.success}")Lightweight graph layer for agent memory relationships:
from toondb import Database, GraphOverlay, TraversalOrder
db = Database.open("./agent_data")
graph = GraphOverlay(db)
# Add nodes (entities, concepts, events)
graph.add_node("user:alice", node_type="user", properties={"role": "admin"})
graph.add_node("project:toondb", node_type="project", properties={"status": "active"})
# Add relationships
graph.add_edge("user:alice", "project:toondb", edge_type="owns", properties={"since": "2024"})
# Traverse graph (BFS/DFS)
related = graph.bfs("user:alice", max_depth=2, edge_filter=lambda e: e.edge_type == "owns")
# Find shortest path
path = graph.shortest_path("user:alice", "project:toondb")Single entry point with auto-detection:
import toondb
# Auto-detects embedded mode from path
db = toondb.connect("./my_database")
# Auto-detects IPC mode from socket
db = toondb.connect("/tmp/toondb.sock")
# Auto-detects gRPC mode from host:port
db = toondb.connect("localhost:50051")
# Explicit mode
db = toondb.connect("./data", mode="embedded", config={"sync_mode": "full"})Logical database namespaces for true multi-tenancy without key prefixing:
from toondb import Database, CollectionConfig, DistanceMetric
db = Database.open("./my_database")
# Create isolated namespace for tenant
ns = db.create_namespace(
"tenant_acme",
display_name="Acme Corporation",
labels={"tier": "enterprise"}
)
# Create collection with immutable config
collection = ns.create_collection(
CollectionConfig(
name="documents",
dimension=384,
metric=DistanceMetric.COSINE,
enable_hybrid_search=True,
content_field="text"
)
)Combine dense vectors (HNSW) with sparse BM25 text search:
# Insert documents with text and vectors
collection.insert(
id="doc_1",
vector=[0.1] * 384,
metadata={"title": "Guide", "text": "ToonDB is fast"},
content="ToonDB is a fast database"
)
# Hybrid search (vector + keyword)
results = collection.hybrid_search(
vector=query_embedding,
text_query="fast database",
k=10,
alpha=0.7, # 70% vector, 30% keyword weight
rrf_fusion=True # Reciprocal Rank Fusion
)Store multiple embeddings per document (e.g., title + content):
# Insert document with multiple vectors
collection.insert_multi_vector(
id="article_1",
vectors={
"title": title_embedding, # List[float] of dim 384
"abstract": abstract_embedding, # List[float] of dim 384
"content": content_embedding # List[float] of dim 384
},
metadata={"title": "Deep Learning Survey"}
)
# Search with aggregation strategy
results = collection.multi_vector_search(
query_vectors={
"title": query_title_embedding,
"content": query_content_embedding
},
k=10,
aggregation="max_pooling" # or "mean_pooling", "weighted_sum"
)Optimize retrieval for LLM context windows:
from toondb import ContextQuery, DeduplicationStrategy
# Build context with token budgeting
context = (
ContextQuery(collection)
.add_vector_query(query_embedding, weight=0.7)
.add_keyword_query("machine learning", weight=0.3)
.with_token_budget(4000) # Stay within model limit
.with_deduplication(DeduplicationStrategy.SEMANTIC)
.execute()
)
# Format for LLM
prompt = f"Context: {context.as_markdown()}\\n\\nQuestion: {user_question}"from toondb import Database
# Open database (creates if doesn't exist)
with Database.open("./my_database") as db:
# Simple key-value
db.put(b"user:123", b'{"name":"Alice","age":30}')
value = db.get(b"user:123")
print(value.decode())
# Output: {"name":"Alice","age":30}# Start server (globally available after pip install)
toondb-server --db ./my_database
# Check status
toondb-server status --db ./my_database
# Output: [Server] Running (PID: 12345)
# Stop server
toondb-server stop --db ./my_databasefrom toondb import IpcClient
# Connect to running server
client = IpcClient.connect("./my_database/toondb.sock")
client.put(b"key", b"value")
value = client.get(b"key")
print(value.decode())
# Output: valuefrom toondb import Database
with Database.open("./sql_db") as db:
# Create users table
db.execute_sql("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER
)
""")
# Insert data
db.execute_sql("""
INSERT INTO users (id, name, email, age)
VALUES (1, 'Alice', 'alice@example.com', 30)
""")
db.execute_sql("""
INSERT INTO users (id, name, email, age)
VALUES (2, 'Bob', 'bob@example.com', 25)
""")Output:
Table 'users' created
2 rows inserted
# Select all users - Returns SQLQueryResult object
result = db.execute_sql("SELECT * FROM users")
print(f"Found {len(result.rows)} users (affected: {result.rows_affected})")
for row in result.rows:
print(row)
# Output:
# Found 2 users (affected: 0)
# {'id': 1, 'name': 'Alice', 'email': 'alice@example.com', 'age': 30}
# {'id': 2, 'name': 'Bob', 'email': 'bob@example.com', 'age': 25}
# WHERE clause
result = db.execute_sql("SELECT name, age FROM users WHERE age > 26")
for row in result.rows:
print(f"{row['name']}: {row['age']} years old")
# Output:
# Alice: 30 years oldImportant: execute_sql() returns a SQLQueryResult object with:
.rows- List of dictionaries (for SELECT queries).columns- List of column names.rows_affected- Number of rows modified (for INSERT/UPDATE/DELETE)
# Update
update_result = db.execute_sql("UPDATE users SET age = 31 WHERE name = 'Alice'")
print(f"Updated {update_result.rows_affected} rows")
# Delete
delete_result = db.execute_sql("DELETE FROM users WHERE age < 26")
print(f"Deleted {delete_result.rows_affected} rows")
# Verify
result = db.execute_sql("SELECT name, age FROM users ORDER BY age")
for row in result.rows:
print(row)
# Output:
# Updated 1 rows
# Deleted 1 rows
# {'name': 'Alice', 'age': 31}# Aggregates
result = db.execute_sql("SELECT COUNT(*) as total FROM users")
print(f"Total users: {result.rows[0]['total']}")
result = db.execute_sql("SELECT AVG(age) as avg_age FROM users")
print(f"Average age: {result.rows[0]['avg_age']}")
# Complex WHERE with AND/OR
result = db.execute_sql("""
SELECT name, age FROM users
WHERE age > 25 AND (name = 'Alice' OR name = 'Bob')
ORDER BY age DESC
""")
for row in result.rows:
print(f"{row['name']}: {row['age']} years old")Note: JOIN operations are not yet supported in the current version. They are planned for a future release.
ToonDB provides scan_batched() for efficient large-scale scans with dramatically reduced FFI overhead.
For 10,000 results with 500ns FFI overhead per call:
scan(): 10,000 FFI calls = 5ms overheadscan_batched(): 10 FFI calls = 5µs overhead (1000x faster!)
from toondb import Database
with Database.open("./my_database") as db:
# Regular scan - fine for small datasets
for key, value in db.scan_prefix(b"user:"):
print(key, value)
# Batched scan - optimized for large datasets
# Fetches 1000 results per FFI call instead of 1
for key, value in db.scan_batched(start=b"user:", end=b"user:\xff", batch_size=1000):
print(key, value)- ✅ Scanning thousands or millions of records
- ✅ Data export or migration operations
- ✅ Full table scans in analytics workloads
- ✅ Batch processing pipelines
Output:
Alice bought Laptop for $999.99
Bob bought Keyboard for $75.0
# GROUP BY with aggregations
result = db.execute_sql("""
SELECT users.name, COUNT(*) as order_count, SUM(orders.amount) as total
FROM users
JOIN orders ON users.id = orders.user_id
GROUP BY users.name
ORDER BY total DESC
""")
for row in result.rows:
print(f"{row['name']}: {row['order_count']} orders, ${row['total']} total")Output:
Alice: 2 orders, $1024.99 total
Bob: 1 orders, $75.0 total
# Put
db.put(b"key", b"value")
# Get
value = db.get(b"key")
if value:
print(value.decode())
else:
print("Key not found")
# Delete
db.delete(b"key")Output:
value
Key not found (after delete)
# Hierarchical data storage
db.put_path("users/alice/email", b"alice@example.com")
db.put_path("users/alice/age", b"30")
db.put_path("users/bob/email", b"bob@example.com")
# Retrieve by path
email = db.get_path("users/alice/email")
print(f"Alice's email: {email.decode()}")Output:
Alice's email: alice@example.com
The most efficient way to iterate keys with a common prefix:
# Insert multi-tenant data
db.put(b"tenants/acme/users/1", b'{"name":"Alice"}')
db.put(b"tenants/acme/users/2", b'{"name":"Bob"}')
db.put(b"tenants/acme/orders/1", b'{"total":100}')
db.put(b"tenants/globex/users/1", b'{"name":"Charlie"}')
# Scan only ACME Corp data (tenant isolation)
results = list(db.scan(b"tenants/acme/", b"tenants/acme;"))
print(f"ACME Corp has {len(results)} items:")
for key, value in results:
print(f" {key.decode()}: {value.decode()}")Output:
ACME Corp has 3 items:
tenants/acme/orders/1: {"total":100}
tenants/acme/users/1: {"name":"Alice"}
tenants/acme/users/2: {"name":"Bob"}
Why use scan():
- Fast: O(|prefix|) performance
- Isolated: Perfect for multi-tenant apps
- Efficient: Binary-safe iteration
# Context manager handles commit/abort
with db.transaction() as txn:
txn.put(b"account:1:balance", b"1000")
txn.put(b"account:2:balance", b"500")
# Commits on success, aborts on exceptionOutput:
✅ Transaction committed
txn = db.begin_transaction()
try:
txn.put(b"key1", b"value1")
txn.put(b"key2", b"value2")
# Scan within transaction
for key, value in txn.scan(b"key", b"key~"):
print(f"{key.decode()}: {value.decode()}")
txn.commit()
except Exception as e:
txn.abort()
raiseOutput:
key1: value1
key2: value2
✅ Transaction committed
Returns results in TOON format (token-optimized for LLMs):
# Insert structured data
db.put(b"products/laptop", b'{"name":"Laptop","price":999,"stock":5}')
db.put(b"products/mouse", b'{"name":"Mouse","price":25,"stock":20}')
# Query with column selection
results = db.query("products/") \
.select(["name", "price"]) \
.limit(10) \
.to_list()
for key, value in results:
print(f"{key.decode()}: {value.decode()}")Output (TOON Format):
products/laptop: result[1]{name,price}:Laptop,999
products/mouse: result[1]{name,price}:Mouse,25
Other query methods:
first = db.query("products/").first() # Get first result
count = db.query("products/").count() # Count results
exists = db.query("products/").exists() # Check existencefrom toondb.bulk import bulk_build_index, bulk_query_index
import numpy as np
# Generate embeddings (10K × 768D)
embeddings = np.random.randn(10000, 768).astype(np.float32)
# Build HNSW index at ~1,600 vec/s
stats = bulk_build_index(
embeddings,
output="my_index.hnsw",
m=16,
ef_construction=100,
metric="cosine"
)
print(f"Built {stats.vectors} vectors at {stats.rate:.0f} vec/s")Output:
Built 10000 vectors at 1598 vec/s
Index size: 45.2 MB
# Single query vector
query = np.random.randn(768).astype(np.float32)
results = bulk_query_index(
index="my_index.hnsw",
query=query,
k=10,
ef_search=64
)
print(f"Top {len(results)} nearest neighbors:")
for i, neighbor in enumerate(results):
print(f"{i+1}. ID: {neighbor.id}, Distance: {neighbor.distance:.4f}")Output:
Top 10 nearest neighbors:
1. ID: 3421, Distance: 0.1234
2. ID: 7892, Distance: 0.1456
3. ID: 1205, Distance: 0.1678
...
Performance Comparison:
| Method | Throughput | Use Case |
|---|---|---|
| Python FFI | ~130 vec/s | Small datasets |
| Bulk API | ~1,600 vec/s | Large-scale ingestion |
Three CLI tools are globally available after pip install toondb-client:
Multi-process database access via Unix domain sockets.
# Start server
toondb-server --db ./my_database
# Check status
toondb-server status --db ./my_database
# Output: [Server] Running (PID: 12345)
# Stop server
toondb-server stop --db ./my_databaseOptions:
| Option | Default | Description |
|---|---|---|
--db PATH |
./toondb_data |
Database directory |
--socket PATH |
<db>/toondb.sock |
Unix socket path |
--max-clients N |
100 |
Max concurrent connections |
--timeout-ms MS |
30000 |
Connection timeout |
--log-level |
info |
trace/debug/info/warn/error |
Wire Protocol (14 Operations):
| OpCode | Operation | Description |
|---|---|---|
0x01 |
PUT | Store key-value |
0x02 |
GET | Retrieve value |
0x03 |
DELETE | Delete key |
0x04 |
BEGIN_TXN | Start transaction |
0x05 |
COMMIT_TXN | Commit transaction |
0x06 |
ABORT_TXN | Abort transaction |
0x07 |
QUERY | Execute query (TOON format) |
0x08 |
CREATE_TABLE | Create table schema |
0x09 |
PUT_PATH | Store at path |
0x0A |
GET_PATH | Get by path |
0x0B |
SCAN | Scan key range |
0x0C |
CHECKPOINT | Force durability |
0x0D |
STATS | Get server statistics |
0x0E |
PING | Health check |
High-performance vector operations (~1,600 vec/s).
# Build HNSW index
toondb-bulk build-index \
--input embeddings.npy \
--output index.hnsw \
--dimension 768
# Query k-NN
toondb-bulk query --index index.hnsw --query vec.raw --k 10
# Get index metadata
toondb-bulk info --index index.hnsw
# Convert formats
toondb-bulk convert --input vec.npy --output vec.raw --to-format raw_f32 --dimension 768gRPC server for remote vector search.
# Start gRPC server
toondb-grpc-server --host 0.0.0.0 --port 50051
# Check status
toondb-grpc-server status --port 50051gRPC Service Methods:
| Method | Description |
|---|---|
CreateIndex |
Create HNSW index (dimension, metric, M, ef_construction) |
DropIndex |
Delete an index |
InsertBatch |
Batch vector insertion (flat format) |
InsertStream |
Stream vectors for insertion |
Search |
Single k-NN query |
SearchBatch |
Batch k-NN queries |
GetStats |
Index statistics (num_vectors, layers, connections) |
HealthCheck |
Server health + version |
export TOONDB_SERVER_PATH=/path/to/toondb-server
export TOONDB_BULK_PATH=/path/to/toondb-bulk
export TOONDB_GRPC_SERVER_PATH=/path/to/toondb-grpc-serverfrom toondb import Database
import json
def main():
with Database.open("./saas_db") as db:
# Create SQL schema
db.execute_sql("""
CREATE TABLE IF NOT EXISTS tenants (
id INTEGER PRIMARY KEY,
name TEXT,
created_at TEXT
)
""")
# Insert tenants
db.execute_sql("INSERT INTO tenants VALUES (1, 'ACME Corp', '2026-01-01')")
db.execute_sql("INSERT INTO tenants VALUES (2, 'Globex Inc', '2026-01-01')")
# Store tenant-specific K-V data
db.put(b"tenants/1/users/alice", b'{"role":"admin","email":"alice@acme.com"}')
db.put(b"tenants/1/users/bob", b'{"role":"user","email":"bob@acme.com"}')
db.put(b"tenants/2/users/charlie", b'{"role":"admin","email":"charlie@globex.com"}')
# Query SQL
tenants = db.execute_sql("SELECT * FROM tenants ORDER BY name")
for tenant in tenants:
tenant_id = tenant['id']
tenant_name = tenant['name']
# Scan tenant-specific data (isolation)
prefix = f"tenants/{tenant_id}/".encode()
end = f"tenants/{tenant_id};".encode()
users = list(db.scan(prefix, end))
print(f"\n{tenant_name} ({len(users)} users):")
for key, value in users:
user_data = json.loads(value.decode())
print(f" {key.decode()}: {user_data['email']} ({user_data['role']})")
if __name__ == "__main__":
main()Output:
ACME Corp (2 users):
tenants/1/users/alice: alice@acme.com (admin)
tenants/1/users/bob: bob@acme.com (user)
Globex Inc (1 users):
tenants/2/users/charlie: charlie@globex.com (admin)
| Method | Description |
|---|---|
Database.open(path) |
Open/create database |
put(key: bytes, value: bytes) |
Store key-value pair |
get(key: bytes) -> bytes | None |
Retrieve value |
delete(key: bytes) |
Delete a key |
put_path(path: str, value: bytes) |
Store at hierarchical path |
get_path(path: str) -> bytes | None |
Retrieve by path |
scan(start: bytes, end: bytes) |
Iterate key range |
transaction() |
Begin ACID transaction |
execute_sql(query: str) |
Execute SQL statement |
checkpoint() |
Force durability checkpoint |
stats() |
Get storage statistics |
| Method | Description |
|---|---|
IpcClient.connect(path) |
Connect to IPC server |
ping() -> float |
Check latency (ms) |
query(prefix: str) |
Create query builder |
scan(prefix: str) |
Scan keys with prefix |
begin_transaction() |
Start transaction |
commit(txn_id) |
Commit transaction |
abort(txn_id) |
Abort transaction |
| Function | Description |
|---|---|
bulk_build_index(embeddings, output, m, ef_construction) |
Build HNSW index (~1,600 vec/s) |
bulk_query_index(index, query, k, ef_search) |
Query for k nearest neighbors |
bulk_info(index) |
Get index metadata |
# Custom configuration
db = Database.open("./my_db", config={
"create_if_missing": True,
"wal_enabled": True,
"sync_mode": "normal", # "full", "normal", "off"
"memtable_size_bytes": 64 * 1024 * 1024, # 64MB
})from toondb import Database, ToonDBError
try:
with Database.open("./db") as db:
value = db.get(b"key")
if value is None:
print("Key not found (not an error)")
except ToonDBError as e:
print(f"Database error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")Achieve 40-66% token reduction compared to JSON for LLM context efficiency.
from toondb import Database
records = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]
# Convert to TOON format
toon_str = Database.to_toon("users", records, ["name", "email"])
print(toon_str)
# Output: users[2]{name,email}:Alice,alice@example.com;Bob,bob@example.com
# Token comparison:
# JSON (compact): ~165 tokens
# TOON format: ~70 tokens (59% reduction!)toon_str = "users[2]{name,email}:Alice,alice@ex.com;Bob,bob@ex.com"
table_name, fields, records = Database.from_toon(toon_str)
print(table_name) # "users"
print(fields) # ["name", "email"]
print(records) # [{"name": "Alice", "email": "alice@ex.com"}, ...]1000× fewer FFI calls for large dataset scans.
with Database.open("./my_db") as db:
# Insert test data
for i in range(10000):
db.put(f"item:{i:05d}".encode(), f"value:{i}".encode())
txn = db.transaction()
# Batched scan - dramatically faster!
count = 0
for key, value in txn.scan_batched(
start=b"item:",
end=b"item;",
batch_size=1000 # Fetch 1000 results per FFI call
):
count += 1
print(f"Scanned {count} items")
txn.abort()Performance Comparison (10,000 results, 500ns FFI overhead):
| Method | FFI Calls | Overhead |
|---|---|---|
scan() |
10,000 | 5ms |
scan_batched() |
10 | 5µs (1000× faster) |
with Database.open("./my_db") as db:
# Perform operations
for i in range(100):
db.put(f"key:{i}".encode(), f"value:{i}".encode())
# Get runtime statistics
stats = db.stats()
print(f"Keys: {stats['keys_count']}")
print(f"Bytes written: {stats['bytes_written']}")
print(f"Bytes read: {stats['bytes_read']}")
print(f"Transactions committed: {stats['transactions_committed']}")
print(f"Cache hits: {stats['cache_hits']}")
print(f"Cache misses: {stats['cache_misses']}")Available Statistics:
keys_count- Total number of keysbytes_written- Total bytes writtenbytes_read- Total bytes readtransactions_committed- Number of committed transactionstransactions_aborted- Number of aborted transactionsqueries_executed- Number of queries executedcache_hits- Cache hit countcache_misses- Cache miss count
Force a durability checkpoint to flush all in-memory data to disk.
with Database.open("./my_db") as db:
# Bulk import
for i in range(10000):
db.put(f"bulk:{i}".encode(), f"data:{i}".encode())
# Force checkpoint for durability
db.checkpoint()
print("All data flushed to disk!")Use Cases:
- Before backup operations
- After bulk imports
- Before system shutdown
- To reduce recovery time after crash
Full trigger system for database events with Python code.
from toondb.plugins import PythonPlugin, PluginRegistry, TriggerEvent, TriggerAbort
# Define a validation plugin
plugin = PythonPlugin(
name="user_validator",
code='''
def on_before_insert(row: dict) -> dict:
"""Validate and transform data before insert."""
# Normalize email
if "email" in row:
row["email"] = row["email"].lower().strip()
# Validate age
if row.get("age", 0) < 0:
raise TriggerAbort("Age cannot be negative", code="INVALID_AGE")
# Add timestamp
import time
row["created_at"] = time.time()
return row
def on_after_delete(row: dict) -> dict:
"""Audit log on delete."""
print(f"[AUDIT] Deleted: {row}")
return row
''',
version="1.0.0",
packages=["numpy", "pandas"], # Optional: required packages
triggers={"users": ["BEFORE INSERT", "AFTER DELETE"]}
)
# Register and use
registry = PluginRegistry()
registry.register(plugin)
# Fire trigger
row = {"name": "Alice", "email": " ALICE@EXAMPLE.COM ", "age": 30}
result = registry.fire("users", TriggerEvent.BEFORE_INSERT, row)
print(result["email"]) # "alice@example.com"Available Trigger Events:
BEFORE_INSERT,AFTER_INSERTBEFORE_UPDATE,AFTER_UPDATEBEFORE_DELETE,AFTER_DELETEON_BATCH
with Database.open("./my_db") as db:
txn = db.transaction()
# Get transaction ID
print(f"Transaction ID: {txn.id}")
# Perform operations
txn.put(b"key", b"value")
# Commit returns LSN (Log Sequence Number)
lsn = txn.commit()
print(f"Committed at LSN: {lsn}")
# Execute SQL within transaction
txn2 = db.transaction()
result = txn2.execute("INSERT INTO users VALUES (1, 'Alice')")
txn2.commit()Start the bundled IPC server for multi-process access:
# Start server
toondb-server --db ./my_database
# Options:
# -d, --db <PATH> Database directory [default: ./toondb_data]
# -s, --socket <PATH> Unix socket path [default: <db>/toondb.sock]
# --max-clients <N> Max connections [default: 100]
# --timeout-ms <MS> Connection timeout [default: 30000]
# --log-level <LEVEL> trace/debug/info/warn/error [default: info]Connect from Python:
from toondb import IpcClient
client = IpcClient.connect("./my_database/toondb.sock")
client.put(b"key", b"value")
value = client.get(b"key")
latency = client.ping() # Round-trip latency in seconds
client.close()✅ Use SQL for structured data — Tables, relationships, complex queries
✅ Use K-V for unstructured data — JSON documents, blobs, caching
✅ Use scan_prefix() for multi-tenancy — Efficient prefix-based isolation
✅ Use scan_batched() for large scans — 1000× faster than regular scan()
✅ Use transactions — Atomic multi-key/multi-table operations
✅ Use bulk API for vectors — 12× faster than FFI for HNSW building
✅ Use TOON format for LLMs — 40-66% token reduction vs JSON
✅ Always use context managers — with Database.open() ensures cleanup
| Platform | Wheel Tag | Notes |
|---|---|---|
| Linux x86_64 | manylinux_2_17_x86_64 |
glibc ≥ 2.17 (CentOS 7+) |
| Linux aarch64 | manylinux_2_17_aarch64 |
ARM servers (AWS Graviton) |
| macOS | macosx_11_0_universal2 |
Intel + Apple Silicon |
| Windows | win_amd64 |
Windows 10+ x64 |
# Clone repo
git clone https://github.com/toondb/toondb-python-sdk
cd toondb-python-sdk
# Install in development mode
pip install -e .
# Run tests
pytest tests/ -v- Python 3.9+
- NumPy (for vector operations)
- No Rust toolchain required for installation
Apache License 2.0
- GitHub Issues: https://github.com/toondb/toondb-python-sdk/issues
- Email: sushanth@toondb.dev
Sushanth - GitHub