-
Notifications
You must be signed in to change notification settings - Fork 94
LangGraph Samples #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
LangGraph Samples #272
Changes from all commits
662053d
18b995a
f8f51a0
6b96e09
2e5c82e
0432967
ca793bf
5adef7d
3b62683
1fb4682
c18f9cb
d3fe116
771b46a
f579c0d
57110cf
0302320
e82d0a5
3475e6f
7db14fe
0beeb3c
3e6df8f
d772f0a
fe7134d
58516c1
9b17b34
6e320fa
3b54795
d097a07
982f75a
76162db
b083bfe
cc9480c
fa7f1bb
faf4dce
c7f240a
c95ddac
bb536f0
2a313c1
a68e14c
bb741b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| # Temporal Python Samples | ||
|
|
||
| ## Serena MCP Server | ||
|
|
||
| Always consult Serena memories at the start of a session using `mcp__serena__list_memories` and read relevant ones with `mcp__serena__read_memory`. Save important project-specific learnings to Serena for future sessions. | ||
|
|
||
| ## Client Initialization Pattern | ||
|
|
||
| Use the `ClientConfig` pattern for client initialization to support environment-based configuration: | ||
|
|
||
| ```python | ||
| from temporalio.client import Client | ||
| from temporalio.envconfig import ClientConfig | ||
|
|
||
| config = ClientConfig.load_client_connect_config() | ||
| config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config) | ||
| ``` | ||
|
|
||
| This pattern allows configuration via environment variables while providing sensible defaults. | ||
|
|
||
| ## LangGraph Guidelines | ||
|
|
||
| ### Agent Creation | ||
|
|
||
| - **DO NOT** use `create_react_agent` from `langgraph.prebuilt` - it is deprecated | ||
| - **USE** `create_agent` from `langchain.agents` instead | ||
|
|
||
| ```python | ||
| # Wrong (deprecated) | ||
| from langgraph.prebuilt import create_react_agent | ||
| agent = create_react_agent(model=model, tools=[...], prompt="...") | ||
|
|
||
| # Correct | ||
| from langchain.agents import create_agent | ||
| agent = create_agent(model=model, tools=[...], system_prompt="...") | ||
| ``` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| # Temporal LangGraph Samples | ||
|
|
||
| These samples demonstrate the Temporal LangGraph integration - combining LangGraph's agent framework with Temporal's durable execution. | ||
|
|
||
| > **Note:** The LangGraph integration is currently available as a preview feature in the `langgraph_plugin` branch of the SDK repository. | ||
|
|
||
| ## Overview | ||
|
|
||
| The integration combines: | ||
| - **Temporal workflows** for orchestrating agent control flow and state management | ||
| - **LangGraph** for defining agent graphs with conditional logic, cycles, and state | ||
|
|
||
| This approach ensures that AI agent workflows are durable, observable, and can handle failures gracefully. | ||
|
|
||
| ## Prerequisites | ||
|
|
||
| - Temporal server [running locally](https://docs.temporal.io/cli/server#start-dev) | ||
| - Python 3.9+ | ||
| - [uv](https://docs.astral.sh/uv/) package manager (recommended) | ||
|
|
||
| ## Installation | ||
|
|
||
| Since the LangGraph integration is currently in a branch, you need to install from the branch repositories. | ||
|
|
||
| ### Running the Samples | ||
|
|
||
| 1. Clone this samples repository: | ||
| ```bash | ||
| git clone -b langgraph_plugin https://github.com/mfateev/samples-python.git | ||
| cd samples-python | ||
| ``` | ||
|
|
||
| 2. Install dependencies: | ||
| ```bash | ||
| uv sync --group langgraph | ||
| ``` | ||
|
|
||
| 3. Install the SDK from the `langgraph-plugin` branch: | ||
| ```bash | ||
| uv pip install "temporalio @ git+https://github.com/mfateev/sdk-python.git@langgraph-plugin" | ||
| ``` | ||
|
|
||
| 4. Start a local Temporal server: | ||
| ```bash | ||
| temporal server start-dev | ||
| ``` | ||
|
|
||
| 5. Navigate to a sample directory and follow its README for specific instructions | ||
|
|
||
| ## Examples | ||
|
|
||
| Each directory contains a complete example with its own README for detailed instructions: | ||
|
|
||
| | Sample | Description | | ||
| |--------|-------------| | ||
| | [hello_world](./hello_world/) | Simple starter example demonstrating basic plugin setup and graph registration | | ||
| | [activity_from_node](./activity_from_node/) | Calling Temporal activities from a graph node using run_in_workflow | | ||
| | [react_agent](./react_agent/) | ReAct agent pattern with tool calling and multi-step reasoning | | ||
| | [human_in_the_loop](./human_in_the_loop/) | Human-in-the-loop approval workflows using two approaches | | ||
| | ↳ [approval_graph_interrupt](./human_in_the_loop/approval_graph_interrupt/) | Uses LangGraph's `interrupt()` function | | ||
| | ↳ [approval_wait_condition](./human_in_the_loop/approval_wait_condition/) | Uses `run_in_workflow=True` with `workflow.wait_condition()` | | ||
| | [supervisor](./supervisor/) | Multi-agent supervisor pattern coordinating specialized agents | | ||
| | [agentic_rag](./agentic_rag/) | Retrieval-augmented generation with document grading and query rewriting | | ||
| | [deep_research](./deep_research/) | Multi-step research with web search and iterative refinement | | ||
| | [plan_and_execute](./plan_and_execute/) | Plan-and-execute pattern with structured step execution | | ||
| | [reflection](./reflection/) | Self-reflection pattern for iterative improvement | | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # Temporal LangGraph Samples |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| # Activity from Node | ||
|
|
||
| Demonstrates calling Temporal activities directly from a LangGraph node using the `run_in_workflow` feature. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above. Say what the upshot is here and then introduce the flag lower down.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Naming-wise, the other samples are all "scenario samples" whereas this one is named after a feature it highlights--that was a little confusing at first. |
||
|
|
||
| ## What This Sample Demonstrates | ||
|
|
||
| - **run_in_workflow nodes**: Using `temporal_node_metadata(run_in_workflow=True)` to run a node in the workflow context | ||
| - **Activity orchestration**: Calling multiple Temporal activities from within a graph node | ||
| - **Mixed execution modes**: Combining run_in_workflow nodes with regular activity nodes | ||
| - **Sandbox enforcement**: Node code is sandboxed to ensure deterministic execution | ||
|
|
||
| ## How It Works | ||
|
|
||
| 1. **Orchestrator Node**: Runs directly in the workflow (not as an activity) with `run_in_workflow=True` | ||
| - Calls `validate_data` activity to validate input | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't feel realistic to validate in a separate activity unless it's very expensive. There's not a lot of upside AFAICT, and as a user I'm left unsold as to the value of Another scenario idea would be to have an expensive query that one would want to memoize, followed by using that data. |
||
| - Calls `enrich_data` activity to enrich valid data | ||
| - Implements orchestration logic (conditional activity calls) | ||
|
|
||
| 2. **Finalize Node**: Runs as a regular Temporal activity (default behavior) | ||
| - Processes the enriched data | ||
|
|
||
| 3. **Activities**: Standard Temporal activities called from the orchestrator | ||
| - `validate_data`: Validates input data | ||
| - `enrich_data`: Enriches data with additional information | ||
|
|
||
| ## When to Use run_in_workflow | ||
|
|
||
| Use `run_in_workflow=True` when your node needs to: | ||
| - Call Temporal activities, child workflows, or other Temporal operations | ||
| - Use workflow features like timers, signals, or queries | ||
| - Implement complex orchestration logic with multiple activity calls | ||
|
|
||
| **Important**: Code in run_in_workflow nodes is sandboxed to ensure deterministic execution. Non-deterministic operations (like `random.randint()`) will be blocked. | ||
|
|
||
| ## Running the Example | ||
|
|
||
| First, start the worker: | ||
| ```bash | ||
| uv run langgraph_plugin/activity_from_node/run_worker.py | ||
| ``` | ||
|
|
||
| Then, in a separate terminal, run the workflow: | ||
| ```bash | ||
| uv run langgraph_plugin/activity_from_node/run_workflow.py | ||
| ``` | ||
|
|
||
| ## Expected Output | ||
|
|
||
| ``` | ||
| Result: {'data': 'Hello from LangGraph', 'validated': True, 'enriched_data': 'Hello from LangGraph [enriched at activity]', 'final_result': 'Processed: Hello from LangGraph [enriched at activity]'} | ||
| ``` | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Activity from Node sample.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| """Activity from Node - Activity Definitions. | ||
|
|
||
| Activities that are called from a run_in_workflow node. | ||
| """ | ||
|
|
||
| from temporalio import activity | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def validate_data(data: str) -> bool: | ||
| """Validate the input data. | ||
|
|
||
| In a real application, this could: | ||
| - Check data format and schema | ||
| - Verify required fields | ||
| - Call external validation services | ||
| """ | ||
| activity.logger.info(f"Validating data: {data}") | ||
|
|
||
| # Simple validation - check if data is non-empty | ||
| is_valid = bool(data and data.strip()) | ||
|
|
||
| activity.logger.info(f"Validation result: {is_valid}") | ||
| return is_valid | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def enrich_data(data: str) -> str: | ||
| """Enrich the input data with additional information. | ||
|
|
||
| In a real application, this could: | ||
| - Call external APIs for data enrichment | ||
| - Lookup data from databases | ||
| - Apply transformations | ||
| """ | ||
| activity.logger.info(f"Enriching data: {data}") | ||
|
|
||
| # Simple enrichment - add metadata | ||
| enriched = f"{data} [enriched at activity]" | ||
|
|
||
| activity.logger.info(f"Enriched data: {enriched}") | ||
| return enriched |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| """Activity from Node - Graph Definition. | ||
|
|
||
| This module defines a graph where a node runs in the workflow context | ||
| and calls Temporal activities directly. | ||
| """ | ||
|
|
||
| from datetime import timedelta | ||
| from typing import Any | ||
|
|
||
| from langgraph.graph import END, START, StateGraph | ||
| from typing_extensions import TypedDict | ||
|
|
||
| # ============================================================================= | ||
| # State Definition | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| class ProcessingState(TypedDict, total=False): | ||
| """State for the processing graph.""" | ||
|
|
||
| data: str | ||
| validated: bool | ||
| enriched_data: str | ||
| final_result: str | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Node Functions | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| async def orchestrator_node(state: ProcessingState) -> ProcessingState: | ||
| """Node that orchestrates multiple activity calls from the workflow. | ||
|
|
||
| This node runs directly in the workflow (run_in_workflow=True) so it can: | ||
| - Call multiple Temporal activities | ||
| - Use workflow features like timers, signals, queries | ||
| - Implement complex orchestration logic | ||
|
|
||
| The node is sandboxed, ensuring deterministic code. | ||
| """ | ||
| from temporalio import workflow | ||
|
|
||
| data = state.get("data", "") | ||
|
|
||
| # Call validation activity | ||
| is_valid = await workflow.execute_activity( | ||
| "validate_data", | ||
| data, | ||
| start_to_close_timeout=timedelta(seconds=30), | ||
| ) | ||
|
|
||
| if not is_valid: | ||
| return {"validated": False, "final_result": "Validation failed"} | ||
|
|
||
| # Call enrichment activity | ||
| enriched = await workflow.execute_activity( | ||
| "enrich_data", | ||
| data, | ||
| start_to_close_timeout=timedelta(seconds=30), | ||
| ) | ||
|
|
||
| return {"validated": True, "enriched_data": enriched} | ||
|
|
||
|
|
||
| def finalize_node(state: ProcessingState) -> ProcessingState: | ||
| """Final processing node - runs as a regular activity. | ||
|
|
||
| This demonstrates mixing run_in_workflow nodes with regular activity nodes. | ||
| """ | ||
| if not state.get("validated"): | ||
| return state | ||
|
|
||
| enriched = state.get("enriched_data", "") | ||
| return {"final_result": f"Processed: {enriched}"} | ||
|
|
||
|
|
||
| # ============================================================================= | ||
| # Graph Builder | ||
| # ============================================================================= | ||
|
|
||
|
|
||
| def build_activity_from_node_graph() -> Any: | ||
| """Build a graph with a node that calls activities from the workflow. | ||
|
|
||
| The orchestrator node uses run_in_workflow=True to execute directly | ||
| in the workflow context, allowing it to call Temporal activities. | ||
| """ | ||
| from temporalio.contrib.langgraph import temporal_node_metadata | ||
|
|
||
| graph = StateGraph(ProcessingState) | ||
|
|
||
| # Orchestrator runs in workflow to call activities | ||
| graph.add_node( | ||
| "orchestrator", | ||
| orchestrator_node, | ||
| metadata=temporal_node_metadata(run_in_workflow=True), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe providing an enum would be clearer and more extensible, like |
||
| ) | ||
|
|
||
| # Finalize runs as a regular activity | ||
| graph.add_node("finalize", finalize_node) | ||
|
|
||
| graph.add_edge(START, "orchestrator") | ||
| graph.add_edge("orchestrator", "finalize") | ||
| graph.add_edge("finalize", END) | ||
|
|
||
| return graph.compile() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| """Worker for the Activity from Node example. | ||
|
|
||
| Starts a Temporal worker that can execute ActivityFromNodeWorkflow. | ||
| """ | ||
|
|
||
| import asyncio | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.contrib.langgraph import LangGraphPlugin | ||
| from temporalio.envconfig import ClientConfig | ||
| from temporalio.worker import Worker | ||
|
|
||
| from langgraph_plugin.activity_from_node.activities import enrich_data, validate_data | ||
| from langgraph_plugin.activity_from_node.graph import build_activity_from_node_graph | ||
| from langgraph_plugin.activity_from_node.workflow import ActivityFromNodeWorkflow | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| # Create the plugin with our graph registered | ||
| plugin = LangGraphPlugin( | ||
| graphs={"activity_from_node_graph": build_activity_from_node_graph}, | ||
| ) | ||
|
|
||
| # Connect to Temporal with the plugin | ||
| config = ClientConfig.load_client_connect_config() | ||
| config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config, plugins=[plugin]) | ||
|
|
||
| # Create and run the worker | ||
| # Note: We register our custom activities alongside LangGraph's auto-registered ones | ||
| worker = Worker( | ||
| client, | ||
| task_queue="langgraph-activity-from-node", | ||
| workflows=[ActivityFromNodeWorkflow], | ||
| activities=[validate_data, enrich_data], | ||
| ) | ||
|
|
||
| print("Worker started. Ctrl+C to exit.") | ||
| await worker.run() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| """Run the Activity from Node workflow. | ||
|
|
||
| Starts a workflow execution and waits for the result. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import uuid | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.envconfig import ClientConfig | ||
|
|
||
| from langgraph_plugin.activity_from_node.workflow import ActivityFromNodeWorkflow | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| # Connect to Temporal | ||
| config = ClientConfig.load_client_connect_config() | ||
| config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config) | ||
|
|
||
| # Run the workflow | ||
| result = await client.execute_workflow( | ||
| ActivityFromNodeWorkflow.run, | ||
| "Hello from LangGraph", | ||
| id=f"activity-from-node-{uuid.uuid4()}", | ||
| task_queue="langgraph-activity-from-node", | ||
| ) | ||
|
|
||
| print(f"Result: {result}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear what
run_in_workflowmeans, especially at this top level. Is it an implementation detail? I'm wondering what I will get out of looking at this sample.