Skip to content

Commit b7f7a03

Browse files
init
1 parent 092eaad commit b7f7a03

7 files changed

Lines changed: 352 additions & 29 deletions

File tree

nexus_sync_operations/README.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1-
This sample shows how to create a Nexus service that is backed by a long-running workflow and
2-
exposes operations that execute updates and queries against that workflow. The long-running
3-
workflow, and the updates/queries are private implementation detail of the nexus service: the caller
4-
does not know how the operations are implemented.
1+
This sample shows how to create a Nexus service that is backed by an entity workflow and
2+
exposes synchronous operations that execute queries, updates, signals, and signal-with-start
3+
operations against that workflow.
4+
5+
The entity workflow follows the entity pattern:
6+
- Runs indefinitely in a loop, processing operations as they arrive
7+
- Maintains state that persists across operations
8+
- Periodically continues-as-new to prevent history from growing too large
9+
- Waits for all handlers to finish before continuing as new
10+
11+
The entity workflow and the queries/updates/signals are private implementation details of the
12+
nexus service: the caller does not know how the operations are implemented.
513

614
### Sample directory structure
715

816
- [service.py](./service.py) - shared Nexus service definition
917
- [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code
10-
- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks.
18+
- [handler](./handler) - Nexus operation handlers, entity workflow implementation, and a worker that polls for workflow, activity, and Nexus tasks
19+
- [workflows.py](./handler/workflows.py) - entity workflow that follows the entity pattern
20+
- [service_handler.py](./handler/service_handler.py) - Nexus operation handlers
21+
- [worker.py](./handler/worker.py) - worker that runs the entity workflow and handles Nexus operations
1122

1223

1324
### Instructions

nexus_sync_operations/caller/workflows.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,11 @@
66
from temporalio import workflow
77

88
from message_passing.introduction import Language
9-
from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput
9+
from message_passing.introduction.workflows import (
10+
ApproveInput,
11+
GetLanguagesInput,
12+
SetLanguageInput,
13+
)
1014

1115
with workflow.unsafe.imports_passed_through():
1216
from nexus_sync_operations.service import GreetingService
@@ -24,13 +28,32 @@ async def run(self) -> list[str]:
2428
endpoint=NEXUS_ENDPOINT,
2529
)
2630

27-
# Get supported languages
31+
# 👉 QUERY OPERATION: Get supported languages
2832
supported_languages = await nexus_client.execute_operation(
2933
GreetingService.get_languages, GetLanguagesInput(include_unsupported=False)
3034
)
31-
log.append(f"supported languages: {supported_languages}")
35+
log.append(f"Query - supported languages: {supported_languages}")
3236

33-
# Set language
37+
# 👉 QUERY OPERATION: Get current language
38+
current_language = await nexus_client.execute_operation(
39+
GreetingService.get_language, None
40+
)
41+
log.append(f"Query - current language: {current_language.name}")
42+
43+
# 👉 UPDATE OPERATION: Set language using synchronous update (non-async)
44+
previous_language = await nexus_client.execute_operation(
45+
GreetingService.set_language_sync,
46+
SetLanguageInput(language=Language.CHINESE),
47+
)
48+
assert (
49+
await nexus_client.execute_operation(GreetingService.get_language, None)
50+
== Language.CHINESE
51+
)
52+
log.append(
53+
f"Update (sync) - language changed: {previous_language.name} -> {Language.CHINESE.name}"
54+
)
55+
56+
# 👉 UPDATE OPERATION: Set language using async update (with activity)
3457
previous_language = await nexus_client.execute_operation(
3558
GreetingService.set_language,
3659
SetLanguageInput(language=Language.ARABIC),
@@ -40,7 +63,22 @@ async def run(self) -> list[str]:
4063
== Language.ARABIC
4164
)
4265
log.append(
43-
f"language changed: {previous_language.name} -> {Language.ARABIC.name}"
66+
f"Update (async) - language changed: {previous_language.name} -> {Language.ARABIC.name}"
67+
)
68+
69+
# 👉 SIGNAL OPERATION: Send approval signal
70+
await nexus_client.execute_operation(
71+
GreetingService.approve,
72+
ApproveInput(name="CallerWorkflow"),
73+
)
74+
log.append("Signal - approval sent")
75+
76+
# 👉 SIGNAL-WITH-START OPERATION: Send approval signal, starting workflow if needed
77+
# This demonstrates signal-with-start, which will start the workflow if it doesn't exist
78+
await nexus_client.execute_operation(
79+
GreetingService.approve_with_start,
80+
ApproveInput(name="CallerWorkflow-SignalWithStart"),
4481
)
82+
log.append("Signal-with-start - approval sent (workflow started if needed)")
4583

4684
return log
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py)
2-
- operation: `get_languages`
3-
- operation: `get_language`
4-
- operation: `set_language`
2+
- operation: `get_languages` (query)
3+
- operation: `get_language` (query)
4+
- operation: `set_language` (update - async with activity)
5+
- operation: `set_language_sync` (update - synchronous)
6+
- operation: `approve` (signal)
7+
- operation: `approve_with_start` (signal-with-start)

nexus_sync_operations/handler/service_handler.py

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
"""
2-
This file demonstrates how to implement a Nexus service that is backed by a long-running workflow
3-
and exposes operations that perform updates and queries against that workflow.
2+
This file demonstrates how to implement a Nexus service that is backed by an entity workflow
3+
and exposes operations that perform queries, updates, signals, and signal-with-start operations
4+
against that workflow.
5+
6+
The entity workflow follows the entity pattern: it runs indefinitely, processes operations
7+
as they arrive, and periodically continues-as-new to prevent history growth.
48
"""
59

610
from __future__ import annotations
@@ -12,33 +16,36 @@
1216

