Skip to content

Commit af9d8e3

Browse files
authored
Adds LangChain example (temporalio#106)
* Adds LangChain example * Runs poe format * Fix polling qualified names
1 parent dfeddf7 commit af9d8e3

File tree

18 files changed

+1214
-23
lines changed

18 files changed

+1214
-23
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Some examples require extra dependencies. See each sample's directory for specif
5757
* [dsl](dsl) - DSL workflow that executes steps defined in a YAML file.
5858
* [encryption](encryption) - Apply end-to-end encryption for all input/output.
5959
* [gevent_async](gevent_async) - Combine gevent and Temporal.
60+
* [langchain](langchain) - Orchestrate workflows for LangChain.
6061
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
6162
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
6263
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.

langchain/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# LangChain Sample
2+
3+
This sample shows you how you can use Temporal to orchestrate workflows for [LangChain](https://www.langchain.com).
4+
5+
For this sample, the optional `langchain` dependency group must be included. To include, run:
6+
7+
poetry install --with langchain
8+
9+
Export your [OpenAI API key](https://platform.openai.com/api-keys) as an environment variable. Replace `YOUR_API_KEY` with your actual OpenAI API key.
10+
11+
export OPENAI_API_KEY='...'
12+
13+
To run, first see [README.md](../README.md) for prerequisites. Then, run the following from this directory to start the
14+
worker:
15+
16+
poetry run python worker.py
17+
18+
This will start the worker. Then, in another terminal, run the following to execute a workflow:
19+
20+
poetry run python starter.py
21+
22+
Then, in another terminal, run the following command to translate a phrase:
23+
24+
curl -X POST "http://localhost:8000/translate?phrase=hello%20world&language=Spanish"
25+
26+
Which should produce some output like:
27+
28+
{"translation":"Hola mundo"}

langchain/activities.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from dataclasses import dataclass
2+
3+
from langchain_openai import ChatOpenAI
4+
from temporalio import activity
5+
6+
from langchain.prompts import ChatPromptTemplate
7+
8+
9+
@dataclass
10+
class TranslateParams:
11+
phrase: str
12+
language: str
13+
14+
15+
@activity.defn
16+
async def translate_phrase(params: TranslateParams) -> dict:
17+
# LangChain setup
18+
template = """You are a helpful assistant who translates between languages.
19+
Translate the following phrase into the specified language: {phrase}
20+
Language: {language}"""
21+
chat_prompt = ChatPromptTemplate.from_messages(
22+
[
23+
("system", template),
24+
("human", "Translate"),
25+
]
26+
)
27+
chain = chat_prompt | ChatOpenAI()
28+
# Use the asynchronous invoke method
29+
return await chain.ainvoke({"phrase": params.phrase, "language": params.language})

langchain/starter.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from contextlib import asynccontextmanager
2+
from uuid import uuid4
3+
4+
import uvicorn
5+
from activities import TranslateParams
6+
from fastapi import FastAPI, HTTPException
7+
from temporalio.client import Client
8+
from workflow import LangChainWorkflow
9+
10+
11+
@asynccontextmanager
12+
async def lifespan(app: FastAPI):
13+
app.state.temporal_client = await Client.connect("localhost:7233")
14+
yield
15+
16+
17+
app = FastAPI(lifespan=lifespan)
18+
19+
20+
@app.post("/translate")
21+
async def translate(phrase: str, language: str):
22+
client = app.state.temporal_client
23+
try:
24+
result = await client.execute_workflow(
25+
LangChainWorkflow.run,
26+
TranslateParams(phrase, language),
27+
id=f"langchain-translation-{uuid4()}",
28+
task_queue="langchain-task-queue",
29+
)
30+
translation_content = result.get("content", "Translation not available")
31+
except Exception as e:
32+
raise HTTPException(status_code=500, detail=str(e))
33+
34+
return {"translation": translation_content}
35+
36+
37+
if __name__ == "__main__":
38+
uvicorn.run(app, host="localhost", port=8000)

langchain/worker.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import asyncio
2+
3+
from activities import translate_phrase
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
from workflow import LangChainWorkflow
7+
8+
interrupt_event = asyncio.Event()
9+
10+
11+
async def main():
12+
client = await Client.connect("localhost:7233")
13+
worker = Worker(
14+
client,
15+
task_queue="langchain-task-queue",
16+
workflows=[LangChainWorkflow],
17+
activities=[translate_phrase],
18+
)
19+
20+
print("\nWorker started, ctrl+c to exit\n")
21+
await worker.run()
22+
try:
23+
# Wait indefinitely until the interrupt event is set
24+
await interrupt_event.wait()
25+
finally:
26+
# The worker will be shutdown gracefully due to the async context manager
27+
print("\nShutting down the worker\n")
28+
29+
30+
if __name__ == "__main__":
31+
loop = asyncio.get_event_loop()
32+
try:
33+
loop.run_until_complete(main())
34+
except KeyboardInterrupt:
35+
print("\nInterrupt received, shutting down...\n")
36+
interrupt_event.set()
37+
loop.run_until_complete(loop.shutdown_asyncgens())

langchain/workflow.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from datetime import timedelta
2+
3+
from temporalio import workflow
4+
5+
with workflow.unsafe.imports_passed_through():
6+
from activities import TranslateParams, translate_phrase
7+
8+
9+
@workflow.defn
10+
class LangChainWorkflow:
11+
@workflow.run
12+
async def run(self, params: TranslateParams) -> dict:
13+
return await workflow.execute_activity(
14+
translate_phrase,
15+
params,
16+
schedule_to_close_timeout=timedelta(seconds=30),
17+
)

0 commit comments

Comments
 (0)