Skip to content

microsoft/duroxide-python

 
 

duroxide-python

Python SDK for the Duroxide durable execution runtime.

Write durable workflows as Python generators. The Rust runtime handles replay, persistence, and fault tolerance.

Features

  • Generator-based orchestrationsyield task descriptors, Rust handles DurableFutures
  • Activities — regular Python functions for side effects (I/O, network calls)
  • Timers — durable delays that survive process restarts
  • Events — wait for external signals
  • Sub-orchestrations — compose workflows hierarchically
  • Fan-out/Fan-inctx.all() for parallel execution, ctx.race() for first-to-complete
  • Continue-as-new — long-running orchestrations with bounded history
  • Deterministic replay — safe resume after crashes
  • SQLite & PostgreSQL — pluggable storage providers, including Microsoft Entra ID auth for Azure Database for PostgreSQL
  • Custom Statusctx.set_custom_status() / ctx.reset_custom_status() for orchestration progress reporting, client.wait_for_status_change() for efficient polling
  • KV Store — durable per-instance state via ctx.set_kv_value() / ctx.get_kv_value() / ctx.get_kv_all_values() / ctx.get_kv_all_keys() / ctx.get_kv_length() / ctx.clear_kv_value() / ctx.clear_all_kv_values() / ctx.prune_kv_values_updated_before(), plus client.get_kv_value() / client.wait_for_kv_value()
  • Event Queuesctx.dequeue_event(queue_name) for FIFO mailbox-style message passing, client.enqueue_event() to send messages
  • Retry on Sessionctx.schedule_activity_with_retry_on_session() for retry with session affinity
  • Tag Routing — worker tags for activity affinity (MAX_WORKER_TAGS=5, MAX_TAG_NAME_BYTES=256, MAX_KV_KEYS=150, MAX_KV_VALUE_BYTES=65536)
  • Admin APIs — instance management, metrics, pruning
  • Activity client accessctx.get_client() lets activities start new orchestrations
  • Runtime metricsmetrics_snapshot() for orchestration/activity counters

Installation

pip install duroxide

Prebuilt wheels are published for macOS arm64/x64, Linux manylinux x86_64 and aarch64, and Windows x86_64.

Quick Start

from duroxide import SqliteProvider, Client, Runtime

# Create provider and runtime
provider = SqliteProvider.in_memory()
runtime = Runtime(provider)

# Register an activity
@runtime.register_activity("greet")
def greet(ctx, input):
    return f"Hello, {input['name']}!"

# Register an orchestration (generator function)
@runtime.register_orchestration("GreetWorkflow")
def greet_workflow(ctx, input):
    result = yield ctx.schedule_activity("greet", input)
    return result

# Start runtime and run orchestration
import threading
runtime.start()

client = Client(provider)
client.start_orchestration("greet-1", "GreetWorkflow", {"name": "World"})
status = client.wait_for_orchestration("greet-1", 10000)
print(status.output)  # "Hello, World!"

runtime.shutdown()

Orchestrations

Orchestrations are Python generator functions. They must be deterministic — no I/O, no randomness, no time.time(). Use only ctx.* methods for side effects.

@runtime.register_orchestration("MyWorkflow")
def my_workflow(ctx, input):
    # Schedule activities
    result = yield ctx.schedule_activity("DoWork", input)

    # Fan-out / Fan-in
    results = yield ctx.all([
        ctx.schedule_activity("TaskA", {"id": 1}),
        ctx.schedule_activity("TaskB", {"id": 2}),
    ])

    # Timer
    yield ctx.schedule_timer(5000)  # 5 seconds

    # Wait for external event
    approval = yield ctx.wait_for_event("approval")

    # Sub-orchestration
    sub_result = yield ctx.schedule_sub_orchestration("SubWorkflow", input)

    # Race (first to complete wins)
    winner = yield ctx.race(
        ctx.schedule_activity("Fast", None),
        ctx.schedule_timer(10000),
    )

    # Custom status (fire-and-forget, no yield)
    ctx.set_custom_status("processing complete")

    # Dequeue from event queue (FIFO, blocks until message available)
    msg = yield ctx.dequeue_event("inbox")

    return {"result": result, "winner": winner}

Activities

Activities are regular Python functions that perform side effects. They run outside the replay engine and are safe for I/O operations.

@runtime.register_activity("SendEmail")
def send_email(ctx, input):
    ctx.trace_info(f"Sending email to {input['to']}")
    # ... actual email sending ...
    return {"sent": True}

PostgreSQL Provider

from duroxide import PostgresEntraOptions, PostgresProvider, Client, Runtime

provider = PostgresProvider.connect("postgresql://user:pass@localhost:5432/mydb")
# or with custom schema:
provider = PostgresProvider.connect_with_schema("postgresql://...", "duroxide_python")

runtime = Runtime(provider)
client = Client(provider)

PostgreSQL with Microsoft Entra ID

For Azure Database for PostgreSQL Flexible Server, use Entra ID token authentication instead of a password:

provider = PostgresProvider.connect_with_entra(
    host="my-server.postgres.database.azure.com",
    port=5432,
    database="appdb",
    user="my-managed-identity",
    options=PostgresEntraOptions(max_connections=10),
)

