|
| 1 | +# Init gevent |
| 2 | +from gevent import monkey |
| 3 | + |
| 4 | +monkey.patch_all() |
| 5 | + |
| 6 | +import asyncio |
| 7 | +import logging |
| 8 | +import signal |
| 9 | + |
| 10 | +import gevent |
| 11 | +from temporalio.client import Client |
| 12 | +from temporalio.worker import Worker |
| 13 | + |
| 14 | +from gevent_async import activity, workflow |
| 15 | +from gevent_async.executor import GeventExecutor |
| 16 | + |
| 17 | + |
| 18 | +def main(): |
| 19 | + logging.basicConfig(level=logging.INFO) |
| 20 | + |
| 21 | + # Create single-worker gevent executor and run asyncio.run(async_main()) in |
| 22 | + # it, waiting for result. This executor cannot be used for anything else in |
| 23 | + # Temporal, it is just a single thread for running asyncio. This means that |
| 24 | + # inside of async_main we must create another executor specifically for |
| 25 | + # executing activity and workflow tasks. |
| 26 | + with GeventExecutor(max_workers=1) as executor: |
| 27 | + executor.submit(asyncio.run, async_main()).result() |
| 28 | + |
| 29 | + |
| 30 | +async def async_main(): |
| 31 | + # Create ctrl+c handler. We do this by telling gevent on SIGINT to set the |
| 32 | + # asyncio event. But asyncio calls are not thread safe, so we have to invoke |
| 33 | + # it via call_soon_threadsafe. |
| 34 | + interrupt_event = asyncio.Event() |
| 35 | + gevent.signal_handler( |
| 36 | + signal.SIGINT, |
| 37 | + asyncio.get_running_loop().call_soon_threadsafe, |
| 38 | + interrupt_event.set, |
| 39 | + ) |
| 40 | + |
| 41 | + # Connect client |
| 42 | + client = await Client.connect("localhost:7233") |
| 43 | + |
| 44 | + # Create an executor for use by Temporal. This cannot be the outer one |
| 45 | + # running this async main. The max_workers here needs to have enough room to |
| 46 | + # support the max concurrent activities/workflows settings. |
| 47 | + with GeventExecutor(max_workers=200) as executor: |
| 48 | + |
| 49 | + # Run a worker for the workflow and activities |
| 50 | + async with Worker( |
| 51 | + client, |
| 52 | + task_queue="gevent_async-task-queue", |
| 53 | + workflows=[workflow.GreetingWorkflow], |
| 54 | + activities=[ |
| 55 | + activity.compose_greeting_async, |
| 56 | + activity.compose_greeting_sync, |
| 57 | + ], |
| 58 | + # Set the executor for activities (only used for non-async |
| 59 | + # activities) and workflow tasks |
| 60 | + activity_executor=executor, |
| 61 | + workflow_task_executor=executor, |
| 62 | + # Set the max concurrent activities/workflows. These are the same as |
| 63 | + # the defaults, but this makes it clear that the 100 + 100 = 200 for |
| 64 | + # max_workers settings. |
| 65 | + max_concurrent_activities=100, |
| 66 | + max_concurrent_workflow_tasks=100, |
| 67 | + ): |
| 68 | + |
| 69 | + # Wait until interrupted |
| 70 | + logging.info("Worker started, ctrl+c to exit") |
| 71 | + await interrupt_event.wait() |
| 72 | + logging.info("Shutting down") |
| 73 | + |
| 74 | + |
| 75 | +if __name__ == "__main__": |
| 76 | + main() |
0 commit comments