1317
from message_passing.introduction import Language
1418
from message_passing.introduction.workflows import (
19+
ApproveInput,
1520
GetLanguagesInput,
16-
GreetingWorkflow,
1721
SetLanguageInput,
1822
)
23+
from nexus_sync_operations.handler.workflows import GreetingEntityWorkflow
1924
from nexus_sync_operations.service import GreetingService
2025

2126

2227
@nexusrpc.handler.service_handler(service=GreetingService)
2328
class GreetingServiceHandler:
24-
def __init__(self, workflow_id: str):
29+
def __init__(self, workflow_id: str, task_queue: str):
2530
self.workflow_id = workflow_id
31+
self.task_queue = task_queue
2632

2733
@classmethod
2834
async def create(
2935
cls, workflow_id: str, client: Client, task_queue: str
3036
) -> GreetingServiceHandler:
3137
# Start the long-running "entity" workflow, if it is not already running.
38+
# Entity workflows run indefinitely and process operations as they arrive.
3239
await client.start_workflow(
33-
GreetingWorkflow.run,
40+
GreetingEntityWorkflow.run,
3441
id=workflow_id,
3542
task_queue=task_queue,
3643
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
3744
)
38-
return cls(workflow_id)
45+
return cls(workflow_id, task_queue)
3946

4047
@property
41-
def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
48+
def greeting_workflow_handle(self) -> WorkflowHandle[GreetingEntityWorkflow, None]:
4249
# In nexus operation handler code, nexus.client() is always available, returning a client
4350
# connected to the handler namespace (it's the same client instance that your nexus worker
4451
# is using to poll the server for nexus tasks). This client can be used to interact with the
@@ -47,7 +54,7 @@ def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]:
4754
# long-running work in a nexus operation handler, use
4855
# temporalio.nexus.workflow_run_operation (see the hello_nexus sample).
4956
return nexus.client().get_workflow_handle_for(
50-
GreetingWorkflow.run, self.workflow_id
57+
GreetingEntityWorkflow.run, self.workflow_id
5158
)
5259

5360
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
@@ -57,7 +64,7 @@ async def get_languages(
5764
self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput
5865
) -> list[Language]:
5966
return await self.greeting_workflow_handle.query(
60-
GreetingWorkflow.get_languages, input
67+
GreetingEntityWorkflow.get_languages, input
6168
)
6269

6370
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
@@ -66,7 +73,7 @@ async def get_languages(
6673
async def get_language(
6774
self, ctx: nexusrpc.handler.StartOperationContext, input: None
6875
) -> Language:
69-
return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language)
76+
return await self.greeting_workflow_handle.query(GreetingEntityWorkflow.get_language)
7077

7178
# 👉 This is a handler for a nexus operation whose internal implementation involves executing an
7279
# update against a long-running workflow that is private to the nexus service. Although updates
@@ -79,5 +86,47 @@ async def set_language(
7986
input: SetLanguageInput,
8087
) -> Language:
8188
return await self.greeting_workflow_handle.execute_update(
82-
GreetingWorkflow.set_language_using_activity, input
89+
GreetingEntityWorkflow.set_language_using_activity, input
90+
)
91+
92+
# 👉 This is a handler for a nexus operation whose internal implementation involves executing a
93+
# synchronous update (non-async) against a long-running workflow.
94+
@nexusrpc.handler.sync_operation
95+
async def set_language_sync(
96+
self,
97+
ctx: nexusrpc.handler.StartOperationContext,
98+
input: SetLanguageInput,
99+
) -> Language:
100+
return await self.greeting_workflow_handle.execute_update(
101+
GreetingEntityWorkflow.set_language, input
102+
)
103+
104+
# 👉 This is a handler for a nexus operation whose internal implementation involves sending a
105+
# signal to a long-running workflow that is private to the nexus service.
106+
@nexusrpc.handler.sync_operation
107+
async def approve(
108+
self,
109+
ctx: nexusrpc.handler.StartOperationContext,
110+
input: ApproveInput,
111+
) -> None:
112+
await self.greeting_workflow_handle.signal(
113+
GreetingEntityWorkflow.approve, input
114+
)
115+
116+
# 👉 This is a handler for a nexus operation whose internal implementation involves sending a
117+
# signal-with-start to a long-running workflow. If the workflow doesn't exist, it will be started.
118+
@nexusrpc.handler.sync_operation
119+
async def approve_with_start(
120+
self,
121+
ctx: nexusrpc.handler.StartOperationContext,
122+
input: ApproveInput,
123+
) -> None:
124+
# Use start_workflow with start_signal to send a signal, starting the workflow if it doesn't exist
125+
await nexus.client().start_workflow(
126+
GreetingEntityWorkflow.run,
127+
id=self.workflow_id,
128+
task_queue=self.task_queue,
129+
start_signal=GreetingEntityWorkflow.approve,
130+
start_signal_args=[input],
131+
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
83132
)

nexus_sync_operations/handler/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from temporalio.worker import Worker
77

88
from message_passing.introduction.activities import call_greeting_service
9-
from message_passing.introduction.workflows import GreetingWorkflow
109
from nexus_sync_operations.handler.service_handler import GreetingServiceHandler
10+
from nexus_sync_operations.handler.workflows import GreetingEntityWorkflow
1111

1212
interrupt_event = asyncio.Event()
1313

@@ -32,7 +32,7 @@ async def main(client: Optional[Client] = None):
3232
async with Worker(
3333
client,
3434
task_queue=TASK_QUEUE,
35-
workflows=[GreetingWorkflow],
35+
workflows=[GreetingEntityWorkflow],
3636
activities=[call_greeting_service],
3737
nexus_service_handlers=[greeting_service_handler],
3838
):

0 commit comments

Comments
 (0)