forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_worker.py
More file actions
56 lines (48 loc) · 1.78 KB
/
run_worker.py
File metadata and controls
56 lines (48 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from __future__ import annotations
import asyncio
from datetime import timedelta
from temporalio.client import Client
from temporalio.contrib.openai_agents import (
ModelActivity,
ModelActivityParameters,
OpenAIAgentsTracingInterceptor,
set_open_ai_agent_temporal_overrides,
)
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.worker import Worker
from openai_agents.workflows.agents_as_tools_workflow import AgentsAsToolsWorkflow
from openai_agents.workflows.customer_service_workflow import CustomerServiceWorkflow
from openai_agents.workflows.get_weather_activity import get_weather
from openai_agents.workflows.hello_world_workflow import HelloWorldAgent
from openai_agents.workflows.research_bot_workflow import ResearchWorkflow
from openai_agents.workflows.tools_workflow import ToolsWorkflow
async def main():
with set_open_ai_agent_temporal_overrides(
model_params=ModelActivityParameters(
start_to_close_timeout=timedelta(seconds=60),
),
):
# Create client connected to server at the given address
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="openai-agents-task-queue",
workflows=[
HelloWorldAgent,
ToolsWorkflow,
ResearchWorkflow,
CustomerServiceWorkflow,
AgentsAsToolsWorkflow,
],
activities=[
ModelActivity().invoke_model_activity,
get_weather,
],
interceptors=[OpenAIAgentsTracingInterceptor()],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())