Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ without wrapping them in a workflow.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers.
* [workflow_pause](workflow_pause/) - Demonstrate the experimental Workflow Pause feature: pause/unpause, signals, queries, updates, activities, and cancel/terminate.

## Test

Expand Down
Empty file.
27 changes: 27 additions & 0 deletions tests/workflow_pause/activities_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import uuid

from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.activities import TASK_QUEUE
from workflow_pause.activities.activities import process_item
from workflow_pause.activities.workflow import ActivityPauseWorkflow


async def test_activity_workflow_retries_then_succeeds(
client: Client, env: WorkflowEnvironment
):
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[ActivityPauseWorkflow],
activities=[process_item],
):
result = await client.execute_workflow(
ActivityPauseWorkflow.run,
"widget",
id=f"activities-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
assert result == "processed widget"
19 changes: 19 additions & 0 deletions tests/workflow_pause/basic_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import uuid

from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.basic import TASK_QUEUE
from workflow_pause.basic.workflow import BasicPauseWorkflow


async def test_basic_workflow_completes(client: Client, env: WorkflowEnvironment):
async with Worker(client, task_queue=TASK_QUEUE, workflows=[BasicPauseWorkflow]):
result = await client.execute_workflow(
BasicPauseWorkflow.run,
3,
id=f"basic-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
assert result == 3
29 changes: 29 additions & 0 deletions tests/workflow_pause/cancel_terminate_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio
import uuid

import pytest
from temporalio.client import Client, WorkflowFailureError
from temporalio.exceptions import CancelledError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.cancel_terminate import TASK_QUEUE
from workflow_pause.cancel_terminate.workflow import CancelTerminatePauseWorkflow


async def test_cancellation_runs_cleanup(client: Client, env: WorkflowEnvironment):
async with Worker(
client, task_queue=TASK_QUEUE, workflows=[CancelTerminatePauseWorkflow]
):
handle = await client.start_workflow(
CancelTerminatePauseWorkflow.run,
20,
id=f"cancel-terminate-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
# Let the workflow start its loop, then cancel it.
await asyncio.sleep(1)
await handle.cancel()
with pytest.raises(WorkflowFailureError) as exc_info:
await handle.result()
assert isinstance(exc_info.value.cause, CancelledError)
19 changes: 19 additions & 0 deletions tests/workflow_pause/queries_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import uuid

from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.queries import TASK_QUEUE
from workflow_pause.queries.workflow import QueryPauseWorkflow


async def test_query_returns_count(client: Client, env: WorkflowEnvironment):
async with Worker(client, task_queue=TASK_QUEUE, workflows=[QueryPauseWorkflow]):
result = await client.execute_workflow(
QueryPauseWorkflow.run,
2,
id=f"queries-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
assert result == 2
22 changes: 22 additions & 0 deletions tests/workflow_pause/signals_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import uuid

from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.signals import TASK_QUEUE
from workflow_pause.signals.workflow import SignalPauseWorkflow


async def test_signals_collected_then_done(client: Client, env: WorkflowEnvironment):
async with Worker(client, task_queue=TASK_QUEUE, workflows=[SignalPauseWorkflow]):
handle = await client.start_workflow(
SignalPauseWorkflow.run,
id=f"signals-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
await handle.signal(SignalPauseWorkflow.add_message, "hello")
await handle.signal(SignalPauseWorkflow.add_message, "world")
await handle.signal(SignalPauseWorkflow.add_message, "done")
result = await handle.result()
assert result == ["hello", "world"]
23 changes: 23 additions & 0 deletions tests/workflow_pause/updates_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import uuid

from temporalio.client import Client
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

from workflow_pause.updates import TASK_QUEUE
from workflow_pause.updates.workflow import UpdatePauseWorkflow


async def test_update_accumulates_then_finishes(
client: Client, env: WorkflowEnvironment
):
async with Worker(client, task_queue=TASK_QUEUE, workflows=[UpdatePauseWorkflow]):
handle = await client.start_workflow(
UpdatePauseWorkflow.run,
id=f"updates-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)
assert await handle.execute_update(UpdatePauseWorkflow.add, 5) == 5
assert await handle.execute_update(UpdatePauseWorkflow.add, 3) == 8
await handle.execute_update(UpdatePauseWorkflow.finish)
assert await handle.result() == 8
30 changes: 30 additions & 0 deletions workflow_pause/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Workflow Pause

These samples demonstrate the experimental **Workflow Pause** feature. Pausing a Workflow Execution
tells the Temporal Service to stop dispatching workflow tasks for it — the workflow makes no forward
progress (timers don't advance, signals buffer, queries and updates are rejected) until it is
**unpaused**. See the [Temporal CLI docs](https://docs.temporal.io/cli/workflow#pause).

## Prerequisites

First see the main [README.md](../README.md) for general prerequisites. Then note:

- Requires **Temporal Server 1.31.0+ / CLI 1.7.1+**. The feature is experimental.
- **Pause must be enabled server-side.** Start your dev server with the pause dynamic-config flag:

```bash
temporal server start-dev --dynamic-config-value frontend.WorkflowPauseEnabled=true
```

Without it, `temporal workflow pause` returns
`Error: workflow pause is not enabled for namespace: default`.

## Samples

* [basic](basic/) — Dead-simple pause / unpause of a workflow waiting on a timer.
* [activities](activities/) — How pause interacts with in-flight activities, plus activity-level
pause (`temporal activity pause`) to halt retries.
* [signals](signals/) — Signals sent while paused are buffered and processed on unpause.
* [queries](queries/) — Queries are rejected while paused.
* [updates](updates/) — Updates are rejected while paused.
* [cancel_terminate](cancel_terminate/) — Terminate is immediate; cancel is deferred until unpause.
Empty file added workflow_pause/__init__.py
Empty file.
54 changes: 54 additions & 0 deletions workflow_pause/activities/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Workflow Pause: in-flight activities & activity-level pause

Demonstrates how pause interacts with activities. The workflow runs a single long-running activity
(`process-item`) that heartbeats for ~5 seconds and is configured to fail its first two attempts
before succeeding, so you can observe both kinds of pause.

> Workflow Pause is experimental. The dev server must be started with the pause flag enabled —
> see the [workflow_pause README](../README.md) for prerequisites. First see the main
> [README.md](../../README.md) for general prerequisites.

Run the worker in one terminal:

```bash
uv run workflow_pause/activities/worker.py
```

Start the workflow in a second terminal:

```bash
uv run workflow_pause/activities/starter.py
```

## Demo A — pause the workflow while the activity is in flight

While the worker log shows the activity processing (attempt 1), pause the workflow:

```bash
temporal workflow pause -w pause-activities-wf --reason demo
```

The currently running activity attempt is **not** killed — it runs to its conclusion — but because
the workflow is paused, the next workflow task is not dispatched, so the workflow does not advance.
Unpause to let it continue:

```bash
temporal workflow unpause -w pause-activities-wf
```

## Demo B — pause the activity (halt retries)

The activity fails its first two attempts, so it enters a retry backoff. Pause the **activity** so
its retries stop:

```bash
temporal activity pause --activity-id process-item -w pause-activities-wf
```

The activity will not be retried while paused. Resume retries with:

```bash
temporal activity unpause --activity-id process-item -w pause-activities-wf
```

On the third attempt the activity succeeds and the workflow completes with `processed widget`.
3 changes: 3 additions & 0 deletions workflow_pause/activities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
TASK_QUEUE = "workflow-pause-activities-task-queue"
WORKFLOW_ID = "pause-activities-wf"
ACTIVITY_ID = "process-item"
24 changes: 24 additions & 0 deletions workflow_pause/activities/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

from temporalio import activity


@activity.defn
async def process_item(item: str) -> str:
"""Long-running activity that heartbeats, and fails its first two attempts.

The heartbeats + sleep make the activity observably "in flight" so you can
pause the workflow while it runs. The deliberate failures on the first two
attempts let you demonstrate `temporal activity pause`, which halts retries.
"""
info = activity.info()
activity.logger.info("Processing %s (attempt %d)", item, info.attempt)

for _ in range(5):
await asyncio.sleep(1)
activity.heartbeat()

if info.attempt < 3:
raise RuntimeError(f"transient failure on attempt {info.attempt}")

return f"processed {item}"
37 changes: 37 additions & 0 deletions workflow_pause/activities/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.common import WorkflowIDReusePolicy
from temporalio.envconfig import ClientConfig

from workflow_pause.activities import ACTIVITY_ID, TASK_QUEUE, WORKFLOW_ID
from workflow_pause.activities.workflow import ActivityPauseWorkflow


async def main():
logging.basicConfig(level=logging.INFO)

config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

handle = await client.start_workflow(
ActivityPauseWorkflow.run,
"widget",
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_reuse_policy=WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
)
print(f"Started workflow with ID: {handle.id}")
print(
f"Pause the workflow: temporal workflow pause -w {WORKFLOW_ID} --reason demo"
)
print(
f"Pause the activity: temporal activity pause "
f"--activity-id {ACTIVITY_ID} -w {WORKFLOW_ID}"
)


if __name__ == "__main__":
asyncio.run(main())
39 changes: 39 additions & 0 deletions workflow_pause/activities/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from workflow_pause.activities import TASK_QUEUE
from workflow_pause.activities.activities import process_item
from workflow_pause.activities.workflow import ActivityPauseWorkflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[ActivityPauseWorkflow],
activities=[process_item],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
37 changes: 37 additions & 0 deletions workflow_pause/activities/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from datetime import timedelta

from temporalio import workflow
from temporalio.common import RetryPolicy

from workflow_pause.activities import ACTIVITY_ID

with workflow.unsafe.imports_passed_through():
from workflow_pause.activities.activities import process_item


@workflow.defn
class ActivityPauseWorkflow:
"""Runs a single long-running, retrying activity.

Two things to demonstrate:
1. Pausing the *workflow* while the activity is in flight: the running
activity attempt is not killed, but once it finishes the next workflow
task is not dispatched, so the workflow does not advance until unpause.
2. Pausing the *activity* with `temporal activity pause`: retries stop
after the current attempt ends, and resume on `temporal activity unpause`.
"""

@workflow.run
async def run(self, item: str) -> str:
return await workflow.execute_activity(
process_item,
item,
activity_id=ACTIVITY_ID,
start_to_close_timeout=timedelta(seconds=30),
heartbeat_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=3),
backoff_coefficient=2.0,
maximum_attempts=10,
),
)
Loading