```
├── .clinerules
├── .cursorrules
├── .env.sample
├── .gitignore
├── .windsurfrules
├── LICENSE
├── README.md
├── assets/
├── banner.png
├── example.png
├── youtube_thumbnail.png
├── docs/
├── AutoGen Core/
├── 01_agent.md
├── 02_messaging_system__topic___subscription_.md
├── 03_agentruntime.md
├── 04_tool.md
├── 05_chatcompletionclient.md
```
## /.clinerules
```clinerules path="/.clinerules"
---
layout: default
title: "Agentic Coding"
---
# Agentic Coding: Humans Design, Agents code!
> If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification.
{: .warning }
## Agentic Coding Steps
Agentic Coding should be a collaboration between Human System Design and Agent Implementation:
| Steps | Human | AI | Comment |
|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------|
| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. |
| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. |
| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. |
| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. |
| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. |
| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. |
| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. |
1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit.
- Understand AI systems' strengths and limitations:
- **Good for**: Routine tasks requiring common sense (filling forms, replying to emails)
- **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL)
- **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning)
- **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features.
- **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early.
2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes.
- Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)).
- For each node in the flow, start with a high-level one-line description of what it does.
- If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine).
- If using **Agent**, specify what are the inputs (context) and what are the possible actions.
- If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows.
- Outline the flow and draw it in a mermaid diagram. For example:
\`\`\`mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]
\`\`\`
- > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition.
{: .best-practice }
3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions.
- Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:

- Reading inputs (e.g., retrieving Slack messages, reading emails)
- Writing outputs (e.g., generating reports, sending emails)
- Using external tools (e.g., calling LLMs, searching the web)
- **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system.
- For each utility function, implement it and write a simple test.
- Document their input/output, as well as why they are necessary. For example:
- `name`: `get_embedding` (`utils/get_embedding.py`)
- `input`: `str`
- `output`: a vector of 3072 floats
- `necessity`: Used by the second node to embed text
- Example utility implementation:
\`\`\`python
# utils/call_llm.py
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))
\`\`\`
- > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them.
{: .best-practice }
4. **Node Design**: Plan how each node will read and write data, and use utility functions.
- One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design:
- For simple systems, use an in-memory dictionary.
- For more complex systems or when persistence is required, use a database.
- **Don't Repeat Yourself**: Use in-memory references or foreign keys.
- Example shared store design:
\`\`\`python
shared = {
"user": {
"id": "user123",
"context": { # Another nested dict
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {} # Empty dict to store outputs
}
\`\`\`
- For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
- `type`: Regular (or Batch, or Async)
- `prep`: Read "text" from the shared store
- `exec`: Call the embedding utility function
- `post`: Write "embedding" to the shared store
5. **Implementation**: Implement the initial nodes and flows based on the design.
- 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins!
- **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking.
- **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system.
- Add logging throughout the code to facilitate debugging.
7. **Optimization**:
- **Use Intuition**: For a quick initial evaluation, human intuition is often a good start.
- **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
- If your flow design is already solid, move on to micro-optimizations:
- **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity.
- **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone.
- > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times.
>
> 
{: .best-practice }
8. **Reliability**
- **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times.
- **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging.
- **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain.
## Example LLM Project File Structure
\`\`\`
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md
\`\`\`
- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*.
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
- **`nodes.py`**: Contains all the node definitions.
\`\`\`python
# nodes.py
from pocketflow import Node
from utils.call_llm import call_llm
class GetQuestionNode(Node):
def exec(self, _):
# Get question directly from user input
user_question = input("Enter your question: ")
return user_question
def post(self, shared, prep_res, exec_res):
# Store the user's question
shared["question"] = exec_res
return "default" # Go to the next node
class AnswerNode(Node):
def prep(self, shared):
# Read question from shared
return shared["question"]
def exec(self, question):
# Call LLM to get the answer
return call_llm(question)
def post(self, shared, prep_res, exec_res):
# Store the answer in shared
shared["answer"] = exec_res
\`\`\`
- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them.
\`\`\`python
# flow.py
from pocketflow import Flow
from nodes import GetQuestionNode, AnswerNode
def create_qa_flow():
"""Create and return a question-answering flow."""
# Create nodes
get_question_node = GetQuestionNode()
answer_node = AnswerNode()
# Connect nodes in sequence
get_question_node >> answer_node
# Create flow starting with input node
return Flow(start=get_question_node)
\`\`\`
- **`main.py`**: Serves as the project's entry point.
\`\`\`python
# main.py
from flow import create_qa_flow
# Example main function
# Please replace this with your own main function
def main():
shared = {
"question": None, # Will be populated by GetQuestionNode from user input
"answer": None # Will be populated by AnswerNode
}
# Create the flow and run it
qa_flow = create_qa_flow()
qa_flow.run(shared)
print(f"Question: {shared['question']}")
print(f"Answer: {shared['answer']}")
if __name__ == "__main__":
main()
\`\`\`
================================================
File: docs/index.md
================================================
---
layout: default
title: "Home"
nav_order: 1
---
# Pocket Flow
A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*.
- **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in.
- **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more.
- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction
We model the LLM workflow as a **Graph + Shared Store**:
- [Node](./core_abstraction/node.md) handles simple (LLM) tasks.
- [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges).
- [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows.
- [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks.
- [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks.
- [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern
From there, it’s easy to implement popular design patterns:
- [Agent](./design_pattern/agent.md) autonomously makes decisions.
- [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines.
- [RAG](./design_pattern/rag.md) integrates data retrieval with generation.
- [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps.
- [Structured Output](./design_pattern/structure.md) formats outputs consistently.
- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function
We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*:
- [LLM Wrapper](./utility_function/llm.md)
- [Viz and Debug](./utility_function/viz.md)
- [Web Search](./utility_function/websearch.md)
- [Chunking](./utility_function/chunking.md)
- [Embedding](./utility_function/embedding.md)
- [Vector Databases](./utility_function/vector.md)
- [Text-to-Speech](./utility_function/text_to_speech.md)
**Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework:
- *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs.
- *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally.
- *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in.
## Ready to build your Apps?
Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow!
================================================
File: docs/core_abstraction/async.md
================================================
---
layout: default
title: "(Advanced) Async"
parent: "Core Abstraction"
nav_order: 5
---
# (Advanced) Async
**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for:
1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way.
2. **exec_async()**: Typically used for async LLM calls.
3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`.
**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes.
### Example
\`\`\`python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())
\`\`\`
================================================
File: docs/core_abstraction/batch.md
================================================
---
layout: default
title: "Batch"
parent: "Core Abstraction"
nav_order: 4
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
- **Chunk-based** processing (e.g., splitting large texts).
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File
\`\`\`python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
\`\`\`
---
## 2. BatchFlow
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files
\`\`\`python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
\`\`\`
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 3. Nested or Multi-Level Batches
You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
\`\`\`python
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
\`\`\`
================================================
File: docs/core_abstraction/communication.md
================================================
---
layout: default
title: "Communication"
parent: "Core Abstraction"
nav_order: 3
---
# Communication
Nodes and Flows **communicate** in 2 ways:
1. **Shared Store (for almost all the cases)**
- A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`).
- Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
- > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md).
{: .best-practice }
2. **Params (only for [Batch](./batch.md))**
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
- Good for identifiers like filenames or numeric IDs, in Batch mode.
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
\`\`\`python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
\`\`\`
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
\`\`\`python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
\`\`\`
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
---
## 2. Params
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
> Only set the uppermost Flow params because others will be overwritten by the parent Flow.
>
> If you need to set child node params, see [Batch](./batch.md).
{: .warning }
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
\`\`\`python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
\`\`\`
================================================
File: docs/core_abstraction/flow.md
================================================
---
layout: default
title: "Flow"
parent: "Core Abstraction"
nav_order: 2
---
# Flow
A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`.
## 1. Action-based Transitions
Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`.
You define transitions with the syntax:
1. **Basic default transition**: `node_a >> node_b`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
2. **Named action transition**: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
It's possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node.
### Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
\`\`\`python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
\`\`\`
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
\`\`\`python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
\`\`\`
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops
\`\`\`mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
\`\`\`
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
> `node.run(shared)` **does not** proceed to the successor.
> This is mainly for debugging or testing a single node.
>
> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{: .warning }
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Flow's Node Methods
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
### Basic Flow Nesting
Here's how to connect a flow to another node:
\`\`\`python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
\`\`\`
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a->node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
\`\`\`python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
\`\`\`
This creates a clean separation of concerns while maintaining a clear execution path:
\`\`\`mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
\`\`\`
================================================
File: docs/core_abstraction/node.md
================================================
---
layout: default
title: "Node"
parent: "Core Abstraction"
nav_order: 1
---
# Node
A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)`
- **Read and preprocess data** from `shared` store.
- Examples: *query DB, read files, or serialize data into a string*.
- Return `prep_res`, which is used by `exec()` and `post()`.
2. `exec(prep_res)`
- **Execute compute logic**, with optional retries and error handling (below).
- Examples: *(mostly) LLM calls, remote APIs, tool use*.
- ⚠️ This shall be only for compute and **NOT** access `shared`.
- ⚠️ If retries enabled, ensure idempotent implementation.
- Return `exec_res`, which is passed to `post()`.
3. `post(shared, prep_res, exec_res)`
- **Postprocess and write data** back to `shared`.
- Examples: *update DB, change states, log results*.
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
>
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
{: .note }
### Fault Tolerance & Retries
You can **retry** `exec()` if it raises an exception via two parameters when define the Node:
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
\`\`\`python
my_node = SummarizeFile(max_retries=3, wait=10)
\`\`\`
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
You can get the current retry times (0-based) from `self.cur_retry`.
\`\`\`python
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
\`\`\`
### Graceful Fallback
To **gracefully handle** the exception (after all retries) rather than raising it, override:
\`\`\`python
def exec_fallback(self, prep_res, exc):
raise exc
\`\`\`
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
### Example: Summarize file
\`\`\`python
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])
\`\`\`
================================================
File: docs/core_abstraction/parallel.md
================================================
---
layout: default
title: "(Advanced) Parallel"
parent: "Core Abstraction"
nav_order: 6
---
# (Advanced) Parallel
**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O.
{: .warning }
> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
>
> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
>
> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.
{: .best-practice }
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
\`\`\`python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)
\`\`\`
## AsyncParallelBatchFlow
Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters:
\`\`\`python
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
\`\`\`
================================================
File: docs/design_pattern/agent.md
================================================
---
layout: default
title: "Agent"
parent: "Design Pattern"
nav_order: 1
---
# Agent
Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph
1. **Context and Action:** Implement nodes that supply context and perform actions.
2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step.
3. **Agent Node:** Provide a prompt to decide action—for example:
\`\`\`python
f"""
### CONTEXT
Task: {task_description}
Previous Actions: {previous_actions}
Current State: {current_state}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (str): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (str): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
\`\`\`yaml
thinking: |
action:
parameters:
:
\`\`\`"""
\`\`\`
The core of building **high-performance** and **reliable** agents boils down to:
1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content.
2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database.
## Example Good Action Design
- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once.
- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts).
- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files.
- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends.
## Example: Search Agent
This agent:
1. Decides whether to search or answer
2. If searches, loops back to decide if more search needed
3. Answers when enough context gathered
\`\`\`python
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
\`\`\`yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
\`\`\`"""
resp = call_llm(prompt)
yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
\`\`\`
================================================
File: docs/design_pattern/mapreduce.md
================================================
---
layout: default
title: "Map Reduce"
parent: "Design Pattern"
nav_order: 4
---
# Map Reduce
MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase.
### Example: Document Summarization
\`\`\`python
class SummarizeAllFiles(BatchNode):
def prep(self, shared):
files_dict = shared["files"] # e.g. 10 files
return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...]
def exec(self, one_file):
filename, file_content = one_file
summary_text = call_llm(f"Summarize the following file:\n{file_content}")
return (filename, summary_text)
def post(self, shared, prep_res, exec_res_list):
shared["file_summaries"] = dict(exec_res_list)
class CombineSummaries(Node):
def prep(self, shared):
return shared["file_summaries"]
def exec(self, file_summaries):
# format as: "File1: summary\nFile2: summary...\n"
text_list = []
for fname, summ in file_summaries.items():
text_list.append(f"{fname} summary:\n{summ}\n")
big_text = "\n---\n".join(text_list)
return call_llm(f"Combine these file summaries into one final summary:\n{big_text}")
def post(self, shared, prep_res, final_summary):
shared["all_files_summary"] = final_summary
batch_node = SummarizeAllFiles()
combine_node = CombineSummaries()
batch_node >> combine_node
flow = Flow(start=batch_node)
shared = {
"files": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "Some other interesting text ...",
# ...
}
}
flow.run(shared)
print("Individual Summaries:", shared["file_summaries"])
print("\nFinal Summary:\n", shared["all_files_summary"])
\`\`\`
================================================
File: docs/design_pattern/rag.md
================================================
---
layout: default
title: "RAG"
parent: "Design Pattern"
nav_order: 3
---
# RAG (Retrieval Augmented Generation)
For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index").
2. **Online stage**: Given a question, generate answers by retrieving the most relevant context.
---
## Stage 1: Offline Indexing
We create three Nodes:
1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text.
2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk.
3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md).
\`\`\`python
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)
\`\`\`
Usage example:
\`\`\`python
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)
\`\`\`
---
## Stage 2: Online Query & Answer
We have 3 nodes:
1. `EmbedQuery` – embeds the user’s question.
2. `RetrieveDocs` – retrieves top chunk from the index.
3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer.
\`\`\`python
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)
\`\`\`
Usage example:
\`\`\`python
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]
\`\`\`
================================================
File: docs/design_pattern/structure.md
================================================
---
layout: default
title: "Structured Output"
parent: "Design Pattern"
nav_order: 5
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLM's response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
\`\`\`yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
\`\`\`
- Summarizing Documents into Bullet Points
\`\`\`yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
\`\`\`
- Generating Configuration Files
\`\`\`yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
\`\`\`
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., `yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
\`\`\`python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
\`\`\`yaml
summary:
- bullet 1
- bullet 2
- bullet 3
\`\`\`"""
response = call_llm(prompt)
yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
\`\`\`
> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic)
{: .note }
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
**In JSON**
\`\`\`json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
\`\`\`
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
\`\`\`yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
\`\`\`
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.
================================================
File: docs/design_pattern/workflow.md
================================================
---
layout: default
title: "Workflow"
parent: "Design Pattern"
nav_order: 2
---
# Workflow
Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*.
> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*.
>
> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md).
{: .best-practice }
### Example: Article Writing
\`\`\`python
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)
\`\`\`
For *dynamic cases*, consider using [Agents](./agent.md).
================================================
File: docs/utility_function/llm.md
================================================
---
layout: default
title: "LLM Wrapper"
parent: "Utility Function"
nav_order: 1
---
# LLM Wrappers
Check out libraries like [litellm](https://github.com/BerriAI/litellm).
Here, we provide some minimal example implementations:
1. OpenAI
\`\`\`python
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
\`\`\`
> Store the API key in an environment variable like OPENAI_API_KEY for security.
{: .best-practice }
2. Claude (Anthropic)
\`\`\`python
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY_HERE")
response = client.messages.create(
model="claude-2",
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
return response.content
\`\`\`
3. Google (Generative AI Studio / PaLM API)
\`\`\`python
def call_llm(prompt):
import google.generativeai as genai
genai.configure(api_key="YOUR_API_KEY_HERE")
response = genai.generate_text(
model="models/text-bison-001",
prompt=prompt
)
return response.result
\`\`\`
4. Azure (Azure OpenAI)
\`\`\`python
def call_llm(prompt):
from openai import AzureOpenAI
client = AzureOpenAI(
azure_endpoint="https://.openai.azure.com/",
api_key="YOUR_API_KEY_HERE",
api_version="2023-05-15"
)
r = client.chat.completions.create(
model="",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
\`\`\`
5. Ollama (Local LLM)
\`\`\`python
def call_llm(prompt):
from ollama import chat
response = chat(
model="llama2",
messages=[{"role": "user", "content": prompt}]
)
return response.message.content
\`\`\`
## Improvements
Feel free to enhance your `call_llm` function as needed. Here are examples:
- Handle chat history:
\`\`\`python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return r.choices[0].message.content
\`\`\`
- Add in-memory caching
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
\`\`\`
> ⚠️ Caching conflicts with Node retries, as retries yield the same result.
>
> To address this, you could use cached results only if not retried.
{: .warning }
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_call(prompt):
pass
def call_llm(prompt, use_cache):
if use_cache:
return cached_call(prompt)
# Call the underlying function directly
return cached_call.__wrapped__(prompt)
class SummarizeNode(Node):
def exec(self, text):
return call_llm(f"Summarize: {text}", self.cur_retry==0)
\`\`\`
- Enable logging:
\`\`\`python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response
\`\`\`
```
## /.cursorrules
```cursorrules path="/.cursorrules"
---
layout: default
title: "Agentic Coding"
---
# Agentic Coding: Humans Design, Agents code!
> If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification.
{: .warning }
## Agentic Coding Steps
Agentic Coding should be a collaboration between Human System Design and Agent Implementation:
| Steps | Human | AI | Comment |
|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------|
| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. |
| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. |
| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. |
| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. |
| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. |
| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. |
| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. |
1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit.
- Understand AI systems' strengths and limitations:
- **Good for**: Routine tasks requiring common sense (filling forms, replying to emails)
- **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL)
- **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning)
- **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features.
- **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early.
2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes.
- Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)).
- For each node in the flow, start with a high-level one-line description of what it does.
- If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine).
- If using **Agent**, specify what are the inputs (context) and what are the possible actions.
- If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows.
- Outline the flow and draw it in a mermaid diagram. For example:
\`\`\`mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]
\`\`\`
- > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition.
{: .best-practice }
3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions.
- Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:

- Reading inputs (e.g., retrieving Slack messages, reading emails)
- Writing outputs (e.g., generating reports, sending emails)
- Using external tools (e.g., calling LLMs, searching the web)
- **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system.
- For each utility function, implement it and write a simple test.
- Document their input/output, as well as why they are necessary. For example:
- `name`: `get_embedding` (`utils/get_embedding.py`)
- `input`: `str`
- `output`: a vector of 3072 floats
- `necessity`: Used by the second node to embed text
- Example utility implementation:
\`\`\`python
# utils/call_llm.py
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))
\`\`\`
- > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them.
{: .best-practice }
4. **Node Design**: Plan how each node will read and write data, and use utility functions.
- One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design:
- For simple systems, use an in-memory dictionary.
- For more complex systems or when persistence is required, use a database.
- **Don't Repeat Yourself**: Use in-memory references or foreign keys.
- Example shared store design:
\`\`\`python
shared = {
"user": {
"id": "user123",
"context": { # Another nested dict
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {} # Empty dict to store outputs
}
\`\`\`
- For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
- `type`: Regular (or Batch, or Async)
- `prep`: Read "text" from the shared store
- `exec`: Call the embedding utility function
- `post`: Write "embedding" to the shared store
5. **Implementation**: Implement the initial nodes and flows based on the design.
- 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins!
- **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking.
- **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system.
- Add logging throughout the code to facilitate debugging.
7. **Optimization**:
- **Use Intuition**: For a quick initial evaluation, human intuition is often a good start.
- **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
- If your flow design is already solid, move on to micro-optimizations:
- **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity.
- **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone.
- > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times.
>
> 
{: .best-practice }
8. **Reliability**
- **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times.
- **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging.
- **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain.
## Example LLM Project File Structure
\`\`\`
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md
\`\`\`
- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*.
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
- **`nodes.py`**: Contains all the node definitions.
\`\`\`python
# nodes.py
from pocketflow import Node
from utils.call_llm import call_llm
class GetQuestionNode(Node):
def exec(self, _):
# Get question directly from user input
user_question = input("Enter your question: ")
return user_question
def post(self, shared, prep_res, exec_res):
# Store the user's question
shared["question"] = exec_res
return "default" # Go to the next node
class AnswerNode(Node):
def prep(self, shared):
# Read question from shared
return shared["question"]
def exec(self, question):
# Call LLM to get the answer
return call_llm(question)
def post(self, shared, prep_res, exec_res):
# Store the answer in shared
shared["answer"] = exec_res
\`\`\`
- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them.
\`\`\`python
# flow.py
from pocketflow import Flow
from nodes import GetQuestionNode, AnswerNode
def create_qa_flow():
"""Create and return a question-answering flow."""
# Create nodes
get_question_node = GetQuestionNode()
answer_node = AnswerNode()
# Connect nodes in sequence
get_question_node >> answer_node
# Create flow starting with input node
return Flow(start=get_question_node)
\`\`\`
- **`main.py`**: Serves as the project's entry point.
\`\`\`python
# main.py
from flow import create_qa_flow
# Example main function
# Please replace this with your own main function
def main():
shared = {
"question": None, # Will be populated by GetQuestionNode from user input
"answer": None # Will be populated by AnswerNode
}
# Create the flow and run it
qa_flow = create_qa_flow()
qa_flow.run(shared)
print(f"Question: {shared['question']}")
print(f"Answer: {shared['answer']}")
if __name__ == "__main__":
main()
\`\`\`
================================================
File: docs/index.md
================================================
---
layout: default
title: "Home"
nav_order: 1
---
# Pocket Flow
A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*.
- **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in.
- **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more.
- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction
We model the LLM workflow as a **Graph + Shared Store**:
- [Node](./core_abstraction/node.md) handles simple (LLM) tasks.
- [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges).
- [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows.
- [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks.
- [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks.
- [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern
From there, it’s easy to implement popular design patterns:
- [Agent](./design_pattern/agent.md) autonomously makes decisions.
- [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines.
- [RAG](./design_pattern/rag.md) integrates data retrieval with generation.
- [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps.
- [Structured Output](./design_pattern/structure.md) formats outputs consistently.
- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function
We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*:
- [LLM Wrapper](./utility_function/llm.md)
- [Viz and Debug](./utility_function/viz.md)
- [Web Search](./utility_function/websearch.md)
- [Chunking](./utility_function/chunking.md)
- [Embedding](./utility_function/embedding.md)
- [Vector Databases](./utility_function/vector.md)
- [Text-to-Speech](./utility_function/text_to_speech.md)
**Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework:
- *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs.
- *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally.
- *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in.
## Ready to build your Apps?
Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow!
================================================
File: docs/core_abstraction/async.md
================================================
---
layout: default
title: "(Advanced) Async"
parent: "Core Abstraction"
nav_order: 5
---
# (Advanced) Async
**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for:
1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way.
2. **exec_async()**: Typically used for async LLM calls.
3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`.
**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes.
### Example
\`\`\`python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())
\`\`\`
================================================
File: docs/core_abstraction/batch.md
================================================
---
layout: default
title: "Batch"
parent: "Core Abstraction"
nav_order: 4
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
- **Chunk-based** processing (e.g., splitting large texts).
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File
\`\`\`python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
\`\`\`
---
## 2. BatchFlow
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files
\`\`\`python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
\`\`\`
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 3. Nested or Multi-Level Batches
You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
\`\`\`python
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
\`\`\`
================================================
File: docs/core_abstraction/communication.md
================================================
---
layout: default
title: "Communication"
parent: "Core Abstraction"
nav_order: 3
---
# Communication
Nodes and Flows **communicate** in 2 ways:
1. **Shared Store (for almost all the cases)**
- A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`).
- Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
- > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md).
{: .best-practice }
2. **Params (only for [Batch](./batch.md))**
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
- Good for identifiers like filenames or numeric IDs, in Batch mode.
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
\`\`\`python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
\`\`\`
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
\`\`\`python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
\`\`\`
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
---
## 2. Params
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
> Only set the uppermost Flow params because others will be overwritten by the parent Flow.
>
> If you need to set child node params, see [Batch](./batch.md).
{: .warning }
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
\`\`\`python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
\`\`\`
================================================
File: docs/core_abstraction/flow.md
================================================
---
layout: default
title: "Flow"
parent: "Core Abstraction"
nav_order: 2
---
# Flow
A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`.
## 1. Action-based Transitions
Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`.
You define transitions with the syntax:
1. **Basic default transition**: `node_a >> node_b`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
2. **Named action transition**: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
It's possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node.
### Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
\`\`\`python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
\`\`\`
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
\`\`\`python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
\`\`\`
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops
\`\`\`mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
\`\`\`
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
> `node.run(shared)` **does not** proceed to the successor.
> This is mainly for debugging or testing a single node.
>
> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{: .warning }
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Flow's Node Methods
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
### Basic Flow Nesting
Here's how to connect a flow to another node:
\`\`\`python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
\`\`\`
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a->node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
\`\`\`python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
\`\`\`
This creates a clean separation of concerns while maintaining a clear execution path:
\`\`\`mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
\`\`\`
================================================
File: docs/core_abstraction/node.md
================================================
---
layout: default
title: "Node"
parent: "Core Abstraction"
nav_order: 1
---
# Node
A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)`
- **Read and preprocess data** from `shared` store.
- Examples: *query DB, read files, or serialize data into a string*.
- Return `prep_res`, which is used by `exec()` and `post()`.
2. `exec(prep_res)`
- **Execute compute logic**, with optional retries and error handling (below).
- Examples: *(mostly) LLM calls, remote APIs, tool use*.
- ⚠️ This shall be only for compute and **NOT** access `shared`.
- ⚠️ If retries enabled, ensure idempotent implementation.
- Return `exec_res`, which is passed to `post()`.
3. `post(shared, prep_res, exec_res)`
- **Postprocess and write data** back to `shared`.
- Examples: *update DB, change states, log results*.
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
>
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
{: .note }
### Fault Tolerance & Retries
You can **retry** `exec()` if it raises an exception via two parameters when define the Node:
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
\`\`\`python
my_node = SummarizeFile(max_retries=3, wait=10)
\`\`\`
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
You can get the current retry times (0-based) from `self.cur_retry`.
\`\`\`python
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
\`\`\`
### Graceful Fallback
To **gracefully handle** the exception (after all retries) rather than raising it, override:
\`\`\`python
def exec_fallback(self, prep_res, exc):
raise exc
\`\`\`
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
### Example: Summarize file
\`\`\`python
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])
\`\`\`
================================================
File: docs/core_abstraction/parallel.md
================================================
---
layout: default
title: "(Advanced) Parallel"
parent: "Core Abstraction"
nav_order: 6
---
# (Advanced) Parallel
**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O.
{: .warning }
> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
>
> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
>
> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.
{: .best-practice }
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
\`\`\`python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)
\`\`\`
## AsyncParallelBatchFlow
Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters:
\`\`\`python
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
\`\`\`
================================================
File: docs/design_pattern/agent.md
================================================
---
layout: default
title: "Agent"
parent: "Design Pattern"
nav_order: 1
---
# Agent
Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph
1. **Context and Action:** Implement nodes that supply context and perform actions.
2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step.
3. **Agent Node:** Provide a prompt to decide action—for example:
\`\`\`python
f"""
### CONTEXT
Task: {task_description}
Previous Actions: {previous_actions}
Current State: {current_state}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (str): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (str): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
\`\`\`yaml
thinking: |
action:
parameters:
:
\`\`\`"""
\`\`\`
The core of building **high-performance** and **reliable** agents boils down to:
1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content.
2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database.
## Example Good Action Design
- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once.
- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts).
- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files.
- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends.
## Example: Search Agent
This agent:
1. Decides whether to search or answer
2. If searches, loops back to decide if more search needed
3. Answers when enough context gathered
\`\`\`python
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
\`\`\`yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
\`\`\`"""
resp = call_llm(prompt)
yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
\`\`\`
================================================
File: docs/design_pattern/mapreduce.md
================================================
---
layout: default
title: "Map Reduce"
parent: "Design Pattern"
nav_order: 4
---
# Map Reduce
MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase.
### Example: Document Summarization
\`\`\`python
class SummarizeAllFiles(BatchNode):
def prep(self, shared):
files_dict = shared["files"] # e.g. 10 files
return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...]
def exec(self, one_file):
filename, file_content = one_file
summary_text = call_llm(f"Summarize the following file:\n{file_content}")
return (filename, summary_text)
def post(self, shared, prep_res, exec_res_list):
shared["file_summaries"] = dict(exec_res_list)
class CombineSummaries(Node):
def prep(self, shared):
return shared["file_summaries"]
def exec(self, file_summaries):
# format as: "File1: summary\nFile2: summary...\n"
text_list = []
for fname, summ in file_summaries.items():
text_list.append(f"{fname} summary:\n{summ}\n")
big_text = "\n---\n".join(text_list)
return call_llm(f"Combine these file summaries into one final summary:\n{big_text}")
def post(self, shared, prep_res, final_summary):
shared["all_files_summary"] = final_summary
batch_node = SummarizeAllFiles()
combine_node = CombineSummaries()
batch_node >> combine_node
flow = Flow(start=batch_node)
shared = {
"files": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "Some other interesting text ...",
# ...
}
}
flow.run(shared)
print("Individual Summaries:", shared["file_summaries"])
print("\nFinal Summary:\n", shared["all_files_summary"])
\`\`\`
================================================
File: docs/design_pattern/rag.md
================================================
---
layout: default
title: "RAG"
parent: "Design Pattern"
nav_order: 3
---
# RAG (Retrieval Augmented Generation)
For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index").
2. **Online stage**: Given a question, generate answers by retrieving the most relevant context.
---
## Stage 1: Offline Indexing
We create three Nodes:
1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text.
2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk.
3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md).
\`\`\`python
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)
\`\`\`
Usage example:
\`\`\`python
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)
\`\`\`
---
## Stage 2: Online Query & Answer
We have 3 nodes:
1. `EmbedQuery` – embeds the user’s question.
2. `RetrieveDocs` – retrieves top chunk from the index.
3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer.
\`\`\`python
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)
\`\`\`
Usage example:
\`\`\`python
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]
\`\`\`
================================================
File: docs/design_pattern/structure.md
================================================
---
layout: default
title: "Structured Output"
parent: "Design Pattern"
nav_order: 5
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLM's response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
\`\`\`yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
\`\`\`
- Summarizing Documents into Bullet Points
\`\`\`yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
\`\`\`
- Generating Configuration Files
\`\`\`yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
\`\`\`
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., `yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
\`\`\`python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
\`\`\`yaml
summary:
- bullet 1
- bullet 2
- bullet 3
\`\`\`"""
response = call_llm(prompt)
yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
\`\`\`
> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic)
{: .note }
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
**In JSON**
\`\`\`json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
\`\`\`
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
\`\`\`yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
\`\`\`
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.
================================================
File: docs/design_pattern/workflow.md
================================================
---
layout: default
title: "Workflow"
parent: "Design Pattern"
nav_order: 2
---
# Workflow
Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*.
> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*.
>
> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md).
{: .best-practice }
### Example: Article Writing
\`\`\`python
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)
\`\`\`
For *dynamic cases*, consider using [Agents](./agent.md).
================================================
File: docs/utility_function/llm.md
================================================
---
layout: default
title: "LLM Wrapper"
parent: "Utility Function"
nav_order: 1
---
# LLM Wrappers
Check out libraries like [litellm](https://github.com/BerriAI/litellm).
Here, we provide some minimal example implementations:
1. OpenAI
\`\`\`python
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
\`\`\`
> Store the API key in an environment variable like OPENAI_API_KEY for security.
{: .best-practice }
2. Claude (Anthropic)
\`\`\`python
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY_HERE")
response = client.messages.create(
model="claude-2",
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
return response.content
\`\`\`
3. Google (Generative AI Studio / PaLM API)
\`\`\`python
def call_llm(prompt):
import google.generativeai as genai
genai.configure(api_key="YOUR_API_KEY_HERE")
response = genai.generate_text(
model="models/text-bison-001",
prompt=prompt
)
return response.result
\`\`\`
4. Azure (Azure OpenAI)
\`\`\`python
def call_llm(prompt):
from openai import AzureOpenAI
client = AzureOpenAI(
azure_endpoint="https://.openai.azure.com/",
api_key="YOUR_API_KEY_HERE",
api_version="2023-05-15"
)
r = client.chat.completions.create(
model="",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
\`\`\`
5. Ollama (Local LLM)
\`\`\`python
def call_llm(prompt):
from ollama import chat
response = chat(
model="llama2",
messages=[{"role": "user", "content": prompt}]
)
return response.message.content
\`\`\`
## Improvements
Feel free to enhance your `call_llm` function as needed. Here are examples:
- Handle chat history:
\`\`\`python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return r.choices[0].message.content
\`\`\`
- Add in-memory caching
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
\`\`\`
> ⚠️ Caching conflicts with Node retries, as retries yield the same result.
>
> To address this, you could use cached results only if not retried.
{: .warning }
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_call(prompt):
pass
def call_llm(prompt, use_cache):
if use_cache:
return cached_call(prompt)
# Call the underlying function directly
return cached_call.__wrapped__(prompt)
class SummarizeNode(Node):
def exec(self, text):
return call_llm(f"Summarize: {text}", self.cur_retry==0)
\`\`\`
- Enable logging:
\`\`\`python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response
\`\`\`
```
## /.env.sample
```sample path="/.env.sample"
GEMINI_PROJECT_ID=
GITHUB_TOKEN=
```
## /.gitignore
```gitignore path="/.gitignore"
# Dependencies
node_modules/
vendor/
.pnp/
.pnp.js
# Build outputs
dist/
build/
out/
*.pyc
__pycache__/
# Environment files
.env
.env.local
.env.*.local
.env.development
.env.test
.env.production
# Python virtual environments
.venv/
venv/
# IDE - VSCode
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
# IDE - JetBrains
.idea/
*.iml
*.iws
*.ipr
# IDE - Eclipse
.project
.classpath
.settings/
# Logs
logs/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Operating System
.DS_Store
Thumbs.db
*.swp
*.swo
# Testing
coverage/
.nyc_output/
# Temporary files
*.tmp
*.temp
.cache/
# Compiled files
*.com
*.class
*.dll
*.exe
*.o
*.so
# Package files
*.7z
*.dmg
*.gz
*.iso
*.jar
*.rar
*.tar
*.zip
# Database
*.sqlite
*.sqlite3
*.db
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional REPL history
.node_repl_history
# LLM cache
llm_cache.json
# Output files
output/
```
## /.windsurfrules
```windsurfrules path="/.windsurfrules"
---
layout: default
title: "Agentic Coding"
---
# Agentic Coding: Humans Design, Agents code!
> If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification.
{: .warning }
## Agentic Coding Steps
Agentic Coding should be a collaboration between Human System Design and Agent Implementation:
| Steps | Human | AI | Comment |
|:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------|
| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. |
| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. |
| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. |
| 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. |
| 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. |
| 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. |
| 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. |
1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit.
- Understand AI systems' strengths and limitations:
- **Good for**: Routine tasks requiring common sense (filling forms, replying to emails)
- **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL)
- **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning)
- **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features.
- **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early.
2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes.
- Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)).
- For each node in the flow, start with a high-level one-line description of what it does.
- If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine).
- If using **Agent**, specify what are the inputs (context) and what are the possible actions.
- If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows.
- Outline the flow and draw it in a mermaid diagram. For example:
\`\`\`mermaid
flowchart LR
start[Start] --> batch[Batch]
batch --> check[Check]
check -->|OK| process
check -->|Error| fix[Fix]
fix --> check
subgraph process[Process]
step1[Step 1] --> step2[Step 2]
end
process --> endNode[End]
\`\`\`
- > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition.
{: .best-practice }
3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions.
- Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:

- Reading inputs (e.g., retrieving Slack messages, reading emails)
- Writing outputs (e.g., generating reports, sending emails)
- Using external tools (e.g., calling LLMs, searching the web)
- **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system.
- For each utility function, implement it and write a simple test.
- Document their input/output, as well as why they are necessary. For example:
- `name`: `get_embedding` (`utils/get_embedding.py`)
- `input`: `str`
- `output`: a vector of 3072 floats
- `necessity`: Used by the second node to embed text
- Example utility implementation:
\`\`\`python
# utils/call_llm.py
from openai import OpenAI
def call_llm(prompt):
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
if __name__ == "__main__":
prompt = "What is the meaning of life?"
print(call_llm(prompt))
\`\`\`
- > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them.
{: .best-practice }
4. **Node Design**: Plan how each node will read and write data, and use utility functions.
- One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design:
- For simple systems, use an in-memory dictionary.
- For more complex systems or when persistence is required, use a database.
- **Don't Repeat Yourself**: Use in-memory references or foreign keys.
- Example shared store design:
\`\`\`python
shared = {
"user": {
"id": "user123",
"context": { # Another nested dict
"weather": {"temp": 72, "condition": "sunny"},
"location": "San Francisco"
}
},
"results": {} # Empty dict to store outputs
}
\`\`\`
- For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
- `type`: Regular (or Batch, or Async)
- `prep`: Read "text" from the shared store
- `exec`: Call the embedding utility function
- `post`: Write "embedding" to the shared store
5. **Implementation**: Implement the initial nodes and flows based on the design.
- 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins!
- **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking.
- **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system.
- Add logging throughout the code to facilitate debugging.
7. **Optimization**:
- **Use Intuition**: For a quick initial evaluation, human intuition is often a good start.
- **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
- If your flow design is already solid, move on to micro-optimizations:
- **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity.
- **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone.
- > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times.
>
> 
{: .best-practice }
8. **Reliability**
- **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times.
- **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging.
- **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain.
## Example LLM Project File Structure
\`\`\`
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md
\`\`\`
- **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*.
- **`utils/`**: Contains all utility functions.
- It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`.
- Each file should also include a `main()` function to try that API call
- **`nodes.py`**: Contains all the node definitions.
\`\`\`python
# nodes.py
from pocketflow import Node
from utils.call_llm import call_llm
class GetQuestionNode(Node):
def exec(self, _):
# Get question directly from user input
user_question = input("Enter your question: ")
return user_question
def post(self, shared, prep_res, exec_res):
# Store the user's question
shared["question"] = exec_res
return "default" # Go to the next node
class AnswerNode(Node):
def prep(self, shared):
# Read question from shared
return shared["question"]
def exec(self, question):
# Call LLM to get the answer
return call_llm(question)
def post(self, shared, prep_res, exec_res):
# Store the answer in shared
shared["answer"] = exec_res
\`\`\`
- **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them.
\`\`\`python
# flow.py
from pocketflow import Flow
from nodes import GetQuestionNode, AnswerNode
def create_qa_flow():
"""Create and return a question-answering flow."""
# Create nodes
get_question_node = GetQuestionNode()
answer_node = AnswerNode()
# Connect nodes in sequence
get_question_node >> answer_node
# Create flow starting with input node
return Flow(start=get_question_node)
\`\`\`
- **`main.py`**: Serves as the project's entry point.
\`\`\`python
# main.py
from flow import create_qa_flow
# Example main function
# Please replace this with your own main function
def main():
shared = {
"question": None, # Will be populated by GetQuestionNode from user input
"answer": None # Will be populated by AnswerNode
}
# Create the flow and run it
qa_flow = create_qa_flow()
qa_flow.run(shared)
print(f"Question: {shared['question']}")
print(f"Answer: {shared['answer']}")
if __name__ == "__main__":
main()
\`\`\`
================================================
File: docs/index.md
================================================
---
layout: default
title: "Home"
nav_order: 1
---
# Pocket Flow
A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*.
- **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in.
- **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more.
- **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction
We model the LLM workflow as a **Graph + Shared Store**:
- [Node](./core_abstraction/node.md) handles simple (LLM) tasks.
- [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges).
- [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows.
- [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks.
- [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks.
- [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern
From there, it’s easy to implement popular design patterns:
- [Agent](./design_pattern/agent.md) autonomously makes decisions.
- [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines.
- [RAG](./design_pattern/rag.md) integrates data retrieval with generation.
- [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps.
- [Structured Output](./design_pattern/structure.md) formats outputs consistently.
- [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function
We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*:
- [LLM Wrapper](./utility_function/llm.md)
- [Viz and Debug](./utility_function/viz.md)
- [Web Search](./utility_function/websearch.md)
- [Chunking](./utility_function/chunking.md)
- [Embedding](./utility_function/embedding.md)
- [Vector Databases](./utility_function/vector.md)
- [Text-to-Speech](./utility_function/text_to_speech.md)
**Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework:
- *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs.
- *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally.
- *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in.
## Ready to build your Apps?
Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow!
================================================
File: docs/core_abstraction/async.md
================================================
---
layout: default
title: "(Advanced) Async"
parent: "Core Abstraction"
nav_order: 5
---
# (Advanced) Async
**Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for:
1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way.
2. **exec_async()**: Typically used for async LLM calls.
3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`.
**Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes.
### Example
\`\`\`python
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())
\`\`\`
================================================
File: docs/core_abstraction/batch.md
================================================
---
layout: default
title: "Batch"
parent: "Core Abstraction"
nav_order: 4
---
# Batch
**Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases:
- **Chunk-based** processing (e.g., splitting large texts).
- **Iterative** processing over lists of input items (e.g., user queries, files, URLs).
## 1. BatchNode
A **BatchNode** extends `Node` but changes `prep()` and `exec()`:
- **`prep(shared)`**: returns an **iterable** (e.g., list, generator).
- **`exec(item)`**: called **once** per item in that iterable.
- **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**.
### Example: Summarize a Large File
\`\`\`python
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
\`\`\`
---
## 2. BatchFlow
A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set.
### Example: Summarize Many Files
\`\`\`python
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
\`\`\`
### Under the Hood
1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`.
2. The **BatchFlow** loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own `params`.
- It calls `flow.run(shared)` using the merged result.
3. This means the sub-Flow is run **repeatedly**, once for every param dict.
---
## 3. Nested or Multi-Level Batches
You can nest a **BatchFlow** in another **BatchFlow**. For instance:
- **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...).
- **Inner** batch: returning a list of per-file param dicts.
At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
\`\`\`python
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)
\`\`\`
================================================
File: docs/core_abstraction/communication.md
================================================
---
layout: default
title: "Communication"
parent: "Core Abstraction"
nav_order: 3
---
# Communication
Nodes and Flows **communicate** in 2 ways:
1. **Shared Store (for almost all the cases)**
- A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`).
- Great for data results, large content, or anything multiple nodes need.
- You shall design the data structure and populate it ahead.
- > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md).
{: .best-practice }
2. **Params (only for [Batch](./batch.md))**
- Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**.
- Good for identifiers like filenames or numeric IDs, in Batch mode.
If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller).
---
## 1. Shared Store
### Overview
A shared store is typically an in-mem dictionary, like:
\`\`\`python
shared = {"data": {}, "summary": {}, "config": {...}, ...}
\`\`\`
It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
### Example
\`\`\`python
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)
\`\`\`
Here:
- `LoadData` writes to `shared["data"]`.
- `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`.
---
## 2. Params
**Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are:
- **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`).
- **Set** via `set_params()`.
- **Cleared** and updated each time a parent Flow calls it.
> Only set the uppermost Flow params because others will be overwritten by the parent Flow.
>
> If you need to set child node params, see [Batch](./batch.md).
{: .warning }
Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
### Example
\`\`\`python
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1
\`\`\`
================================================
File: docs/core_abstraction/flow.md
================================================
---
layout: default
title: "Flow"
parent: "Core Abstraction"
nav_order: 2
---
# Flow
A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`.
## 1. Action-based Transitions
Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`.
You define transitions with the syntax:
1. **Basic default transition**: `node_a >> node_b`
This means if `node_a.post()` returns `"default"`, go to `node_b`.
(Equivalent to `node_a - "default" >> node_b`)
2. **Named action transition**: `node_a - "action_name" >> node_b`
This means if `node_a.post()` returns `"action_name"`, go to `node_b`.
It's possible to create loops, branching, or multi-step flows.
## 2. Creating a Flow
A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node.
### Example: Simple Sequence
Here's a minimal flow of two nodes in a chain:
\`\`\`python
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)
\`\`\`
- When you run the flow, it executes `node_a`.
- Suppose `node_a.post()` returns `"default"`.
- The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`.
- `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there.
### Example: Branching & Looping
Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions:
- `"approved"`: expense is approved, move to payment processing
- `"needs_revision"`: expense needs changes, send back for revision
- `"rejected"`: expense is denied, finish the process
We can wire them like this:
\`\`\`python
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)
\`\`\`
Let's see how it flows:
1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node
2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review`
3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops
\`\`\`mermaid
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
\`\`\`
### Running Individual Nodes vs. Running a Flow
- `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action.
- `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
> `node.run(shared)` **does not** proceed to the successor.
> This is mainly for debugging or testing a single node.
>
> Always use `flow.run(...)` in production to ensure the full pipeline runs correctly.
{: .warning }
## 3. Nested Flows
A **Flow** can act like a Node, which enables powerful composition patterns. This means you can:
1. Use a Flow as a Node within another Flow's transitions.
2. Combine multiple smaller Flows into a larger Flow for reuse.
3. Node `params` will be a merging of **all** parents' `params`.
### Flow's Node Methods
A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However:
- It **won't** run `exec()`, as its main logic is to orchestrate its nodes.
- `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store.
### Basic Flow Nesting
Here's how to connect a flow to another node:
\`\`\`python
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)
\`\`\`
When `parent_flow.run()` executes:
1. It starts `subflow`
2. `subflow` runs through its nodes (`node_a->node_b`)
3. After `subflow` completes, execution continues to `node_c`
### Example: Order Processing Pipeline
Here's a practical example that breaks down order processing into nested flows:
\`\`\`python
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)
\`\`\`
This creates a clean separation of concerns while maintaining a clear execution path:
\`\`\`mermaid
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
\`\`\`
================================================
File: docs/core_abstraction/node.md
================================================
---
layout: default
title: "Node"
parent: "Core Abstraction"
nav_order: 1
---
# Node
A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)`
- **Read and preprocess data** from `shared` store.
- Examples: *query DB, read files, or serialize data into a string*.
- Return `prep_res`, which is used by `exec()` and `post()`.
2. `exec(prep_res)`
- **Execute compute logic**, with optional retries and error handling (below).
- Examples: *(mostly) LLM calls, remote APIs, tool use*.
- ⚠️ This shall be only for compute and **NOT** access `shared`.
- ⚠️ If retries enabled, ensure idempotent implementation.
- Return `exec_res`, which is passed to `post()`.
3. `post(shared, prep_res, exec_res)`
- **Postprocess and write data** back to `shared`.
- Examples: *update DB, change states, log results*.
- **Decide the next action** by returning a *string* (`action = "default"` if *None*).
> **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately.
>
> All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data.
{: .note }
### Fault Tolerance & Retries
You can **retry** `exec()` if it raises an exception via two parameters when define the Node:
- `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry).
- `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting).
`wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
\`\`\`python
my_node = SummarizeFile(max_retries=3, wait=10)
\`\`\`
When an exception occurs in `exec()`, the Node automatically retries until:
- It either succeeds, or
- The Node has retried `max_retries - 1` times already and fails on the last attempt.
You can get the current retry times (0-based) from `self.cur_retry`.
\`\`\`python
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")
\`\`\`
### Graceful Fallback
To **gracefully handle** the exception (after all retries) rather than raising it, override:
\`\`\`python
def exec_fallback(self, prep_res, exc):
raise exc
\`\`\`
By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`.
### Example: Summarize file
\`\`\`python
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])
\`\`\`
================================================
File: docs/core_abstraction/parallel.md
================================================
---
layout: default
title: "(Advanced) Parallel"
parent: "Core Abstraction"
nav_order: 6
---
# (Advanced) Parallel
**Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
> Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O.
{: .warning }
> - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize.
>
> - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals).
>
> - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits.
{: .best-practice }
## AsyncParallelBatchNode
Like **AsyncBatchNode**, but run `exec_async()` in **parallel**:
\`\`\`python
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)
\`\`\`
## AsyncParallelBatchFlow
Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters:
\`\`\`python
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)
\`\`\`
================================================
File: docs/design_pattern/agent.md
================================================
---
layout: default
title: "Agent"
parent: "Design Pattern"
nav_order: 1
---
# Agent
Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph
1. **Context and Action:** Implement nodes that supply context and perform actions.
2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step.
3. **Agent Node:** Provide a prompt to decide action—for example:
\`\`\`python
f"""
### CONTEXT
Task: {task_description}
Previous Actions: {previous_actions}
Current State: {current_state}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (str): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (str): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
\`\`\`yaml
thinking: |
action:
parameters:
:
\`\`\`"""
\`\`\`
The core of building **high-performance** and **reliable** agents boils down to:
1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content.
2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database.
## Example Good Action Design
- **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once.
- **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts).
- **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files.
- **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends.
## Example: Search Agent
This agent:
1. Decides whether to search or answer
2. If searches, loops back to decide if more search needed
3. Answers when enough context gathered
\`\`\`python
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
\`\`\`yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
\`\`\`"""
resp = call_llm(prompt)
yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})
\`\`\`
================================================
File: docs/design_pattern/mapreduce.md
================================================
---
layout: default
title: "Map Reduce"
parent: "Design Pattern"
nav_order: 4
---
# Map Reduce
MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase.
### Example: Document Summarization
\`\`\`python
class SummarizeAllFiles(BatchNode):
def prep(self, shared):
files_dict = shared["files"] # e.g. 10 files
return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...]
def exec(self, one_file):
filename, file_content = one_file
summary_text = call_llm(f"Summarize the following file:\n{file_content}")
return (filename, summary_text)
def post(self, shared, prep_res, exec_res_list):
shared["file_summaries"] = dict(exec_res_list)
class CombineSummaries(Node):
def prep(self, shared):
return shared["file_summaries"]
def exec(self, file_summaries):
# format as: "File1: summary\nFile2: summary...\n"
text_list = []
for fname, summ in file_summaries.items():
text_list.append(f"{fname} summary:\n{summ}\n")
big_text = "\n---\n".join(text_list)
return call_llm(f"Combine these file summaries into one final summary:\n{big_text}")
def post(self, shared, prep_res, final_summary):
shared["all_files_summary"] = final_summary
batch_node = SummarizeAllFiles()
combine_node = CombineSummaries()
batch_node >> combine_node
flow = Flow(start=batch_node)
shared = {
"files": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "Some other interesting text ...",
# ...
}
}
flow.run(shared)
print("Individual Summaries:", shared["file_summaries"])
print("\nFinal Summary:\n", shared["all_files_summary"])
\`\`\`
================================================
File: docs/design_pattern/rag.md
================================================
---
layout: default
title: "RAG"
parent: "Design Pattern"
nav_order: 3
---
# RAG (Retrieval Augmented Generation)
For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index").
2. **Online stage**: Given a question, generate answers by retrieving the most relevant context.
---
## Stage 1: Offline Indexing
We create three Nodes:
1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text.
2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk.
3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md).
\`\`\`python
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)
\`\`\`
Usage example:
\`\`\`python
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)
\`\`\`
---
## Stage 2: Online Query & Answer
We have 3 nodes:
1. `EmbedQuery` – embeds the user’s question.
2. `RetrieveDocs` – retrieves top chunk from the index.
3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer.
\`\`\`python
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)
\`\`\`
Usage example:
\`\`\`python
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]
\`\`\`
================================================
File: docs/design_pattern/structure.md
================================================
---
layout: default
title: "Structured Output"
parent: "Design Pattern"
nav_order: 5
---
# Structured Output
In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- **Prompting** the LLM to strictly return a defined structure.
- Using LLMs that natively support **schema enforcement**.
- **Post-processing** the LLM's response to extract structured content.
In practice, **Prompting** is simple and reliable for modern LLMs.
### Example Use Cases
- Extracting Key Information
\`\`\`yaml
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.
\`\`\`
- Summarizing Documents into Bullet Points
\`\`\`yaml
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.
\`\`\`
- Generating Configuration Files
\`\`\`yaml
server:
host: 127.0.0.1
port: 8080
ssl: true
\`\`\`
## Prompt Engineering
When prompting the LLM to produce **structured** output:
1. **Wrap** the structure in code fences (e.g., `yaml`).
2. **Validate** that all required fields exist (and let `Node` handles retry).
### Example Text Summarization
\`\`\`python
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
\`\`\`yaml
summary:
- bullet 1
- bullet 2
- bullet 3
\`\`\`"""
response = call_llm(prompt)
yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_result
\`\`\`
> Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic)
{: .note }
### Why YAML instead of JSON?
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
**In JSON**
\`\`\`json
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}
\`\`\`
- Every double quote inside the string must be escaped with `\"`.
- Each newline in the dialogue must be represented as `\n`.
**In YAML**
\`\`\`yaml
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."
\`\`\`
- No need to escape interior quotes—just place the entire text under a block literal (`|`).
- Newlines are naturally preserved without needing `\n`.
================================================
File: docs/design_pattern/workflow.md
================================================
---
layout: default
title: "Workflow"
parent: "Design Pattern"
nav_order: 2
---
# Workflow
Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*.
> - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*.
>
> You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md).
{: .best-practice }
### Example: Article Writing
\`\`\`python
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)
\`\`\`
For *dynamic cases*, consider using [Agents](./agent.md).
================================================
File: docs/utility_function/llm.md
================================================
---
layout: default
title: "LLM Wrapper"
parent: "Utility Function"
nav_order: 1
---
# LLM Wrappers
Check out libraries like [litellm](https://github.com/BerriAI/litellm).
Here, we provide some minimal example implementations:
1. OpenAI
\`\`\`python
def call_llm(prompt):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
# Example usage
call_llm("How are you?")
\`\`\`
> Store the API key in an environment variable like OPENAI_API_KEY for security.
{: .best-practice }
2. Claude (Anthropic)
\`\`\`python
def call_llm(prompt):
from anthropic import Anthropic
client = Anthropic(api_key="YOUR_API_KEY_HERE")
response = client.messages.create(
model="claude-2",
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
return response.content
\`\`\`
3. Google (Generative AI Studio / PaLM API)
\`\`\`python
def call_llm(prompt):
import google.generativeai as genai
genai.configure(api_key="YOUR_API_KEY_HERE")
response = genai.generate_text(
model="models/text-bison-001",
prompt=prompt
)
return response.result
\`\`\`
4. Azure (Azure OpenAI)
\`\`\`python
def call_llm(prompt):
from openai import AzureOpenAI
client = AzureOpenAI(
azure_endpoint="https://.openai.azure.com/",
api_key="YOUR_API_KEY_HERE",
api_version="2023-05-15"
)
r = client.chat.completions.create(
model="",
messages=[{"role": "user", "content": prompt}]
)
return r.choices[0].message.content
\`\`\`
5. Ollama (Local LLM)
\`\`\`python
def call_llm(prompt):
from ollama import chat
response = chat(
model="llama2",
messages=[{"role": "user", "content": prompt}]
)
return response.message.content
\`\`\`
## Improvements
Feel free to enhance your `call_llm` function as needed. Here are examples:
- Handle chat history:
\`\`\`python
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return r.choices[0].message.content
\`\`\`
- Add in-memory caching
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
\`\`\`
> ⚠️ Caching conflicts with Node retries, as retries yield the same result.
>
> To address this, you could use cached results only if not retried.
{: .warning }
\`\`\`python
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_call(prompt):
pass
def call_llm(prompt, use_cache):
if use_cache:
return cached_call(prompt)
# Call the underlying function directly
return cached_call.__wrapped__(prompt)
class SummarizeNode(Node):
def exec(self, text):
return call_llm(f"Summarize: {text}", self.cur_retry==0)
\`\`\`
- Enable logging:
\`\`\`python
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response
\`\`\`
```
## /LICENSE
``` path="/LICENSE"
MIT License
Copyright (c) 2025 Zachary Huang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```
## /README.md
Turns Codebase into Easy Tutorial with AI

