1- import asyncio
2- import os
31import uuid
42from datetime import timedelta
53
6- import pytest
7- from temporalio .client import Client , NexusOperationFailureError
8- from temporalio .service import RPCError
4+ from temporalio .client import Client
95from temporalio .worker import Worker
106
117from nexus_standalone_operations .handler import HelloWorkflow , MyNexusServiceHandler
2117
2218
2319async def test_nexus_standalone_operations (client : Client ):
24- if not os .getenv ("ENABLE_STANDALONE_NEXUS_TESTS" ):
25- pytest .skip (
26- "Standalone Nexus operations not yet supported by default dev server. Set ENABLE_STANDALONE_NEXUS_TESTS=1 to enable."
27- )
28-
2920 endpoint_name = f"test-nexus-standalone-{ uuid .uuid4 ()} "
3021
3122 create_response = await create_nexus_endpoint (
@@ -44,19 +35,14 @@ async def test_nexus_standalone_operations(client: Client):
4435 service = MyNexusService , endpoint = endpoint_name
4536 )
4637
47- # Test sync echo operation (with retry for endpoint propagation)
38+ # Test sync echo operation
4839 echo_result = None
49- for _ in range (30 ):
50- try :
51- echo_result = await nexus_client .execute_operation (
52- MyNexusService .echo ,
53- EchoInput (message = "test-echo" ),
54- id = str (uuid .uuid4 ()),
55- schedule_to_close_timeout = timedelta (seconds = 10 ),
56- )
57- break
58- except (RPCError , NexusOperationFailureError ):
59- await asyncio .sleep (0.5 )
40+ echo_result = await nexus_client .execute_operation (
41+ MyNexusService .echo ,
42+ EchoInput (message = "test-echo" ),
43+ id = str (uuid .uuid4 ()),
44+ schedule_to_close_timeout = timedelta (seconds = 10 ),
45+ )
6046 assert isinstance (echo_result , EchoOutput )
6147 assert echo_result .message == "test-echo"
6248
0 commit comments