diff --git a/nexus_sync_operations/README.md b/nexus_sync_operations/README.md index 10e266ec..9347c8ec 100644 --- a/nexus_sync_operations/README.md +++ b/nexus_sync_operations/README.md @@ -1,13 +1,24 @@ -This sample shows how to create a Nexus service that is backed by a long-running workflow and -exposes operations that execute updates and queries against that workflow. The long-running -workflow, and the updates/queries are private implementation detail of the nexus service: the caller -does not know how the operations are implemented. +This sample shows how to create a Nexus service that is backed by an entity workflow and +exposes synchronous operations that execute queries, updates, signals, and signal-with-start +operations against that workflow. + +The entity workflow follows the entity pattern: +- Runs indefinitely in a loop, processing operations as they arrive +- Maintains state that persists across operations +- Periodically continues-as-new to prevent history from growing too large +- Waits for all handlers to finish before continuing as new + +The entity workflow and the queries/updates/signals are private implementation details of the +nexus service: the caller does not know how the operations are implemented. ### Sample directory structure - [service.py](./service.py) - shared Nexus service definition - [caller](./caller) - a caller workflow that executes Nexus operations, together with a worker and starter code -- [handler](./handler) - Nexus operation handlers, together with a workflow used by one of the Nexus operations, and a worker that polls for both workflow, activity, and Nexus tasks. +- [handler](./handler) - Nexus operation handlers, entity workflow implementation, and a worker that polls for workflow, activity, and Nexus tasks + - [workflows.py](./handler/workflows.py) - entity workflow that follows the entity pattern + - [service_handler.py](./handler/service_handler.py) - Nexus operation handlers + - [worker.py](./handler/worker.py) - worker that runs the entity workflow and handles Nexus operations ### Instructions diff --git a/nexus_sync_operations/caller/app.py b/nexus_sync_operations/caller/app.py index 4966415c..48a44e82 100644 --- a/nexus_sync_operations/caller/app.py +++ b/nexus_sync_operations/caller/app.py @@ -14,23 +14,31 @@ async def execute_caller_workflow( client: Optional[Client] = None, ) -> None: + # Use separate task queue for caller + caller_task_queue = "nexus-sync-operations-caller-task-queue" + client = client or await Client.connect( "localhost:7233", namespace=NAMESPACE, ) + # Start worker in the background, keep it running async with Worker( client, - task_queue=TASK_QUEUE, + task_queue=caller_task_queue, workflows=[CallerWorkflow], + # Caller doesn't need activities or nexus handlers - + # it only calls operations on remote endpoint ): log = await client.execute_workflow( CallerWorkflow.run, + args=[], id=str(uuid.uuid4()), - task_queue=TASK_QUEUE, + task_queue=caller_task_queue, # Use caller's task queue ) for line in log: print(line) + # Worker stays alive until the workflow completes if __name__ == "__main__": diff --git a/nexus_sync_operations/caller/workflows.py b/nexus_sync_operations/caller/workflows.py index a358d764..4eefb63d 100644 --- a/nexus_sync_operations/caller/workflows.py +++ b/nexus_sync_operations/caller/workflows.py @@ -6,7 +6,11 @@ from temporalio import workflow from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput +from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + SetLanguageInput, +) with workflow.unsafe.imports_passed_through(): from nexus_sync_operations.service import GreetingService @@ -24,23 +28,57 @@ async def run(self) -> list[str]: endpoint=NEXUS_ENDPOINT, ) - # Get supported languages + # QUERY OPERATION: Get supported languages supported_languages = await nexus_client.execute_operation( GreetingService.get_languages, GetLanguagesInput(include_unsupported=False) ) - log.append(f"supported languages: {supported_languages}") + log.append(f"Query - supported languages: {supported_languages}") - # Set language + # QUERY OPERATION: Get current language + current_language = await nexus_client.execute_operation( + GreetingService.get_language, None + ) + log.append(f"Query - current language: {current_language.name}") + + # UPDATE OPERATION: Set language using synchronous update (non-async) + previous_language = await nexus_client.execute_operation( + GreetingService.set_language_sync, + SetLanguageInput(language=Language.CHINESE), + ) + assert ( + await nexus_client.execute_operation(GreetingService.get_language, None) + == Language.CHINESE + ) + log.append( + f"Update (sync) - language changed: {previous_language.name} -> {Language.CHINESE.name}" + ) + + # UPDATE OPERATION: Set language using async update (with activity) previous_language = await nexus_client.execute_operation( GreetingService.set_language, - SetLanguageInput(language=Language.ARABIC), + SetLanguageInput(language=Language.FRENCH), ) assert ( await nexus_client.execute_operation(GreetingService.get_language, None) - == Language.ARABIC + == Language.FRENCH ) log.append( - f"language changed: {previous_language.name} -> {Language.ARABIC.name}" + f"Update (async) - language changed: {previous_language.name} -> {Language.FRENCH.name}" + ) + + # SIGNAL OPERATION: Send approval signal + await nexus_client.execute_operation( + GreetingService.approve, + ApproveInput(name="CallerWorkflow"), + ) + log.append("Signal - approval sent") + + # SIGNAL-WITH-START OPERATION: Send approval signal, starting workflow if needed + # This demonstrates signal-with-start, which will start the workflow if it doesn't exist + await nexus_client.execute_operation( + GreetingService.approve_with_start, + ApproveInput(name="CallerWorkflow-SignalWithStart"), ) + log.append("Signal-with-start - approval sent (workflow started if needed)") return log diff --git a/nexus_sync_operations/endpoint_description.md b/nexus_sync_operations/endpoint_description.md index a33b60cf..750a09dd 100644 --- a/nexus_sync_operations/endpoint_description.md +++ b/nexus_sync_operations/endpoint_description.md @@ -1,4 +1,7 @@ ## Service: [GreetingService](https://github.com/temporalio/samples-python/blob/main/nexus_sync_operations/service.py) -- operation: `get_languages` -- operation: `get_language` -- operation: `set_language` +- operation: `get_languages` (query) +- operation: `get_language` (query) +- operation: `set_language` (update - async with activity) +- operation: `set_language_sync` (update - synchronous) +- operation: `approve` (signal) +- operation: `approve_with_start` (signal-with-start) diff --git a/nexus_sync_operations/handler/service_handler.py b/nexus_sync_operations/handler/service_handler.py index 626948f0..fe662684 100644 --- a/nexus_sync_operations/handler/service_handler.py +++ b/nexus_sync_operations/handler/service_handler.py @@ -1,6 +1,10 @@ """ -This file demonstrates how to implement a Nexus service that is backed by a long-running workflow -and exposes operations that perform updates and queries against that workflow. +This file demonstrates how to implement a Nexus service that is backed by an entity workflow +and exposes operations that perform queries, updates, signals, and signal-with-start operations +against that workflow. + +The entity workflow follows the entity pattern: it runs indefinitely, processes operations +as they arrive, and periodically continues-as-new to prevent history growth. """ from __future__ import annotations @@ -12,33 +16,36 @@ from message_passing.introduction import Language from message_passing.introduction.workflows import ( + ApproveInput, GetLanguagesInput, - GreetingWorkflow, SetLanguageInput, ) +from nexus_sync_operations.handler.workflows import GreetingEntityWorkflow from nexus_sync_operations.service import GreetingService @nexusrpc.handler.service_handler(service=GreetingService) class GreetingServiceHandler: - def __init__(self, workflow_id: str): + def __init__(self, workflow_id: str, task_queue: str): self.workflow_id = workflow_id + self.task_queue = task_queue @classmethod async def create( cls, workflow_id: str, client: Client, task_queue: str ) -> GreetingServiceHandler: # Start the long-running "entity" workflow, if it is not already running. + # Entity workflows run indefinitely and process operations as they arrive. await client.start_workflow( - GreetingWorkflow.run, + GreetingEntityWorkflow.run, id=workflow_id, task_queue=task_queue, id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) - return cls(workflow_id) + return cls(workflow_id, task_queue) @property - def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: + def greeting_workflow_handle(self) -> WorkflowHandle[GreetingEntityWorkflow, None]: # In nexus operation handler code, nexus.client() is always available, returning a client # connected to the handler namespace (it's the same client instance that your nexus worker # is using to poll the server for nexus tasks). This client can be used to interact with the @@ -47,17 +54,17 @@ def greeting_workflow_handle(self) -> WorkflowHandle[GreetingWorkflow, str]: # long-running work in a nexus operation handler, use # temporalio.nexus.workflow_run_operation (see the hello_nexus sample). return nexus.client().get_workflow_handle_for( - GreetingWorkflow.run, self.workflow_id + GreetingEntityWorkflow.run, self.workflow_id ) # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves executing a # query against a long-running workflow that is private to the nexus service. @nexusrpc.handler.sync_operation async def get_languages( - self, ctx: nexusrpc.handler.StartOperationContext, input: GetLanguagesInput + self, ctx: nexusrpc.handler.StartOperationContext, inputLanguage: GetLanguagesInput ) -> list[Language]: return await self.greeting_workflow_handle.query( - GreetingWorkflow.get_languages, input + GreetingEntityWorkflow.get_languages, input=inputLanguage ) # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves executing a @@ -66,7 +73,7 @@ async def get_languages( async def get_language( self, ctx: nexusrpc.handler.StartOperationContext, input: None ) -> Language: - return await self.greeting_workflow_handle.query(GreetingWorkflow.get_language) + return await self.greeting_workflow_handle.query(GreetingEntityWorkflow.get_language) # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves executing an # update against a long-running workflow that is private to the nexus service. Although updates @@ -79,5 +86,47 @@ async def set_language( input: SetLanguageInput, ) -> Language: return await self.greeting_workflow_handle.execute_update( - GreetingWorkflow.set_language_using_activity, input + GreetingEntityWorkflow.set_language_using_activity, input + ) + + # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves executing a + # synchronous update (non-async) against a long-running workflow. + @nexusrpc.handler.sync_operation + async def set_language_sync( + self, + ctx: nexusrpc.handler.StartOperationContext, + input: SetLanguageInput, + ) -> Language: + return await self.greeting_workflow_handle.execute_update( + GreetingEntityWorkflow.set_language, input + ) + + # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves sending a + # signal to a long-running workflow that is private to the nexus service. + @nexusrpc.handler.sync_operation + async def approve( + self, + ctx: nexusrpc.handler.StartOperationContext, + input: ApproveInput, + ) -> None: + await self.greeting_workflow_handle.signal( + GreetingEntityWorkflow.approve, input + ) + + # πŸ‘‰ This is a handler for a nexus operation whose internal implementation involves sending a + # signal-with-start to a long-running workflow. If the workflow doesn't exist, it will be started. + @nexusrpc.handler.sync_operation + async def approve_with_start( + self, + ctx: nexusrpc.handler.StartOperationContext, + input: ApproveInput, + ) -> None: + # Use signal_with_start synchronously - send signal and start workflow if needed + await nexus.client().start_workflow( + GreetingEntityWorkflow.run, + id=self.workflow_id, + task_queue=self.task_queue, + start_signal=GreetingEntityWorkflow.approve, + start_signal_args=[input], + id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING, ) diff --git a/nexus_sync_operations/handler/worker.py b/nexus_sync_operations/handler/worker.py index 5545adc0..1a95c58c 100644 --- a/nexus_sync_operations/handler/worker.py +++ b/nexus_sync_operations/handler/worker.py @@ -6,8 +6,8 @@ from temporalio.worker import Worker from message_passing.introduction.activities import call_greeting_service -from message_passing.introduction.workflows import GreetingWorkflow from nexus_sync_operations.handler.service_handler import GreetingServiceHandler +from nexus_sync_operations.handler.workflows import GreetingEntityWorkflow interrupt_event = asyncio.Event() @@ -32,7 +32,7 @@ async def main(client: Optional[Client] = None): async with Worker( client, task_queue=TASK_QUEUE, - workflows=[GreetingWorkflow], + workflows=[GreetingEntityWorkflow], activities=[call_greeting_service], nexus_service_handlers=[greeting_service_handler], ): diff --git a/nexus_sync_operations/handler/workflows.py b/nexus_sync_operations/handler/workflows.py new file mode 100644 index 00000000..b54d1890 --- /dev/null +++ b/nexus_sync_operations/handler/workflows.py @@ -0,0 +1,190 @@ +""" +Entity workflow for the Greeting service. + +This workflow follows the entity pattern: it runs indefinitely, processing operations +(signals, updates, queries) as they arrive. It periodically continues-as-new to prevent +history from growing too large. +""" + +import asyncio +from dataclasses import dataclass +from datetime import timedelta +from typing import Dict, List, Optional + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from message_passing.introduction import Language + from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + SetLanguageInput, + ) + + +@dataclass +class GreetingEntityState: + """State that persists across continue-as-new operations.""" + greetings: Dict[Language, str] + language: Language + approver_name: Optional[str] = None + + +@dataclass +class GreetingEntityInput: + """Input for the entity workflow, used for continue-as-new.""" + state: Optional[GreetingEntityState] = None + + +@workflow.defn +class GreetingEntityWorkflow: + """ + An entity workflow that manages greeting state and processes operations. + + This workflow follows the entity pattern: + - Runs indefinitely in a loop + - Processes signals and updates as they arrive + - Supports queries at any time + - Periodically continues-as-new to prevent history growth + """ + + @workflow.init + def __init__(self, input: Optional[GreetingEntityInput] = None) -> None: + """Initialize workflow state, restoring from continue-as-new if provided.""" + if input and input.state: + self.greetings = input.state.greetings.copy() + self.language = input.state.language + self.approver_name = input.state.approver_name + else: + # Initial state + self.greetings = { + Language.CHINESE: "δ½ ε₯½οΌŒδΈ–η•Œ", + Language.ENGLISH: "Hello, world", + Language.FRENCH: "Bonjour" + } + self.language = Language.ENGLISH + self.approver_name = None + + + @workflow.run + async def run(self, input: Optional[GreetingEntityInput] = None) -> None: + """ + Main entity workflow loop. Runs indefinitely, processing operations. + + The workflow: + 1. Waits for operations (signals/updates) to arrive + 2. Processes them via their handlers + 3. Periodically continues-as-new to prevent history growth + + Note: State is initialized in @workflow.init. When continuing as new, + init is called again with the new input containing the state. + """ + # Main entity loop - runs indefinitely + # Entity workflows process operations (signals/updates) via their handlers. + # The main loop periodically checks if we should continue-as-new to prevent + # history from growing too large. + while True: + # Wait for a timeout, then check if we should continue as new + # Signals and updates are handled by their respective handlers + try: + # Use a dummy condition that will timeout, allowing us to periodically + # check for continue-as-new + await workflow.wait_condition( + lambda: False, # Never true, just wait for timeout + timeout=timedelta(minutes=10), # Check every 10 minutes + ) + except asyncio.TimeoutError: + # Timeout reached - check if we should continue as new + if self._should_continue_as_new(): + # Wait for all handlers (signals/updates) to finish before continuing as new + # This ensures no operations are in progress when we checkpoint state + await workflow.wait_condition(lambda: workflow.all_handlers_finished()) + + workflow.logger.info("Continuing as new to prevent history growth") + + # Continue as new with the current state + workflow.continue_as_new( + GreetingEntityInput( + state=GreetingEntityState( + greetings=self.greetings.copy(), + language=self.language, + approver_name=self.approver_name, + ) + ) + ) + + def _should_continue_as_new(self) -> bool: + """Check if workflow should continue as new.""" + # Use Temporal's suggestion if available + if workflow.info().is_continue_as_new_suggested(): + return True + # For testing/demo: continue as new if history gets large + # In production, rely on Temporal's suggestion + if workflow.info().get_current_history_length() > 1000: + return True + return False + + @workflow.query + def get_languages(self, input: GetLanguagesInput) -> List[Language]: + """ + Query: Get list of supported languages. + + Returns all languages if include_unsupported is True, + otherwise returns only languages with greetings defined. + """ + if input.include_unsupported: + return sorted(Language) + else: + return sorted(self.greetings.keys()) + + @workflow.query + def get_language(self) -> Language: + """Query: Get the current language.""" + return self.language + + @workflow.signal + def approve(self, input: ApproveInput) -> None: + """ + Signal: Record approval. + + Signals mutate state but don't return values. + This is processed asynchronously by the entity loop. + """ + self.approver_name = input.name + workflow.logger.info(f"Approved by: {input.name}") + + @workflow.update + def set_language(self, input: SetLanguageInput) -> Language: + """ + Update: Set the language synchronously (no activity). + + Updates can mutate state and return values. + This update handler is synchronous and only modifies local state. + """ + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + previous_language = self.language + self.language = input.language + return previous_language + + @set_language.validator + def validate_language(self, input: SetLanguageInput) -> None: + """Validator: Reject unsupported languages before update is accepted.""" + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + @workflow.update + def set_language_using_activity(self, input: SetLanguageInput) -> Language: + """ + Update: Set the language synchronously. + + This update handler is synchronous and only modifies local state. + """ + if input.language not in self.greetings: + raise ValueError(f"{input.language.name} is not supported") + + previous_language = self.language + self.language = input.language + return previous_language + diff --git a/nexus_sync_operations/service.py b/nexus_sync_operations/service.py index 3436d5f3..16d72712 100644 --- a/nexus_sync_operations/service.py +++ b/nexus_sync_operations/service.py @@ -1,5 +1,6 @@ """ -This module defines a Nexus service that exposes three operations. +This module defines a Nexus service that exposes synchronous operations for query, update, signal, +and signal-with-start operations. It is used by the nexus service handler to validate that the operation handlers implement the correct input and output types, and by the caller workflow to create a type-safe client. It does not @@ -10,11 +11,23 @@ import nexusrpc from message_passing.introduction import Language -from message_passing.introduction.workflows import GetLanguagesInput, SetLanguageInput +from message_passing.introduction.workflows import ( + ApproveInput, + GetLanguagesInput, + SetLanguageInput, +) @nexusrpc.service class GreetingService: + # Query operations get_languages: nexusrpc.Operation[GetLanguagesInput, list[Language]] get_language: nexusrpc.Operation[None, Language] - set_language: nexusrpc.Operation[SetLanguageInput, Language] + + # Update operations + set_language: nexusrpc.Operation[SetLanguageInput, Language] # async update with activity + set_language_sync: nexusrpc.Operation[SetLanguageInput, Language] # synchronous update + + # Signal operations + approve: nexusrpc.Operation[ApproveInput, None] # signal + approve_with_start: nexusrpc.Operation[ApproveInput, None] # signal-with-start