provider = PostgresProvider.connect_with_schema_and_entra(
    host="my-server.postgres.database.azure.com",
    port=5432,
    database="appdb",
    user="my-managed-identity",
    schema="duroxide_python",
    options=PostgresEntraOptions(refresh_interval_ms=1_200_000),
)

PostgresEntraOptions also accepts audience, acquire_timeout_ms, and refresh_interval_ms.

Admin APIs

client = Client(provider)

# Metrics
metrics = client.get_system_metrics()
stats = client.get_orchestration_stats("instance-1")
depths = client.get_queue_depths()

# Instance management
instances = client.list_all_instances()
info = client.get_instance_info("instance-1")
tree = client.get_instance_tree("instance-1")

# Execution history with full event data
executions = client.list_executions("instance-1")
events = client.read_execution_history("instance-1", executions[0])
for event in events:
    print(event.kind, event.data)
    # event.kind: "OrchestrationStarted" | "ActivityCompleted" | ...
    # event.data: JSON string with event-specific content (result, input, error, etc.)

# Cleanup
client.delete_instance("instance-1", force=True)
client.prune_executions("instance-1", PruneOptions(keep_last=5))

Custom Status

Report orchestration progress visible to external clients:

@runtime.register_orchestration("ProgressWorkflow")
def progress_workflow(ctx, input):
    ctx.set_custom_status("step 1: validating")
    yield ctx.schedule_activity("Validate", input)

    ctx.set_custom_status("step 2: processing")
    result = yield ctx.schedule_activity("Process", input)

    ctx.reset_custom_status()  # clear status
    return result

# Poll for status changes from outside
status = client.wait_for_status_change("instance-1", 0, 50, 10000)
if status:
    print(status.custom_status)          # "step 1: validating"
    print(status.custom_status_version)  # monotonically increasing counter

KV Store

Durable per-instance key-value state for orchestration coordination and request/response patterns:

@runtime.register_orchestration("KvWorkflow")
def kv_workflow(ctx, input):
    ctx.set_kv_value("status", "running")
    result = yield ctx.schedule_activity("Compute", input)
    ctx.set_kv_value("result", str(result))
    snapshot = ctx.get_kv_all_values()
    keys = ctx.get_kv_all_keys()
    count = ctx.get_kv_length()
    return {"result": result, "snapshot": snapshot, "keys": keys, "count": count}

# External reads
status = client.wait_for_kv_value("instance-1", "status", 10000)
result = client.get_kv_value("instance-1", "result")

KV entries are scoped to a single orchestration instance and remain readable after completion until the instance is deleted or pruned. Use ctx.prune_kv_values_updated_before(cutoff_ms) to deterministically clear stale keys from prior turns when you only want to retain newer state.

Event Queues

Persistent FIFO message passing between clients and orchestrations:

@runtime.register_orchestration("ChatBot")
def chat_bot(ctx, input):
    msg_json = yield ctx.dequeue_event("inbox")
    msg = json.loads(msg_json)
    response = yield ctx.schedule_activity("Generate", msg["text"])
    ctx.set_custom_status(json.dumps({"state": "replied", "response": response, "seq": msg["seq"]}))
    if "bye" in msg["text"].lower():
        return f"Done after {msg['seq']} msgs"
    return (yield ctx.continue_as_new(""))

# Send messages from outside
client.enqueue_event(instance_id, "inbox", json.dumps({"seq": 1, "text": "Hello!"}))
status = client.wait_for_status_change(instance_id, 0, 50, 10000)
reply = json.loads(status.custom_status)

Development

# Create and activate a virtual environment
python3 -m venv .venv
source .venv/bin/activate

# Install build tools and test dependencies
pip install maturin pytest

# Build the native extension and install in development mode
maturin develop

# Run all 59 tests
pytest

# Run tests with verbose output
pytest -v

# Run a single test file
pytest tests/test_e2e.py -v

# Run a single test
pytest tests/test_e2e.py::test_hello_world

# Stop on first failure
pytest -v -x

# Build release wheel
maturin build --release

After Rust source changes (src/*.rs), re-run maturin develop to rebuild. Python-only changes (python/duroxide/, tests/) take effect immediately.

Changelog

See CHANGELOG.md for release notes.

Documentation

  • User Guide — orchestration patterns, activities, providers, tracing, determinism rules
  • Architecture — PyO3 interop, GIL deadlock fix, generator driver, tracing internals

Support

Use GitHub Issues for bug reports and feature requests. Do not report security vulnerabilities through public GitHub issues; follow the instructions in SECURITY.md instead.

Code of Conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information, see the Code of Conduct FAQ or contact opencode@microsoft.com with questions or comments.

Security

Microsoft takes the security of our software products and services seriously. Please do not report security vulnerabilities through public GitHub issues. See SECURITY.md for security reporting instructions.

Privacy and Telemetry

duroxide-python does not send telemetry to Microsoft. Applications may configure their own logging or metrics exporters; those signals are controlled by the application owner.

Trademarks

This project may contain trademarks or logos for projects, products, or services. Authorized use of Microsoft trademarks or logos is subject to and must follow Microsoft's Trademark & Brand Guidelines. Use of Microsoft trademarks or logos in modified versions of this project must not cause confusion or imply Microsoft sponsorship. Any use of third-party trademarks or logos is subject to those third-party policies.

License

MIT License - see LICENSE for details.

About

No description, website, or topics provided.

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages