``` ├── .clinerules ├── .cursorrules ├── .env.sample ├── .gitignore ├── .windsurfrules ├── LICENSE ├── README.md ├── assets/ ├── banner.png ├── example.png ├── youtube_thumbnail.png ├── docs/ ├── AutoGen Core/ ├── 01_agent.md ├── 02_messaging_system__topic___subscription_.md ├── 03_agentruntime.md ├── 04_tool.md ├── 05_chatcompletionclient.md ``` ## /.clinerules ```clinerules path="/.clinerules" --- layout: default title: "Agentic Coding" --- # Agentic Coding: Humans Design, Agents code! > If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning } ## Agentic Coding Steps Agentic Coding should be a collaboration between Human System Design and Agent Implementation: | Steps | Human | AI | Comment | |:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| | 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | | 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | | 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | | 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | | 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | | 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | | 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | 1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - Understand AI systems' strengths and limitations: - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. 2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - For each node in the flow, start with a high-level one-line description of what it does. - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - Outline the flow and draw it in a mermaid diagram. For example: \`\`\`mermaid flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] \`\`\` - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. {: .best-practice } 3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails) - Writing outputs (e.g., generating reports, sending emails) - Using external tools (e.g., calling LLMs, searching the web) - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - `name`: `get_embedding` (`utils/get_embedding.py`) - `input`: `str` - `output`: a vector of 3072 floats - `necessity`: Used by the second node to embed text - Example utility implementation: \`\`\`python # utils/call_llm.py from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content if __name__ == "__main__": prompt = "What is the meaning of life?" print(call_llm(prompt)) \`\`\` - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice } 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - Example shared store design: \`\`\`python shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs } \`\`\` - For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `type`: Regular (or Batch, or Async) - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function - `post`: Write "embedding" to the shared store 5. **Implementation**: Implement the initial nodes and flows based on the design. - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - Add logging throughout the code to facilitate debugging. 7. **Optimization**: - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - If your flow design is already solid, move on to micro-optimizations: - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. > >
{: .best-practice } 8. **Reliability** - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. ## Example LLM Project File Structure \`\`\` my_project/ ├── main.py ├── nodes.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md \`\`\` - **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`nodes.py`**: Contains all the node definitions. \`\`\`python # nodes.py from pocketflow import Node from utils.call_llm import call_llm class GetQuestionNode(Node): def exec(self, _): # Get question directly from user input user_question = input("Enter your question: ") return user_question def post(self, shared, prep_res, exec_res): # Store the user's question shared["question"] = exec_res return "default" # Go to the next node class AnswerNode(Node): def prep(self, shared): # Read question from shared return shared["question"] def exec(self, question): # Call LLM to get the answer return call_llm(question) def post(self, shared, prep_res, exec_res): # Store the answer in shared shared["answer"] = exec_res \`\`\` - **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. \`\`\`python # flow.py from pocketflow import Flow from nodes import GetQuestionNode, AnswerNode def create_qa_flow(): """Create and return a question-answering flow.""" # Create nodes get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes in sequence get_question_node >> answer_node # Create flow starting with input node return Flow(start=get_question_node) \`\`\` - **`main.py`**: Serves as the project's entry point. \`\`\`python # main.py from flow import create_qa_flow # Example main function # Please replace this with your own main function def main(): shared = { "question": None, # Will be populated by GetQuestionNode from user input "answer": None # Will be populated by AnswerNode } # Create the flow and run it qa_flow = create_qa_flow() qa_flow.run(shared) print(f"Question: {shared['question']}") print(f"Answer: {shared['answer']}") if __name__ == "__main__": main() \`\`\` ================================================ File: docs/index.md ================================================ --- layout: default title: "Home" nav_order: 1 --- # Pocket Flow A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. - **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. - **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction We model the LLM workflow as a **Graph + Shared Store**: - [Node](./core_abstraction/node.md) handles simple (LLM) tasks. - [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). - [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. - [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. - [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks. - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern From there, it’s easy to implement popular design patterns: - [Agent](./design_pattern/agent.md) autonomously makes decisions. - [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. - [RAG](./design_pattern/rag.md) integrates data retrieval with generation. - [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. - [Structured Output](./design_pattern/structure.md) formats outputs consistently. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: - [LLM Wrapper](./utility_function/llm.md) - [Viz and Debug](./utility_function/viz.md) - [Web Search](./utility_function/websearch.md) - [Chunking](./utility_function/chunking.md) - [Embedding](./utility_function/embedding.md) - [Vector Databases](./utility_function/vector.md) - [Text-to-Speech](./utility_function/text_to_speech.md) **Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: - *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. - *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. - *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. ## Ready to build your Apps? Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow! ================================================ File: docs/core_abstraction/async.md ================================================ --- layout: default title: "(Advanced) Async" parent: "Core Abstraction" nav_order: 5 --- # (Advanced) Async **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. 2. **exec_async()**: Typically used for async LLM calls. 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example \`\`\`python class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main()) \`\`\` ================================================ File: docs/core_abstraction/batch.md ================================================ --- layout: default title: "Batch" parent: "Core Abstraction" nav_order: 4 --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - **`prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`exec(item)`**: called **once** per item in that iterable. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Summarize a Large File \`\`\`python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) \`\`\` --- ## 2. BatchFlow A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. ### Example: Summarize Many Files \`\`\`python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) \`\`\` ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 3. Nested or Multi-Level Batches You can nest a **BatchFlow** in another **BatchFlow**. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. \`\`\`python class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) \`\`\` ================================================ File: docs/core_abstraction/communication.md ================================================ --- layout: default title: "Communication" parent: "Core Abstraction" nav_order: 3 --- # Communication Nodes and Flows **communicate** in 2 ways: 1. **Shared Store (for almost all the cases)** - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). {: .best-practice } 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: \`\`\`python shared = {"data": {}, "summary": {}, "config": {...}, ...} \`\`\` It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example \`\`\`python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) \`\`\` Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: - **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example \`\`\`python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 \`\`\` ================================================ File: docs/core_abstraction/flow.md ================================================ --- layout: default title: "Flow" parent: "Core Abstraction" nav_order: 2 --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with the syntax: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. (Equivalent to `node_a - "default" >> node_b`) 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: \`\`\`python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) \`\`\` - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: \`\`\`python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) \`\`\` Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops \`\`\`mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish \`\`\` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > > Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {: .warning } ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: \`\`\`python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) \`\`\` When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: \`\`\`python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) \`\`\` This creates a clean separation of concerns while maintaining a clear execution path: \`\`\`mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end \`\`\` ================================================ File: docs/core_abstraction/node.md ================================================ --- layout: default title: "Node" parent: "Core Abstraction" nav_order: 1 --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: *query DB, read files, or serialize data into a string*. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: *(mostly) LLM calls, remote APIs, tool use*. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). > **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. \`\`\`python my_node = SummarizeFile(max_retries=3, wait=10) \`\`\` When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `self.cur_retry`. \`\`\`python class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") \`\`\` ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: \`\`\`python def exec_fallback(self, prep_res, exc): raise exc \`\`\` By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize file \`\`\`python class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) \`\`\` ================================================ File: docs/core_abstraction/parallel.md ================================================ --- layout: default title: "(Advanced) Parallel" parent: "Core Abstraction" nav_order: 6 --- # (Advanced) Parallel **Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } > - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. > > - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). > > - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice } ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: \`\`\`python class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node) \`\`\` ## AsyncParallelBatchFlow Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: \`\`\`python class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) \`\`\` ================================================ File: docs/design_pattern/agent.md ================================================ --- layout: default title: "Agent" parent: "Design Pattern" nav_order: 1 --- # Agent Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph 1. **Context and Action:** Implement nodes that supply context and perform actions. 2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. 3. **Agent Node:** Provide a prompt to decide action—for example: \`\`\`python f""" ### CONTEXT Task: {task_description} Previous Actions: {previous_actions} Current State: {current_state} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (str): What to search for [2] answer Description: Conclude based on the results Parameters: - result (str): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: \`\`\`yaml thinking: | action: parameters: : \`\`\`""" \`\`\` The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Example Good Action Design - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered \`\`\`python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: \`\`\`yaml action: search/answer reason: why this action search_term: search phrase if action is search \`\`\`""" resp = call_llm(prompt) yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) \`\`\` ================================================ File: docs/design_pattern/mapreduce.md ================================================ --- layout: default title: "Map Reduce" parent: "Design Pattern" nav_order: 4 --- # Map Reduce MapReduce is a design pattern suitable when you have either: - Large input data (e.g., multiple files to process), or - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization \`\`\`python class SummarizeAllFiles(BatchNode): def prep(self, shared): files_dict = shared["files"] # e.g. 10 files return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] def exec(self, one_file): filename, file_content = one_file summary_text = call_llm(f"Summarize the following file:\n{file_content}") return (filename, summary_text) def post(self, shared, prep_res, exec_res_list): shared["file_summaries"] = dict(exec_res_list) class CombineSummaries(Node): def prep(self, shared): return shared["file_summaries"] def exec(self, file_summaries): # format as: "File1: summary\nFile2: summary...\n" text_list = [] for fname, summ in file_summaries.items(): text_list.append(f"{fname} summary:\n{summ}\n") big_text = "\n---\n".join(text_list) return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") def post(self, shared, prep_res, final_summary): shared["all_files_summary"] = final_summary batch_node = SummarizeAllFiles() combine_node = CombineSummaries() batch_node >> combine_node flow = Flow(start=batch_node) shared = { "files": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "Some other interesting text ...", # ... } } flow.run(shared) print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) \`\`\` ================================================ File: docs/design_pattern/rag.md ================================================ --- layout: default title: "RAG" parent: "Design Pattern" nav_order: 3 --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index"). 2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. --- ## Stage 1: Offline Indexing We create three Nodes: 1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. 2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. 3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). \`\`\`python class ChunkDocs(BatchNode): def prep(self, shared): # A list of file paths in shared["files"]. We process each file. return shared["files"] def exec(self, filepath): # read file content. In real usage, do error handling. with open(filepath, "r", encoding="utf-8") as f: text = f.read() # chunk by 100 chars each chunks = [] size = 100 for i in range(0, len(text), size): chunks.append(text[i : i + size]) return chunks def post(self, shared, prep_res, exec_res_list): # exec_res_list is a list of chunk-lists, one per file. # flatten them all into a single list of chunks. all_chunks = [] for chunk_list in exec_res_list: all_chunks.extend(chunk_list) shared["all_chunks"] = all_chunks class EmbedDocs(BatchNode): def prep(self, shared): return shared["all_chunks"] def exec(self, chunk): return get_embedding(chunk) def post(self, shared, prep_res, exec_res_list): # Store the list of embeddings. shared["all_embeds"] = exec_res_list print(f"Total embeddings: {len(exec_res_list)}") class StoreIndex(Node): def prep(self, shared): # We'll read all embeds from shared. return shared["all_embeds"] def exec(self, all_embeds): # Create a vector index (faiss or other DB in real usage). index = create_index(all_embeds) return index def post(self, shared, prep_res, index): shared["index"] = index # Wire them in sequence chunk_node = ChunkDocs() embed_node = EmbedDocs() store_node = StoreIndex() chunk_node >> embed_node >> store_node OfflineFlow = Flow(start=chunk_node) \`\`\` Usage example: \`\`\`python shared = { "files": ["doc1.txt", "doc2.txt"], # any text files } OfflineFlow.run(shared) \`\`\` --- ## Stage 2: Online Query & Answer We have 3 nodes: 1. `EmbedQuery` – embeds the user’s question. 2. `RetrieveDocs` – retrieves top chunk from the index. 3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. \`\`\`python class EmbedQuery(Node): def prep(self, shared): return shared["question"] def exec(self, question): return get_embedding(question) def post(self, shared, prep_res, q_emb): shared["q_emb"] = q_emb class RetrieveDocs(Node): def prep(self, shared): # We'll need the query embedding, plus the offline index/chunks return shared["q_emb"], shared["index"], shared["all_chunks"] def exec(self, inputs): q_emb, index, chunks = inputs I, D = search_index(index, q_emb, top_k=1) best_id = I[0][0] relevant_chunk = chunks[best_id] return relevant_chunk def post(self, shared, prep_res, relevant_chunk): shared["retrieved_chunk"] = relevant_chunk print("Retrieved chunk:", relevant_chunk[:60], "...") class GenerateAnswer(Node): def prep(self, shared): return shared["question"], shared["retrieved_chunk"] def exec(self, inputs): question, chunk = inputs prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) def post(self, shared, prep_res, answer): shared["answer"] = answer print("Answer:", answer) embed_qnode = EmbedQuery() retrieve_node = RetrieveDocs() generate_node = GenerateAnswer() embed_qnode >> retrieve_node >> generate_node OnlineFlow = Flow(start=embed_qnode) \`\`\` Usage example: \`\`\`python # Suppose we already ran OfflineFlow and have: # shared["all_chunks"], shared["index"], etc. shared["question"] = "Why do people like cats?" OnlineFlow.run(shared) # final answer in shared["answer"] \`\`\` ================================================ File: docs/design_pattern/structure.md ================================================ --- layout: default title: "Structured Output" parent: "Design Pattern" nav_order: 5 --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information \`\`\`yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. \`\`\` - Summarizing Documents into Bullet Points \`\`\`yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. \`\`\` - Generating Configuration Files \`\`\`yaml server: host: 127.0.0.1 port: 8080 ssl: true \`\`\` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization \`\`\`python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: \`\`\`yaml summary: - bullet 1 - bullet 2 - bullet 3 \`\`\`""" response = call_llm(prompt) yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result \`\`\` > Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) {: .note } ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** \`\`\`json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } \`\`\` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** \`\`\`yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." \`\`\` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/design_pattern/workflow.md ================================================ --- layout: default title: "Workflow" parent: "Design Pattern" nav_order: 2 --- # Workflow Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. > - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. > > You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). {: .best-practice } ### Example: Article Writing \`\`\`python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) \`\`\` For *dynamic cases*, consider using [Agents](./agent.md). ================================================ File: docs/utility_function/llm.md ================================================ --- layout: default title: "LLM Wrapper" parent: "Utility Function" nav_order: 1 --- # LLM Wrappers Check out libraries like [litellm](https://github.com/BerriAI/litellm). Here, we provide some minimal example implementations: 1. OpenAI \`\`\`python def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?") \`\`\` > Store the API key in an environment variable like OPENAI_API_KEY for security. {: .best-practice } 2. Claude (Anthropic) \`\`\`python def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY_HERE") response = client.messages.create( model="claude-2", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) return response.content \`\`\` 3. Google (Generative AI Studio / PaLM API) \`\`\`python def call_llm(prompt): import google.generativeai as genai genai.configure(api_key="YOUR_API_KEY_HERE") response = genai.generate_text( model="models/text-bison-001", prompt=prompt ) return response.result \`\`\` 4. Azure (Azure OpenAI) \`\`\`python def call_llm(prompt): from openai import AzureOpenAI client = AzureOpenAI( azure_endpoint="https://.openai.azure.com/", api_key="YOUR_API_KEY_HERE", api_version="2023-05-15" ) r = client.chat.completions.create( model="", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content \`\`\` 5. Ollama (Local LLM) \`\`\`python def call_llm(prompt): from ollama import chat response = chat( model="llama2", messages=[{"role": "user", "content": prompt}] ) return response.message.content \`\`\` ## Improvements Feel free to enhance your `call_llm` function as needed. Here are examples: - Handle chat history: \`\`\`python def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content \`\`\` - Add in-memory caching \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass \`\`\` > ⚠️ Caching conflicts with Node retries, as retries yield the same result. > > To address this, you could use cached results only if not retried. {: .warning } \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0) \`\`\` - Enable logging: \`\`\`python def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response \`\`\` ``` ## /.cursorrules ```cursorrules path="/.cursorrules" --- layout: default title: "Agentic Coding" --- # Agentic Coding: Humans Design, Agents code! > If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning } ## Agentic Coding Steps Agentic Coding should be a collaboration between Human System Design and Agent Implementation: | Steps | Human | AI | Comment | |:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| | 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | | 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | | 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | | 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | | 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | | 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | | 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | 1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - Understand AI systems' strengths and limitations: - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. 2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - For each node in the flow, start with a high-level one-line description of what it does. - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - Outline the flow and draw it in a mermaid diagram. For example: \`\`\`mermaid flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] \`\`\` - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. {: .best-practice } 3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails) - Writing outputs (e.g., generating reports, sending emails) - Using external tools (e.g., calling LLMs, searching the web) - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - `name`: `get_embedding` (`utils/get_embedding.py`) - `input`: `str` - `output`: a vector of 3072 floats - `necessity`: Used by the second node to embed text - Example utility implementation: \`\`\`python # utils/call_llm.py from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content if __name__ == "__main__": prompt = "What is the meaning of life?" print(call_llm(prompt)) \`\`\` - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice } 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - Example shared store design: \`\`\`python shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs } \`\`\` - For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `type`: Regular (or Batch, or Async) - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function - `post`: Write "embedding" to the shared store 5. **Implementation**: Implement the initial nodes and flows based on the design. - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - Add logging throughout the code to facilitate debugging. 7. **Optimization**: - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - If your flow design is already solid, move on to micro-optimizations: - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. > >
{: .best-practice } 8. **Reliability** - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. ## Example LLM Project File Structure \`\`\` my_project/ ├── main.py ├── nodes.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md \`\`\` - **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`nodes.py`**: Contains all the node definitions. \`\`\`python # nodes.py from pocketflow import Node from utils.call_llm import call_llm class GetQuestionNode(Node): def exec(self, _): # Get question directly from user input user_question = input("Enter your question: ") return user_question def post(self, shared, prep_res, exec_res): # Store the user's question shared["question"] = exec_res return "default" # Go to the next node class AnswerNode(Node): def prep(self, shared): # Read question from shared return shared["question"] def exec(self, question): # Call LLM to get the answer return call_llm(question) def post(self, shared, prep_res, exec_res): # Store the answer in shared shared["answer"] = exec_res \`\`\` - **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. \`\`\`python # flow.py from pocketflow import Flow from nodes import GetQuestionNode, AnswerNode def create_qa_flow(): """Create and return a question-answering flow.""" # Create nodes get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes in sequence get_question_node >> answer_node # Create flow starting with input node return Flow(start=get_question_node) \`\`\` - **`main.py`**: Serves as the project's entry point. \`\`\`python # main.py from flow import create_qa_flow # Example main function # Please replace this with your own main function def main(): shared = { "question": None, # Will be populated by GetQuestionNode from user input "answer": None # Will be populated by AnswerNode } # Create the flow and run it qa_flow = create_qa_flow() qa_flow.run(shared) print(f"Question: {shared['question']}") print(f"Answer: {shared['answer']}") if __name__ == "__main__": main() \`\`\` ================================================ File: docs/index.md ================================================ --- layout: default title: "Home" nav_order: 1 --- # Pocket Flow A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. - **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. - **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction We model the LLM workflow as a **Graph + Shared Store**: - [Node](./core_abstraction/node.md) handles simple (LLM) tasks. - [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). - [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. - [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. - [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks. - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern From there, it’s easy to implement popular design patterns: - [Agent](./design_pattern/agent.md) autonomously makes decisions. - [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. - [RAG](./design_pattern/rag.md) integrates data retrieval with generation. - [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. - [Structured Output](./design_pattern/structure.md) formats outputs consistently. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: - [LLM Wrapper](./utility_function/llm.md) - [Viz and Debug](./utility_function/viz.md) - [Web Search](./utility_function/websearch.md) - [Chunking](./utility_function/chunking.md) - [Embedding](./utility_function/embedding.md) - [Vector Databases](./utility_function/vector.md) - [Text-to-Speech](./utility_function/text_to_speech.md) **Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: - *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. - *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. - *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. ## Ready to build your Apps? Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow! ================================================ File: docs/core_abstraction/async.md ================================================ --- layout: default title: "(Advanced) Async" parent: "Core Abstraction" nav_order: 5 --- # (Advanced) Async **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. 2. **exec_async()**: Typically used for async LLM calls. 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example \`\`\`python class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main()) \`\`\` ================================================ File: docs/core_abstraction/batch.md ================================================ --- layout: default title: "Batch" parent: "Core Abstraction" nav_order: 4 --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - **`prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`exec(item)`**: called **once** per item in that iterable. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Summarize a Large File \`\`\`python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) \`\`\` --- ## 2. BatchFlow A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. ### Example: Summarize Many Files \`\`\`python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) \`\`\` ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 3. Nested or Multi-Level Batches You can nest a **BatchFlow** in another **BatchFlow**. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. \`\`\`python class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) \`\`\` ================================================ File: docs/core_abstraction/communication.md ================================================ --- layout: default title: "Communication" parent: "Core Abstraction" nav_order: 3 --- # Communication Nodes and Flows **communicate** in 2 ways: 1. **Shared Store (for almost all the cases)** - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). {: .best-practice } 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: \`\`\`python shared = {"data": {}, "summary": {}, "config": {...}, ...} \`\`\` It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example \`\`\`python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) \`\`\` Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: - **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example \`\`\`python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 \`\`\` ================================================ File: docs/core_abstraction/flow.md ================================================ --- layout: default title: "Flow" parent: "Core Abstraction" nav_order: 2 --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with the syntax: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. (Equivalent to `node_a - "default" >> node_b`) 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: \`\`\`python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) \`\`\` - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: \`\`\`python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) \`\`\` Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops \`\`\`mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish \`\`\` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > > Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {: .warning } ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: \`\`\`python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) \`\`\` When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: \`\`\`python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) \`\`\` This creates a clean separation of concerns while maintaining a clear execution path: \`\`\`mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end \`\`\` ================================================ File: docs/core_abstraction/node.md ================================================ --- layout: default title: "Node" parent: "Core Abstraction" nav_order: 1 --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: *query DB, read files, or serialize data into a string*. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: *(mostly) LLM calls, remote APIs, tool use*. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). > **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. \`\`\`python my_node = SummarizeFile(max_retries=3, wait=10) \`\`\` When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `self.cur_retry`. \`\`\`python class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") \`\`\` ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: \`\`\`python def exec_fallback(self, prep_res, exc): raise exc \`\`\` By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize file \`\`\`python class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) \`\`\` ================================================ File: docs/core_abstraction/parallel.md ================================================ --- layout: default title: "(Advanced) Parallel" parent: "Core Abstraction" nav_order: 6 --- # (Advanced) Parallel **Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } > - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. > > - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). > > - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice } ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: \`\`\`python class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node) \`\`\` ## AsyncParallelBatchFlow Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: \`\`\`python class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) \`\`\` ================================================ File: docs/design_pattern/agent.md ================================================ --- layout: default title: "Agent" parent: "Design Pattern" nav_order: 1 --- # Agent Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph 1. **Context and Action:** Implement nodes that supply context and perform actions. 2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. 3. **Agent Node:** Provide a prompt to decide action—for example: \`\`\`python f""" ### CONTEXT Task: {task_description} Previous Actions: {previous_actions} Current State: {current_state} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (str): What to search for [2] answer Description: Conclude based on the results Parameters: - result (str): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: \`\`\`yaml thinking: | action: parameters: : \`\`\`""" \`\`\` The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Example Good Action Design - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered \`\`\`python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: \`\`\`yaml action: search/answer reason: why this action search_term: search phrase if action is search \`\`\`""" resp = call_llm(prompt) yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) \`\`\` ================================================ File: docs/design_pattern/mapreduce.md ================================================ --- layout: default title: "Map Reduce" parent: "Design Pattern" nav_order: 4 --- # Map Reduce MapReduce is a design pattern suitable when you have either: - Large input data (e.g., multiple files to process), or - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization \`\`\`python class SummarizeAllFiles(BatchNode): def prep(self, shared): files_dict = shared["files"] # e.g. 10 files return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] def exec(self, one_file): filename, file_content = one_file summary_text = call_llm(f"Summarize the following file:\n{file_content}") return (filename, summary_text) def post(self, shared, prep_res, exec_res_list): shared["file_summaries"] = dict(exec_res_list) class CombineSummaries(Node): def prep(self, shared): return shared["file_summaries"] def exec(self, file_summaries): # format as: "File1: summary\nFile2: summary...\n" text_list = [] for fname, summ in file_summaries.items(): text_list.append(f"{fname} summary:\n{summ}\n") big_text = "\n---\n".join(text_list) return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") def post(self, shared, prep_res, final_summary): shared["all_files_summary"] = final_summary batch_node = SummarizeAllFiles() combine_node = CombineSummaries() batch_node >> combine_node flow = Flow(start=batch_node) shared = { "files": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "Some other interesting text ...", # ... } } flow.run(shared) print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) \`\`\` ================================================ File: docs/design_pattern/rag.md ================================================ --- layout: default title: "RAG" parent: "Design Pattern" nav_order: 3 --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index"). 2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. --- ## Stage 1: Offline Indexing We create three Nodes: 1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. 2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. 3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). \`\`\`python class ChunkDocs(BatchNode): def prep(self, shared): # A list of file paths in shared["files"]. We process each file. return shared["files"] def exec(self, filepath): # read file content. In real usage, do error handling. with open(filepath, "r", encoding="utf-8") as f: text = f.read() # chunk by 100 chars each chunks = [] size = 100 for i in range(0, len(text), size): chunks.append(text[i : i + size]) return chunks def post(self, shared, prep_res, exec_res_list): # exec_res_list is a list of chunk-lists, one per file. # flatten them all into a single list of chunks. all_chunks = [] for chunk_list in exec_res_list: all_chunks.extend(chunk_list) shared["all_chunks"] = all_chunks class EmbedDocs(BatchNode): def prep(self, shared): return shared["all_chunks"] def exec(self, chunk): return get_embedding(chunk) def post(self, shared, prep_res, exec_res_list): # Store the list of embeddings. shared["all_embeds"] = exec_res_list print(f"Total embeddings: {len(exec_res_list)}") class StoreIndex(Node): def prep(self, shared): # We'll read all embeds from shared. return shared["all_embeds"] def exec(self, all_embeds): # Create a vector index (faiss or other DB in real usage). index = create_index(all_embeds) return index def post(self, shared, prep_res, index): shared["index"] = index # Wire them in sequence chunk_node = ChunkDocs() embed_node = EmbedDocs() store_node = StoreIndex() chunk_node >> embed_node >> store_node OfflineFlow = Flow(start=chunk_node) \`\`\` Usage example: \`\`\`python shared = { "files": ["doc1.txt", "doc2.txt"], # any text files } OfflineFlow.run(shared) \`\`\` --- ## Stage 2: Online Query & Answer We have 3 nodes: 1. `EmbedQuery` – embeds the user’s question. 2. `RetrieveDocs` – retrieves top chunk from the index. 3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. \`\`\`python class EmbedQuery(Node): def prep(self, shared): return shared["question"] def exec(self, question): return get_embedding(question) def post(self, shared, prep_res, q_emb): shared["q_emb"] = q_emb class RetrieveDocs(Node): def prep(self, shared): # We'll need the query embedding, plus the offline index/chunks return shared["q_emb"], shared["index"], shared["all_chunks"] def exec(self, inputs): q_emb, index, chunks = inputs I, D = search_index(index, q_emb, top_k=1) best_id = I[0][0] relevant_chunk = chunks[best_id] return relevant_chunk def post(self, shared, prep_res, relevant_chunk): shared["retrieved_chunk"] = relevant_chunk print("Retrieved chunk:", relevant_chunk[:60], "...") class GenerateAnswer(Node): def prep(self, shared): return shared["question"], shared["retrieved_chunk"] def exec(self, inputs): question, chunk = inputs prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) def post(self, shared, prep_res, answer): shared["answer"] = answer print("Answer:", answer) embed_qnode = EmbedQuery() retrieve_node = RetrieveDocs() generate_node = GenerateAnswer() embed_qnode >> retrieve_node >> generate_node OnlineFlow = Flow(start=embed_qnode) \`\`\` Usage example: \`\`\`python # Suppose we already ran OfflineFlow and have: # shared["all_chunks"], shared["index"], etc. shared["question"] = "Why do people like cats?" OnlineFlow.run(shared) # final answer in shared["answer"] \`\`\` ================================================ File: docs/design_pattern/structure.md ================================================ --- layout: default title: "Structured Output" parent: "Design Pattern" nav_order: 5 --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information \`\`\`yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. \`\`\` - Summarizing Documents into Bullet Points \`\`\`yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. \`\`\` - Generating Configuration Files \`\`\`yaml server: host: 127.0.0.1 port: 8080 ssl: true \`\`\` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization \`\`\`python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: \`\`\`yaml summary: - bullet 1 - bullet 2 - bullet 3 \`\`\`""" response = call_llm(prompt) yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result \`\`\` > Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) {: .note } ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** \`\`\`json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } \`\`\` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** \`\`\`yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." \`\`\` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/design_pattern/workflow.md ================================================ --- layout: default title: "Workflow" parent: "Design Pattern" nav_order: 2 --- # Workflow Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. > - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. > > You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). {: .best-practice } ### Example: Article Writing \`\`\`python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) \`\`\` For *dynamic cases*, consider using [Agents](./agent.md). ================================================ File: docs/utility_function/llm.md ================================================ --- layout: default title: "LLM Wrapper" parent: "Utility Function" nav_order: 1 --- # LLM Wrappers Check out libraries like [litellm](https://github.com/BerriAI/litellm). Here, we provide some minimal example implementations: 1. OpenAI \`\`\`python def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?") \`\`\` > Store the API key in an environment variable like OPENAI_API_KEY for security. {: .best-practice } 2. Claude (Anthropic) \`\`\`python def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY_HERE") response = client.messages.create( model="claude-2", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) return response.content \`\`\` 3. Google (Generative AI Studio / PaLM API) \`\`\`python def call_llm(prompt): import google.generativeai as genai genai.configure(api_key="YOUR_API_KEY_HERE") response = genai.generate_text( model="models/text-bison-001", prompt=prompt ) return response.result \`\`\` 4. Azure (Azure OpenAI) \`\`\`python def call_llm(prompt): from openai import AzureOpenAI client = AzureOpenAI( azure_endpoint="https://.openai.azure.com/", api_key="YOUR_API_KEY_HERE", api_version="2023-05-15" ) r = client.chat.completions.create( model="", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content \`\`\` 5. Ollama (Local LLM) \`\`\`python def call_llm(prompt): from ollama import chat response = chat( model="llama2", messages=[{"role": "user", "content": prompt}] ) return response.message.content \`\`\` ## Improvements Feel free to enhance your `call_llm` function as needed. Here are examples: - Handle chat history: \`\`\`python def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content \`\`\` - Add in-memory caching \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass \`\`\` > ⚠️ Caching conflicts with Node retries, as retries yield the same result. > > To address this, you could use cached results only if not retried. {: .warning } \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0) \`\`\` - Enable logging: \`\`\`python def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response \`\`\` ``` ## /.env.sample ```sample path="/.env.sample" GEMINI_PROJECT_ID= GITHUB_TOKEN= ``` ## /.gitignore ```gitignore path="/.gitignore" # Dependencies node_modules/ vendor/ .pnp/ .pnp.js # Build outputs dist/ build/ out/ *.pyc __pycache__/ # Environment files .env .env.local .env.*.local .env.development .env.test .env.production # Python virtual environments .venv/ venv/ # IDE - VSCode .vscode/* !.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json # IDE - JetBrains .idea/ *.iml *.iws *.ipr # IDE - Eclipse .project .classpath .settings/ # Logs logs/ *.log npm-debug.log* yarn-debug.log* yarn-error.log* # Operating System .DS_Store Thumbs.db *.swp *.swo # Testing coverage/ .nyc_output/ # Temporary files *.tmp *.temp .cache/ # Compiled files *.com *.class *.dll *.exe *.o *.so # Package files *.7z *.dmg *.gz *.iso *.jar *.rar *.tar *.zip # Database *.sqlite *.sqlite3 *.db # Optional npm cache directory .npm # Optional eslint cache .eslintcache # Optional REPL history .node_repl_history # LLM cache llm_cache.json # Output files output/ ``` ## /.windsurfrules ```windsurfrules path="/.windsurfrules" --- layout: default title: "Agentic Coding" --- # Agentic Coding: Humans Design, Agents code! > If you are an AI agents involved in building LLM Systems, read this guide **VERY, VERY** carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (`docs/design.md`) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning } ## Agentic Coding Steps Agentic Coding should be a collaboration between Human System Design and Agent Implementation: | Steps | Human | AI | Comment | |:-----------------------|:----------:|:---------:|:------------------------------------------------------------------------| | 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. | | 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. | | 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. | | 4. Node | ★☆☆ Low | ★★★ High | The AI helps design the node types and data handling based on the flow. | | 5. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. | | 6. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. | | 7. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. | 1. **Requirements**: Clarify the requirements for your project, and evaluate whether an AI system is a good fit. - Understand AI systems' strengths and limitations: - **Good for**: Routine tasks requiring common sense (filling forms, replying to emails) - **Good for**: Creative tasks with well-defined inputs (building slides, writing SQL) - **Not good for**: Ambiguous problems requiring complex decision-making (business strategy, startup planning) - **Keep It User-Centric:** Explain the "problem" from the user's perspective rather than just listing features. - **Balance complexity vs. impact**: Aim to deliver the highest value features with minimal complexity early. 2. **Flow Design**: Outline at a high level, describe how your AI system orchestrates nodes. - Identify applicable design patterns (e.g., [Map Reduce](./design_pattern/mapreduce.md), [Agent](./design_pattern/agent.md), [RAG](./design_pattern/rag.md)). - For each node in the flow, start with a high-level one-line description of what it does. - If using **Map Reduce**, specify how to map (what to split) and how to reduce (how to combine). - If using **Agent**, specify what are the inputs (context) and what are the possible actions. - If using **RAG**, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows. - Outline the flow and draw it in a mermaid diagram. For example: \`\`\`mermaid flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] \`\`\` - > **If Humans can't specify the flow, AI Agents can't automate it!** Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition. {: .best-practice } 3. **Utilities**: Based on the Flow Design, identify and implement necessary utility functions. - Think of your AI system as the brain. It needs a body—these *external utility functions*—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails) - Writing outputs (e.g., generating reports, sending emails) - Using external tools (e.g., calling LLMs, searching the web) - **NOTE**: *LLM-based tasks* (e.g., summarizing text, analyzing sentiment) are **NOT** utility functions; rather, they are *core functions* internal in the AI system. - For each utility function, implement it and write a simple test. - Document their input/output, as well as why they are necessary. For example: - `name`: `get_embedding` (`utils/get_embedding.py`) - `input`: `str` - `output`: a vector of 3072 floats - `necessity`: Used by the second node to embed text - Example utility implementation: \`\`\`python # utils/call_llm.py from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content if __name__ == "__main__": prompt = "What is the meaning of life?" print(call_llm(prompt)) \`\`\` - > **Sometimes, design Utilies before Flow:** For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice } 4. **Node Design**: Plan how each node will read and write data, and use utility functions. - One core design principle for PocketFlow is to use a [shared store](./core_abstraction/communication.md), so start with a shared store design: - For simple systems, use an in-memory dictionary. - For more complex systems or when persistence is required, use a database. - **Don't Repeat Yourself**: Use in-memory references or foreign keys. - Example shared store design: \`\`\`python shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs } \`\`\` - For each [Node](./core_abstraction/node.md), describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example: - `type`: Regular (or Batch, or Async) - `prep`: Read "text" from the shared store - `exec`: Call the embedding utility function - `post`: Write "embedding" to the shared store 5. **Implementation**: Implement the initial nodes and flows based on the design. - 🎉 If you've reached this step, humans have finished the design. Now *Agentic Coding* begins! - **"Keep it simple, stupid!"** Avoid complex features and full-scale type checking. - **FAIL FAST**! Avoid `try` logic so you can quickly identify any weak points in the system. - Add logging throughout the code to facilitate debugging. 7. **Optimization**: - **Use Intuition**: For a quick initial evaluation, human intuition is often a good start. - **Redesign Flow (Back to Step 3)**: Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts. - If your flow design is already solid, move on to micro-optimizations: - **Prompt Engineering**: Use clear, specific instructions with examples to reduce ambiguity. - **In-Context Learning**: Provide robust examples for tasks that are difficult to specify with instructions alone. - > **You'll likely iterate a lot!** Expect to repeat Steps 3–6 hundreds of times. > >
{: .best-practice } 8. **Reliability** - **Node Retries**: Add checks in the node `exec` to ensure outputs meet requirements, and consider increasing `max_retries` and `wait` times. - **Logging and Visualization**: Maintain logs of all attempts and visualize node results for easier debugging. - **Self-Evaluation**: Add a separate node (powered by an LLM) to review outputs when results are uncertain. ## Example LLM Project File Structure \`\`\` my_project/ ├── main.py ├── nodes.py ├── flow.py ├── utils/ │ ├── __init__.py │ ├── call_llm.py │ └── search_web.py ├── requirements.txt └── docs/ └── design.md \`\`\` - **`docs/design.md`**: Contains project documentation for each step above. This should be *high-level* and *no-code*. - **`utils/`**: Contains all utility functions. - It's recommended to dedicate one Python file to each API call, for example `call_llm.py` or `search_web.py`. - Each file should also include a `main()` function to try that API call - **`nodes.py`**: Contains all the node definitions. \`\`\`python # nodes.py from pocketflow import Node from utils.call_llm import call_llm class GetQuestionNode(Node): def exec(self, _): # Get question directly from user input user_question = input("Enter your question: ") return user_question def post(self, shared, prep_res, exec_res): # Store the user's question shared["question"] = exec_res return "default" # Go to the next node class AnswerNode(Node): def prep(self, shared): # Read question from shared return shared["question"] def exec(self, question): # Call LLM to get the answer return call_llm(question) def post(self, shared, prep_res, exec_res): # Store the answer in shared shared["answer"] = exec_res \`\`\` - **`flow.py`**: Implements functions that create flows by importing node definitions and connecting them. \`\`\`python # flow.py from pocketflow import Flow from nodes import GetQuestionNode, AnswerNode def create_qa_flow(): """Create and return a question-answering flow.""" # Create nodes get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes in sequence get_question_node >> answer_node # Create flow starting with input node return Flow(start=get_question_node) \`\`\` - **`main.py`**: Serves as the project's entry point. \`\`\`python # main.py from flow import create_qa_flow # Example main function # Please replace this with your own main function def main(): shared = { "question": None, # Will be populated by GetQuestionNode from user input "answer": None # Will be populated by AnswerNode } # Create the flow and run it qa_flow = create_qa_flow() qa_flow.run(shared) print(f"Question: {shared['question']}") print(f"Answer: {shared['answer']}") if __name__ == "__main__": main() \`\`\` ================================================ File: docs/index.md ================================================ --- layout: default title: "Home" nav_order: 1 --- # Pocket Flow A [100-line](https://github.com/the-pocket/PocketFlow/blob/main/pocketflow/__init__.py) minimalist LLM framework for *Agents, Task Decomposition, RAG, etc*. - **Lightweight**: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in. - **Expressive**: Everything you love from larger frameworks—([Multi-](./design_pattern/multi_agent.html))[Agents](./design_pattern/agent.html), [Workflow](./design_pattern/workflow.html), [RAG](./design_pattern/rag.html), and more. - **Agentic-Coding**: Intuitive enough for AI agents to help humans build complex LLM applications.
## Core Abstraction We model the LLM workflow as a **Graph + Shared Store**: - [Node](./core_abstraction/node.md) handles simple (LLM) tasks. - [Flow](./core_abstraction/flow.md) connects nodes through **Actions** (labeled edges). - [Shared Store](./core_abstraction/communication.md) enables communication between nodes within flows. - [Batch](./core_abstraction/batch.md) nodes/flows allow for data-intensive tasks. - [Async](./core_abstraction/async.md) nodes/flows allow waiting for asynchronous tasks. - [(Advanced) Parallel](./core_abstraction/parallel.md) nodes/flows handle I/O-bound tasks.
## Design Pattern From there, it’s easy to implement popular design patterns: - [Agent](./design_pattern/agent.md) autonomously makes decisions. - [Workflow](./design_pattern/workflow.md) chains multiple tasks into pipelines. - [RAG](./design_pattern/rag.md) integrates data retrieval with generation. - [Map Reduce](./design_pattern/mapreduce.md) splits data tasks into Map and Reduce steps. - [Structured Output](./design_pattern/structure.md) formats outputs consistently. - [(Advanced) Multi-Agents](./design_pattern/multi_agent.md) coordinate multiple agents.
## Utility Function We **do not** provide built-in utilities. Instead, we offer *examples*—please *implement your own*: - [LLM Wrapper](./utility_function/llm.md) - [Viz and Debug](./utility_function/viz.md) - [Web Search](./utility_function/websearch.md) - [Chunking](./utility_function/chunking.md) - [Embedding](./utility_function/embedding.md) - [Vector Databases](./utility_function/vector.md) - [Text-to-Speech](./utility_function/text_to_speech.md) **Why not built-in?**: I believe it's a *bad practice* for vendor-specific APIs in a general framework: - *API Volatility*: Frequent changes lead to heavy maintenance for hardcoded APIs. - *Flexibility*: You may want to switch vendors, use fine-tuned models, or run them locally. - *Optimizations*: Prompt caching, batching, and streaming are easier without vendor lock-in. ## Ready to build your Apps? Check out [Agentic Coding Guidance](./guide.md), the fastest way to develop LLM projects with Pocket Flow! ================================================ File: docs/core_abstraction/async.md ================================================ --- layout: default title: "(Advanced) Async" parent: "Core Abstraction" nav_order: 5 --- # (Advanced) Async **Async** Nodes implement `prep_async()`, `exec_async()`, `exec_fallback_async()`, and/or `post_async()`. This is useful for: 1. **prep_async()**: For *fetching/reading data (files, APIs, DB)* in an I/O-friendly way. 2. **exec_async()**: Typically used for async LLM calls. 3. **post_async()**: For *awaiting user feedback*, *coordinating across multi-agents* or any additional async steps after `exec_async()`. **Note**: `AsyncNode` must be wrapped in `AsyncFlow`. `AsyncFlow` can also include regular (sync) nodes. ### Example \`\`\`python class SummarizeThenVerify(AsyncNode): async def prep_async(self, shared): # Example: read a file asynchronously doc_text = await read_file_async(shared["doc_path"]) return doc_text async def exec_async(self, prep_res): # Example: async LLM call summary = await call_llm_async(f"Summarize: {prep_res}") return summary async def post_async(self, shared, prep_res, exec_res): # Example: wait for user feedback decision = await gather_user_feedback(exec_res) if decision == "approve": shared["summary"] = exec_res return "approve" return "deny" summarize_node = SummarizeThenVerify() final_node = Finalize() # Define transitions summarize_node - "approve" >> final_node summarize_node - "deny" >> summarize_node # retry flow = AsyncFlow(start=summarize_node) async def main(): shared = {"doc_path": "document.txt"} await flow.run_async(shared) print("Final Summary:", shared.get("summary")) asyncio.run(main()) \`\`\` ================================================ File: docs/core_abstraction/batch.md ================================================ --- layout: default title: "Batch" parent: "Core Abstraction" nav_order: 4 --- # Batch **Batch** makes it easier to handle large inputs in one Node or **rerun** a Flow multiple times. Example use cases: - **Chunk-based** processing (e.g., splitting large texts). - **Iterative** processing over lists of input items (e.g., user queries, files, URLs). ## 1. BatchNode A **BatchNode** extends `Node` but changes `prep()` and `exec()`: - **`prep(shared)`**: returns an **iterable** (e.g., list, generator). - **`exec(item)`**: called **once** per item in that iterable. - **`post(shared, prep_res, exec_res_list)`**: after all items are processed, receives a **list** of results (`exec_res_list`) and returns an **Action**. ### Example: Summarize a Large File \`\`\`python class MapSummaries(BatchNode): def prep(self, shared): # Suppose we have a big file; chunk it content = shared["data"] chunk_size = 10000 chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)] return chunks def exec(self, chunk): prompt = f"Summarize this chunk in 10 words: {chunk}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res_list): combined = "\n".join(exec_res_list) shared["summary"] = combined return "default" map_summaries = MapSummaries() flow = Flow(start=map_summaries) flow.run(shared) \`\`\` --- ## 2. BatchFlow A **BatchFlow** runs a **Flow** multiple times, each time with different `params`. Think of it as a loop that replays the Flow for each parameter set. ### Example: Summarize Many Files \`\`\`python class SummarizeAllFiles(BatchFlow): def prep(self, shared): # Return a list of param dicts (one per file) filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...] return [{"filename": fn} for fn in filenames] # Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce): summarize_file = SummarizeFile(start=load_file) # Wrap that flow into a BatchFlow: summarize_all_files = SummarizeAllFiles(start=summarize_file) summarize_all_files.run(shared) \`\`\` ### Under the Hood 1. `prep(shared)` returns a list of param dicts—e.g., `[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]`. 2. The **BatchFlow** loops through each dict. For each one: - It merges the dict with the BatchFlow’s own `params`. - It calls `flow.run(shared)` using the merged result. 3. This means the sub-Flow is run **repeatedly**, once for every param dict. --- ## 3. Nested or Multi-Level Batches You can nest a **BatchFlow** in another **BatchFlow**. For instance: - **Outer** batch: returns a list of diretory param dicts (e.g., `{"directory": "/pathA"}`, `{"directory": "/pathB"}`, ...). - **Inner** batch: returning a list of per-file param dicts. At each level, **BatchFlow** merges its own param dict with the parent’s. By the time you reach the **innermost** node, the final `params` is the merged result of **all** parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once. \`\`\`python class FileBatchFlow(BatchFlow): def prep(self, shared): directory = self.params["directory"] # e.g., files = ["file1.txt", "file2.txt", ...] files = [f for f in os.listdir(directory) if f.endswith(".txt")] return [{"filename": f} for f in files] class DirectoryBatchFlow(BatchFlow): def prep(self, shared): directories = [ "/path/to/dirA", "/path/to/dirB"] return [{"directory": d} for d in directories] # MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"} inner_flow = FileBatchFlow(start=MapSummaries()) outer_flow = DirectoryBatchFlow(start=inner_flow) \`\`\` ================================================ File: docs/core_abstraction/communication.md ================================================ --- layout: default title: "Communication" parent: "Core Abstraction" nav_order: 3 --- # Communication Nodes and Flows **communicate** in 2 ways: 1. **Shared Store (for almost all the cases)** - A global data structure (often an in-mem dict) that all nodes can read ( `prep()`) and write (`post()`). - Great for data results, large content, or anything multiple nodes need. - You shall design the data structure and populate it ahead. - > **Separation of Concerns:** Use `Shared Store` for almost all cases to separate *Data Schema* from *Compute Logic*! This approach is both flexible and easy to manage, resulting in more maintainable code. `Params` is more a syntax sugar for [Batch](./batch.md). {: .best-practice } 2. **Params (only for [Batch](./batch.md))** - Each node has a local, ephemeral `params` dict passed in by the **parent Flow**, used as an identifier for tasks. Parameter keys and values shall be **immutable**. - Good for identifiers like filenames or numeric IDs, in Batch mode. If you know memory management, think of the **Shared Store** like a **heap** (shared by all function calls), and **Params** like a **stack** (assigned by the caller). --- ## 1. Shared Store ### Overview A shared store is typically an in-mem dictionary, like: \`\`\`python shared = {"data": {}, "summary": {}, "config": {...}, ...} \`\`\` It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements. ### Example \`\`\`python class LoadData(Node): def post(self, shared, prep_res, exec_res): # We write data to shared store shared["data"] = "Some text content" return None class Summarize(Node): def prep(self, shared): # We read data from shared store return shared["data"] def exec(self, prep_res): # Call LLM to summarize prompt = f"Summarize: {prep_res}" summary = call_llm(prompt) return summary def post(self, shared, prep_res, exec_res): # We write summary to shared store shared["summary"] = exec_res return "default" load_data = LoadData() summarize = Summarize() load_data >> summarize flow = Flow(start=load_data) shared = {} flow.run(shared) \`\`\` Here: - `LoadData` writes to `shared["data"]`. - `Summarize` reads from `shared["data"]`, summarizes, and writes to `shared["summary"]`. --- ## 2. Params **Params** let you store *per-Node* or *per-Flow* config that doesn't need to live in the shared store. They are: - **Immutable** during a Node's run cycle (i.e., they don't change mid-`prep->exec->post`). - **Set** via `set_params()`. - **Cleared** and updated each time a parent Flow calls it. > Only set the uppermost Flow params because others will be overwritten by the parent Flow. > > If you need to set child node params, see [Batch](./batch.md). {: .warning } Typically, **Params** are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store. ### Example \`\`\`python # 1) Create a Node that uses params class SummarizeFile(Node): def prep(self, shared): # Access the node's param filename = self.params["filename"] return shared["data"].get(filename, "") def exec(self, prep_res): prompt = f"Summarize: {prep_res}" return call_llm(prompt) def post(self, shared, prep_res, exec_res): filename = self.params["filename"] shared["summary"][filename] = exec_res return "default" # 2) Set params node = SummarizeFile() # 3) Set Node params directly (for testing) node.set_params({"filename": "doc1.txt"}) node.run(shared) # 4) Create Flow flow = Flow(start=node) # 5) Set Flow params (overwrites node params) flow.set_params({"filename": "doc2.txt"}) flow.run(shared) # The node summarizes doc2, not doc1 \`\`\` ================================================ File: docs/core_abstraction/flow.md ================================================ --- layout: default title: "Flow" parent: "Core Abstraction" nav_order: 2 --- # Flow A **Flow** orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the **Actions** returned from each Node's `post()`. ## 1. Action-based Transitions Each Node's `post()` returns an **Action** string. By default, if `post()` doesn't return anything, we treat that as `"default"`. You define transitions with the syntax: 1. **Basic default transition**: `node_a >> node_b` This means if `node_a.post()` returns `"default"`, go to `node_b`. (Equivalent to `node_a - "default" >> node_b`) 2. **Named action transition**: `node_a - "action_name" >> node_b` This means if `node_a.post()` returns `"action_name"`, go to `node_b`. It's possible to create loops, branching, or multi-step flows. ## 2. Creating a Flow A **Flow** begins with a **start** node. You call `Flow(start=some_node)` to specify the entry point. When you call `flow.run(shared)`, it executes the start node, looks at its returned Action from `post()`, follows the transition, and continues until there's no next node. ### Example: Simple Sequence Here's a minimal flow of two nodes in a chain: \`\`\`python node_a >> node_b flow = Flow(start=node_a) flow.run(shared) \`\`\` - When you run the flow, it executes `node_a`. - Suppose `node_a.post()` returns `"default"`. - The flow then sees `"default"` Action is linked to `node_b` and runs `node_b`. - `node_b.post()` returns `"default"` but we didn't define `node_b >> something_else`. So the flow ends there. ### Example: Branching & Looping Here's a simple expense approval flow that demonstrates branching and looping. The `ReviewExpense` node can return three possible Actions: - `"approved"`: expense is approved, move to payment processing - `"needs_revision"`: expense needs changes, send back for revision - `"rejected"`: expense is denied, finish the process We can wire them like this: \`\`\`python # Define the flow connections review - "approved" >> payment # If approved, process payment review - "needs_revision" >> revise # If needs changes, go to revision review - "rejected" >> finish # If rejected, finish the process revise >> review # After revision, go back for another review payment >> finish # After payment, finish the process flow = Flow(start=review) \`\`\` Let's see how it flows: 1. If `review.post()` returns `"approved"`, the expense moves to the `payment` node 2. If `review.post()` returns `"needs_revision"`, it goes to the `revise` node, which then loops back to `review` 3. If `review.post()` returns `"rejected"`, it moves to the `finish` node and stops \`\`\`mermaid flowchart TD review[Review Expense] -->|approved| payment[Process Payment] review -->|needs_revision| revise[Revise Report] review -->|rejected| finish[Finish Process] revise --> review payment --> finish \`\`\` ### Running Individual Nodes vs. Running a Flow - `node.run(shared)`: Just runs that node alone (calls `prep->exec->post()`), returns an Action. - `flow.run(shared)`: Executes from the start node, follows Actions to the next node, and so on until the flow can't continue. > `node.run(shared)` **does not** proceed to the successor. > This is mainly for debugging or testing a single node. > > Always use `flow.run(...)` in production to ensure the full pipeline runs correctly. {: .warning } ## 3. Nested Flows A **Flow** can act like a Node, which enables powerful composition patterns. This means you can: 1. Use a Flow as a Node within another Flow's transitions. 2. Combine multiple smaller Flows into a larger Flow for reuse. 3. Node `params` will be a merging of **all** parents' `params`. ### Flow's Node Methods A **Flow** is also a **Node**, so it will run `prep()` and `post()`. However: - It **won't** run `exec()`, as its main logic is to orchestrate its nodes. - `post()` always receives `None` for `exec_res` and should instead get the flow execution results from the shared store. ### Basic Flow Nesting Here's how to connect a flow to another node: \`\`\`python # Create a sub-flow node_a >> node_b subflow = Flow(start=node_a) # Connect it to another node subflow >> node_c # Create the parent flow parent_flow = Flow(start=subflow) \`\`\` When `parent_flow.run()` executes: 1. It starts `subflow` 2. `subflow` runs through its nodes (`node_a->node_b`) 3. After `subflow` completes, execution continues to `node_c` ### Example: Order Processing Pipeline Here's a practical example that breaks down order processing into nested flows: \`\`\`python # Payment processing sub-flow validate_payment >> process_payment >> payment_confirmation payment_flow = Flow(start=validate_payment) # Inventory sub-flow check_stock >> reserve_items >> update_inventory inventory_flow = Flow(start=check_stock) # Shipping sub-flow create_label >> assign_carrier >> schedule_pickup shipping_flow = Flow(start=create_label) # Connect the flows into a main order pipeline payment_flow >> inventory_flow >> shipping_flow # Create the master flow order_pipeline = Flow(start=payment_flow) # Run the entire pipeline order_pipeline.run(shared_data) \`\`\` This creates a clean separation of concerns while maintaining a clear execution path: \`\`\`mermaid flowchart LR subgraph order_pipeline[Order Pipeline] subgraph paymentFlow["Payment Flow"] A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation] end subgraph inventoryFlow["Inventory Flow"] D[Check Stock] --> E[Reserve Items] --> F[Update Inventory] end subgraph shippingFlow["Shipping Flow"] G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup] end paymentFlow --> inventoryFlow inventoryFlow --> shippingFlow end \`\`\` ================================================ File: docs/core_abstraction/node.md ================================================ --- layout: default title: "Node" parent: "Core Abstraction" nav_order: 1 --- # Node A **Node** is the smallest building block. Each Node has 3 steps `prep->exec->post`:
1. `prep(shared)` - **Read and preprocess data** from `shared` store. - Examples: *query DB, read files, or serialize data into a string*. - Return `prep_res`, which is used by `exec()` and `post()`. 2. `exec(prep_res)` - **Execute compute logic**, with optional retries and error handling (below). - Examples: *(mostly) LLM calls, remote APIs, tool use*. - ⚠️ This shall be only for compute and **NOT** access `shared`. - ⚠️ If retries enabled, ensure idempotent implementation. - Return `exec_res`, which is passed to `post()`. 3. `post(shared, prep_res, exec_res)` - **Postprocess and write data** back to `shared`. - Examples: *update DB, change states, log results*. - **Decide the next action** by returning a *string* (`action = "default"` if *None*). > **Why 3 steps?** To enforce the principle of *separation of concerns*. The data storage and data processing are operated separately. > > All steps are *optional*. E.g., you can only implement `prep` and `post` if you just need to process data. {: .note } ### Fault Tolerance & Retries You can **retry** `exec()` if it raises an exception via two parameters when define the Node: - `max_retries` (int): Max times to run `exec()`. The default is `1` (**no** retry). - `wait` (int): The time to wait (in **seconds**) before next retry. By default, `wait=0` (no waiting). `wait` is helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off. \`\`\`python my_node = SummarizeFile(max_retries=3, wait=10) \`\`\` When an exception occurs in `exec()`, the Node automatically retries until: - It either succeeds, or - The Node has retried `max_retries - 1` times already and fails on the last attempt. You can get the current retry times (0-based) from `self.cur_retry`. \`\`\`python class RetryNode(Node): def exec(self, prep_res): print(f"Retry {self.cur_retry} times") raise Exception("Failed") \`\`\` ### Graceful Fallback To **gracefully handle** the exception (after all retries) rather than raising it, override: \`\`\`python def exec_fallback(self, prep_res, exc): raise exc \`\`\` By default, it just re-raises exception. But you can return a fallback result instead, which becomes the `exec_res` passed to `post()`. ### Example: Summarize file \`\`\`python class SummarizeFile(Node): def prep(self, shared): return shared["data"] def exec(self, prep_res): if not prep_res: return "Empty file content" prompt = f"Summarize this text in 10 words: {prep_res}" summary = call_llm(prompt) # might fail return summary def exec_fallback(self, prep_res, exc): # Provide a simple fallback instead of crashing return "There was an error processing your request." def post(self, shared, prep_res, exec_res): shared["summary"] = exec_res # Return "default" by not returning summarize_node = SummarizeFile(max_retries=3) # node.run() calls prep->exec->post # If exec() fails, it retries up to 3 times before calling exec_fallback() action_result = summarize_node.run(shared) print("Action returned:", action_result) # "default" print("Summary stored:", shared["summary"]) \`\`\` ================================================ File: docs/core_abstraction/parallel.md ================================================ --- layout: default title: "(Advanced) Parallel" parent: "Core Abstraction" nav_order: 6 --- # (Advanced) Parallel **Parallel** Nodes and Flows let you run multiple **Async** Nodes and Flows **concurrently**—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute. > Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning } > - **Ensure Tasks Are Independent**: If each item depends on the output of a previous item, **do not** parallelize. > > - **Beware of Rate Limits**: Parallel calls can **quickly** trigger rate limits on LLM services. You may need a **throttling** mechanism (e.g., semaphores or sleep intervals). > > - **Consider Single-Node Batch APIs**: Some LLMs offer a **batch inference** API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice } ## AsyncParallelBatchNode Like **AsyncBatchNode**, but run `exec_async()` in **parallel**: \`\`\`python class ParallelSummaries(AsyncParallelBatchNode): async def prep_async(self, shared): # e.g., multiple texts return shared["texts"] async def exec_async(self, text): prompt = f"Summarize: {text}" return await call_llm_async(prompt) async def post_async(self, shared, prep_res, exec_res_list): shared["summary"] = "\n\n".join(exec_res_list) return "default" node = ParallelSummaries() flow = AsyncFlow(start=node) \`\`\` ## AsyncParallelBatchFlow Parallel version of **BatchFlow**. Each iteration of the sub-flow runs **concurrently** using different parameters: \`\`\`python class SummarizeMultipleFiles(AsyncParallelBatchFlow): async def prep_async(self, shared): return [{"filename": f} for f in shared["files"]] sub_flow = AsyncFlow(start=LoadAndSummarizeFile()) parallel_flow = SummarizeMultipleFiles(start=sub_flow) await parallel_flow.run_async(shared) \`\`\` ================================================ File: docs/design_pattern/agent.md ================================================ --- layout: default title: "Agent" parent: "Design Pattern" nav_order: 1 --- # Agent Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
## Implement Agent with Graph 1. **Context and Action:** Implement nodes that supply context and perform actions. 2. **Branching:** Use branching to connect each action node to an agent node. Use action to allow the agent to direct the [flow](../core_abstraction/flow.md) between nodes—and potentially loop back for multi-step. 3. **Agent Node:** Provide a prompt to decide action—for example: \`\`\`python f""" ### CONTEXT Task: {task_description} Previous Actions: {previous_actions} Current State: {current_state} ### ACTION SPACE [1] search Description: Use web search to get results Parameters: - query (str): What to search for [2] answer Description: Conclude based on the results Parameters: - result (str): Final answer to provide ### NEXT ACTION Decide the next action based on the current context and available action space. Return your response in the following format: \`\`\`yaml thinking: | action: parameters: : \`\`\`""" \`\`\` The core of building **high-performance** and **reliable** agents boils down to: 1. **Context Management:** Provide *relevant, minimal context.* For example, rather than including an entire chat history, retrieve the most relevant via [RAG](./rag.md). Even with larger context windows, LLMs still fall victim to ["lost in the middle"](https://arxiv.org/abs/2307.03172), overlooking mid-prompt content. 2. **Action Space:** Provide *a well-structured and unambiguous* set of actions—avoiding overlap like separate `read_databases` or `read_csvs`. Instead, import CSVs into the database. ## Example Good Action Design - **Incremental:** Feed content in manageable chunks (500 lines or 1 page) instead of all at once. - **Overview-zoom-in:** First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts). - **Parameterized/Programmable:** Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files. - **Backtracking:** Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends. ## Example: Search Agent This agent: 1. Decides whether to search or answer 2. If searches, loops back to decide if more search needed 3. Answers when enough context gathered \`\`\`python class DecideAction(Node): def prep(self, shared): context = shared.get("context", "No previous search") query = shared["query"] return query, context def exec(self, inputs): query, context = inputs prompt = f""" Given input: {query} Previous search results: {context} Should I: 1) Search web for more info 2) Answer with current knowledge Output in yaml: \`\`\`yaml action: search/answer reason: why this action search_term: search phrase if action is search \`\`\`""" resp = call_llm(prompt) yaml_str = resp.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() result = yaml.safe_load(yaml_str) assert isinstance(result, dict) assert "action" in result assert "reason" in result assert result["action"] in ["search", "answer"] if result["action"] == "search": assert "search_term" in result return result def post(self, shared, prep_res, exec_res): if exec_res["action"] == "search": shared["search_term"] = exec_res["search_term"] return exec_res["action"] class SearchWeb(Node): def prep(self, shared): return shared["search_term"] def exec(self, search_term): return search_web(search_term) def post(self, shared, prep_res, exec_res): prev_searches = shared.get("context", []) shared["context"] = prev_searches + [ {"term": shared["search_term"], "result": exec_res} ] return "decide" class DirectAnswer(Node): def prep(self, shared): return shared["query"], shared.get("context", "") def exec(self, inputs): query, context = inputs return call_llm(f"Context: {context}\nAnswer: {query}") def post(self, shared, prep_res, exec_res): print(f"Answer: {exec_res}") shared["answer"] = exec_res # Connect nodes decide = DecideAction() search = SearchWeb() answer = DirectAnswer() decide - "search" >> search decide - "answer" >> answer search - "decide" >> decide # Loop back flow = Flow(start=decide) flow.run({"query": "Who won the Nobel Prize in Physics 2024?"}) \`\`\` ================================================ File: docs/design_pattern/mapreduce.md ================================================ --- layout: default title: "Map Reduce" parent: "Design Pattern" nav_order: 4 --- # Map Reduce MapReduce is a design pattern suitable when you have either: - Large input data (e.g., multiple files to process), or - Large output data (e.g., multiple forms to fill) and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using [BatchNode](../core_abstraction/batch.md) in the map phase, followed by aggregation in the reduce phase. ### Example: Document Summarization \`\`\`python class SummarizeAllFiles(BatchNode): def prep(self, shared): files_dict = shared["files"] # e.g. 10 files return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...] def exec(self, one_file): filename, file_content = one_file summary_text = call_llm(f"Summarize the following file:\n{file_content}") return (filename, summary_text) def post(self, shared, prep_res, exec_res_list): shared["file_summaries"] = dict(exec_res_list) class CombineSummaries(Node): def prep(self, shared): return shared["file_summaries"] def exec(self, file_summaries): # format as: "File1: summary\nFile2: summary...\n" text_list = [] for fname, summ in file_summaries.items(): text_list.append(f"{fname} summary:\n{summ}\n") big_text = "\n---\n".join(text_list) return call_llm(f"Combine these file summaries into one final summary:\n{big_text}") def post(self, shared, prep_res, final_summary): shared["all_files_summary"] = final_summary batch_node = SummarizeAllFiles() combine_node = CombineSummaries() batch_node >> combine_node flow = Flow(start=batch_node) shared = { "files": { "file1.txt": "Alice was beginning to get very tired of sitting by her sister...", "file2.txt": "Some other interesting text ...", # ... } } flow.run(shared) print("Individual Summaries:", shared["file_summaries"]) print("\nFinal Summary:\n", shared["all_files_summary"]) \`\`\` ================================================ File: docs/design_pattern/rag.md ================================================ --- layout: default title: "RAG" parent: "Design Pattern" nav_order: 3 --- # RAG (Retrieval Augmented Generation) For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a **two-stage** RAG pipeline:
1. **Offline stage**: Preprocess and index documents ("building the index"). 2. **Online stage**: Given a question, generate answers by retrieving the most relevant context. --- ## Stage 1: Offline Indexing We create three Nodes: 1. `ChunkDocs` – [chunks](../utility_function/chunking.md) raw text. 2. `EmbedDocs` – [embeds](../utility_function/embedding.md) each chunk. 3. `StoreIndex` – stores embeddings into a [vector database](../utility_function/vector.md). \`\`\`python class ChunkDocs(BatchNode): def prep(self, shared): # A list of file paths in shared["files"]. We process each file. return shared["files"] def exec(self, filepath): # read file content. In real usage, do error handling. with open(filepath, "r", encoding="utf-8") as f: text = f.read() # chunk by 100 chars each chunks = [] size = 100 for i in range(0, len(text), size): chunks.append(text[i : i + size]) return chunks def post(self, shared, prep_res, exec_res_list): # exec_res_list is a list of chunk-lists, one per file. # flatten them all into a single list of chunks. all_chunks = [] for chunk_list in exec_res_list: all_chunks.extend(chunk_list) shared["all_chunks"] = all_chunks class EmbedDocs(BatchNode): def prep(self, shared): return shared["all_chunks"] def exec(self, chunk): return get_embedding(chunk) def post(self, shared, prep_res, exec_res_list): # Store the list of embeddings. shared["all_embeds"] = exec_res_list print(f"Total embeddings: {len(exec_res_list)}") class StoreIndex(Node): def prep(self, shared): # We'll read all embeds from shared. return shared["all_embeds"] def exec(self, all_embeds): # Create a vector index (faiss or other DB in real usage). index = create_index(all_embeds) return index def post(self, shared, prep_res, index): shared["index"] = index # Wire them in sequence chunk_node = ChunkDocs() embed_node = EmbedDocs() store_node = StoreIndex() chunk_node >> embed_node >> store_node OfflineFlow = Flow(start=chunk_node) \`\`\` Usage example: \`\`\`python shared = { "files": ["doc1.txt", "doc2.txt"], # any text files } OfflineFlow.run(shared) \`\`\` --- ## Stage 2: Online Query & Answer We have 3 nodes: 1. `EmbedQuery` – embeds the user’s question. 2. `RetrieveDocs` – retrieves top chunk from the index. 3. `GenerateAnswer` – calls the LLM with the question + chunk to produce the final answer. \`\`\`python class EmbedQuery(Node): def prep(self, shared): return shared["question"] def exec(self, question): return get_embedding(question) def post(self, shared, prep_res, q_emb): shared["q_emb"] = q_emb class RetrieveDocs(Node): def prep(self, shared): # We'll need the query embedding, plus the offline index/chunks return shared["q_emb"], shared["index"], shared["all_chunks"] def exec(self, inputs): q_emb, index, chunks = inputs I, D = search_index(index, q_emb, top_k=1) best_id = I[0][0] relevant_chunk = chunks[best_id] return relevant_chunk def post(self, shared, prep_res, relevant_chunk): shared["retrieved_chunk"] = relevant_chunk print("Retrieved chunk:", relevant_chunk[:60], "...") class GenerateAnswer(Node): def prep(self, shared): return shared["question"], shared["retrieved_chunk"] def exec(self, inputs): question, chunk = inputs prompt = f"Question: {question}\nContext: {chunk}\nAnswer:" return call_llm(prompt) def post(self, shared, prep_res, answer): shared["answer"] = answer print("Answer:", answer) embed_qnode = EmbedQuery() retrieve_node = RetrieveDocs() generate_node = GenerateAnswer() embed_qnode >> retrieve_node >> generate_node OnlineFlow = Flow(start=embed_qnode) \`\`\` Usage example: \`\`\`python # Suppose we already ran OfflineFlow and have: # shared["all_chunks"], shared["index"], etc. shared["question"] = "Why do people like cats?" OnlineFlow.run(shared) # final answer in shared["answer"] \`\`\` ================================================ File: docs/design_pattern/structure.md ================================================ --- layout: default title: "Structured Output" parent: "Design Pattern" nav_order: 5 --- # Structured Output In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys. There are several approaches to achieve a structured output: - **Prompting** the LLM to strictly return a defined structure. - Using LLMs that natively support **schema enforcement**. - **Post-processing** the LLM's response to extract structured content. In practice, **Prompting** is simple and reliable for modern LLMs. ### Example Use Cases - Extracting Key Information \`\`\`yaml product: name: Widget Pro price: 199.99 description: | A high-quality widget designed for professionals. Recommended for advanced users. \`\`\` - Summarizing Documents into Bullet Points \`\`\`yaml summary: - This product is easy to use. - It is cost-effective. - Suitable for all skill levels. \`\`\` - Generating Configuration Files \`\`\`yaml server: host: 127.0.0.1 port: 8080 ssl: true \`\`\` ## Prompt Engineering When prompting the LLM to produce **structured** output: 1. **Wrap** the structure in code fences (e.g., `yaml`). 2. **Validate** that all required fields exist (and let `Node` handles retry). ### Example Text Summarization \`\`\`python class SummarizeNode(Node): def exec(self, prep_res): # Suppose `prep_res` is the text to summarize. prompt = f""" Please summarize the following text as YAML, with exactly 3 bullet points {prep_res} Now, output: \`\`\`yaml summary: - bullet 1 - bullet 2 - bullet 3 \`\`\`""" response = call_llm(prompt) yaml_str = response.split("\`\`\`yaml")[1].split("\`\`\`")[0].strip() import yaml structured_result = yaml.safe_load(yaml_str) assert "summary" in structured_result assert isinstance(structured_result["summary"], list) return structured_result \`\`\` > Besides using `assert` statements, another popular way to validate schemas is [Pydantic](https://github.com/pydantic/pydantic) {: .note } ### Why YAML instead of JSON? Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes. **In JSON** \`\`\`json { "dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\"" } \`\`\` - Every double quote inside the string must be escaped with `\"`. - Each newline in the dialogue must be represented as `\n`. **In YAML** \`\`\`yaml dialogue: | Alice said: "Hello Bob. How are you? I am good." \`\`\` - No need to escape interior quotes—just place the entire text under a block literal (`|`). - Newlines are naturally preserved without needing `\n`. ================================================ File: docs/design_pattern/workflow.md ================================================ --- layout: default title: "Workflow" parent: "Design Pattern" nav_order: 2 --- # Workflow Many real-world tasks are too complex for one LLM call. The solution is to **Task Decomposition**: decompose them into a [chain](../core_abstraction/flow.md) of multiple Nodes.
> - You don't want to make each task **too coarse**, because it may be *too complex for one LLM call*. > - You don't want to make each task **too granular**, because then *the LLM call doesn't have enough context* and results are *not consistent across nodes*. > > You usually need multiple *iterations* to find the *sweet spot*. If the task has too many *edge cases*, consider using [Agents](./agent.md). {: .best-practice } ### Example: Article Writing \`\`\`python class GenerateOutline(Node): def prep(self, shared): return shared["topic"] def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}") def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res class WriteSection(Node): def prep(self, shared): return shared["outline"] def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}") def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res class ReviewAndRefine(Node): def prep(self, shared): return shared["draft"] def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}") def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res # Connect nodes outline = GenerateOutline() write = WriteSection() review = ReviewAndRefine() outline >> write >> review # Create and run flow writing_flow = Flow(start=outline) shared = {"topic": "AI Safety"} writing_flow.run(shared) \`\`\` For *dynamic cases*, consider using [Agents](./agent.md). ================================================ File: docs/utility_function/llm.md ================================================ --- layout: default title: "LLM Wrapper" parent: "Utility Function" nav_order: 1 --- # LLM Wrappers Check out libraries like [litellm](https://github.com/BerriAI/litellm). Here, we provide some minimal example implementations: 1. OpenAI \`\`\`python def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?") \`\`\` > Store the API key in an environment variable like OPENAI_API_KEY for security. {: .best-practice } 2. Claude (Anthropic) \`\`\`python def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY_HERE") response = client.messages.create( model="claude-2", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) return response.content \`\`\` 3. Google (Generative AI Studio / PaLM API) \`\`\`python def call_llm(prompt): import google.generativeai as genai genai.configure(api_key="YOUR_API_KEY_HERE") response = genai.generate_text( model="models/text-bison-001", prompt=prompt ) return response.result \`\`\` 4. Azure (Azure OpenAI) \`\`\`python def call_llm(prompt): from openai import AzureOpenAI client = AzureOpenAI( azure_endpoint="https://.openai.azure.com/", api_key="YOUR_API_KEY_HERE", api_version="2023-05-15" ) r = client.chat.completions.create( model="", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content \`\`\` 5. Ollama (Local LLM) \`\`\`python def call_llm(prompt): from ollama import chat response = chat( model="llama2", messages=[{"role": "user", "content": prompt}] ) return response.message.content \`\`\` ## Improvements Feel free to enhance your `call_llm` function as needed. Here are examples: - Handle chat history: \`\`\`python def call_llm(messages): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=messages ) return r.choices[0].message.content \`\`\` - Add in-memory caching \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def call_llm(prompt): # Your implementation here pass \`\`\` > ⚠️ Caching conflicts with Node retries, as retries yield the same result. > > To address this, you could use cached results only if not retried. {: .warning } \`\`\`python from functools import lru_cache @lru_cache(maxsize=1000) def cached_call(prompt): pass def call_llm(prompt, use_cache): if use_cache: return cached_call(prompt) # Call the underlying function directly return cached_call.__wrapped__(prompt) class SummarizeNode(Node): def exec(self, text): return call_llm(f"Summarize: {text}", self.cur_retry==0) \`\`\` - Enable logging: \`\`\`python def call_llm(prompt): import logging logging.info(f"Prompt: {prompt}") response = ... # Your implementation here logging.info(f"Response: {response}") return response \`\`\` ``` ## /LICENSE ``` path="/LICENSE" MIT License Copyright (c) 2025 Zachary Huang Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` ## /README.md