> *Ever stared at a new codebase written by others feeling completely lost? This tutorial shows you how to build an AI agent that analyzes GitHub repositories and creates beginner-friendly tutorials explaining exactly how the code works.*
This is a tutorial project of [Pocket Flow](https://github.com/The-Pocket/PocketFlow), a 100-line LLM framework. It crawls GitHub repositories and builds a knowledge base from the code. It analyzes entire codebases to identify core abstractions and how they interact, and transforms complex code into beginner-friendly tutorials with clear visualizations.
- Check out the [YouTube Development Tutorial](https://youtu.be/AFY67zOpbSo) for more!
- Check out the [Substack Post Tutorial](https://zacharyhuang.substack.com/p/ai-codebase-knowledge-builder-full) for more!
**🔸 🎉 Reached Hacker News Front Page** (April 2025) with >800 up‑votes: [Discussion »](https://news.ycombinator.com/item?id=43739456)
## ⭐ Example Results for Popular GitHub Repositories!
🤯 All these tutorials are generated **entirely by AI** by crawling the GitHub repo!
- [AutoGen Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/AutoGen%20Core) - Build AI teams that talk, think, and solve problems together like coworkers!
- [Browser Use](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Browser%20Use) - Let AI surf the web for you, clicking buttons and filling forms like a digital assistant!
- [Celery](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Celery) - Supercharge your app with background tasks that run while you sleep!
- [Click](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Click) - Turn Python functions into slick command-line tools with just a decorator!
- [Codex](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Codex) - Turn plain English into working code with this AI terminal wizard!
- [Crawl4AI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Crawl4AI) - Train your AI to extract exactly what matters from any website!
- [CrewAI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/CrewAI) - Assemble a dream team of AI specialists to tackle impossible problems!
- [DSPy](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/DSPy) - Build LLM apps like Lego blocks that optimize themselves!
- [FastAPI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/FastAPI) - Create APIs at lightning speed with automatic docs that clients will love!
- [Flask](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Flask) - Craft web apps with minimal code that scales from prototype to production!
- [Google A2A](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Google%20A2A) - The universal language that lets AI agents collaborate across borders!
- [LangGraph](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LangGraph) - Design AI agents as flowcharts where each step remembers what happened before!
- [LevelDB](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LevelDB) - Store data at warp speed with Google's engine that powers blockchains!
- [MCP Python SDK](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/MCP%20Python%20SDK) - Build powerful apps that communicate through an elegant protocol without sweating the details!
- [NumPy Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/NumPy%20Core) - Master the engine behind data science that makes Python as fast as C!
- [OpenManus](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/OpenManus) - Build AI agents with digital brains that think, learn, and use tools just like humans do!
- [Pydantic Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Pydantic%20Core) - Validate data at rocket speed with just Python type hints!
- [Requests](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Requests) - Talk to the internet in Python with code so simple it feels like cheating!
- [SmolaAgents](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/SmolaAgents) - Build tiny AI agents that punch way above their weight class!
- Showcase Your AI-Generated Tutorials in [Discussions](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge/discussions)!
## 🚀 Getting Started
1. Clone this repository
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Set up LLM in [`utils/call_llm.py`](./utils/call_llm.py) by providing credentials. By default, you can use the AI Studio key with this client for Gemini Pro 2.5:
```python
client = genai.Client(
api_key=os.getenv("GEMINI_API_KEY", "your-api_key"),
)
```
You can use your own models. We highly recommend the latest models with thinking capabilities (Claude 3.7 with thinking, O1). You can verify that it is correctly set up by running:
```bash
python utils/call_llm.py
```
4. Generate a complete codebase tutorial by running the main script:
```bash
# Analyze a GitHub repository
python main.py --repo https://github.com/username/repo --include "*.py" "*.js" --exclude "tests/*" --max-size 50000
# Or, analyze a local directory
python main.py --dir /path/to/your/codebase --include "*.py" --exclude "*test*"
# Or, generate a tutorial in Chinese
python main.py --repo https://github.com/username/repo --language "Chinese"
```
- `--repo` or `--dir` - Specify either a GitHub repo URL or a local directory path (required, mutually exclusive)
- `-n, --name` - Project name (optional, derived from URL/directory if omitted)
- `-t, --token` - GitHub token (or set GITHUB_TOKEN environment variable)
- `-o, --output` - Output directory (default: ./output)
- `-i, --include` - Files to include (e.g., "*.py" "*.js")
- `-e, --exclude` - Files to exclude (e.g., "tests/*" "docs/*")
- `-s, --max-size` - Maximum file size in bytes (default: 100KB)
- `--language` - Language for the generated tutorial (default: "english")
The application will crawl the repository, analyze the codebase structure, generate tutorial content in the specified language, and save the output in the specified directory (default: ./output).
## 💡 Development Tutorial
- I built using [**Agentic Coding**](https://zacharyhuang.substack.com/p/agentic-coding-the-most-fun-way-to), the fastest development paradigm, where humans simply [design](docs/design.md) and agents [code](flow.py).
- The secret weapon is [Pocket Flow](https://github.com/The-Pocket/PocketFlow), a 100-line LLM framework that lets Agents (e.g., Cursor AI) build for you
- Check out the Step-by-step YouTube development tutorial:
## /assets/banner.png
Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/banner.png
## /assets/example.png
Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/example.png
## /assets/youtube_thumbnail.png
Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/youtube_thumbnail.png
## /docs/AutoGen Core/01_agent.md
---
layout: default
title: "Agent"
parent: "AutoGen Core"
nav_order: 1
---
# Chapter 1: Agent - The Workers of AutoGen
Welcome to the AutoGen Core tutorial! We're excited to guide you through building powerful applications with autonomous agents.
## Motivation: Why Do We Need Agents?
Imagine you want to build an automated system to write blog posts. You might need one part of the system to research a topic and another part to write the actual post based on the research. How do you represent these different "workers" and make them talk to each other?
This is where the concept of an **Agent** comes in. In AutoGen Core, an `Agent` is the fundamental building block representing an actor or worker in your system. Think of it like an employee in an office.
## Key Concepts: Understanding Agents
Let's break down what makes an Agent:
1. **It's a Worker:** An Agent is designed to *do* things. This could be running calculations, calling a Large Language Model (LLM) like ChatGPT, using a tool (like a search engine), or managing a piece of data.
2. **It Has an Identity (`AgentId`):** Just like every employee has a name and a job title, every Agent needs a unique identity. This identity, called `AgentId`, has two parts:
* `type`: What kind of role does the agent have? (e.g., "researcher", "writer", "coder"). This helps organize agents.
* `key`: A unique name for this specific agent instance (e.g., "researcher-01", "amy-the-writer").
```python
# From: _agent_id.py
class AgentId:
def __init__(self, type: str, key: str) -> None:
# ... (validation checks omitted for brevity)
self._type = type
self._key = key
@property
def type(self) -> str:
return self._type
@property
def key(self) -> str:
return self._key
def __str__(self) -> str:
# Creates an id like "researcher/amy-the-writer"
return f"{self._type}/{self._key}"
```
This `AgentId` acts like the agent's address, allowing other agents (or the system) to send messages specifically to it.
3. **It Has Metadata (`AgentMetadata`):** Besides its core identity, an agent often has descriptive information.
* `type`: Same as in `AgentId`.
* `key`: Same as in `AgentId`.
* `description`: A human-readable explanation of what the agent does (e.g., "Researches topics using web search").
```python
# From: _agent_metadata.py
from typing import TypedDict
class AgentMetadata(TypedDict):
type: str
key: str
description: str
```
This metadata helps understand the agent's purpose within the system.
4. **It Communicates via Messages:** Agents don't work in isolation. They collaborate by sending and receiving messages. The primary way an agent receives work is through its `on_message` method. Think of this like the agent's inbox.
```python
# From: _agent.py (Simplified Agent Protocol)
from typing import Any, Mapping, Protocol
# ... other imports
class Agent(Protocol):
@property
def id(self) -> AgentId: ... # The agent's unique ID
async def on_message(self, message: Any, ctx: MessageContext) -> Any:
"""Handles an incoming message."""
# Agent's logic to process the message goes here
...
```
When an agent receives a message, `on_message` is called. The `message` contains the data or task, and `ctx` (MessageContext) provides extra information about the message (like who sent it). We'll cover `MessageContext` more later.
5. **It Can Remember Things (State):** Sometimes, an agent needs to remember information between tasks, like keeping notes on research progress. Agents can optionally implement `save_state` and `load_state` methods to store and retrieve their internal memory.
```python
# From: _agent.py (Simplified Agent Protocol)
class Agent(Protocol):
# ... other methods
async def save_state(self) -> Mapping[str, Any]:
"""Save the agent's internal memory."""
# Return a dictionary representing the state
...
async def load_state(self, state: Mapping[str, Any]) -> None:
"""Load the agent's internal memory."""
# Restore state from the dictionary
...
```
We'll explore state and memory in more detail in [Chapter 7: Memory](07_memory.md).
6. **Different Agent Types:** AutoGen Core provides base classes to make creating agents easier:
* `BaseAgent`: The fundamental class most agents inherit from. It provides common setup.
* `ClosureAgent`: A very quick way to create simple agents using just a function (like hiring a temp worker for a specific task defined on the spot).
* `RoutedAgent`: An agent that can automatically direct different types of messages to different internal handler methods (like a smart receptionist).
## Use Case Example: Researcher and Writer
Let's revisit our blog post example. We want a `Researcher` agent and a `Writer` agent.
**Goal:**
1. Tell the `Researcher` a topic (e.g., "AutoGen Agents").
2. The `Researcher` finds some facts (we'll keep it simple and just make them up for now).
3. The `Researcher` sends these facts to the `Writer`.
4. The `Writer` receives the facts and drafts a short post.
**Simplified Implementation Idea (using `ClosureAgent` for brevity):**
First, let's define the messages they might exchange:
```python
from dataclasses import dataclass
@dataclass
class ResearchTopic:
topic: str
@dataclass
class ResearchFacts:
topic: str
facts: list[str]
@dataclass
class DraftPost:
topic: str
draft: str
```
These are simple Python classes to hold the data being passed around.
Now, let's imagine defining the `Researcher` using a `ClosureAgent`. This agent will listen for `ResearchTopic` messages.
```python
# Simplified concept - requires AgentRuntime (Chapter 3) to actually run
async def researcher_logic(agent_context, message: ResearchTopic, msg_context):
print(f"Researcher received topic: {message.topic}")
# In a real scenario, this would involve searching, calling an LLM, etc.
# For now, we just make up facts.
facts = [f"Fact 1 about {message.topic}", f"Fact 2 about {message.topic}"]
print(f"Researcher found facts: {facts}")
# Find the Writer agent's ID (we assume we know it)
writer_id = AgentId(type="writer", key="blog_writer_1")
# Send the facts to the Writer
await agent_context.send_message(
message=ResearchFacts(topic=message.topic, facts=facts),
recipient=writer_id,
)
print("Researcher sent facts to Writer.")
# This agent doesn't return a direct reply
return None
```
This `researcher_logic` function defines *what* the researcher does when it gets a `ResearchTopic` message. It processes the topic, creates `ResearchFacts`, and uses `agent_context.send_message` to send them to the `writer` agent.
Similarly, the `Writer` agent would have its own logic:
```python
# Simplified concept - requires AgentRuntime (Chapter 3) to actually run
async def writer_logic(agent_context, message: ResearchFacts, msg_context):
print(f"Writer received facts for topic: {message.topic}")
# In a real scenario, this would involve LLM prompting
draft = f"Blog Post about {message.topic}:\n"
for fact in message.facts:
draft += f"- {fact}\n"
print(f"Writer drafted post:\n{draft}")
# Perhaps save the draft or send it somewhere else
# For now, we just print it. We don't send another message.
return None # Or maybe return a confirmation/result
```
This `writer_logic` function defines how the writer reacts to receiving `ResearchFacts`.
**Important:** To actually *run* these agents and make them communicate, we need the `AgentRuntime` (covered in [Chapter 3: AgentRuntime](03_agentruntime.md)) and the `Messaging System` (covered in [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md)). For now, focus on the *idea* that Agents are distinct workers defined by their logic (`on_message`) and identified by their `AgentId`.
## Under the Hood: How an Agent Gets a Message
While the full message delivery involves the `Messaging System` and `AgentRuntime`, let's look at the agent's role when it receives a message.
**Conceptual Flow:**
```mermaid
sequenceDiagram
participant Sender as Sender Agent
participant Runtime as AgentRuntime
participant Recipient as Recipient Agent
Sender->>+Runtime: send_message(message, recipient_id)
Runtime->>+Recipient: Locate agent by recipient_id
Runtime->>+Recipient: on_message(message, context)
Recipient->>Recipient: Process message using internal logic
alt Response Needed
Recipient->>-Runtime: Return response value
Runtime->>-Sender: Deliver response value
else No Response
Recipient->>-Runtime: Return None (or no return)
end
```
1. Some other agent (Sender) or the system decides to send a message to our agent (Recipient).
2. It tells the `AgentRuntime` (the manager): "Deliver this `message` to the agent with `recipient_id`".
3. The `AgentRuntime` finds the correct `Recipient` agent instance.
4. The `AgentRuntime` calls the `Recipient.on_message(message, context)` method.
5. The agent's internal logic inside `on_message` (or methods called by it, like in `RoutedAgent`) runs to process the message.
6. If the message requires a direct response (like an RPC call), the agent returns a value from `on_message`. If not (like a general notification or event), it might return `None`.
**Code Glimpse:**
The core definition is the `Agent` Protocol (`_agent.py`). It's like an interface or a contract – any class wanting to be an Agent *must* provide these methods.
```python
# From: _agent.py - The Agent blueprint (Protocol)
@runtime_checkable
class Agent(Protocol):
@property
def metadata(self) -> AgentMetadata: ...
@property
def id(self) -> AgentId: ...
async def on_message(self, message: Any, ctx: MessageContext) -> Any: ...
async def save_state(self) -> Mapping[str, Any]: ...
async def load_state(self, state: Mapping[str, Any]) -> None: ...
async def close(self) -> None: ...
```
Most agents you create will inherit from `BaseAgent` (`_base_agent.py`). It provides some standard setup:
```python
# From: _base_agent.py (Simplified)
class BaseAgent(ABC, Agent):
def __init__(self, description: str) -> None:
# Gets runtime & id from a special context when created by the runtime
# Raises error if you try to create it directly!
self._runtime: AgentRuntime = AgentInstantiationContext.current_runtime()
self._id: AgentId = AgentInstantiationContext.current_agent_id()
self._description = description
# ...
# This is the final version called by the runtime
@final
async def on_message(self, message: Any, ctx: MessageContext) -> Any:
# It calls the implementation method you need to write
return await self.on_message_impl(message, ctx)
# You MUST implement this in your subclass
@abstractmethod
async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any: ...
# Helper to send messages easily
async def send_message(self, message: Any, recipient: AgentId, ...) -> Any:
# It just asks the runtime to do the actual sending
return await self._runtime.send_message(
message, sender=self.id, recipient=recipient, ...
)
# ... other methods like publish_message, save_state, load_state
```
Notice how `BaseAgent` handles getting its `id` and `runtime` during creation and provides a convenient `send_message` method that uses the runtime. When inheriting from `BaseAgent`, you primarily focus on implementing the `on_message_impl` method to define your agent's unique behavior.
## Next Steps
You now understand the core concept of an `Agent` in AutoGen Core! It's the fundamental worker unit with an identity, the ability to process messages, and optionally maintain state.
In the next chapters, we'll explore:
* [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md): How messages actually travel between agents.
* [Chapter 3: AgentRuntime](03_agentruntime.md): The manager responsible for creating, running, and connecting agents.
Let's continue building your understanding!
---
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
## /docs/AutoGen Core/02_messaging_system__topic___subscription_.md
---
layout: default
title: "Messaging System"
parent: "AutoGen Core"
nav_order: 2
---
# Chapter 2: Messaging System (Topic & Subscription)
In [Chapter 1: Agent](01_agent.md), we learned about Agents as individual workers. But how do they coordinate when one agent doesn't know exactly *who* needs the information it produces? Imagine our Researcher finds some facts. Maybe the Writer needs them, but maybe a Fact-Checker agent or a Summary agent also needs them later. How can the Researcher just announce "Here are the facts!" without needing a specific mailing list?
This is where the **Messaging System**, specifically **Topics** and **Subscriptions**, comes in. It allows agents to broadcast messages to anyone interested, like posting on a company announcement board.
## Motivation: Broadcasting Information
Let's refine our blog post example:
1. The `Researcher` agent finds facts about "AutoGen Agents".
2. Instead of sending *directly* to the `Writer`, the `Researcher` **publishes** these facts to a general "research-results" **Topic**.
3. The `Writer` agent has previously told the system it's **subscribed** to the "research-results" Topic.
4. The system sees the new message on the Topic and delivers it to the `Writer` (and any other subscribers).
This way, the `Researcher` doesn't need to know who the `Writer` is, or even if a `Writer` exists! It just broadcasts the results. If we later add a `FactChecker` agent that also needs the results, it simply subscribes to the same Topic.
## Key Concepts: Topics and Subscriptions
Let's break down the components of this broadcasting system:
1. **Topic (`TopicId`): The Announcement Board**
* A `TopicId` represents a specific channel or category for messages. Think of it like the name of an announcement board (e.g., "Project Updates", "General Announcements").
* It has two main parts:
* `type`: What *kind* of event or information is this? (e.g., "research.completed", "user.request"). This helps categorize messages.
* `source`: *Where* or *why* did this event originate? Often, this relates to the specific task or context (e.g., the specific blog post being researched like "autogen-agents-blog-post", or the team generating the event like "research-team").
```python
# From: _topic.py (Simplified)
from dataclasses import dataclass
@dataclass(frozen=True) # Immutable: can't change after creation
class TopicId:
type: str
source: str
def __str__(self) -> str:
# Creates an id like "research.completed/autogen-agents-blog-post"
return f"{self.type}/{self.source}"
```
This structure allows for flexible filtering. Agents might subscribe to all topics of a certain `type`, regardless of the `source`, or only to topics with a specific `source`.
2. **Publishing: Posting the Announcement**
* When an agent has information to share broadly, it *publishes* a message to a specific `TopicId`.
* This is like pinning a note to the designated announcement board. The agent doesn't need to know who will read it.
3. **Subscription (`Subscription`): Signing Up for Updates**
* A `Subscription` is how an agent declares its interest in certain `TopicId`s.
* It acts like a rule: "If a message is published to a Topic that matches *this pattern*, please deliver it to *this kind of agent*".
* The `Subscription` links a `TopicId` pattern (e.g., "all topics with type `research.completed`") to an `AgentId` (or a way to determine the `AgentId`).
4. **Routing: Delivering the Mail**
* The `AgentRuntime` (the system manager we'll meet in [Chapter 3: AgentRuntime](03_agentruntime.md)) keeps track of all active `Subscription`s.
* When a message is published to a `TopicId`, the `AgentRuntime` checks which `Subscription`s match that `TopicId`.
* For each match, it uses the `Subscription`'s rule to figure out which specific `AgentId` should receive the message and delivers it.
## Use Case Example: Researcher Publishes, Writer Subscribes
Let's see how our Researcher and Writer can use this system.
**Goal:** Researcher publishes facts to a topic, Writer receives them via subscription.
**1. Define the Topic:**
We need a `TopicId` for research results. Let's say the `type` is "research.facts.available" and the `source` identifies the specific research task (e.g., "blog-post-autogen").
```python
# From: _topic.py
from autogen_core import TopicId
# Define the topic for this specific research task
research_topic_id = TopicId(type="research.facts.available", source="blog-post-autogen")
print(f"Topic ID: {research_topic_id}")
# Output: Topic ID: research.facts.available/blog-post-autogen
```
This defines the "announcement board" we'll use.
**2. Researcher Publishes:**
The `Researcher` agent, after finding facts, will use its `agent_context` (provided by the runtime) to publish the `ResearchFacts` message to this topic.
```python
# Simplified concept - Researcher agent logic
# Assume 'agent_context' and 'message' (ResearchTopic) are provided
# Define the facts message (from Chapter 1)
@dataclass
class ResearchFacts:
topic: str
facts: list[str]
async def researcher_publish_logic(agent_context, message: ResearchTopic, msg_context):
print(f"Researcher working on: {message.topic}")
facts_data = ResearchFacts(
topic=message.topic,
facts=[f"Fact A about {message.topic}", f"Fact B about {message.topic}"]
)
# Define the specific topic for this task's results
results_topic = TopicId(type="research.facts.available", source=message.topic) # Use message topic as source
# Publish the facts to the topic
await agent_context.publish_message(message=facts_data, topic_id=results_topic)
print(f"Researcher published facts to topic: {results_topic}")
# No direct reply needed
return None
```
Notice the `agent_context.publish_message` call. The Researcher doesn't specify a recipient, only the topic.
**3. Writer Subscribes:**
The `Writer` agent needs to tell the system it's interested in messages on topics like "research.facts.available". We can use a predefined `Subscription` type called `TypeSubscription`. This subscription typically means: "I am interested in all topics with this *exact type*. When a message arrives, create/use an agent of *my type* whose `key` matches the topic's `source`."
```python
# From: _type_subscription.py (Simplified Concept)
from autogen_core import TypeSubscription, BaseAgent
class WriterAgent(BaseAgent):
# ... agent implementation ...
async def on_message_impl(self, message: ResearchFacts, ctx):
# This method gets called when a subscribed message arrives
print(f"Writer ({self.id}) received facts via subscription: {message.facts}")
# ... process facts and write draft ...
# How the Writer subscribes (usually done during runtime setup - Chapter 3)
# This tells the runtime: "Messages on topics with type 'research.facts.available'
# should go to a 'writer' agent whose key matches the topic source."
writer_subscription = TypeSubscription(
topic_type="research.facts.available",
agent_type="writer" # The type of agent that should handle this
)
print(f"Writer subscription created for topic type: {writer_subscription.topic_type}")
# Output: Writer subscription created for topic type: research.facts.available
```
When the `Researcher` publishes to `TopicId(type="research.facts.available", source="blog-post-autogen")`, the `AgentRuntime` will see that `writer_subscription` matches the `topic_type`. It will then use the rule: "Find (or create) an agent with `AgentId(type='writer', key='blog-post-autogen')` and deliver the message."
**Benefit:** Decoupling! The Researcher just broadcasts. The Writer just listens for relevant broadcasts. We can add more listeners (like a `FactChecker` subscribing to the same `topic_type`) without changing the `Researcher` at all.
## Under the Hood: How Publishing Works
Let's trace the journey of a published message.
**Conceptual Flow:**
```mermaid
sequenceDiagram
participant Publisher as Publisher Agent
participant Runtime as AgentRuntime
participant SubRegistry as Subscription Registry
participant Subscriber as Subscriber Agent
Publisher->>+Runtime: publish_message(message, topic_id)
Runtime->>+SubRegistry: Find subscriptions matching topic_id
SubRegistry-->>-Runtime: Return list of matching Subscriptions
loop For each matching Subscription
Runtime->>Subscription: map_to_agent(topic_id)
Subscription-->>Runtime: Return target AgentId
Runtime->>+Subscriber: Locate/Create Agent instance by AgentId
Runtime->>Subscriber: on_message(message, context)
Subscriber-->>-Runtime: Process message (optional return)
end
Runtime-->>-Publisher: Return (usually None for publish)
```
1. **Publish:** An agent calls `agent_context.publish_message(message, topic_id)`. This internally calls the `AgentRuntime`'s publish method.
2. **Lookup:** The `AgentRuntime` takes the `topic_id` and consults its internal `Subscription Registry`.
3. **Match:** The Registry checks all registered `Subscription` objects. Each `Subscription` has an `is_match(topic_id)` method. The registry finds all subscriptions where `is_match` returns `True`.
4. **Map:** For each matching `Subscription`, the Runtime calls its `map_to_agent(topic_id)` method. This method returns the specific `AgentId` that should handle this message based on the subscription rule and the topic details.
5. **Deliver:** The `AgentRuntime` finds the agent instance corresponding to the returned `AgentId` (potentially creating it if it doesn't exist yet, especially with `TypeSubscription`). It then calls that agent's `on_message` method, delivering the original published `message`.
**Code Glimpse:**
* **`TopicId` (`_topic.py`):** As shown before, a simple dataclass holding `type` and `source`. It includes validation to ensure the `type` follows certain naming conventions.
```python
# From: _topic.py
@dataclass(eq=True, frozen=True)
class TopicId:
type: str
source: str
# ... validation and __str__ ...
@classmethod
def from_str(cls, topic_id: str) -> Self:
# Helper to parse "type/source" string
# ... implementation ...
```
* **`Subscription` Protocol (`_subscription.py`):** This defines the *contract* for any subscription rule.
```python
# From: _subscription.py (Simplified Protocol)
from typing import Protocol
# ... other imports
class Subscription(Protocol):
@property
def id(self) -> str: ... # Unique ID for this subscription instance
def is_match(self, topic_id: TopicId) -> bool:
"""Check if a topic matches this subscription's rule."""
...
def map_to_agent(self, topic_id: TopicId) -> AgentId:
"""Determine the target AgentId if is_match was True."""
...
```
Any class implementing these methods can act as a subscription rule.
* **`TypeSubscription` (`_type_subscription.py`):** A common implementation of the `Subscription` protocol.
```python
# From: _type_subscription.py (Simplified)
class TypeSubscription(Subscription):
def __init__(self, topic_type: str, agent_type: str, ...):
self._topic_type = topic_type
self._agent_type = agent_type
# ... generates a unique self._id ...
def is_match(self, topic_id: TopicId) -> bool:
# Matches if the topic's type is exactly the one we want
return topic_id.type == self._topic_type
def map_to_agent(self, topic_id: TopicId) -> AgentId:
# Maps to an agent of the specified type, using the
# topic's source as the agent's unique key.
if not self.is_match(topic_id):
raise CantHandleException(...) # Should not happen if used correctly
return AgentId(type=self._agent_type, key=topic_id.source)
# ... id property ...
```
This implementation provides the "one agent instance per source" behavior for a specific topic type.
* **`DefaultSubscription` (`_default_subscription.py`):** This is often used via a decorator (`@default_subscription`) and provides a convenient way to create a `TypeSubscription` where the `agent_type` is automatically inferred from the agent class being defined, and the `topic_type` defaults to "default" (but can be overridden). It simplifies common use cases.
```python
# From: _default_subscription.py (Conceptual Usage)
from autogen_core import BaseAgent, default_subscription, ResearchFacts
@default_subscription # Uses 'default' topic type, infers agent type 'writer'
class WriterAgent(BaseAgent):
# Agent logic here...
async def on_message_impl(self, message: ResearchFacts, ctx): ...
# Or specify the topic type
@default_subscription(topic_type="research.facts.available")
class SpecificWriterAgent(BaseAgent):
# Agent logic here...
async def on_message_impl(self, message: ResearchFacts, ctx): ...
```
The actual sending (`publish_message`) and routing logic reside within the `AgentRuntime`, which we'll explore next.
## Next Steps
You've learned how AutoGen Core uses a publish/subscribe system (`TopicId`, `Subscription`) to allow agents to communicate without direct coupling. This is crucial for building flexible and scalable multi-agent applications.
* **Topic (`TopicId`):** Named channels (`type`/`source`) for broadcasting messages.
* **Publish:** Sending a message to a Topic.
* **Subscription:** An agent's declared interest in messages on certain Topics, defining a routing rule.
Now, let's dive into the orchestrator that manages agents and makes this messaging system work:
* [Chapter 3: AgentRuntime](03_agentruntime.md): The manager responsible for creating, running, and connecting agents, including handling message publishing and subscription routing.
---
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
## /docs/AutoGen Core/03_agentruntime.md
---
layout: default
title: "AgentRuntime"
parent: "AutoGen Core"
nav_order: 3
---
# Chapter 3: AgentRuntime - The Office Manager
In [Chapter 1: Agent](01_agent.md), we met the workers (`Agent`) of our system. In [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md), we saw how they can communicate broadly using topics and subscriptions. But who hires these agents? Who actually delivers the messages, whether direct or published? And who keeps the whole system running smoothly?
This is where the **`AgentRuntime`** comes in. It's the central nervous system, the operating system, or perhaps the most fitting analogy: **the office manager** for all your agents.
## Motivation: Why Do We Need an Office Manager?
Imagine an office full of employees (Agents). You have researchers, writers, maybe coders.
* How does a new employee get hired and set up?
* When one employee wants to send a memo directly to another, who makes sure it gets to the right desk?
* When someone posts an announcement on the company bulletin board (publishes to a topic), who ensures everyone who signed up for that type of announcement sees it?
* Who starts the workday and ensures everything keeps running?
Without an office manager, it would be chaos! The `AgentRuntime` serves this crucial role in AutoGen Core. It handles:
1. **Agent Creation:** "Onboarding" new agents when they are needed.
2. **Message Routing:** Delivering direct messages (`send_message`) and published messages (`publish_message`).
3. **Lifecycle Management:** Starting, running, and stopping the whole system.
4. **State Management:** Keeping track of the overall system state (optional).
## Key Concepts: Understanding the Manager's Job
Let's break down the main responsibilities of the `AgentRuntime`:
1. **Agent Instantiation (Hiring):**
* You don't usually create agent objects directly (like `my_agent = ResearcherAgent()`). Why? Because the agent needs to know *about* the runtime (the office it works in) to send messages, publish announcements, etc.
* Instead, you tell the `AgentRuntime`: "I need an agent of type 'researcher'. Here's a recipe (a **factory function**) for how to create one." This is done using `runtime.register_factory(...)`.
* When a message needs to go to a 'researcher' agent with a specific key (e.g., 'researcher-01'), the runtime checks if it already exists. If not, it uses the registered factory function to create (instantiate) the agent.
* **Crucially**, while creating the agent, the runtime provides special context (`AgentInstantiationContext`) so the new agent automatically gets its unique `AgentId` and a reference to the `AgentRuntime` itself. This is like giving a new employee their ID badge and telling them who the office manager is.
```python
# Simplified Concept - How a BaseAgent gets its ID and runtime access
# From: _agent_instantiation.py and _base_agent.py
# Inside the agent's __init__ method (when inheriting from BaseAgent):
class MyAgent(BaseAgent):
def __init__(self, description: str):
# This magic happens *because* the AgentRuntime is creating the agent
# inside a special context.
self._runtime = AgentInstantiationContext.current_runtime() # Gets the manager
self._id = AgentInstantiationContext.current_agent_id() # Gets its own ID
self._description = description
# ... rest of initialization ...
```
This ensures agents are properly integrated into the system from the moment they are created.
2. **Message Delivery (Mail Room):**
* **Direct Send (`send_message`):** When an agent calls `await agent_context.send_message(message, recipient_id)`, it's actually telling the `AgentRuntime`, "Please deliver this `message` directly to the agent identified by `recipient_id`." The runtime finds the recipient agent (creating it if necessary) and calls its `on_message` method. It's like putting a specific name on an envelope and handing it to the mail room.
* **Publish (`publish_message`):** When an agent calls `await agent_context.publish_message(message, topic_id)`, it tells the runtime, "Post this `message` to the announcement board named `topic_id`." The runtime then checks its list of **subscriptions** (who signed up for which boards). For every matching subscription, it figures out the correct recipient agent(s) (based on the subscription rule) and delivers the message to their `on_message` method.
3. **Lifecycle Management (Opening/Closing the Office):**
* The runtime needs to be started to begin processing messages. Typically, you call `runtime.start()`. This usually kicks off a background process or loop that watches for incoming messages.
* When work is done, you need to stop the runtime gracefully. `runtime.stop_when_idle()` is common – it waits until all messages currently in the queue have been processed, then stops. `runtime.stop()` stops more abruptly.
4. **State Management (Office Records):**
* The runtime can save the state of *all* the agents it manages (`runtime.save_state()`) and load it back later (`runtime.load_state()`). This is useful for pausing and resuming complex multi-agent interactions. It can also save/load state for individual agents (`runtime.agent_save_state()` / `runtime.agent_load_state()`). We'll touch more on state in [Chapter 7: Memory](07_memory.md).
## Use Case Example: Running Our Researcher and Writer
Let's finally run the Researcher/Writer scenario from Chapters 1 and 2. We need the `AgentRuntime` to make it happen.
**Goal:**
1. Create a runtime.
2. Register factories for a 'researcher' and a 'writer' agent.
3. Tell the runtime that 'writer' agents are interested in "research.facts.available" topics (add subscription).
4. Start the runtime.
5. Send an initial `ResearchTopic` message to a 'researcher' agent.
6. Let the system run (Researcher publishes facts, Runtime delivers to Writer via subscription, Writer processes).
7. Stop the runtime when idle.
**Code Snippets (Simplified):**
```python
# 0. Imports and Message Definitions (from previous chapters)
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId, BaseAgent, SingleThreadedAgentRuntime, TopicId,
MessageContext, TypeSubscription, AgentInstantiationContext
)
@dataclass
class ResearchTopic: topic: str
@dataclass
class ResearchFacts: topic: str; facts: list[str]
```
These are the messages our agents will exchange.
```python
# 1. Define Agent Logic (using BaseAgent)
class ResearcherAgent(BaseAgent):
async def on_message_impl(self, message: ResearchTopic, ctx: MessageContext):
print(f"Researcher ({self.id}) got topic: {message.topic}")
facts = [f"Fact 1 about {message.topic}", f"Fact 2"]
results_topic = TopicId("research.facts.available", message.topic)
# Use the runtime (via self.publish_message helper) to publish
await self.publish_message(
ResearchFacts(topic=message.topic, facts=facts), results_topic
)
print(f"Researcher ({self.id}) published facts to {results_topic}")
class WriterAgent(BaseAgent):
async def on_message_impl(self, message: ResearchFacts, ctx: MessageContext):
print(f"Writer ({self.id}) received facts via topic '{ctx.topic_id}': {message.facts}")
draft = f"Draft for {message.topic}: {'; '.join(message.facts)}"
print(f"Writer ({self.id}) created draft: '{draft}'")
# This agent doesn't send further messages in this example
```
Here we define the behavior of our two agent types, inheriting from `BaseAgent` which gives us `self.id`, `self.publish_message`, etc.
```python
# 2. Define Agent Factories
def researcher_factory():
# Gets runtime/id via AgentInstantiationContext inside BaseAgent.__init__
print("Runtime is creating a ResearcherAgent...")
return ResearcherAgent(description="I research topics.")
def writer_factory():
print("Runtime is creating a WriterAgent...")
return WriterAgent(description="I write drafts from facts.")
```
These simple functions tell the runtime *how* to create instances of our agents when needed.
```python
# 3. Setup and Run the Runtime
async def main():
# Create the runtime (the office manager)
runtime = SingleThreadedAgentRuntime()
# Register the factories (tell the manager how to hire)
await runtime.register_factory("researcher", researcher_factory)
await runtime.register_factory("writer", writer_factory)
print("Registered agent factories.")
# Add the subscription (tell manager who listens to which announcements)
# Rule: Messages to topics of type "research.facts.available"
# should go to a "writer" agent whose key matches the topic source.
writer_sub = TypeSubscription(topic_type="research.facts.available", agent_type="writer")
await runtime.add_subscription(writer_sub)
print(f"Added subscription: {writer_sub.id}")
# Start the runtime (open the office)
runtime.start()
print("Runtime started.")
# Send the initial message to kick things off
research_task_topic = "AutoGen Agents"
researcher_instance_id = AgentId(type="researcher", key=research_task_topic)
print(f"Sending initial topic '{research_task_topic}' to {researcher_instance_id}")
await runtime.send_message(
message=ResearchTopic(topic=research_task_topic),
recipient=researcher_instance_id,
)
# Wait until all messages are processed (wait for work day to end)
print("Waiting for runtime to become idle...")
await runtime.stop_when_idle()
print("Runtime stopped.")
# Run the main function
asyncio.run(main())
```
This script sets up the `SingleThreadedAgentRuntime`, registers the blueprints (factories) and communication rules (subscription), starts the process, and then shuts down cleanly.
**Expected Output (Conceptual Order):**
```
Registered agent factories.
Added subscription: type=research.facts.available=>agent=writer
Runtime started.
Sending initial topic 'AutoGen Agents' to researcher/AutoGen Agents
Waiting for runtime to become idle...
Runtime is creating a ResearcherAgent... # First time researcher/AutoGen Agents is needed
Researcher (researcher/AutoGen Agents) got topic: AutoGen Agents
Researcher (researcher/AutoGen Agents) published facts to research.facts.available/AutoGen Agents
Runtime is creating a WriterAgent... # First time writer/AutoGen Agents is needed (due to subscription)
Writer (writer/AutoGen Agents) received facts via topic 'research.facts.available/AutoGen Agents': ['Fact 1 about AutoGen Agents', 'Fact 2']
Writer (writer/AutoGen Agents) created draft: 'Draft for AutoGen Agents: Fact 1 about AutoGen Agents; Fact 2'
Runtime stopped.
```
You can see the runtime orchestrating the creation of agents and the flow of messages based on the initial request and the subscription rule.
## Under the Hood: How the Manager Works
Let's peek inside the `SingleThreadedAgentRuntime` (a common implementation provided by AutoGen Core) to understand the flow.
**Core Idea:** It uses an internal queue (`_message_queue`) to hold incoming requests (`send_message`, `publish_message`). A background task continuously takes items from the queue and processes them one by one (though the *handling* of a message might involve `await` and allow other tasks to run).
**1. Agent Creation (`_get_agent`, `_invoke_agent_factory`)**
When the runtime needs an agent instance (e.g., to deliver a message) that hasn't been created yet:
```mermaid
sequenceDiagram
participant Runtime as AgentRuntime
participant Factory as Agent Factory Func
participant AgentCtx as AgentInstantiationContext
participant Agent as New Agent Instance
Runtime->>Runtime: Check if agent instance exists (e.g., in `_instantiated_agents` dict)
alt Agent Not Found
Runtime->>Runtime: Find registered factory for agent type
Runtime->>AgentCtx: Set current runtime & agent_id
activate AgentCtx
Runtime->>Factory: Call factory function()
activate Factory
Factory->>AgentCtx: (Inside Agent.__init__) Get current runtime
AgentCtx-->>Factory: Return runtime
Factory->>AgentCtx: (Inside Agent.__init__) Get current agent_id
AgentCtx-->>Factory: Return agent_id
Factory-->>Runtime: Return new Agent instance
deactivate Factory
Runtime->>AgentCtx: Clear context
deactivate AgentCtx
Runtime->>Runtime: Store new agent instance
end
Runtime->>Runtime: Return agent instance
```
* The runtime looks up the factory function registered for the required `AgentId.type`.
* It uses `AgentInstantiationContext.populate_context` to temporarily store its own reference and the target `AgentId`.
* It calls the factory function.
* Inside the agent's `__init__` (usually via `BaseAgent`), `AgentInstantiationContext.current_runtime()` and `AgentInstantiationContext.current_agent_id()` are called to retrieve the context set by the runtime.
* The factory returns the fully initialized agent instance.
* The runtime stores this instance for future use.
```python
# From: _agent_instantiation.py (Simplified)
class AgentInstantiationContext:
_CONTEXT_VAR = ContextVar("agent_context") # Stores (runtime, agent_id)
@classmethod
@contextmanager
def populate_context(cls, ctx: tuple[AgentRuntime, AgentId]):
token = cls._CONTEXT_VAR.set(ctx) # Store context for this block
try:
yield # Code inside the 'with' block runs here
finally:
cls._CONTEXT_VAR.reset(token) # Clean up context
@classmethod
def current_runtime(cls) -> AgentRuntime:
return cls._CONTEXT_VAR.get()[0] # Retrieve runtime from context
@classmethod
def current_agent_id(cls) -> AgentId:
return cls._CONTEXT_VAR.get()[1] # Retrieve agent_id from context
```
This context manager pattern ensures the correct runtime and ID are available *only* during the agent's creation by the runtime.
**2. Direct Messaging (`send_message` -> `_process_send`)**
```mermaid
sequenceDiagram
participant Sender as Sending Agent/Code
participant Runtime as AgentRuntime
participant Queue as Internal Queue
participant Recipient as Recipient Agent
Sender->>+Runtime: send_message(msg, recipient_id, ...)
Runtime->>Runtime: Create Future (for response)
Runtime->>+Queue: Put SendMessageEnvelope(msg, recipient_id, future)
Runtime-->>-Sender: Return awaitable Future
Note over Queue, Runtime: Background task picks up envelope
Runtime->>Runtime: _process_send(envelope)
Runtime->>+Recipient: _get_agent(recipient_id) (creates if needed)
Recipient-->>-Runtime: Return Agent instance
Runtime->>+Recipient: on_message(msg, context)
Recipient->>Recipient: Process message...
Recipient-->>-Runtime: Return response value
Runtime->>Runtime: Set Future result with response value
```
* `send_message` creates a `Future` object (a placeholder for the eventual result) and wraps the message details in a `SendMessageEnvelope`.
* This envelope is put onto the internal `_message_queue`.
* The background task picks up the envelope.
* `_process_send` gets the recipient agent instance (using `_get_agent`).
* It calls the recipient's `on_message` method.
* When `on_message` returns a result, `_process_send` sets the result on the `Future` object, which makes the original `await runtime.send_message(...)` call return the value.
**3. Publish/Subscribe (`publish_message` -> `_process_publish`)**
```mermaid
sequenceDiagram
participant Publisher as Publishing Agent/Code
participant Runtime as AgentRuntime
participant Queue as Internal Queue
participant SubManager as SubscriptionManager
participant Subscriber as Subscribed Agent
Publisher->>+Runtime: publish_message(msg, topic_id, ...)
Runtime->>+Queue: Put PublishMessageEnvelope(msg, topic_id)
Runtime-->>-Publisher: Return (None for publish)
Note over Queue, Runtime: Background task picks up envelope
Runtime->>Runtime: _process_publish(envelope)
Runtime->>+SubManager: get_subscribed_recipients(topic_id)
SubManager->>SubManager: Find matching subscriptions
SubManager->>SubManager: Map subscriptions to AgentIds
SubManager-->>-Runtime: Return list of recipient AgentIds
loop For each recipient AgentId
Runtime->>+Subscriber: _get_agent(recipient_id) (creates if needed)
Subscriber-->>-Runtime: Return Agent instance
Runtime->>+Subscriber: on_message(msg, context with topic_id)
Subscriber->>Subscriber: Process message...
Subscriber-->>-Runtime: Return (usually None for publish)
end
```
* `publish_message` wraps the message in a `PublishMessageEnvelope` and puts it on the queue.
* The background task picks it up.
* `_process_publish` asks the `SubscriptionManager` (`_subscription_manager`) for all `AgentId`s that are subscribed to the given `topic_id`.
* The `SubscriptionManager` checks its registered `Subscription` objects (`_subscriptions` list, added via `add_subscription`). For each `Subscription` where `is_match(topic_id)` is true, it calls `map_to_agent(topic_id)` to get the target `AgentId`.
* For each resulting `AgentId`, the runtime gets the agent instance and calls its `on_message` method, providing the `topic_id` in the `MessageContext`.
```python
# From: _runtime_impl_helpers.py (SubscriptionManager simplified)
class SubscriptionManager:
def __init__(self):
self._subscriptions: List[Subscription] = []
# Optimization cache can be added here
async def add_subscription(self, subscription: Subscription):
self._subscriptions.append(subscription)
# Clear cache if any
async def get_subscribed_recipients(self, topic: TopicId) -> List[AgentId]:
recipients = []
for sub in self._subscriptions:
if sub.is_match(topic):
recipients.append(sub.map_to_agent(topic))
return recipients
```
The `SubscriptionManager` simply iterates through registered subscriptions to find matches when a message is published.
## Next Steps
You now understand the `AgentRuntime` - the essential coordinator that brings Agents to life, manages their communication, and runs the entire show. It handles agent creation via factories, routes direct and published messages, and manages the system's lifecycle.
With the core concepts of `Agent`, `Messaging`, and `AgentRuntime` covered, we can start looking at more specialized building blocks. Next, we'll explore how agents can use external capabilities:
* [Chapter 4: Tool](04_tool.md): How to give agents tools (like functions or APIs) to perform specific actions beyond just processing messages.
---
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
## /docs/AutoGen Core/04_tool.md
---
layout: default
title: "Tool"
parent: "AutoGen Core"
nav_order: 4
---
# Chapter 4: Tool - Giving Agents Specific Capabilities
In the previous chapters, we learned about Agents as workers ([Chapter 1](01_agent.md)), how they can communicate directly or using announcements ([Chapter 2](02_messaging_system__topic___subscription_.md)), and the `AgentRuntime` that manages them ([Chapter 3](03_agentruntime.md)).
Agents can process messages and coordinate, but what if an agent needs to perform a very specific action, like looking up information online, running a piece of code, accessing a database, or even just finding out the current date? They need specialized *capabilities*.
This is where the concept of a **Tool** comes in.
## Motivation: Agents Need Skills!
Imagine our `Writer` agent from before. It receives facts and writes a draft. Now, let's say we want the `Writer` (or perhaps a smarter `Assistant` agent helping it) to always include the current date in the blog post title.
How does the agent get the current date? It doesn't inherently know it. It needs a specific *skill* or *tool* for that.
A `Tool` in AutoGen Core represents exactly this: a specific, well-defined capability that an Agent can use. Think of it like giving an employee (Agent) a specialized piece of equipment (Tool), like a calculator, a web browser, or a calendar lookup program.
## Key Concepts: Understanding Tools
Let's break down what defines a Tool:
1. **It's a Specific Capability:** A Tool performs one well-defined task. Examples:
* `search_web(query: str)`
* `run_python_code(code: str)`
* `get_stock_price(ticker: str)`
* `get_current_date()`
2. **It Has a Schema (The Manual):** This is crucial! For an Agent (especially one powered by a Large Language Model - LLM) to know *when* and *how* to use a tool, the tool needs a clear description or "manual". This is called the `ToolSchema`. It typically includes:
* **`name`**: A unique identifier for the tool (e.g., `get_current_date`).
* **`description`**: A clear explanation of what the tool does, which helps the LLM decide if this tool is appropriate for the current task (e.g., "Fetches the current date in YYYY-MM-DD format").
* **`parameters`**: Defines what inputs the tool needs. This is itself a schema (`ParametersSchema`) describing the input fields, their types, and which ones are required. For our `get_current_date` example, it might need no parameters. For `get_stock_price`, it would need a `ticker` parameter of type string.
```python
# From: tools/_base.py (Simplified Concept)
from typing import TypedDict, Dict, Any, Sequence, NotRequired
class ParametersSchema(TypedDict):
type: str # Usually "object"
properties: Dict[str, Any] # Defines input fields and their types
required: NotRequired[Sequence[str]] # List of required field names
class ToolSchema(TypedDict):
name: str
description: NotRequired[str]
parameters: NotRequired[ParametersSchema]
# 'strict' flag also possible (Chapter 5 related)
```
This schema allows an LLM to understand: "Ah, there's a tool called `get_current_date` that takes no inputs and gives me the current date. I should use that now!"
3. **It Can Be Executed:** Once an agent decides to use a tool (often based on the schema), there needs to be a mechanism to actually *run* the tool's underlying function and get the result.
## Use Case Example: Adding a `get_current_date` Tool
Let's equip an agent with the ability to find the current date.
**Goal:** Define a tool that gets the current date and show how it could be executed by a specialized agent.
**Step 1: Define the Python Function**
First, we need the actual Python code that performs the action.
```python
# File: get_date_function.py
import datetime
def get_current_date() -> str:
"""Fetches the current date as a string."""
today = datetime.date.today()
return today.isoformat() # Returns date like "2023-10-27"
# Test the function
print(f"Function output: {get_current_date()}")
```
This is a standard Python function. It takes no arguments and returns the date as a string.
**Step 2: Wrap it as a `FunctionTool`**
AutoGen Core provides a convenient way to turn a Python function like this into a `Tool` object using `FunctionTool`. It automatically inspects the function's signature (arguments and return type) and docstring to help build the `ToolSchema`.
```python
# File: create_date_tool.py
from autogen_core.tools import FunctionTool
from get_date_function import get_current_date # Import our function
# Create the Tool instance
# We provide the function and a clear description for the LLM
date_tool = FunctionTool(
func=get_current_date,
description="Use this tool to get the current date in YYYY-MM-DD format."
# Name defaults to function name 'get_current_date'
)
# Let's see what FunctionTool generated
print(f"Tool Name: {date_tool.name}")
print(f"Tool Description: {date_tool.description}")
# The schema defines inputs (none in this case)
# print(f"Tool Schema Parameters: {date_tool.schema['parameters']}")
# Output (simplified): {'type': 'object', 'properties': {}, 'required': []}
```
`FunctionTool` wraps our `get_current_date` function. It uses the function name as the tool name and the description we provided. It also correctly determines from the function signature that there are no input parameters (`properties: {}`).
**Step 3: How an Agent Might Request Tool Use**
Now we have a `date_tool`. How is it used? Typically, an LLM-powered agent (which we'll see more of in [Chapter 5: ChatCompletionClient](05_chatcompletionclient.md)) analyzes a request and decides a tool is needed. It then generates a request to *call* that tool, often using a specific message type like `FunctionCall`.
```python
# File: tool_call_request.py
from autogen_core import FunctionCall # Represents a request to call a tool
# Imagine an LLM agent decided to use the date tool.
# It constructs this message, providing the tool name and arguments (as JSON string).
date_call_request = FunctionCall(
id="call_date_001", # A unique ID for this specific call attempt
name="get_current_date", # Matches the Tool's name
arguments="{}" # An empty JSON object because no arguments are needed
)
print("FunctionCall message:", date_call_request)
# Output: FunctionCall(id='call_date_001', name='get_current_date', arguments='{}')
```
This `FunctionCall` message is like a work order: "Please execute the tool named `get_current_date` with these arguments."
**Step 4: The `ToolAgent` Executes the Tool**
Who receives this `FunctionCall` message? Usually, a specialized agent called `ToolAgent`. You create a `ToolAgent` and give it the list of tools it knows how to execute. When it receives a `FunctionCall`, it finds the matching tool and runs it.
```python
# File: tool_agent_example.py
import asyncio
from autogen_core.tool_agent import ToolAgent
from autogen_core.models import FunctionExecutionResult
from create_date_tool import date_tool # Import the tool we created
from tool_call_request import date_call_request # Import the request message
# Create an agent specifically designed to execute tools
tool_executor = ToolAgent(
description="I can execute tools like getting the date.",
tools=[date_tool] # Give it the list of tools it manages
)
# --- Simulation of Runtime delivering the message ---
# In a real app, the AgentRuntime (Chapter 3) would route the
# date_call_request message to this tool_executor agent.
# We simulate the call to its message handler here:
async def simulate_execution():
# Fake context (normally provided by runtime)
class MockContext: cancellation_token = None
ctx = MockContext()
print(f"ToolAgent received request: {date_call_request.name}")
result: FunctionExecutionResult = await tool_executor.handle_function_call(
message=date_call_request,
ctx=ctx
)
print(f"ToolAgent produced result: {result}")
asyncio.run(simulate_execution())
```
**Expected Output:**
```
ToolAgent received request: get_current_date
ToolAgent produced result: FunctionExecutionResult(content='2023-10-27', call_id='call_date_001', is_error=False, name='get_current_date') # Date will be current date
```
The `ToolAgent` received the `FunctionCall`, found the `date_tool` in its list, executed the underlying `get_current_date` function, and packaged the result (the date string) into a `FunctionExecutionResult` message. This result message can then be sent back to the agent that originally requested the tool use.
## Under the Hood: How Tool Execution Works
Let's visualize the typical flow when an LLM agent decides to use a tool managed by a `ToolAgent`.
**Conceptual Flow:**
```mermaid
sequenceDiagram
participant LLMA as LLM Agent (Decides)
participant Caller as Caller Agent (Orchestrates)
participant ToolA as ToolAgent (Executes)
participant ToolFunc as Tool Function (e.g., get_current_date)
Note over LLMA: Analyzes conversation, decides tool needed.
LLMA->>Caller: Sends AssistantMessage containing FunctionCall(name='get_current_date', args='{}')
Note over Caller: Receives LLM response, sees FunctionCall.
Caller->>+ToolA: Uses runtime.send_message(message=FunctionCall, recipient=ToolAgent_ID)
Note over ToolA: Receives FunctionCall via on_message.
ToolA->>ToolA: Looks up 'get_current_date' in its internal list of Tools.
ToolA->>+ToolFunc: Calls tool.run_json(args={}) -> triggers get_current_date()
ToolFunc-->>-ToolA: Returns the result (e.g., "2023-10-27")
ToolA->>ToolA: Creates FunctionExecutionResult message with the content.
ToolA-->>-Caller: Returns FunctionExecutionResult via runtime messaging.
Note over Caller: Receives the tool result.
Caller->>LLMA: Sends FunctionExecutionResultMessage to LLM for next step.
Note over LLMA: Now knows the current date.
```
1. **Decision:** An LLM-powered agent decides a tool is needed based on the conversation and the available tools' descriptions. It generates a `FunctionCall`.
2. **Request:** A "Caller" agent (often the same LLM agent or a managing agent) sends this `FunctionCall` message to the dedicated `ToolAgent` using the `AgentRuntime`.
3. **Lookup:** The `ToolAgent` receives the message, extracts the tool `name` (`get_current_date`), and finds the corresponding `Tool` object (our `date_tool`) in the list it was configured with.
4. **Execution:** The `ToolAgent` calls the `run_json` method on the `Tool` object, passing the arguments from the `FunctionCall`. For a `FunctionTool`, `run_json` validates the arguments against the generated schema and then executes the original Python function (`get_current_date`).
5. **Result:** The Python function returns its result (the date string).
6. **Response:** The `ToolAgent` wraps this result string in a `FunctionExecutionResult` message, including the original `call_id`, and sends it back to the Caller agent.
7. **Continuation:** The Caller agent typically sends this result back to the LLM agent, allowing the conversation or task to continue with the new information.
**Code Glimpse:**
* **`Tool` Protocol (`tools/_base.py`):** Defines the basic contract any tool must fulfill. Key methods are `schema` (property returning the `ToolSchema`) and `run_json` (method to execute the tool with JSON-like arguments).
* **`BaseTool` (`tools/_base.py`):** An abstract class that helps implement the `Tool` protocol, especially using Pydantic models for defining arguments (`args_type`) and return values (`return_type`). It automatically generates the `parameters` part of the schema from the `args_type` model.
* **`FunctionTool` (`tools/_function_tool.py`):** Inherits from `BaseTool`. Its magic lies in automatically creating the `args_type` Pydantic model by inspecting the wrapped Python function's signature (`args_base_model_from_signature`). Its `run` method handles calling the original sync or async Python function.
```python
# Inside FunctionTool (Simplified Concept)
class FunctionTool(BaseTool[BaseModel, BaseModel]):
def __init__(self, func, description, ...):
self._func = func
self._signature = get_typed_signature(func)
# Automatically create Pydantic model for arguments
args_model = args_base_model_from_signature(...)
# Get return type from signature
return_type = self._signature.return_annotation
super().__init__(args_model, return_type, ...)
async def run(self, args: BaseModel, ...):
# Extract arguments from the 'args' model
kwargs = args.model_dump()
# Call the original Python function (sync or async)
result = await self._call_underlying_func(**kwargs)
return result # Must match the expected return_type
```
* **`ToolAgent` (`tool_agent/_tool_agent.py`):** A specialized `RoutedAgent`. It registers a handler specifically for `FunctionCall` messages.
```python
# Inside ToolAgent (Simplified Concept)
class ToolAgent(RoutedAgent):
def __init__(self, ..., tools: List[Tool]):
super().__init__(...)
self._tools = {tool.name: tool for tool in tools} # Store tools by name
@message_handler # Registers this for FunctionCall messages
async def handle_function_call(self, message: FunctionCall, ctx: MessageContext):
# Find the tool by name
tool = self._tools.get(message.name)
if tool is None:
# Handle error: Tool not found
raise ToolNotFoundException(...)
try:
# Parse arguments string into a dictionary
arguments = json.loads(message.arguments)
# Execute the tool's run_json method
result_obj = await tool.run_json(args=arguments, ...)
# Convert result object back to string if needed
result_str = tool.return_value_as_string(result_obj)
# Create the success result message
return FunctionExecutionResult(content=result_str, ...)
except Exception as e:
# Handle execution errors
return FunctionExecutionResult(content=f"Error: {e}", is_error=True, ...)
```
Its core logic is: find tool -> parse args -> run tool -> return result/error.
## Next Steps
You've learned how **Tools** provide specific capabilities to Agents, defined by a **Schema** that LLMs can understand. We saw how `FunctionTool` makes it easy to wrap existing Python functions and how `ToolAgent` acts as the executor for these tools.
This ability for agents to use tools is fundamental to building powerful and versatile AI systems that can interact with the real world or perform complex calculations.
Now that agents can use tools, we need to understand more about the agents that *decide* which tools to use, which often involves interacting with Large Language Models:
* [Chapter 5: ChatCompletionClient](05_chatcompletionclient.md): How agents interact with LLMs like GPT to generate responses or decide on actions (like calling a tool).
* [Chapter 6: ChatCompletionContext](06_chatcompletioncontext.md): How the history of the conversation, including tool calls and results, is managed when talking to an LLM.
---
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge)
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.