Skip to content

Commit ffecf24

Browse files
mikemolinetclaude
andauthored
feat: add fire + list_claimable + claim + claim_next (#23)
Brings the Python SDK toward parity with the @cueapi/mcp 0.3.0 + 0.4.0 surface. Adds four new methods covering the SEND side (fire) and the RECEIVE side (claimable list, claim, claim-next). Heartbeat signature fix is held for a follow-up commit pending technical review. CuesResource: - fire(cue_id, payload_override=None, merge_strategy=None) POST /v1/cues/{id}/fire. For ad-hoc one-shot triggers and for using cues as a messaging channel between agents (carry message, instruction, task, reply_cue_id in payload_override). ExecutionsResource: - list_claimable(task=None, agent=None) GET /v1/executions/claimable?task=&agent= Filters server-side via query params (NOT client-side). Required for single-purpose workers; client-side filter after fetch hits the LIMIT 50 starvation bug fixed in the 2026-04-25 prod incident. - claim(execution_id, worker_id=...) POST /v1/executions/{id}/claim Atomic; returns 409 if already claimed. - claim_next(worker_id=..., task=None) POST /v1/executions/claim (no task) OR list+claim chain (with task). Server's claim endpoint does not accept a task filter today; with task the SDK fans out (list_claimable filtered, pick oldest, claim by ID). Tiny race window between list and claim is bounded by the atomic claim returning 409, in which case the caller retries. Version: 0.1.3 -> 0.2.0. Aligned cueapi/__init__.py (had drifted to 0.1.2) with pyproject.toml at the same time. Tests: 42 unit tests pass (was 30; +12 net). Mirrors the existing MagicMock pattern in tests/test_executions_resource.py. Pending follow-up: - ExecutionsResource.heartbeat(execution_id) currently sends an empty body and does not include worker_id via the X-Worker-Id request header that the server reads from. Worker-id is what the server uses to enforce ownership on heartbeat (returns 403 on mismatch); without it the race-protection check is silently bypassed. A signature change to add worker_id is held pending technical review of the deprecation cadence (additive kwarg-only with default-warn-on-omit vs hard signature change in a major bump). Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ff28c30 commit ffecf24

8 files changed

Lines changed: 337 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,20 @@
22

33
All notable changes to cueapi-sdk will be documented here.
44

5+
## [0.2.0] - 2026-05-01
6+
7+
### Added
8+
- `client.cues.fire(cue_id, payload_override=None, merge_strategy=None)` for ad-hoc one-shot triggers and for using cues as a messaging channel between agents. Wraps `POST /v1/cues/{id}/fire`.
9+
- `client.executions.list_claimable(task=None, agent=None)` for unclaimed worker-transport executions ready for processing. Filters server-side via task / agent query params; required for single-purpose workers.
10+
- `client.executions.claim(execution_id, worker_id=...)` for atomic claim of a specific execution. Returns 409 if already claimed.
11+
- `client.executions.claim_next(worker_id=..., task=None)` for the next available execution. Without task the server picks the oldest pending; with task the SDK fans out internally (list_claimable filtered, pick oldest, claim by ID) since the server's claim endpoint does not accept a task filter today.
12+
13+
### Changed
14+
- `__version__` in `cueapi/__init__.py` had drifted to 0.1.2 while `pyproject.toml` was at 0.1.3. Both now aligned at 0.2.0.
15+
16+
### Pending follow-up
17+
- `client.executions.heartbeat(execution_id)` currently sends an empty body and does NOT include `worker_id` via the `X-Worker-Id` request header that the server reads from. Worker-id is what the server uses to enforce ownership on the heartbeat (returns 403 on mismatch); without it the race-protection check is silently bypassed. A signature change to add `worker_id` is held pending technical review of the deprecation cadence.
18+
519
## [0.1.0] - 2025-03-28
620

721
### Added

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<!-- Pipeline validation: 2026-04-04 -->
2-
> **For AI agents:** This is the official Python SDK for CueAPI. Run `pip install cueapi-sdk`. Your API key is in `CUEAPI_API_KEY`. Create a client: `client = CueAPI(os.environ["CUEAPI_API_KEY"])`. Schedule a task: `client.cues.create(name="task-name", cron="0 9 * * *", callback="https://your-handler.com", payload={})`. For agents without a public URL: set `transport="worker"` and run `pip install cueapi-worker`, then define a handler with `@worker.handler("task-name")` returning `{"success": true}` or `{"success": false, "error": "reason"}`. List executions: `client.executions.list()`. Get one: `client.executions.get("exec_id")`. Do not retry on `AuthenticationError` or `CueLimitExceededError`. Wait `e.retry_after` seconds on `RateLimitError`.
2+
> **For AI agents:** This is the official Python SDK for CueAPI. Run `pip install cueapi-sdk`. Your API key is in `CUEAPI_API_KEY`. Create a client: `client = CueAPI(os.environ["CUEAPI_API_KEY"])`. Schedule a task: `client.cues.create(name="task-name", cron="0 9 * * *", callback="https://your-handler.com", payload={})`. For agents without a public URL: set `transport="worker"` and run `pip install cueapi-worker`, then define a handler with `@worker.handler("task-name")` returning `{"success": true}` or `{"success": false, "error": "reason"}`. Fire an existing cue immediately: `client.cues.fire("cue_id", payload_override={"task": "x"})`. List unclaimed worker executions: `client.executions.list_claimable(task="task-name")`. Claim one: `client.executions.claim("exec_id", worker_id="my-worker")`. List executions: `client.executions.list()`. Get one: `client.executions.get("exec_id")`. Do not retry on `AuthenticationError` or `CueLimitExceededError`. Wait `e.retry_after` seconds on `RateLimitError`.
33
44
# cueapi-sdk
55

cueapi/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from cueapi.resources.workers import WorkersResource
1919
from cueapi.webhook import verify_webhook
2020

21-
__version__ = "0.1.2"
21+
__version__ = "0.2.0"
2222

2323
__all__ = [
2424
"AgentsResource",

cueapi/resources/cues.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,36 @@ def resume(self, cue_id: str) -> Cue:
218218
The updated Cue object.
219219
"""
220220
return self.update(cue_id, status="active")
221+
222+
def fire(
223+
self,
224+
cue_id: str,
225+
*,
226+
payload_override: Optional[Dict[str, Any]] = None,
227+
merge_strategy: Optional[str] = None,
228+
) -> Dict[str, Any]:
229+
"""Fire an existing cue immediately, optionally overriding its payload.
230+
231+
For ad-hoc one-shot triggers and for using cues as a messaging channel
232+
between agents (carry message/instruction/task/reply_cue_id in
233+
payload_override).
234+
235+
Args:
236+
cue_id: The cue ID to fire.
237+
payload_override: Override the cue's default payload for this fire
238+
only. Persisted on the resulting execution row, never on the
239+
cue itself.
240+
merge_strategy: How payload_override combines with the cue's stored
241+
payload. "merge" (server default) shallow-merges with override
242+
wins on key collisions. "replace" uses override as the final
243+
payload, ignoring cue.payload.
244+
245+
Returns:
246+
The execution dict (id, scheduled_for, status, etc.).
247+
"""
248+
body: Dict[str, Any] = {}
249+
if payload_override is not None:
250+
body["payload_override"] = payload_override
251+
if merge_strategy is not None:
252+
body["merge_strategy"] = merge_strategy
253+
return self._client._post(f"/v1/cues/{cue_id}/fire", json=body)

cueapi/resources/executions.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,96 @@ def get(self, execution_id: str) -> dict:
124124
"""Get a single execution."""
125125
return self._client._get(f"/v1/executions/{execution_id}")
126126

127+
def list_claimable(
128+
self,
129+
*,
130+
task: Optional[str] = None,
131+
agent: Optional[str] = None,
132+
) -> dict:
133+
"""List unclaimed worker-transport executions ready for processing.
134+
135+
Filters server-side via task / agent query params (NOT client-side).
136+
Required for single-purpose workers; without a filter, sibling tasks
137+
ahead in the LIMIT 50 window starve your handler.
138+
139+
Returns:
140+
Dict with "executions" list, each item carrying execution_id,
141+
cue_id, cue_name, task, scheduled_for, payload, attempt.
142+
"""
143+
params: Dict[str, Any] = {}
144+
if task is not None:
145+
params["task"] = task
146+
if agent is not None:
147+
params["agent"] = agent
148+
return self._client._get("/v1/executions/claimable", params=params)
149+
150+
def claim(self, execution_id: str, *, worker_id: str) -> dict:
151+
"""Atomically claim a specific worker-transport execution.
152+
153+
Conditional UPDATE WHERE status IN ('pending', 'retry_ready'); returns
154+
409 if already claimed or not eligible. Response includes lease_seconds
155+
(default 900s = 15 min); send heartbeat well before that to extend.
156+
157+
Args:
158+
execution_id: Execution UUID.
159+
worker_id: Stable identifier for this worker. Caller-defined, not
160+
session/process-scoped. Same value must be used across
161+
claim, heartbeat, and outcome calls so the server can enforce
162+
ownership.
163+
164+
Returns:
165+
Dict with claimed (bool), execution_id, lease_seconds.
166+
"""
167+
return self._client._post(
168+
f"/v1/executions/{execution_id}/claim",
169+
json={"worker_id": worker_id},
170+
)
171+
172+
def claim_next(
173+
self,
174+
*,
175+
worker_id: str,
176+
task: Optional[str] = None,
177+
) -> dict:
178+
"""Claim the next available worker-transport execution.
179+
180+
Without task, the server picks the oldest pending across any of your
181+
worker cues. With task, this method internally fans out (list_claimable
182+
filtered, pick oldest, claim by ID) since the server's claim endpoint
183+
does not accept a task filter today. Tiny race window between list and
184+
claim is bounded by the atomic claim returning 409, in which case the
185+
caller retries.
186+
187+
Args:
188+
worker_id: Stable caller-defined identifier (see claim()).
189+
task: Optional task filter.
190+
191+
Returns:
192+
Dict with claimed (bool), execution_id, lease_seconds. When
193+
task is set and no executions are claimable for that task,
194+
returns {"claimed": False, "reason": "no_executions_for_task",
195+
"task": <task>}.
196+
"""
197+
if task is not None:
198+
listing = self._client._get(
199+
"/v1/executions/claimable", params={"task": task}
200+
)
201+
execs = listing.get("executions") or []
202+
if not execs:
203+
return {
204+
"claimed": False,
205+
"reason": "no_executions_for_task",
206+
"task": task,
207+
}
208+
next_id = execs[0].get("execution_id")
209+
return self._client._post(
210+
f"/v1/executions/{next_id}/claim",
211+
json={"worker_id": worker_id},
212+
)
213+
return self._client._post(
214+
"/v1/executions/claim", json={"worker_id": worker_id}
215+
)
216+
127217
def heartbeat(self, execution_id: str) -> dict:
128218
"""Send heartbeat to extend claim lease."""
129219
return self._client._post(f"/v1/executions/{execution_id}/heartbeat", json={})

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "cueapi-sdk"
7-
version = "0.1.3"
7+
version = "0.2.0"
88
description = "Official Python SDK for CueAPI — open-source execution accountability primitive for AI agents. Schedule agent work, require evidence-backed outcomes, and gate execution with write-once verification."
99
readme = "README.md"
1010
license = { text = "Apache-2.0" }

tests/test_cues_resource.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""Unit tests for CuesResource methods that don't fit the staging-integration pattern."""
2+
3+
from unittest.mock import MagicMock
4+
5+
from cueapi.resources.cues import CuesResource
6+
7+
8+
class TestFire:
9+
def test_fire_no_payload_override(self):
10+
mock_client = MagicMock()
11+
mock_client._post.return_value = {"id": "exec_test", "status": "queued"}
12+
resource = CuesResource(mock_client)
13+
14+
result = resource.fire("cue_abc123")
15+
16+
mock_client._post.assert_called_once_with("/v1/cues/cue_abc123/fire", json={})
17+
assert result["id"] == "exec_test"
18+
19+
def test_fire_with_payload_override_only(self):
20+
mock_client = MagicMock()
21+
mock_client._post.return_value = {"id": "exec_test"}
22+
resource = CuesResource(mock_client)
23+
24+
payload = {"task": "downstream", "scope": "single-row"}
25+
resource.fire("cue_abc123", payload_override=payload)
26+
27+
mock_client._post.assert_called_once_with(
28+
"/v1/cues/cue_abc123/fire",
29+
json={"payload_override": payload},
30+
)
31+
32+
def test_fire_with_payload_override_and_merge_strategy(self):
33+
mock_client = MagicMock()
34+
mock_client._post.return_value = {"id": "exec_test"}
35+
resource = CuesResource(mock_client)
36+
37+
payload = {"run_id": "ad-hoc-001"}
38+
resource.fire("cue_abc123", payload_override=payload, merge_strategy="replace")
39+
40+
mock_client._post.assert_called_once_with(
41+
"/v1/cues/cue_abc123/fire",
42+
json={"payload_override": payload, "merge_strategy": "replace"},
43+
)
44+
45+
def test_fire_omits_merge_strategy_when_not_passed(self):
46+
# When the caller omits merge_strategy, the wrapper must NOT send a
47+
# client-side default. The server's Pydantic default of "merge"
48+
# applies. This pins the contract so a future refactor can't silently
49+
# start sending a strategy that overrides the server's choice.
50+
mock_client = MagicMock()
51+
mock_client._post.return_value = {"id": "exec_test"}
52+
resource = CuesResource(mock_client)
53+
54+
resource.fire("cue_abc123", payload_override={"k": "v"})
55+
56+
sent_body = mock_client._post.call_args.kwargs["json"]
57+
assert "merge_strategy" not in sent_body

tests/test_executions_resource.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,143 @@ def test_replay_returns_server_dict_unchanged(self):
244244

245245
result = resource.replay("exec_old")
246246
assert result == {"execution_id": "exec_x", "extra": "field"}
247+
248+
249+
class TestListClaimable:
250+
# Filtering MUST be server-side via query params, not client-side after
251+
# fetch. Client-side filter hits the LIMIT 50 starvation bug fixed in the
252+
# 2026-04-25 prod incident (see cueapi-core app/routers/executions.py
253+
# docstring at line 122-131).
254+
255+
def test_list_claimable_no_filters_sends_no_params(self):
256+
mock_client = MagicMock()
257+
mock_client._get.return_value = {"executions": []}
258+
resource = ExecutionsResource(mock_client)
259+
260+
resource.list_claimable()
261+
262+
mock_client._get.assert_called_once_with(
263+
"/v1/executions/claimable", params={},
264+
)
265+
266+
def test_list_claimable_passes_task_as_query_param(self):
267+
mock_client = MagicMock()
268+
mock_client._get.return_value = {"executions": []}
269+
resource = ExecutionsResource(mock_client)
270+
271+
resource.list_claimable(task="cowork-workspace")
272+
273+
mock_client._get.assert_called_once_with(
274+
"/v1/executions/claimable",
275+
params={"task": "cowork-workspace"},
276+
)
277+
278+
def test_list_claimable_passes_agent_as_query_param(self):
279+
mock_client = MagicMock()
280+
mock_client._get.return_value = {"executions": []}
281+
resource = ExecutionsResource(mock_client)
282+
283+
resource.list_claimable(agent="writer-bot")
284+
285+
mock_client._get.assert_called_once_with(
286+
"/v1/executions/claimable",
287+
params={"agent": "writer-bot"},
288+
)
289+
290+
def test_list_claimable_passes_both_task_and_agent(self):
291+
mock_client = MagicMock()
292+
mock_client._get.return_value = {"executions": []}
293+
resource = ExecutionsResource(mock_client)
294+
295+
resource.list_claimable(task="t", agent="a")
296+
297+
mock_client._get.assert_called_once_with(
298+
"/v1/executions/claimable",
299+
params={"task": "t", "agent": "a"},
300+
)
301+
302+
303+
class TestClaim:
304+
def test_claim_posts_to_specific_execution_with_worker_id_in_body(self):
305+
mock_client = MagicMock()
306+
mock_client._post.return_value = {
307+
"claimed": True,
308+
"execution_id": "exec_abc123",
309+
"lease_seconds": 900,
310+
}
311+
resource = ExecutionsResource(mock_client)
312+
313+
result = resource.claim("exec_abc123", worker_id="cowork-workspace")
314+
315+
mock_client._post.assert_called_once_with(
316+
"/v1/executions/exec_abc123/claim",
317+
json={"worker_id": "cowork-workspace"},
318+
)
319+
assert result["claimed"] is True
320+
321+
322+
class TestClaimNext:
323+
# Two branches: with task and without. Without task is a single POST.
324+
# With task is a fan-out (list_claimable filtered, pick first, claim by ID)
325+
# because the server's POST /v1/executions/claim does not accept a task
326+
# filter today.
327+
328+
def test_claim_next_without_task_sends_single_post(self):
329+
mock_client = MagicMock()
330+
mock_client._post.return_value = {
331+
"claimed": True,
332+
"execution_id": "exec_test",
333+
"lease_seconds": 900,
334+
}
335+
resource = ExecutionsResource(mock_client)
336+
337+
resource.claim_next(worker_id="cowork-workspace")
338+
339+
mock_client._post.assert_called_once_with(
340+
"/v1/executions/claim",
341+
json={"worker_id": "cowork-workspace"},
342+
)
343+
344+
def test_claim_next_with_task_fans_out_to_list_then_claim(self):
345+
mock_client = MagicMock()
346+
mock_client._get.return_value = {
347+
"executions": [
348+
{"execution_id": "exec_first"},
349+
{"execution_id": "exec_second"},
350+
],
351+
}
352+
mock_client._post.return_value = {
353+
"claimed": True,
354+
"execution_id": "exec_first",
355+
"lease_seconds": 900,
356+
}
357+
resource = ExecutionsResource(mock_client)
358+
359+
result = resource.claim_next(
360+
worker_id="cowork-workspace", task="cowork-workspace"
361+
)
362+
363+
mock_client._get.assert_called_once_with(
364+
"/v1/executions/claimable", params={"task": "cowork-workspace"},
365+
)
366+
mock_client._post.assert_called_once_with(
367+
"/v1/executions/exec_first/claim",
368+
json={"worker_id": "cowork-workspace"},
369+
)
370+
assert result["claimed"] is True
371+
372+
def test_claim_next_with_task_and_empty_list_returns_no_claim(self):
373+
mock_client = MagicMock()
374+
mock_client._get.return_value = {"executions": []}
375+
resource = ExecutionsResource(mock_client)
376+
377+
result = resource.claim_next(
378+
worker_id="cowork-workspace", task="no-such-task"
379+
)
380+
381+
mock_client._post.assert_not_called()
382+
assert result == {
383+
"claimed": False,
384+
"reason": "no_executions_for_task",
385+
"task": "no-such-task",
386+
}

0 commit comments

Comments
 (0)