Turns Codebase into Easy Tutorial with AI

![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg) > *Ever stared at a new codebase written by others feeling completely lost? This tutorial shows you how to build an AI agent that analyzes GitHub repositories and creates beginner-friendly tutorials explaining exactly how the code works.*

This is a tutorial project of [Pocket Flow](https://github.com/The-Pocket/PocketFlow), a 100-line LLM framework. It crawls GitHub repositories and builds a knowledge base from the code. It analyzes entire codebases to identify core abstractions and how they interact, and transforms complex code into beginner-friendly tutorials with clear visualizations. - Check out the [YouTube Development Tutorial](https://youtu.be/AFY67zOpbSo) for more! - Check out the [Substack Post Tutorial](https://zacharyhuang.substack.com/p/ai-codebase-knowledge-builder-full) for more!   **🔸 🎉 Reached Hacker News Front Page** (April 2025) with >800 up‑votes: [Discussion »](https://news.ycombinator.com/item?id=43739456) ## ⭐ Example Results for Popular GitHub Repositories!

🤯 All these tutorials are generated **entirely by AI** by crawling the GitHub repo! - [AutoGen Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/AutoGen%20Core) - Build AI teams that talk, think, and solve problems together like coworkers! - [Browser Use](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Browser%20Use) - Let AI surf the web for you, clicking buttons and filling forms like a digital assistant! - [Celery](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Celery) - Supercharge your app with background tasks that run while you sleep! - [Click](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Click) - Turn Python functions into slick command-line tools with just a decorator! - [Codex](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Codex) - Turn plain English into working code with this AI terminal wizard! - [Crawl4AI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Crawl4AI) - Train your AI to extract exactly what matters from any website! - [CrewAI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/CrewAI) - Assemble a dream team of AI specialists to tackle impossible problems! - [DSPy](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/DSPy) - Build LLM apps like Lego blocks that optimize themselves! - [FastAPI](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/FastAPI) - Create APIs at lightning speed with automatic docs that clients will love! - [Flask](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Flask) - Craft web apps with minimal code that scales from prototype to production! - [Google A2A](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Google%20A2A) - The universal language that lets AI agents collaborate across borders! - [LangGraph](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LangGraph) - Design AI agents as flowcharts where each step remembers what happened before! - [LevelDB](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/LevelDB) - Store data at warp speed with Google's engine that powers blockchains! - [MCP Python SDK](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/MCP%20Python%20SDK) - Build powerful apps that communicate through an elegant protocol without sweating the details! - [NumPy Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/NumPy%20Core) - Master the engine behind data science that makes Python as fast as C! - [OpenManus](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/OpenManus) - Build AI agents with digital brains that think, learn, and use tools just like humans do! - [Pydantic Core](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Pydantic%20Core) - Validate data at rocket speed with just Python type hints! - [Requests](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/Requests) - Talk to the internet in Python with code so simple it feels like cheating! - [SmolaAgents](https://the-pocket.github.io/Tutorial-Codebase-Knowledge/SmolaAgents) - Build tiny AI agents that punch way above their weight class! - Showcase Your AI-Generated Tutorials in [Discussions](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge/discussions)! ## 🚀 Getting Started 1. Clone this repository 2. Install dependencies: ```bash pip install -r requirements.txt ``` 3. Set up LLM in [`utils/call_llm.py`](./utils/call_llm.py) by providing credentials. By default, you can use the AI Studio key with this client for Gemini Pro 2.5: ```python client = genai.Client( api_key=os.getenv("GEMINI_API_KEY", "your-api_key"), ) ``` You can use your own models. We highly recommend the latest models with thinking capabilities (Claude 3.7 with thinking, O1). You can verify that it is correctly set up by running: ```bash python utils/call_llm.py ``` 4. Generate a complete codebase tutorial by running the main script: ```bash # Analyze a GitHub repository python main.py --repo https://github.com/username/repo --include "*.py" "*.js" --exclude "tests/*" --max-size 50000 # Or, analyze a local directory python main.py --dir /path/to/your/codebase --include "*.py" --exclude "*test*" # Or, generate a tutorial in Chinese python main.py --repo https://github.com/username/repo --language "Chinese" ``` - `--repo` or `--dir` - Specify either a GitHub repo URL or a local directory path (required, mutually exclusive) - `-n, --name` - Project name (optional, derived from URL/directory if omitted) - `-t, --token` - GitHub token (or set GITHUB_TOKEN environment variable) - `-o, --output` - Output directory (default: ./output) - `-i, --include` - Files to include (e.g., "*.py" "*.js") - `-e, --exclude` - Files to exclude (e.g., "tests/*" "docs/*") - `-s, --max-size` - Maximum file size in bytes (default: 100KB) - `--language` - Language for the generated tutorial (default: "english") The application will crawl the repository, analyze the codebase structure, generate tutorial content in the specified language, and save the output in the specified directory (default: ./output). ## 💡 Development Tutorial - I built using [**Agentic Coding**](https://zacharyhuang.substack.com/p/agentic-coding-the-most-fun-way-to), the fastest development paradigm, where humans simply [design](docs/design.md) and agents [code](flow.py). - The secret weapon is [Pocket Flow](https://github.com/The-Pocket/PocketFlow), a 100-line LLM framework that lets Agents (e.g., Cursor AI) build for you - Check out the Step-by-step YouTube development tutorial:

## /assets/banner.png Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/banner.png ## /assets/example.png Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/example.png ## /assets/youtube_thumbnail.png Binary file available at https://raw.githubusercontent.com/The-Pocket/Tutorial-Codebase-Knowledge/refs/heads/main/assets/youtube_thumbnail.png ## /docs/AutoGen Core/01_agent.md --- layout: default title: "Agent" parent: "AutoGen Core" nav_order: 1 --- # Chapter 1: Agent - The Workers of AutoGen Welcome to the AutoGen Core tutorial! We're excited to guide you through building powerful applications with autonomous agents. ## Motivation: Why Do We Need Agents? Imagine you want to build an automated system to write blog posts. You might need one part of the system to research a topic and another part to write the actual post based on the research. How do you represent these different "workers" and make them talk to each other? This is where the concept of an **Agent** comes in. In AutoGen Core, an `Agent` is the fundamental building block representing an actor or worker in your system. Think of it like an employee in an office. ## Key Concepts: Understanding Agents Let's break down what makes an Agent: 1. **It's a Worker:** An Agent is designed to *do* things. This could be running calculations, calling a Large Language Model (LLM) like ChatGPT, using a tool (like a search engine), or managing a piece of data. 2. **It Has an Identity (`AgentId`):** Just like every employee has a name and a job title, every Agent needs a unique identity. This identity, called `AgentId`, has two parts: * `type`: What kind of role does the agent have? (e.g., "researcher", "writer", "coder"). This helps organize agents. * `key`: A unique name for this specific agent instance (e.g., "researcher-01", "amy-the-writer"). ```python # From: _agent_id.py class AgentId: def __init__(self, type: str, key: str) -> None: # ... (validation checks omitted for brevity) self._type = type self._key = key @property def type(self) -> str: return self._type @property def key(self) -> str: return self._key def __str__(self) -> str: # Creates an id like "researcher/amy-the-writer" return f"{self._type}/{self._key}" ``` This `AgentId` acts like the agent's address, allowing other agents (or the system) to send messages specifically to it. 3. **It Has Metadata (`AgentMetadata`):** Besides its core identity, an agent often has descriptive information. * `type`: Same as in `AgentId`. * `key`: Same as in `AgentId`. * `description`: A human-readable explanation of what the agent does (e.g., "Researches topics using web search"). ```python # From: _agent_metadata.py from typing import TypedDict class AgentMetadata(TypedDict): type: str key: str description: str ``` This metadata helps understand the agent's purpose within the system. 4. **It Communicates via Messages:** Agents don't work in isolation. They collaborate by sending and receiving messages. The primary way an agent receives work is through its `on_message` method. Think of this like the agent's inbox. ```python # From: _agent.py (Simplified Agent Protocol) from typing import Any, Mapping, Protocol # ... other imports class Agent(Protocol): @property def id(self) -> AgentId: ... # The agent's unique ID async def on_message(self, message: Any, ctx: MessageContext) -> Any: """Handles an incoming message.""" # Agent's logic to process the message goes here ... ``` When an agent receives a message, `on_message` is called. The `message` contains the data or task, and `ctx` (MessageContext) provides extra information about the message (like who sent it). We'll cover `MessageContext` more later. 5. **It Can Remember Things (State):** Sometimes, an agent needs to remember information between tasks, like keeping notes on research progress. Agents can optionally implement `save_state` and `load_state` methods to store and retrieve their internal memory. ```python # From: _agent.py (Simplified Agent Protocol) class Agent(Protocol): # ... other methods async def save_state(self) -> Mapping[str, Any]: """Save the agent's internal memory.""" # Return a dictionary representing the state ... async def load_state(self, state: Mapping[str, Any]) -> None: """Load the agent's internal memory.""" # Restore state from the dictionary ... ``` We'll explore state and memory in more detail in [Chapter 7: Memory](07_memory.md). 6. **Different Agent Types:** AutoGen Core provides base classes to make creating agents easier: * `BaseAgent`: The fundamental class most agents inherit from. It provides common setup. * `ClosureAgent`: A very quick way to create simple agents using just a function (like hiring a temp worker for a specific task defined on the spot). * `RoutedAgent`: An agent that can automatically direct different types of messages to different internal handler methods (like a smart receptionist). ## Use Case Example: Researcher and Writer Let's revisit our blog post example. We want a `Researcher` agent and a `Writer` agent. **Goal:** 1. Tell the `Researcher` a topic (e.g., "AutoGen Agents"). 2. The `Researcher` finds some facts (we'll keep it simple and just make them up for now). 3. The `Researcher` sends these facts to the `Writer`. 4. The `Writer` receives the facts and drafts a short post. **Simplified Implementation Idea (using `ClosureAgent` for brevity):** First, let's define the messages they might exchange: ```python from dataclasses import dataclass @dataclass class ResearchTopic: topic: str @dataclass class ResearchFacts: topic: str facts: list[str] @dataclass class DraftPost: topic: str draft: str ``` These are simple Python classes to hold the data being passed around. Now, let's imagine defining the `Researcher` using a `ClosureAgent`. This agent will listen for `ResearchTopic` messages. ```python # Simplified concept - requires AgentRuntime (Chapter 3) to actually run async def researcher_logic(agent_context, message: ResearchTopic, msg_context): print(f"Researcher received topic: {message.topic}") # In a real scenario, this would involve searching, calling an LLM, etc. # For now, we just make up facts. facts = [f"Fact 1 about {message.topic}", f"Fact 2 about {message.topic}"] print(f"Researcher found facts: {facts}") # Find the Writer agent's ID (we assume we know it) writer_id = AgentId(type="writer", key="blog_writer_1") # Send the facts to the Writer await agent_context.send_message( message=ResearchFacts(topic=message.topic, facts=facts), recipient=writer_id, ) print("Researcher sent facts to Writer.") # This agent doesn't return a direct reply return None ``` This `researcher_logic` function defines *what* the researcher does when it gets a `ResearchTopic` message. It processes the topic, creates `ResearchFacts`, and uses `agent_context.send_message` to send them to the `writer` agent. Similarly, the `Writer` agent would have its own logic: ```python # Simplified concept - requires AgentRuntime (Chapter 3) to actually run async def writer_logic(agent_context, message: ResearchFacts, msg_context): print(f"Writer received facts for topic: {message.topic}") # In a real scenario, this would involve LLM prompting draft = f"Blog Post about {message.topic}:\n" for fact in message.facts: draft += f"- {fact}\n" print(f"Writer drafted post:\n{draft}") # Perhaps save the draft or send it somewhere else # For now, we just print it. We don't send another message. return None # Or maybe return a confirmation/result ``` This `writer_logic` function defines how the writer reacts to receiving `ResearchFacts`. **Important:** To actually *run* these agents and make them communicate, we need the `AgentRuntime` (covered in [Chapter 3: AgentRuntime](03_agentruntime.md)) and the `Messaging System` (covered in [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md)). For now, focus on the *idea* that Agents are distinct workers defined by their logic (`on_message`) and identified by their `AgentId`. ## Under the Hood: How an Agent Gets a Message While the full message delivery involves the `Messaging System` and `AgentRuntime`, let's look at the agent's role when it receives a message. **Conceptual Flow:** ```mermaid sequenceDiagram participant Sender as Sender Agent participant Runtime as AgentRuntime participant Recipient as Recipient Agent Sender->>+Runtime: send_message(message, recipient_id) Runtime->>+Recipient: Locate agent by recipient_id Runtime->>+Recipient: on_message(message, context) Recipient->>Recipient: Process message using internal logic alt Response Needed Recipient->>-Runtime: Return response value Runtime->>-Sender: Deliver response value else No Response Recipient->>-Runtime: Return None (or no return) end ``` 1. Some other agent (Sender) or the system decides to send a message to our agent (Recipient). 2. It tells the `AgentRuntime` (the manager): "Deliver this `message` to the agent with `recipient_id`". 3. The `AgentRuntime` finds the correct `Recipient` agent instance. 4. The `AgentRuntime` calls the `Recipient.on_message(message, context)` method. 5. The agent's internal logic inside `on_message` (or methods called by it, like in `RoutedAgent`) runs to process the message. 6. If the message requires a direct response (like an RPC call), the agent returns a value from `on_message`. If not (like a general notification or event), it might return `None`. **Code Glimpse:** The core definition is the `Agent` Protocol (`_agent.py`). It's like an interface or a contract – any class wanting to be an Agent *must* provide these methods. ```python # From: _agent.py - The Agent blueprint (Protocol) @runtime_checkable class Agent(Protocol): @property def metadata(self) -> AgentMetadata: ... @property def id(self) -> AgentId: ... async def on_message(self, message: Any, ctx: MessageContext) -> Any: ... async def save_state(self) -> Mapping[str, Any]: ... async def load_state(self, state: Mapping[str, Any]) -> None: ... async def close(self) -> None: ... ``` Most agents you create will inherit from `BaseAgent` (`_base_agent.py`). It provides some standard setup: ```python # From: _base_agent.py (Simplified) class BaseAgent(ABC, Agent): def __init__(self, description: str) -> None: # Gets runtime & id from a special context when created by the runtime # Raises error if you try to create it directly! self._runtime: AgentRuntime = AgentInstantiationContext.current_runtime() self._id: AgentId = AgentInstantiationContext.current_agent_id() self._description = description # ... # This is the final version called by the runtime @final async def on_message(self, message: Any, ctx: MessageContext) -> Any: # It calls the implementation method you need to write return await self.on_message_impl(message, ctx) # You MUST implement this in your subclass @abstractmethod async def on_message_impl(self, message: Any, ctx: MessageContext) -> Any: ... # Helper to send messages easily async def send_message(self, message: Any, recipient: AgentId, ...) -> Any: # It just asks the runtime to do the actual sending return await self._runtime.send_message( message, sender=self.id, recipient=recipient, ... ) # ... other methods like publish_message, save_state, load_state ``` Notice how `BaseAgent` handles getting its `id` and `runtime` during creation and provides a convenient `send_message` method that uses the runtime. When inheriting from `BaseAgent`, you primarily focus on implementing the `on_message_impl` method to define your agent's unique behavior. ## Next Steps You now understand the core concept of an `Agent` in AutoGen Core! It's the fundamental worker unit with an identity, the ability to process messages, and optionally maintain state. In the next chapters, we'll explore: * [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md): How messages actually travel between agents. * [Chapter 3: AgentRuntime](03_agentruntime.md): The manager responsible for creating, running, and connecting agents. Let's continue building your understanding! --- Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) ## /docs/AutoGen Core/02_messaging_system__topic___subscription_.md --- layout: default title: "Messaging System" parent: "AutoGen Core" nav_order: 2 --- # Chapter 2: Messaging System (Topic & Subscription) In [Chapter 1: Agent](01_agent.md), we learned about Agents as individual workers. But how do they coordinate when one agent doesn't know exactly *who* needs the information it produces? Imagine our Researcher finds some facts. Maybe the Writer needs them, but maybe a Fact-Checker agent or a Summary agent also needs them later. How can the Researcher just announce "Here are the facts!" without needing a specific mailing list? This is where the **Messaging System**, specifically **Topics** and **Subscriptions**, comes in. It allows agents to broadcast messages to anyone interested, like posting on a company announcement board. ## Motivation: Broadcasting Information Let's refine our blog post example: 1. The `Researcher` agent finds facts about "AutoGen Agents". 2. Instead of sending *directly* to the `Writer`, the `Researcher` **publishes** these facts to a general "research-results" **Topic**. 3. The `Writer` agent has previously told the system it's **subscribed** to the "research-results" Topic. 4. The system sees the new message on the Topic and delivers it to the `Writer` (and any other subscribers). This way, the `Researcher` doesn't need to know who the `Writer` is, or even if a `Writer` exists! It just broadcasts the results. If we later add a `FactChecker` agent that also needs the results, it simply subscribes to the same Topic. ## Key Concepts: Topics and Subscriptions Let's break down the components of this broadcasting system: 1. **Topic (`TopicId`): The Announcement Board** * A `TopicId` represents a specific channel or category for messages. Think of it like the name of an announcement board (e.g., "Project Updates", "General Announcements"). * It has two main parts: * `type`: What *kind* of event or information is this? (e.g., "research.completed", "user.request"). This helps categorize messages. * `source`: *Where* or *why* did this event originate? Often, this relates to the specific task or context (e.g., the specific blog post being researched like "autogen-agents-blog-post", or the team generating the event like "research-team"). ```python # From: _topic.py (Simplified) from dataclasses import dataclass @dataclass(frozen=True) # Immutable: can't change after creation class TopicId: type: str source: str def __str__(self) -> str: # Creates an id like "research.completed/autogen-agents-blog-post" return f"{self.type}/{self.source}" ``` This structure allows for flexible filtering. Agents might subscribe to all topics of a certain `type`, regardless of the `source`, or only to topics with a specific `source`. 2. **Publishing: Posting the Announcement** * When an agent has information to share broadly, it *publishes* a message to a specific `TopicId`. * This is like pinning a note to the designated announcement board. The agent doesn't need to know who will read it. 3. **Subscription (`Subscription`): Signing Up for Updates** * A `Subscription` is how an agent declares its interest in certain `TopicId`s. * It acts like a rule: "If a message is published to a Topic that matches *this pattern*, please deliver it to *this kind of agent*". * The `Subscription` links a `TopicId` pattern (e.g., "all topics with type `research.completed`") to an `AgentId` (or a way to determine the `AgentId`). 4. **Routing: Delivering the Mail** * The `AgentRuntime` (the system manager we'll meet in [Chapter 3: AgentRuntime](03_agentruntime.md)) keeps track of all active `Subscription`s. * When a message is published to a `TopicId`, the `AgentRuntime` checks which `Subscription`s match that `TopicId`. * For each match, it uses the `Subscription`'s rule to figure out which specific `AgentId` should receive the message and delivers it. ## Use Case Example: Researcher Publishes, Writer Subscribes Let's see how our Researcher and Writer can use this system. **Goal:** Researcher publishes facts to a topic, Writer receives them via subscription. **1. Define the Topic:** We need a `TopicId` for research results. Let's say the `type` is "research.facts.available" and the `source` identifies the specific research task (e.g., "blog-post-autogen"). ```python # From: _topic.py from autogen_core import TopicId # Define the topic for this specific research task research_topic_id = TopicId(type="research.facts.available", source="blog-post-autogen") print(f"Topic ID: {research_topic_id}") # Output: Topic ID: research.facts.available/blog-post-autogen ``` This defines the "announcement board" we'll use. **2. Researcher Publishes:** The `Researcher` agent, after finding facts, will use its `agent_context` (provided by the runtime) to publish the `ResearchFacts` message to this topic. ```python # Simplified concept - Researcher agent logic # Assume 'agent_context' and 'message' (ResearchTopic) are provided # Define the facts message (from Chapter 1) @dataclass class ResearchFacts: topic: str facts: list[str] async def researcher_publish_logic(agent_context, message: ResearchTopic, msg_context): print(f"Researcher working on: {message.topic}") facts_data = ResearchFacts( topic=message.topic, facts=[f"Fact A about {message.topic}", f"Fact B about {message.topic}"] ) # Define the specific topic for this task's results results_topic = TopicId(type="research.facts.available", source=message.topic) # Use message topic as source # Publish the facts to the topic await agent_context.publish_message(message=facts_data, topic_id=results_topic) print(f"Researcher published facts to topic: {results_topic}") # No direct reply needed return None ``` Notice the `agent_context.publish_message` call. The Researcher doesn't specify a recipient, only the topic. **3. Writer Subscribes:** The `Writer` agent needs to tell the system it's interested in messages on topics like "research.facts.available". We can use a predefined `Subscription` type called `TypeSubscription`. This subscription typically means: "I am interested in all topics with this *exact type*. When a message arrives, create/use an agent of *my type* whose `key` matches the topic's `source`." ```python # From: _type_subscription.py (Simplified Concept) from autogen_core import TypeSubscription, BaseAgent class WriterAgent(BaseAgent): # ... agent implementation ... async def on_message_impl(self, message: ResearchFacts, ctx): # This method gets called when a subscribed message arrives print(f"Writer ({self.id}) received facts via subscription: {message.facts}") # ... process facts and write draft ... # How the Writer subscribes (usually done during runtime setup - Chapter 3) # This tells the runtime: "Messages on topics with type 'research.facts.available' # should go to a 'writer' agent whose key matches the topic source." writer_subscription = TypeSubscription( topic_type="research.facts.available", agent_type="writer" # The type of agent that should handle this ) print(f"Writer subscription created for topic type: {writer_subscription.topic_type}") # Output: Writer subscription created for topic type: research.facts.available ``` When the `Researcher` publishes to `TopicId(type="research.facts.available", source="blog-post-autogen")`, the `AgentRuntime` will see that `writer_subscription` matches the `topic_type`. It will then use the rule: "Find (or create) an agent with `AgentId(type='writer', key='blog-post-autogen')` and deliver the message." **Benefit:** Decoupling! The Researcher just broadcasts. The Writer just listens for relevant broadcasts. We can add more listeners (like a `FactChecker` subscribing to the same `topic_type`) without changing the `Researcher` at all. ## Under the Hood: How Publishing Works Let's trace the journey of a published message. **Conceptual Flow:** ```mermaid sequenceDiagram participant Publisher as Publisher Agent participant Runtime as AgentRuntime participant SubRegistry as Subscription Registry participant Subscriber as Subscriber Agent Publisher->>+Runtime: publish_message(message, topic_id) Runtime->>+SubRegistry: Find subscriptions matching topic_id SubRegistry-->>-Runtime: Return list of matching Subscriptions loop For each matching Subscription Runtime->>Subscription: map_to_agent(topic_id) Subscription-->>Runtime: Return target AgentId Runtime->>+Subscriber: Locate/Create Agent instance by AgentId Runtime->>Subscriber: on_message(message, context) Subscriber-->>-Runtime: Process message (optional return) end Runtime-->>-Publisher: Return (usually None for publish) ``` 1. **Publish:** An agent calls `agent_context.publish_message(message, topic_id)`. This internally calls the `AgentRuntime`'s publish method. 2. **Lookup:** The `AgentRuntime` takes the `topic_id` and consults its internal `Subscription Registry`. 3. **Match:** The Registry checks all registered `Subscription` objects. Each `Subscription` has an `is_match(topic_id)` method. The registry finds all subscriptions where `is_match` returns `True`. 4. **Map:** For each matching `Subscription`, the Runtime calls its `map_to_agent(topic_id)` method. This method returns the specific `AgentId` that should handle this message based on the subscription rule and the topic details. 5. **Deliver:** The `AgentRuntime` finds the agent instance corresponding to the returned `AgentId` (potentially creating it if it doesn't exist yet, especially with `TypeSubscription`). It then calls that agent's `on_message` method, delivering the original published `message`. **Code Glimpse:** * **`TopicId` (`_topic.py`):** As shown before, a simple dataclass holding `type` and `source`. It includes validation to ensure the `type` follows certain naming conventions. ```python # From: _topic.py @dataclass(eq=True, frozen=True) class TopicId: type: str source: str # ... validation and __str__ ... @classmethod def from_str(cls, topic_id: str) -> Self: # Helper to parse "type/source" string # ... implementation ... ``` * **`Subscription` Protocol (`_subscription.py`):** This defines the *contract* for any subscription rule. ```python # From: _subscription.py (Simplified Protocol) from typing import Protocol # ... other imports class Subscription(Protocol): @property def id(self) -> str: ... # Unique ID for this subscription instance def is_match(self, topic_id: TopicId) -> bool: """Check if a topic matches this subscription's rule.""" ... def map_to_agent(self, topic_id: TopicId) -> AgentId: """Determine the target AgentId if is_match was True.""" ... ``` Any class implementing these methods can act as a subscription rule. * **`TypeSubscription` (`_type_subscription.py`):** A common implementation of the `Subscription` protocol. ```python # From: _type_subscription.py (Simplified) class TypeSubscription(Subscription): def __init__(self, topic_type: str, agent_type: str, ...): self._topic_type = topic_type self._agent_type = agent_type # ... generates a unique self._id ... def is_match(self, topic_id: TopicId) -> bool: # Matches if the topic's type is exactly the one we want return topic_id.type == self._topic_type def map_to_agent(self, topic_id: TopicId) -> AgentId: # Maps to an agent of the specified type, using the # topic's source as the agent's unique key. if not self.is_match(topic_id): raise CantHandleException(...) # Should not happen if used correctly return AgentId(type=self._agent_type, key=topic_id.source) # ... id property ... ``` This implementation provides the "one agent instance per source" behavior for a specific topic type. * **`DefaultSubscription` (`_default_subscription.py`):** This is often used via a decorator (`@default_subscription`) and provides a convenient way to create a `TypeSubscription` where the `agent_type` is automatically inferred from the agent class being defined, and the `topic_type` defaults to "default" (but can be overridden). It simplifies common use cases. ```python # From: _default_subscription.py (Conceptual Usage) from autogen_core import BaseAgent, default_subscription, ResearchFacts @default_subscription # Uses 'default' topic type, infers agent type 'writer' class WriterAgent(BaseAgent): # Agent logic here... async def on_message_impl(self, message: ResearchFacts, ctx): ... # Or specify the topic type @default_subscription(topic_type="research.facts.available") class SpecificWriterAgent(BaseAgent): # Agent logic here... async def on_message_impl(self, message: ResearchFacts, ctx): ... ``` The actual sending (`publish_message`) and routing logic reside within the `AgentRuntime`, which we'll explore next. ## Next Steps You've learned how AutoGen Core uses a publish/subscribe system (`TopicId`, `Subscription`) to allow agents to communicate without direct coupling. This is crucial for building flexible and scalable multi-agent applications. * **Topic (`TopicId`):** Named channels (`type`/`source`) for broadcasting messages. * **Publish:** Sending a message to a Topic. * **Subscription:** An agent's declared interest in messages on certain Topics, defining a routing rule. Now, let's dive into the orchestrator that manages agents and makes this messaging system work: * [Chapter 3: AgentRuntime](03_agentruntime.md): The manager responsible for creating, running, and connecting agents, including handling message publishing and subscription routing. --- Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) ## /docs/AutoGen Core/03_agentruntime.md --- layout: default title: "AgentRuntime" parent: "AutoGen Core" nav_order: 3 --- # Chapter 3: AgentRuntime - The Office Manager In [Chapter 1: Agent](01_agent.md), we met the workers (`Agent`) of our system. In [Chapter 2: Messaging System](02_messaging_system__topic___subscription_.md), we saw how they can communicate broadly using topics and subscriptions. But who hires these agents? Who actually delivers the messages, whether direct or published? And who keeps the whole system running smoothly? This is where the **`AgentRuntime`** comes in. It's the central nervous system, the operating system, or perhaps the most fitting analogy: **the office manager** for all your agents. ## Motivation: Why Do We Need an Office Manager? Imagine an office full of employees (Agents). You have researchers, writers, maybe coders. * How does a new employee get hired and set up? * When one employee wants to send a memo directly to another, who makes sure it gets to the right desk? * When someone posts an announcement on the company bulletin board (publishes to a topic), who ensures everyone who signed up for that type of announcement sees it? * Who starts the workday and ensures everything keeps running? Without an office manager, it would be chaos! The `AgentRuntime` serves this crucial role in AutoGen Core. It handles: 1. **Agent Creation:** "Onboarding" new agents when they are needed. 2. **Message Routing:** Delivering direct messages (`send_message`) and published messages (`publish_message`). 3. **Lifecycle Management:** Starting, running, and stopping the whole system. 4. **State Management:** Keeping track of the overall system state (optional). ## Key Concepts: Understanding the Manager's Job Let's break down the main responsibilities of the `AgentRuntime`: 1. **Agent Instantiation (Hiring):** * You don't usually create agent objects directly (like `my_agent = ResearcherAgent()`). Why? Because the agent needs to know *about* the runtime (the office it works in) to send messages, publish announcements, etc. * Instead, you tell the `AgentRuntime`: "I need an agent of type 'researcher'. Here's a recipe (a **factory function**) for how to create one." This is done using `runtime.register_factory(...)`. * When a message needs to go to a 'researcher' agent with a specific key (e.g., 'researcher-01'), the runtime checks if it already exists. If not, it uses the registered factory function to create (instantiate) the agent. * **Crucially**, while creating the agent, the runtime provides special context (`AgentInstantiationContext`) so the new agent automatically gets its unique `AgentId` and a reference to the `AgentRuntime` itself. This is like giving a new employee their ID badge and telling them who the office manager is. ```python # Simplified Concept - How a BaseAgent gets its ID and runtime access # From: _agent_instantiation.py and _base_agent.py # Inside the agent's __init__ method (when inheriting from BaseAgent): class MyAgent(BaseAgent): def __init__(self, description: str): # This magic happens *because* the AgentRuntime is creating the agent # inside a special context. self._runtime = AgentInstantiationContext.current_runtime() # Gets the manager self._id = AgentInstantiationContext.current_agent_id() # Gets its own ID self._description = description # ... rest of initialization ... ``` This ensures agents are properly integrated into the system from the moment they are created. 2. **Message Delivery (Mail Room):** * **Direct Send (`send_message`):** When an agent calls `await agent_context.send_message(message, recipient_id)`, it's actually telling the `AgentRuntime`, "Please deliver this `message` directly to the agent identified by `recipient_id`." The runtime finds the recipient agent (creating it if necessary) and calls its `on_message` method. It's like putting a specific name on an envelope and handing it to the mail room. * **Publish (`publish_message`):** When an agent calls `await agent_context.publish_message(message, topic_id)`, it tells the runtime, "Post this `message` to the announcement board named `topic_id`." The runtime then checks its list of **subscriptions** (who signed up for which boards). For every matching subscription, it figures out the correct recipient agent(s) (based on the subscription rule) and delivers the message to their `on_message` method. 3. **Lifecycle Management (Opening/Closing the Office):** * The runtime needs to be started to begin processing messages. Typically, you call `runtime.start()`. This usually kicks off a background process or loop that watches for incoming messages. * When work is done, you need to stop the runtime gracefully. `runtime.stop_when_idle()` is common – it waits until all messages currently in the queue have been processed, then stops. `runtime.stop()` stops more abruptly. 4. **State Management (Office Records):** * The runtime can save the state of *all* the agents it manages (`runtime.save_state()`) and load it back later (`runtime.load_state()`). This is useful for pausing and resuming complex multi-agent interactions. It can also save/load state for individual agents (`runtime.agent_save_state()` / `runtime.agent_load_state()`). We'll touch more on state in [Chapter 7: Memory](07_memory.md). ## Use Case Example: Running Our Researcher and Writer Let's finally run the Researcher/Writer scenario from Chapters 1 and 2. We need the `AgentRuntime` to make it happen. **Goal:** 1. Create a runtime. 2. Register factories for a 'researcher' and a 'writer' agent. 3. Tell the runtime that 'writer' agents are interested in "research.facts.available" topics (add subscription). 4. Start the runtime. 5. Send an initial `ResearchTopic` message to a 'researcher' agent. 6. Let the system run (Researcher publishes facts, Runtime delivers to Writer via subscription, Writer processes). 7. Stop the runtime when idle. **Code Snippets (Simplified):** ```python # 0. Imports and Message Definitions (from previous chapters) import asyncio from dataclasses import dataclass from autogen_core import ( AgentId, BaseAgent, SingleThreadedAgentRuntime, TopicId, MessageContext, TypeSubscription, AgentInstantiationContext ) @dataclass class ResearchTopic: topic: str @dataclass class ResearchFacts: topic: str; facts: list[str] ``` These are the messages our agents will exchange. ```python # 1. Define Agent Logic (using BaseAgent) class ResearcherAgent(BaseAgent): async def on_message_impl(self, message: ResearchTopic, ctx: MessageContext): print(f"Researcher ({self.id}) got topic: {message.topic}") facts = [f"Fact 1 about {message.topic}", f"Fact 2"] results_topic = TopicId("research.facts.available", message.topic) # Use the runtime (via self.publish_message helper) to publish await self.publish_message( ResearchFacts(topic=message.topic, facts=facts), results_topic ) print(f"Researcher ({self.id}) published facts to {results_topic}") class WriterAgent(BaseAgent): async def on_message_impl(self, message: ResearchFacts, ctx: MessageContext): print(f"Writer ({self.id}) received facts via topic '{ctx.topic_id}': {message.facts}") draft = f"Draft for {message.topic}: {'; '.join(message.facts)}" print(f"Writer ({self.id}) created draft: '{draft}'") # This agent doesn't send further messages in this example ``` Here we define the behavior of our two agent types, inheriting from `BaseAgent` which gives us `self.id`, `self.publish_message`, etc. ```python # 2. Define Agent Factories def researcher_factory(): # Gets runtime/id via AgentInstantiationContext inside BaseAgent.__init__ print("Runtime is creating a ResearcherAgent...") return ResearcherAgent(description="I research topics.") def writer_factory(): print("Runtime is creating a WriterAgent...") return WriterAgent(description="I write drafts from facts.") ``` These simple functions tell the runtime *how* to create instances of our agents when needed. ```python # 3. Setup and Run the Runtime async def main(): # Create the runtime (the office manager) runtime = SingleThreadedAgentRuntime() # Register the factories (tell the manager how to hire) await runtime.register_factory("researcher", researcher_factory) await runtime.register_factory("writer", writer_factory) print("Registered agent factories.") # Add the subscription (tell manager who listens to which announcements) # Rule: Messages to topics of type "research.facts.available" # should go to a "writer" agent whose key matches the topic source. writer_sub = TypeSubscription(topic_type="research.facts.available", agent_type="writer") await runtime.add_subscription(writer_sub) print(f"Added subscription: {writer_sub.id}") # Start the runtime (open the office) runtime.start() print("Runtime started.") # Send the initial message to kick things off research_task_topic = "AutoGen Agents" researcher_instance_id = AgentId(type="researcher", key=research_task_topic) print(f"Sending initial topic '{research_task_topic}' to {researcher_instance_id}") await runtime.send_message( message=ResearchTopic(topic=research_task_topic), recipient=researcher_instance_id, ) # Wait until all messages are processed (wait for work day to end) print("Waiting for runtime to become idle...") await runtime.stop_when_idle() print("Runtime stopped.") # Run the main function asyncio.run(main()) ``` This script sets up the `SingleThreadedAgentRuntime`, registers the blueprints (factories) and communication rules (subscription), starts the process, and then shuts down cleanly. **Expected Output (Conceptual Order):** ``` Registered agent factories. Added subscription: type=research.facts.available=>agent=writer Runtime started. Sending initial topic 'AutoGen Agents' to researcher/AutoGen Agents Waiting for runtime to become idle... Runtime is creating a ResearcherAgent... # First time researcher/AutoGen Agents is needed Researcher (researcher/AutoGen Agents) got topic: AutoGen Agents Researcher (researcher/AutoGen Agents) published facts to research.facts.available/AutoGen Agents Runtime is creating a WriterAgent... # First time writer/AutoGen Agents is needed (due to subscription) Writer (writer/AutoGen Agents) received facts via topic 'research.facts.available/AutoGen Agents': ['Fact 1 about AutoGen Agents', 'Fact 2'] Writer (writer/AutoGen Agents) created draft: 'Draft for AutoGen Agents: Fact 1 about AutoGen Agents; Fact 2' Runtime stopped. ``` You can see the runtime orchestrating the creation of agents and the flow of messages based on the initial request and the subscription rule. ## Under the Hood: How the Manager Works Let's peek inside the `SingleThreadedAgentRuntime` (a common implementation provided by AutoGen Core) to understand the flow. **Core Idea:** It uses an internal queue (`_message_queue`) to hold incoming requests (`send_message`, `publish_message`). A background task continuously takes items from the queue and processes them one by one (though the *handling* of a message might involve `await` and allow other tasks to run). **1. Agent Creation (`_get_agent`, `_invoke_agent_factory`)** When the runtime needs an agent instance (e.g., to deliver a message) that hasn't been created yet: ```mermaid sequenceDiagram participant Runtime as AgentRuntime participant Factory as Agent Factory Func participant AgentCtx as AgentInstantiationContext participant Agent as New Agent Instance Runtime->>Runtime: Check if agent instance exists (e.g., in `_instantiated_agents` dict) alt Agent Not Found Runtime->>Runtime: Find registered factory for agent type Runtime->>AgentCtx: Set current runtime & agent_id activate AgentCtx Runtime->>Factory: Call factory function() activate Factory Factory->>AgentCtx: (Inside Agent.__init__) Get current runtime AgentCtx-->>Factory: Return runtime Factory->>AgentCtx: (Inside Agent.__init__) Get current agent_id AgentCtx-->>Factory: Return agent_id Factory-->>Runtime: Return new Agent instance deactivate Factory Runtime->>AgentCtx: Clear context deactivate AgentCtx Runtime->>Runtime: Store new agent instance end Runtime->>Runtime: Return agent instance ``` * The runtime looks up the factory function registered for the required `AgentId.type`. * It uses `AgentInstantiationContext.populate_context` to temporarily store its own reference and the target `AgentId`. * It calls the factory function. * Inside the agent's `__init__` (usually via `BaseAgent`), `AgentInstantiationContext.current_runtime()` and `AgentInstantiationContext.current_agent_id()` are called to retrieve the context set by the runtime. * The factory returns the fully initialized agent instance. * The runtime stores this instance for future use. ```python # From: _agent_instantiation.py (Simplified) class AgentInstantiationContext: _CONTEXT_VAR = ContextVar("agent_context") # Stores (runtime, agent_id) @classmethod @contextmanager def populate_context(cls, ctx: tuple[AgentRuntime, AgentId]): token = cls._CONTEXT_VAR.set(ctx) # Store context for this block try: yield # Code inside the 'with' block runs here finally: cls._CONTEXT_VAR.reset(token) # Clean up context @classmethod def current_runtime(cls) -> AgentRuntime: return cls._CONTEXT_VAR.get()[0] # Retrieve runtime from context @classmethod def current_agent_id(cls) -> AgentId: return cls._CONTEXT_VAR.get()[1] # Retrieve agent_id from context ``` This context manager pattern ensures the correct runtime and ID are available *only* during the agent's creation by the runtime. **2. Direct Messaging (`send_message` -> `_process_send`)** ```mermaid sequenceDiagram participant Sender as Sending Agent/Code participant Runtime as AgentRuntime participant Queue as Internal Queue participant Recipient as Recipient Agent Sender->>+Runtime: send_message(msg, recipient_id, ...) Runtime->>Runtime: Create Future (for response) Runtime->>+Queue: Put SendMessageEnvelope(msg, recipient_id, future) Runtime-->>-Sender: Return awaitable Future Note over Queue, Runtime: Background task picks up envelope Runtime->>Runtime: _process_send(envelope) Runtime->>+Recipient: _get_agent(recipient_id) (creates if needed) Recipient-->>-Runtime: Return Agent instance Runtime->>+Recipient: on_message(msg, context) Recipient->>Recipient: Process message... Recipient-->>-Runtime: Return response value Runtime->>Runtime: Set Future result with response value ``` * `send_message` creates a `Future` object (a placeholder for the eventual result) and wraps the message details in a `SendMessageEnvelope`. * This envelope is put onto the internal `_message_queue`. * The background task picks up the envelope. * `_process_send` gets the recipient agent instance (using `_get_agent`). * It calls the recipient's `on_message` method. * When `on_message` returns a result, `_process_send` sets the result on the `Future` object, which makes the original `await runtime.send_message(...)` call return the value. **3. Publish/Subscribe (`publish_message` -> `_process_publish`)** ```mermaid sequenceDiagram participant Publisher as Publishing Agent/Code participant Runtime as AgentRuntime participant Queue as Internal Queue participant SubManager as SubscriptionManager participant Subscriber as Subscribed Agent Publisher->>+Runtime: publish_message(msg, topic_id, ...) Runtime->>+Queue: Put PublishMessageEnvelope(msg, topic_id) Runtime-->>-Publisher: Return (None for publish) Note over Queue, Runtime: Background task picks up envelope Runtime->>Runtime: _process_publish(envelope) Runtime->>+SubManager: get_subscribed_recipients(topic_id) SubManager->>SubManager: Find matching subscriptions SubManager->>SubManager: Map subscriptions to AgentIds SubManager-->>-Runtime: Return list of recipient AgentIds loop For each recipient AgentId Runtime->>+Subscriber: _get_agent(recipient_id) (creates if needed) Subscriber-->>-Runtime: Return Agent instance Runtime->>+Subscriber: on_message(msg, context with topic_id) Subscriber->>Subscriber: Process message... Subscriber-->>-Runtime: Return (usually None for publish) end ``` * `publish_message` wraps the message in a `PublishMessageEnvelope` and puts it on the queue. * The background task picks it up. * `_process_publish` asks the `SubscriptionManager` (`_subscription_manager`) for all `AgentId`s that are subscribed to the given `topic_id`. * The `SubscriptionManager` checks its registered `Subscription` objects (`_subscriptions` list, added via `add_subscription`). For each `Subscription` where `is_match(topic_id)` is true, it calls `map_to_agent(topic_id)` to get the target `AgentId`. * For each resulting `AgentId`, the runtime gets the agent instance and calls its `on_message` method, providing the `topic_id` in the `MessageContext`. ```python # From: _runtime_impl_helpers.py (SubscriptionManager simplified) class SubscriptionManager: def __init__(self): self._subscriptions: List[Subscription] = [] # Optimization cache can be added here async def add_subscription(self, subscription: Subscription): self._subscriptions.append(subscription) # Clear cache if any async def get_subscribed_recipients(self, topic: TopicId) -> List[AgentId]: recipients = [] for sub in self._subscriptions: if sub.is_match(topic): recipients.append(sub.map_to_agent(topic)) return recipients ``` The `SubscriptionManager` simply iterates through registered subscriptions to find matches when a message is published. ## Next Steps You now understand the `AgentRuntime` - the essential coordinator that brings Agents to life, manages their communication, and runs the entire show. It handles agent creation via factories, routes direct and published messages, and manages the system's lifecycle. With the core concepts of `Agent`, `Messaging`, and `AgentRuntime` covered, we can start looking at more specialized building blocks. Next, we'll explore how agents can use external capabilities: * [Chapter 4: Tool](04_tool.md): How to give agents tools (like functions or APIs) to perform specific actions beyond just processing messages. --- Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) ## /docs/AutoGen Core/04_tool.md --- layout: default title: "Tool" parent: "AutoGen Core" nav_order: 4 --- # Chapter 4: Tool - Giving Agents Specific Capabilities In the previous chapters, we learned about Agents as workers ([Chapter 1](01_agent.md)), how they can communicate directly or using announcements ([Chapter 2](02_messaging_system__topic___subscription_.md)), and the `AgentRuntime` that manages them ([Chapter 3](03_agentruntime.md)). Agents can process messages and coordinate, but what if an agent needs to perform a very specific action, like looking up information online, running a piece of code, accessing a database, or even just finding out the current date? They need specialized *capabilities*. This is where the concept of a **Tool** comes in. ## Motivation: Agents Need Skills! Imagine our `Writer` agent from before. It receives facts and writes a draft. Now, let's say we want the `Writer` (or perhaps a smarter `Assistant` agent helping it) to always include the current date in the blog post title. How does the agent get the current date? It doesn't inherently know it. It needs a specific *skill* or *tool* for that. A `Tool` in AutoGen Core represents exactly this: a specific, well-defined capability that an Agent can use. Think of it like giving an employee (Agent) a specialized piece of equipment (Tool), like a calculator, a web browser, or a calendar lookup program. ## Key Concepts: Understanding Tools Let's break down what defines a Tool: 1. **It's a Specific Capability:** A Tool performs one well-defined task. Examples: * `search_web(query: str)` * `run_python_code(code: str)` * `get_stock_price(ticker: str)` * `get_current_date()` 2. **It Has a Schema (The Manual):** This is crucial! For an Agent (especially one powered by a Large Language Model - LLM) to know *when* and *how* to use a tool, the tool needs a clear description or "manual". This is called the `ToolSchema`. It typically includes: * **`name`**: A unique identifier for the tool (e.g., `get_current_date`). * **`description`**: A clear explanation of what the tool does, which helps the LLM decide if this tool is appropriate for the current task (e.g., "Fetches the current date in YYYY-MM-DD format"). * **`parameters`**: Defines what inputs the tool needs. This is itself a schema (`ParametersSchema`) describing the input fields, their types, and which ones are required. For our `get_current_date` example, it might need no parameters. For `get_stock_price`, it would need a `ticker` parameter of type string. ```python # From: tools/_base.py (Simplified Concept) from typing import TypedDict, Dict, Any, Sequence, NotRequired class ParametersSchema(TypedDict): type: str # Usually "object" properties: Dict[str, Any] # Defines input fields and their types required: NotRequired[Sequence[str]] # List of required field names class ToolSchema(TypedDict): name: str description: NotRequired[str] parameters: NotRequired[ParametersSchema] # 'strict' flag also possible (Chapter 5 related) ``` This schema allows an LLM to understand: "Ah, there's a tool called `get_current_date` that takes no inputs and gives me the current date. I should use that now!" 3. **It Can Be Executed:** Once an agent decides to use a tool (often based on the schema), there needs to be a mechanism to actually *run* the tool's underlying function and get the result. ## Use Case Example: Adding a `get_current_date` Tool Let's equip an agent with the ability to find the current date. **Goal:** Define a tool that gets the current date and show how it could be executed by a specialized agent. **Step 1: Define the Python Function** First, we need the actual Python code that performs the action. ```python # File: get_date_function.py import datetime def get_current_date() -> str: """Fetches the current date as a string.""" today = datetime.date.today() return today.isoformat() # Returns date like "2023-10-27" # Test the function print(f"Function output: {get_current_date()}") ``` This is a standard Python function. It takes no arguments and returns the date as a string. **Step 2: Wrap it as a `FunctionTool`** AutoGen Core provides a convenient way to turn a Python function like this into a `Tool` object using `FunctionTool`. It automatically inspects the function's signature (arguments and return type) and docstring to help build the `ToolSchema`. ```python # File: create_date_tool.py from autogen_core.tools import FunctionTool from get_date_function import get_current_date # Import our function # Create the Tool instance # We provide the function and a clear description for the LLM date_tool = FunctionTool( func=get_current_date, description="Use this tool to get the current date in YYYY-MM-DD format." # Name defaults to function name 'get_current_date' ) # Let's see what FunctionTool generated print(f"Tool Name: {date_tool.name}") print(f"Tool Description: {date_tool.description}") # The schema defines inputs (none in this case) # print(f"Tool Schema Parameters: {date_tool.schema['parameters']}") # Output (simplified): {'type': 'object', 'properties': {}, 'required': []} ``` `FunctionTool` wraps our `get_current_date` function. It uses the function name as the tool name and the description we provided. It also correctly determines from the function signature that there are no input parameters (`properties: {}`). **Step 3: How an Agent Might Request Tool Use** Now we have a `date_tool`. How is it used? Typically, an LLM-powered agent (which we'll see more of in [Chapter 5: ChatCompletionClient](05_chatcompletionclient.md)) analyzes a request and decides a tool is needed. It then generates a request to *call* that tool, often using a specific message type like `FunctionCall`. ```python # File: tool_call_request.py from autogen_core import FunctionCall # Represents a request to call a tool # Imagine an LLM agent decided to use the date tool. # It constructs this message, providing the tool name and arguments (as JSON string). date_call_request = FunctionCall( id="call_date_001", # A unique ID for this specific call attempt name="get_current_date", # Matches the Tool's name arguments="{}" # An empty JSON object because no arguments are needed ) print("FunctionCall message:", date_call_request) # Output: FunctionCall(id='call_date_001', name='get_current_date', arguments='{}') ``` This `FunctionCall` message is like a work order: "Please execute the tool named `get_current_date` with these arguments." **Step 4: The `ToolAgent` Executes the Tool** Who receives this `FunctionCall` message? Usually, a specialized agent called `ToolAgent`. You create a `ToolAgent` and give it the list of tools it knows how to execute. When it receives a `FunctionCall`, it finds the matching tool and runs it. ```python # File: tool_agent_example.py import asyncio from autogen_core.tool_agent import ToolAgent from autogen_core.models import FunctionExecutionResult from create_date_tool import date_tool # Import the tool we created from tool_call_request import date_call_request # Import the request message # Create an agent specifically designed to execute tools tool_executor = ToolAgent( description="I can execute tools like getting the date.", tools=[date_tool] # Give it the list of tools it manages ) # --- Simulation of Runtime delivering the message --- # In a real app, the AgentRuntime (Chapter 3) would route the # date_call_request message to this tool_executor agent. # We simulate the call to its message handler here: async def simulate_execution(): # Fake context (normally provided by runtime) class MockContext: cancellation_token = None ctx = MockContext() print(f"ToolAgent received request: {date_call_request.name}") result: FunctionExecutionResult = await tool_executor.handle_function_call( message=date_call_request, ctx=ctx ) print(f"ToolAgent produced result: {result}") asyncio.run(simulate_execution()) ``` **Expected Output:** ``` ToolAgent received request: get_current_date ToolAgent produced result: FunctionExecutionResult(content='2023-10-27', call_id='call_date_001', is_error=False, name='get_current_date') # Date will be current date ``` The `ToolAgent` received the `FunctionCall`, found the `date_tool` in its list, executed the underlying `get_current_date` function, and packaged the result (the date string) into a `FunctionExecutionResult` message. This result message can then be sent back to the agent that originally requested the tool use. ## Under the Hood: How Tool Execution Works Let's visualize the typical flow when an LLM agent decides to use a tool managed by a `ToolAgent`. **Conceptual Flow:** ```mermaid sequenceDiagram participant LLMA as LLM Agent (Decides) participant Caller as Caller Agent (Orchestrates) participant ToolA as ToolAgent (Executes) participant ToolFunc as Tool Function (e.g., get_current_date) Note over LLMA: Analyzes conversation, decides tool needed. LLMA->>Caller: Sends AssistantMessage containing FunctionCall(name='get_current_date', args='{}') Note over Caller: Receives LLM response, sees FunctionCall. Caller->>+ToolA: Uses runtime.send_message(message=FunctionCall, recipient=ToolAgent_ID) Note over ToolA: Receives FunctionCall via on_message. ToolA->>ToolA: Looks up 'get_current_date' in its internal list of Tools. ToolA->>+ToolFunc: Calls tool.run_json(args={}) -> triggers get_current_date() ToolFunc-->>-ToolA: Returns the result (e.g., "2023-10-27") ToolA->>ToolA: Creates FunctionExecutionResult message with the content. ToolA-->>-Caller: Returns FunctionExecutionResult via runtime messaging. Note over Caller: Receives the tool result. Caller->>LLMA: Sends FunctionExecutionResultMessage to LLM for next step. Note over LLMA: Now knows the current date. ``` 1. **Decision:** An LLM-powered agent decides a tool is needed based on the conversation and the available tools' descriptions. It generates a `FunctionCall`. 2. **Request:** A "Caller" agent (often the same LLM agent or a managing agent) sends this `FunctionCall` message to the dedicated `ToolAgent` using the `AgentRuntime`. 3. **Lookup:** The `ToolAgent` receives the message, extracts the tool `name` (`get_current_date`), and finds the corresponding `Tool` object (our `date_tool`) in the list it was configured with. 4. **Execution:** The `ToolAgent` calls the `run_json` method on the `Tool` object, passing the arguments from the `FunctionCall`. For a `FunctionTool`, `run_json` validates the arguments against the generated schema and then executes the original Python function (`get_current_date`). 5. **Result:** The Python function returns its result (the date string). 6. **Response:** The `ToolAgent` wraps this result string in a `FunctionExecutionResult` message, including the original `call_id`, and sends it back to the Caller agent. 7. **Continuation:** The Caller agent typically sends this result back to the LLM agent, allowing the conversation or task to continue with the new information. **Code Glimpse:** * **`Tool` Protocol (`tools/_base.py`):** Defines the basic contract any tool must fulfill. Key methods are `schema` (property returning the `ToolSchema`) and `run_json` (method to execute the tool with JSON-like arguments). * **`BaseTool` (`tools/_base.py`):** An abstract class that helps implement the `Tool` protocol, especially using Pydantic models for defining arguments (`args_type`) and return values (`return_type`). It automatically generates the `parameters` part of the schema from the `args_type` model. * **`FunctionTool` (`tools/_function_tool.py`):** Inherits from `BaseTool`. Its magic lies in automatically creating the `args_type` Pydantic model by inspecting the wrapped Python function's signature (`args_base_model_from_signature`). Its `run` method handles calling the original sync or async Python function. ```python # Inside FunctionTool (Simplified Concept) class FunctionTool(BaseTool[BaseModel, BaseModel]): def __init__(self, func, description, ...): self._func = func self._signature = get_typed_signature(func) # Automatically create Pydantic model for arguments args_model = args_base_model_from_signature(...) # Get return type from signature return_type = self._signature.return_annotation super().__init__(args_model, return_type, ...) async def run(self, args: BaseModel, ...): # Extract arguments from the 'args' model kwargs = args.model_dump() # Call the original Python function (sync or async) result = await self._call_underlying_func(**kwargs) return result # Must match the expected return_type ``` * **`ToolAgent` (`tool_agent/_tool_agent.py`):** A specialized `RoutedAgent`. It registers a handler specifically for `FunctionCall` messages. ```python # Inside ToolAgent (Simplified Concept) class ToolAgent(RoutedAgent): def __init__(self, ..., tools: List[Tool]): super().__init__(...) self._tools = {tool.name: tool for tool in tools} # Store tools by name @message_handler # Registers this for FunctionCall messages async def handle_function_call(self, message: FunctionCall, ctx: MessageContext): # Find the tool by name tool = self._tools.get(message.name) if tool is None: # Handle error: Tool not found raise ToolNotFoundException(...) try: # Parse arguments string into a dictionary arguments = json.loads(message.arguments) # Execute the tool's run_json method result_obj = await tool.run_json(args=arguments, ...) # Convert result object back to string if needed result_str = tool.return_value_as_string(result_obj) # Create the success result message return FunctionExecutionResult(content=result_str, ...) except Exception as e: # Handle execution errors return FunctionExecutionResult(content=f"Error: {e}", is_error=True, ...) ``` Its core logic is: find tool -> parse args -> run tool -> return result/error. ## Next Steps You've learned how **Tools** provide specific capabilities to Agents, defined by a **Schema** that LLMs can understand. We saw how `FunctionTool` makes it easy to wrap existing Python functions and how `ToolAgent` acts as the executor for these tools. This ability for agents to use tools is fundamental to building powerful and versatile AI systems that can interact with the real world or perform complex calculations. Now that agents can use tools, we need to understand more about the agents that *decide* which tools to use, which often involves interacting with Large Language Models: * [Chapter 5: ChatCompletionClient](05_chatcompletionclient.md): How agents interact with LLMs like GPT to generate responses or decide on actions (like calling a tool). * [Chapter 6: ChatCompletionContext](06_chatcompletioncontext.md): How the history of the conversation, including tool calls and results, is managed when talking to an LLM. --- Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.