An async Python library for executing shell commands inside Kubernetes pods over WebSocket connections. Provider-agnostic — the same session and registry API works with PlainTTY, native Kubernetes, or any custom provider.
On top of the core library, PodWrangler ships a REST API (FastAPI + SQLModel) and a CLI (Typer + Rich) for managing pods and running commands across providers with persistent state.
- Async-first — built on
asyncioandwebsockets - Provider-agnostic — swap PlainTTY, Kubernetes, or a custom provider without changing application code
- Registry — alias-based catalogue for managing multiple pod endpoints
- Auto-reconnect — optional exponential-backoff reconnection on connection loss
- Structured results — every command returns a
CommandResultwith output, exit code, and error - Token safety — auth tokens are automatically redacted from
repr()and serialisation output - Typed models — all data models are frozen Pydantic v2 models
- FastAPI + SQLModel — async REST API with auto-generated OpenAPI docs
- Full pod CRUD — register, list, update, and remove pods by alias
- Remote execution — run a command on one pod or fan-out to many in parallel
- Persistent history — every execution is stored in SQLite or PostgreSQL
- Dual DB support — switch between SQLite and PostgreSQL via a single env var
- Typer + Rich — beautiful terminal output with tables, panels, and status spinners
- Local or remote — target any running API instance via
--api-urlor env var - Embedded server —
--localflag auto-starts the API in a daemon thread; no separate terminal needed - Full pod management —
pod list/show/add/update/removesub-commands - Remote execution —
exec run,exec run-many(parallel fan-out),exec history
- Python ≥ 3.13
websockets≥ 16.0pydantic≥ 2.10
fastapi,uvicornsqlmodel,aiosqlite(SQLite) orasyncpg(PostgreSQL)pydantic-settings
typer,rich,httpx
pip install podwranglerOr with Poetry:
poetry add podwranglerimport asyncio
import podwrangler as pw
async def main():
provider = pw.PlainTTYProvider()
wss_url = "wss://k8s.provider.com/exec/45582/?token=<JWT>&cols=220&pod=my-app-abc-xyz"
async with pw.PodSession.from_url(wss_url, provider) as session:
result = await session.run("hostname")
print(result.output)
asyncio.run(main())provider = pw.PlainTTYProvider(cols=220)provider = pw.KubernetesProvider(
host="k8s.example.com:6443",
use_tls=True,
shell_cmd=["/bin/sh"],
)Note:
KubernetesProvider.init_shellandlist_podsare not yet implemented.
async with pw.PodSession.from_url(wss_url, provider) as session:
# Run a single command
result = await session.run("ls -la /app")
print(result.output)
# Run with timeout and strict exit-code check
result = await session.run("make test", timeout=120.0, check=True)
# Run a sequence of commands
results = await session.run_many(
["cd /app", "git pull", "pip install -r requirements.txt"],
timeout_each=60.0,
stop_on_error=True,
)
for r in results:
print("✓" if r.succeeded else "✗", r.command)registry = pw.PodRegistry()
registry.register_from_url("web-1", WSS_URL_1, provider)
registry.register_from_url("web-2", WSS_URL_2, provider)
print(registry.aliases) # ['web-1', 'web-2']
async with registry.session("web-1") as s:
result = await s.run("uptime")
print(result.output)async def run_on(alias, registry, cmd):
async with registry.session(alias) as s:
return await s.run(cmd)
results = await asyncio.gather(
run_on("web-1", registry, "hostname"),
run_on("web-2", registry, "hostname"),
)
for r in results:
print(r.pod_label, "→", r.output.strip())try:
result = await session.run("ls /missing", check=True)
except pw.PodCommandTimeoutError as e:
print(f"Timed out after {e.timeout}s")
except pw.PodCommandError as e:
print(f"Exit {e.exit_code}: {e.output}")
except pw.PodConnectionError as e:
print(f"Connection error: {e.reason}")session = pw.PodSession(endpoint, auto_reconnect=True, connect_timeout=20.0)
await session.connect()
result = await session.run("echo hello") # reconnects automatically if needed
await session.close()provider = pw.PlainTTYProvider(cols=220)
identity = pw.PodIdentity(
pod_name="my-app-deploy-abc12-xyz99",
service="my-app-deploy",
deployment="abc12",
cluster="45582", # PlainTTY service_id
cols=220,
)
endpoint = pw.PodEndpoint(identity=identity, provider=provider, token="<JWT>")
async with pw.PodSession(endpoint, namespace="production") as s:
r = await s.run("env | grep K8S")
print(r.output)Any class that satisfies the ProviderSpec protocol can be used as a provider — no inheritance required:
class MyProvider:
name = "myprovider"
def build_wss_schema(self, identity: pw.PodIdentity, token: str) -> pw.WssSchema:
url = f"wss://my.service.com/exec/{identity.pod_name}?token={token}"
return pw.WssSchema(url=url, is_binary_framed=False)
def parse_url(self, url: str) -> tuple[pw.PodIdentity, str]:
...
async def init_shell(self, ws, schema: pw.WssSchema) -> None:
...
async def detect_namespace(self, ws, identity, schema) -> str:
return identity.namespace or "default"
async def list_pods(self, token: str, **kwargs) -> list[pw.PodIdentity]:
raise NotImplementedError
async with pw.PodSession.from_url(my_url, MyProvider()) as s:
print(await s.run("hostname"))# SQLite (default)
uvicorn api.main:app --reload
# PostgreSQL
PODWRANGLER_DATABASE_URL=postgresql+asyncpg://user:pass@localhost/podwrangler \
uvicorn api.main:app --reloadInteractive docs are available at http://127.0.0.1:8000/docs.
| Method | Path | Description |
|---|---|---|
GET |
/api/v1/pods/ |
List all registered pods |
POST |
/api/v1/pods/ |
Register a new pod |
GET |
/api/v1/pods/{alias} |
Get pod details |
PATCH |
/api/v1/pods/{alias} |
Update pod connection details |
DELETE |
/api/v1/pods/{alias} |
Remove a pod |
POST |
/api/v1/pods/{alias}/exec |
Run a command on one pod |
POST |
/api/v1/exec |
Run a command on multiple pods (parallel) |
GET |
/api/v1/commands |
List command execution history |
GET |
/health |
Health check |
| Env var | Default | Description |
|---|---|---|
PODWRANGLER_DATABASE_URL |
sqlite+aiosqlite:///./pods.db |
Database URL |
PODWRANGLER_API_HOST |
127.0.0.1 |
Bind host |
PODWRANGLER_API_PORT |
8000 |
Bind port |
See docs/api/api.md for the full REST API reference and docs/api/arch.md for architecture details.
All dependencies are declared in pyproject.toml. After poetry install the podwrangler script is available.
# Start the embedded server and check health
podwrangler --local health
# Register a pod
podwrangler pod add --alias prod-web \
--url "wss://k8s.provider.com/exec/45582/?token=<JWT>&pod=my-app-abc-xyz" \
--provider plaintty
# List pods
podwrangler pod list
# Run a command on one pod
podwrangler exec run prod-web "uptime"
# Run a command on multiple pods in parallel
podwrangler exec run-many "df -h" --aliases prod-web,staging-web
# Show execution history
podwrangler exec history| Source | Key / Env var | Description |
|---|---|---|
| Env var | PODWRANGLER_API_URL |
API base URL |
| TOML file | ~/.podwrangler.toml → api_url |
Persistent URL config |
| Default | http://127.0.0.1:8000 |
Fallback |
| Env var | PODWRANGLER_API_KEY |
Bearer token (optional) |
| Option | Description |
|---|---|
--local / -L |
Auto-start embedded API server before running the command |
--api-url / -u |
Override API base URL for this invocation |
See docs/cli/api.md for the full CLI reference and docs/cli/arch.md for architecture details.
See docs/deployment.md for the full packaging and deployment reference, covering:
- Building a wheel and exporting a
requirements.txt - Docker (SQLite single-container and PostgreSQL two-container)
- Systemd service (Linux bare-metal)
- Kubernetes manifests (
deploy/kubernetes/) - Reverse proxy (nginx + TLS)
- Database backends and schema migrations
- Production checklist
All packages live under src/ (PEP 517 src layout):
src/
├── podwrangler/ Core async library
├── api/ FastAPI REST service
└── cli/ Typer + Rich CLI
Set PYTHONPATH=src (or use make serve / make cli) when running outside of an installed environment.
| Target | Description |
|---|---|
make install |
Install all runtime + dev dependencies via Poetry |
make lint |
Run ruff linter |
make format |
Auto-format code with ruff |
make typecheck |
Run mypy type checker |
make check |
Run all code quality checks |
make test |
Run test suite |
make test-cov |
Run tests with HTML coverage report |
make serve |
Start API server in dev mode (SQLite, auto-reload) |
make serve-prod |
Start API server in production mode (4 workers) |
make cli ARGS="pod list" |
Run CLI with arguments |
make build |
Build wheel + sdist |
make publish |
Publish to PyPI |
make docker-build |
Build Docker image |
make docker-run |
Run API in Docker (SQLite) |
make docker-run-pg |
Run API in Docker with PostgreSQL |
make db-init |
Initialise database tables |
make clean |
Remove build artefacts and caches |
make all |
Install, check, test, and build |
Run make help to see all available targets with descriptions.
See docs/api.md for the core library API reference.
See docs/arch.md for core library architecture notes.
All exceptions inherit from PodClientError:
| Exception | When raised |
|---|---|
PodNotFoundError |
Alias not in registry |
PodConnectionError |
WebSocket connect/network failure |
PodNotConnectedError |
Operation on disconnected session |
PodCommandError |
Non-zero exit code with check=True |
PodCommandTimeoutError |
Command exceeded timeout |
PodConfigValidationError |
Invalid config field value |
PodURLParseError |
URL doesn't match provider schema |
NamespaceDetectionError |
Namespace auto-detection failed |
| Constant | Value |
|---|---|
DEFAULT_TERMINAL_COLS |
220 |
DEFAULT_CONNECT_TIMEOUT |
15.0 s |
DEFAULT_COMMAND_TIMEOUT |
30.0 s |
MAX_RECONNECT_ATTEMPTS |
3 |
RECONNECT_BACKOFF_BASE |
2.0 s |
MIT — see LICENSE for details.
Mohammad Khodaparastan — mohammad@khodaparastan.com