Skip to content

Commit c587d20

Browse files
authored
Gevent sample (temporalio#84)
1 parent 2cb0fdd commit c587d20

File tree

13 files changed

+530
-2
lines changed

13 files changed

+530
-2
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
strategy:
1313
fail-fast: true
1414
matrix:
15-
python: ["3.7", "3.10"]
15+
python: ["3.7", "3.11"]
1616
os: [ubuntu-latest, macos-latest, windows-latest]
1717
runs-on: ${{ matrix.os }}
1818
steps:
@@ -32,3 +32,11 @@ jobs:
3232
- run: poe test -s -o log_cli_level=DEBUG
3333
- run: poe test -s -o log_cli_level=DEBUG --workflow-environment time-skipping
3434

35+
# On non-3.7, run gevent test
36+
- name: Gevent test
37+
if: ${{ matrix.python != '3.7' }}
38+
run: |
39+
poetry install --with gevent
40+
poetry run python gevent_async/test/run_combined.py
41+
42+

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5656
* [custom_converter](custom_converter) - Use a custom payload converter to handle custom types.
5757
* [custom_decorator](custom_decorator) - Custom decorator to auto-heartbeat a long-running activity.
5858
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
59+
* [gevent_async](gevent_async) - Combine gevent and Temporal.
5960
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
6061
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
6162
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.

gevent_async/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Gevent Sample
2+
3+
This sample shows how to run Temporal in an environment that gevent has patched.
4+
5+
Gevent is built to patch Python libraries to attempt to seamlessly convert threaded code into coroutine-based code.
6+
However, it is well known within the gevent community that it does not work well with `asyncio`, which is the modern
7+
Python approach to coroutines. Temporal leverages `asyncio` which means by default it is incompatible with gevent. Users
8+
are encouraged to abandon gevent in favor of more modern approaches where they can but it is not always possible.
9+
10+
This sample shows how to use a customized gevent executor to run `asyncio` Temporal clients, workers, activities, and
11+
workflows.
12+
13+
For this sample, the optional `gevent` dependency group must be included. To include, run:
14+
15+
poetry install --with gevent
16+
17+
To run the sample, first see [README.md](../README.md) for prerequisites such as having a localhost Temporal server
18+
running. Then, run the following from this directory to start the worker:
19+
20+
poetry run python worker.py
21+
22+
This will start the worker. The worker has a workflow and two activities, one `asyncio` based and one gevent based. Now
23+
in another terminal, run the following from this directory to execute the workflow:
24+
25+
poetry run python starter.py
26+
27+
The workflow should run and complete with the hello result. Note on the worker terminal there will be logs of the
28+
workflow and activity executions.

gevent_async/__init__.py

Whitespace-only changes.

gevent_async/activity.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from dataclasses import dataclass
2+
3+
import gevent
4+
from temporalio import activity
5+
6+
7+
@dataclass
8+
class ComposeGreetingInput:
9+
greeting: str
10+
name: str
11+
12+
13+
@activity.defn
14+
async def compose_greeting_async(input: ComposeGreetingInput) -> str:
15+
activity.logger.info(f"Running async activity with parameter {input}")
16+
return f"{input.greeting}, {input.name}!"
17+
18+
19+
@activity.defn
20+
def compose_greeting_sync(input: ComposeGreetingInput) -> str:
21+
activity.logger.info(
22+
f"Running sync activity with parameter {input}, "
23+
f"in greenlet: {gevent.getcurrent()}"
24+
)
25+
return f"{input.greeting}, {input.name}!"

gevent_async/executor.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import functools
2+
from concurrent.futures import Future
3+
from typing import Callable, TypeVar
4+
5+
from gevent import threadpool
6+
from typing_extensions import ParamSpec
7+
8+
T = TypeVar("T")
9+
P = ParamSpec("P")
10+
11+
12+
class GeventExecutor(threadpool.ThreadPoolExecutor):
13+
def submit(
14+
self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs
15+
) -> Future[T]:
16+
# Gevent's returned futures do not map well to Python futures, so we
17+
# must translate. We can't just use set_result/set_exception because
18+
# done callbacks are not always called in gevent's case and it doesn't
19+
# seem to support cancel, so we instead wrap the caller function.
20+
python_fut: Future[T] = Future()
21+
22+
@functools.wraps(fn)
23+
def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None:
24+
try:
25+
result = fn(*w_args, **w_kwargs)
26+
# Swallow InvalidStateError in case Python future was cancelled
27+
try:
28+
python_fut.set_result(result)
29+
except:
30+
pass
31+
except Exception as exc:
32+
# Swallow InvalidStateError in case Python future was cancelled
33+
try:
34+
python_fut.set_exception(exc)
35+
except:
36+
pass
37+
38+
# Submit our wrapper to gevent
39+
super().submit(wrapper, *args, **kwargs)
40+
# Return Python future to user
41+
return python_fut

gevent_async/starter.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Init gevent
2+
from gevent import monkey
3+
4+
monkey.patch_all()
5+
6+
import asyncio
7+
import logging
8+
9+
from temporalio.client import Client
10+
11+
from gevent_async import workflow
12+
from gevent_async.executor import GeventExecutor
13+
14+
15+
def main():
16+
logging.basicConfig(level=logging.INFO)
17+
18+
# Create single-worker gevent executor and run asyncio.run(async_main()) in
19+
# it, waiting for result. This executor cannot be used for anything else in
20+
# Temporal, it is just a single thread for running asyncio.
21+
with GeventExecutor(max_workers=1) as executor:
22+
executor.submit(asyncio.run, async_main()).result()
23+
24+
25+
async def async_main():
26+
# Connect client
27+
client = await Client.connect("localhost:7233")
28+
29+
# Run workflow
30+
result = await client.execute_workflow(
31+
workflow.GreetingWorkflow.run,
32+
"Temporal",
33+
id="gevent_async-workflow-id",
34+
task_queue="gevent_async-task-queue",
35+
)
36+
logging.info(f"Workflow result: {result}")
37+
38+
39+
if __name__ == "__main__":
40+
main()

gevent_async/test/__init__.py

Whitespace-only changes.

gevent_async/test/run_combined.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Init gevent
2+
from gevent import monkey
3+
4+
monkey.patch_all()
5+
6+
import asyncio
7+
import logging
8+
9+
from temporalio.testing import WorkflowEnvironment
10+
from temporalio.worker import Worker
11+
12+
from gevent_async import activity, workflow
13+
from gevent_async.executor import GeventExecutor
14+
15+
# This basically combines ../worker.py and ../starter.py for use by CI to
16+
# confirm this works in all environments
17+
18+
19+
def main():
20+
logging.basicConfig(level=logging.INFO)
21+
with GeventExecutor(max_workers=1) as executor:
22+
executor.submit(asyncio.run, async_main()).result()
23+
24+
25+
async def async_main():
26+
logging.info("Starting local server")
27+
async with await WorkflowEnvironment.start_local() as env:
28+
logging.info("Starting worker")
29+
with GeventExecutor(max_workers=200) as executor:
30+
async with Worker(
31+
env.client,
32+
task_queue="gevent_async-task-queue",
33+
workflows=[workflow.GreetingWorkflow],
34+
activities=[
35+
activity.compose_greeting_async,
36+
activity.compose_greeting_sync,
37+
],
38+
activity_executor=executor,
39+
workflow_task_executor=executor,
40+
max_concurrent_activities=100,
41+
max_concurrent_workflow_tasks=100,
42+
):
43+
logging.info("Running workflow")
44+
result = await env.client.execute_workflow(
45+
workflow.GreetingWorkflow.run,
46+
"Temporal",
47+
id="gevent_async-workflow-id",
48+
task_queue="gevent_async-task-queue",
49+
)
50+
if result != "Hello, Temporal!":
51+
raise RuntimeError(f"Unexpected result: {result}")
52+
logging.info(f"Workflow complete, result: {result}")
53+
54+
55+
if __name__ == "__main__":
56+
main()

gevent_async/worker.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Init gevent
2+
from gevent import monkey
3+
4+
monkey.patch_all()
5+
6+
import asyncio
7+
import logging
8+
import signal
9+
10+
import gevent
11+
from temporalio.client import Client
12+
from temporalio.worker import Worker
13+
14+
from gevent_async import activity, workflow
15+
from gevent_async.executor import GeventExecutor
16+
17+
18+
def main():
19+
logging.basicConfig(level=logging.INFO)
20+
21+
# Create single-worker gevent executor and run asyncio.run(async_main()) in
22+
# it, waiting for result. This executor cannot be used for anything else in
23+
# Temporal, it is just a single thread for running asyncio. This means that
24+
# inside of async_main we must create another executor specifically for
25+
# executing activity and workflow tasks.
26+
with GeventExecutor(max_workers=1) as executor:
27+
executor.submit(asyncio.run, async_main()).result()
28+
29+
30+
async def async_main():
31+
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
32+
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
33+
# it via call_soon_threadsafe.
34+
interrupt_event = asyncio.Event()
35+
gevent.signal_handler(
36+
signal.SIGINT,
37+
asyncio.get_running_loop().call_soon_threadsafe,
38+
interrupt_event.set,
39+
)
40+
41+
# Connect client
42+
client = await Client.connect("localhost:7233")
43+
44+
# Create an executor for use by Temporal. This cannot be the outer one
45+
# running this async main. The max_workers here needs to have enough room to
46+
# support the max concurrent activities/workflows settings.
47+
with GeventExecutor(max_workers=200) as executor:
48+
49+
# Run a worker for the workflow and activities
50+
async with Worker(
51+
client,
52+
task_queue="gevent_async-task-queue",
53+
workflows=[workflow.GreetingWorkflow],
54+
activities=[
55+
activity.compose_greeting_async,
56+
activity.compose_greeting_sync,
57+
],
58+
# Set the executor for activities (only used for non-async
59+
# activities) and workflow tasks
60+
activity_executor=executor,
61+
workflow_task_executor=executor,
62+
# Set the max concurrent activities/workflows. These are the same as
63+
# the defaults, but this makes it clear that the 100 + 100 = 200 for
64+
# max_workers settings.
65+
max_concurrent_activities=100,
66+
max_concurrent_workflow_tasks=100,
67+
):
68+
69+
# Wait until interrupted
70+
logging.info("Worker started, ctrl+c to exit")
71+
await interrupt_event.wait()
72+
logging.info("Shutting down")
73+
74+
75+
if __name__ == "__main__":
76+
main()

0 commit comments

Comments
 (0)