``` ├── .gitignore ├── CODE_OF_CONDUCT.md ├── CONTRIBUTING.md ├── LICENSE ├── README.md ├── demo/ ├── README.md ├── ui/ ├── README.md ├── __init__.py ├── components/ ├── __init__.py ├── agent_list.py ├── api_key_dialog.py ├── async_poller.js ├── async_poller.py ├── chat_bubble.py ├── conversation.py ├── conversation_list.py ├── dialog.py ├── event_viewer.py ├── form_render.py ├── header.py ├── page_scaffold.py ├── poller.py ├── side_nav.py ├── task_card.py ├── main.py ├── pages/ ├── __init__.py ├── agent_list.py ├── conversation.py ├── event_list.py ├── home.py ├── settings.py ├── task_list.py ├── pyproject.toml ├── service/ ├── __init__.py ├── client/ ├── __init__.py ├── client.py ├── server/ ├── __init__.py ├── adk_host_manager.py ├── application_manager.py ├── in_memory_manager.py ├── server.py ├── test_image.py ``` ## /.gitignore ```gitignore path="/.gitignore" .DS_Store __pycache__ .env ``` ## /CODE_OF_CONDUCT.md # Code of Conduct ## Our Pledge In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation. ## Our Standards Examples of behavior that contributes to creating a positive environment include: * Using welcoming and inclusive language * Being respectful of differing viewpoints and experiences * Gracefully accepting constructive criticism * Focusing on what is best for the community * Showing empathy towards other community members Examples of unacceptable behavior by participants include: * The use of sexualized language or imagery and unwelcome sexual attention or advances * Trolling, insulting/derogatory comments, and personal or political attacks * Public or private harassment * Publishing others' private information, such as a physical or electronic address, without explicit permission * Other conduct which could reasonably be considered inappropriate in a professional setting ## Our Responsibilities Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior. Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful. ## Scope This Code of Conduct applies both within project spaces and in public spaces when an individual is representing the project or its community. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers. This Code of Conduct also applies outside the project spaces when the Project Steward has a reasonable belief that an individual's behavior may have a negative impact on the project or its community. ## Conflict Resolution We do not believe that all conflict is bad; healthy debate and disagreement often yield positive results. However, it is never okay to be disrespectful or to engage in behavior that violates the project’s code of conduct. If you see someone violating the code of conduct, you are encouraged to address the behavior directly with those involved. Many issues can be resolved quickly and easily, and this gives people more control over the outcome of their dispute. If you are unable to resolve the matter for any reason, or if the behavior is threatening or harassing, report it. We are dedicated to providing an environment where participants feel welcome and safe. Reports should be directed to *[PROJECT STEWARD NAME(s) AND EMAIL(s)]*, the Project Steward(s) for *[PROJECT NAME]*. It is the Project Steward’s duty to receive and address reported violations of the code of conduct. They will then work with a committee consisting of representatives from the Open Source Programs Office and the Google Open Source Strategy team. If for any reason you are uncomfortable reaching out to the Project Steward, please email opensource@google.com. We will investigate every complaint, but you may not receive a direct response. We will use our discretion in determining when and how to follow up on reported incidents, which may range from not taking action to permanent expulsion from the project and project-sponsored spaces. We will notify the accused of the report and provide them an opportunity to discuss it before any action is taken. The identity of the reporter will be omitted from the details of the report supplied to the accused. In potentially harmful situations, such as ongoing harassment or threats to anyone's safety, we may take action without notice. ## Attribution This Code of Conduct is adapted from the Contributor Covenant, version 1.4, available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html Note: A version of this file is also available in the [New Project repo](https://github.com/google/new-project/blob/master/docs/code-of-conduct.md). ## /CONTRIBUTING.md # How to contribute We'd love to accept your patches and contributions to this project. ## Before you begin ### Sign our Contributor License Agreement Contributions to this project must be accompanied by a [Contributor License Agreement](https://cla.developers.google.com/about) (CLA). You (or your employer) retain the copyright to your contribution; this simply gives us permission to use and redistribute your contributions as part of the project. If you or your current employer have already signed the Google CLA (even if it was for a different project), you probably don't need to do it again. Visit to see your current agreements or to sign a new one. ### Review our community guidelines This project follows [Google's Open Source Community Guidelines](https://opensource.google/conduct/). ## Contribution process ### Code reviews All submissions, including submissions by project members, require review. We use GitHub pull requests for this purpose. Consult [GitHub Help](https://help.github.com/articles/about-pull-requests/) for more information on using pull requests. ## /LICENSE ``` path="/LICENSE" Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ``` ## /README.md ![image info](images/A2A_banner.png) [![Apache License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE) **_An open protocol enabling communication and interoperability between opaque agentic applications._** - [Agent2Agent Protocol A2A](#agent2agent-protocol-a2a) - [Getting Started](#getting-started) - [Contributing](#contributing) - [What's next](#whats-next) - [About](#about) One of the biggest challenges in enterprise AI adoption is getting agents built on different frameworks and vendors to work together. That’s why we created an open *Agent2Agent (A2A) protocol*, a collaborative way to help agents across different ecosystems communicate with each other. Google is driving this open protocol initiative for the industry because we believe this protocol will be **critical to support multi-agent communication by giving your agents a common language – irrespective of the framework or vendor they are built on**. With *A2A*, agents can show each other their capabilities and negotiate how they will interact with users (via text, forms, or bidirectional audio/video) – all while working securely together. ### **See A2A in Action** Watch [this demo video](https://storage.googleapis.com/gweb-developer-goog-blog-assets/original_videos/A2A_demo_v4.mp4) to see how A2A enables seamless communication between different agent frameworks. ### Conceptual Overview The Agent2Agent (A2A) protocol facilitates communication between independent AI agents. Here are the core concepts: * **Agent Card:** A public metadata file (usually at `/.well-known/agent.json`) describing an agent's capabilities, skills, endpoint URL, and authentication requirements. Clients use this for discovery. * **A2A Server:** An agent exposing an HTTP endpoint that implements the A2A protocol methods (defined in the [json specification](/specification)). It receives requests and manages task execution. * **A2A Client:** An application or another agent that consumes A2A services. It sends requests (like `tasks/send`) to an A2A Server's URL. * **Task:** The central unit of work. A client initiates a task by sending a message (`tasks/send` or `tasks/sendSubscribe`). Tasks have unique IDs and progress through states (`submitted`, `working`, `input-required`, `completed`, `failed`, `canceled`). * **Message:** Represents communication turns between the client (`role: "user"`) and the agent (`role: "agent"`). Messages contain `Parts`. * **Part:** The fundamental content unit within a `Message` or `Artifact`. Can be `TextPart`, `FilePart` (with inline bytes or a URI), or `DataPart` (for structured JSON, e.g., forms). * **Artifact:** Represents outputs generated by the agent during a task (e.g., generated files, final structured data). Artifacts also contain `Parts`. * **Streaming:** For long-running tasks, servers supporting the `streaming` capability can use `tasks/sendSubscribe`. The client receives Server-Sent Events (SSE) containing `TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent` messages, providing real-time progress. * **Push Notifications:** Servers supporting `pushNotifications` can proactively send task updates to a client-provided webhook URL, configured via `tasks/pushNotification/set`. **Typical Flow:** 1. **Discovery:** Client fetches the Agent Card from the server's well-known URL. 2. **Initiation:** Client sends a `tasks/send` or `tasks/sendSubscribe` request containing the initial user message and a unique Task ID. 3. **Processing:** * **(Streaming):** Server sends SSE events (status updates, artifacts) as the task progresses. * **(Non-Streaming):** Server processes the task synchronously and returns the final `Task` object in the response. 4. **Interaction (Optional):** If the task enters `input-required`, the client sends subsequent messages using the same Task ID via `tasks/send` or `tasks/sendSubscribe`. 5. **Completion:** The task eventually reaches a terminal state (`completed`, `failed`, `canceled`). ### **Getting Started** * 📚 Read the [technical documentation](https://google.github.io/A2A/#/documentation) to understand the capabilities * 📝 Review the [json specification](/specification) of the protocol structures * 🎬 Use our [samples](/samples) to see A2A in action * Sample A2A Client/Server ([Python](/samples/python/common), [JS](/samples/js/src)) * [Multi-Agent Web App](/demo/README.md) * CLI ([Python](/samples/python/hosts/cli/README.md), [JS](/samples/js/README.md)) * 🤖 Use our [sample agents](/samples/python/agents/README.md) to see how to bring A2A to agent frameworks * [Agent Development Kit (ADK)](/samples/python/agents/google_adk/README.md) * [CrewAI](/samples/python/agents/crewai/README.md) * [LangGraph](/samples/python/agents/langgraph/README.md) * [Genkit](/samples/js/src/agents/README.md) * [LlamaIndex](/samples/python/agents/llama_index_file_chat/README.md) * [Marvin](/samples/python/agents/marvin/README.md) * [Semantic Kernel](/samples/python/agents/semantickernel/README.md) * 📑 Review key topics to understand protocol details * [A2A and MCP](https://google.github.io/A2A/#/topics/a2a_and_mcp.md) * [Agent Discovery](https://google.github.io/A2A/#/topics/agent_discovery.md) * [Enterprise Ready](https://google.github.io/A2A/#/topics/enterprise_ready.md) * [Push Notifications](https://google.github.io/A2A/#/topics/push_notifications.md) ### **Contributing** We highly value community contributions and appreciate your interest in A2A Protocol! Here's how you can get involved: * Get Started? Please see our [contributing guide](CONTRIBUTING.md) to get started. * Have questions? Join our community in [GitHub discussions](https://github.com/google/A2A/discussions). * Want to help with protocol improvement feedback? Dive into [GitHub issues](https://github.com/google/A2A/issues). * Private Feedback? Please use this [Google form](https://docs.google.com/forms/d/e/1FAIpQLScS23OMSKnVFmYeqS2dP7dxY3eTyT7lmtGLUa8OJZfP4RTijQ/viewform) * Existing Google cloud platform customer and want to join our partner program to contribute to A2A ecosystem? Please fill this [form](https://docs.google.com/forms/d/1VXYY1qBhUbRfY15Z5G_KPYoPC9d1LCrwde5ehjYKCZ8/preview) ### **What's next** Future plans include improvements to the protocol itself and enhancements to the samples: **Protocol Enhancements:** * **Agent Discovery:** * Formalize inclusion of authorization schemes and optional credentials directly within the `AgentCard`. * **Agent Collaboration:** * Investigate a `QuerySkill()` method for dynamically checking unsupported or unanticipated skills. * **Task Lifecycle & UX:** * Support for dynamic UX negotiation *within* a task (e.g., agent adding audio/video mid-conversation). * **Client Methods & Transport:** * Explore extending support to client-initiated methods (beyond task management). * Improvements to streaming reliability and push notification mechanisms. **Sample & Documentation Enhancements:** * Simplify "Hello World" examples. * Include additional examples of agents integrated with different frameworks or showcasing specific A2A features. * Provide more comprehensive documentation for the common client/server libraries. * Generate human-readable HTML documentation from the JSON Schema. ### **About** A2A Protocol is an open source project run by Google LLC, under [Apache License](LICENSE) and open to contributions from the entire community. ## /demo/README.md ## Demo Web App This demo application showcases agents talking to other agents over A2A. ![image](/images/a2a_demo_arch.png) * The frontend is a [mesop](https://github.com/mesop-dev/mesop) web application that renders conversations as content between the end user and the "Host Agent". This app can render text content, thought bubbles, web forms (requests for input from agents), and images. More content types coming soon * The [Host Agent](/samples/python/hosts/multiagent/host_agent.py) is a Google ADK agent which orchestrates user requests to Remote Agents. * Each [Remote Agent](/samples/python/hosts/multiagent/remote_agent_connection.py) is an A2AClient running inside a Google ADK agent. Each remote agent will retrieve the A2AServer's [AgentCard](https://google.github.io/A2A/#documentation?id=agent-card) and then proxy all requests using A2A. ## Features ### Dynamically add agents Clicking on the robot icon in the web app lets you add new agents. Enter the address of the remote agent's AgentCard and the app will fetch the card and add the remote agent to the local set of known agents. ### Speak with one or more agents Click on the chat button to start or continue an existing conversation. This conversation will go to the Host Agent which will then delegate the request to one or more remote agents. If the agent returns complex content - like an image or a web-form - the frontend will render this in the conversation view. The Remote Agent will take care of converting this content between A2A and the web apps native application representation. ### Explore A2A Tasks Click on the history to see the messages sent between the web app and all of the agents (Host agent and Remote agents). Click on the task list to see all the A2A task updates from the remote agents ## Prerequisites - Python 3.12 or higher - UV - Agent servers speaking A2A ([use these samples](/samples/python/agents/README.md)) - Authentication credentials (API Key or Vertex AI) ## Running the Examples 1. Navigate to the demo ui directory: ```bash cd demo/ui ``` 2. Create an environment file with your API key or enter it directly in the UI when prompted: **Option A: Google AI Studio API Key** ```bash echo "GOOGLE_API_KEY=your_api_key_here" >> .env ``` Or enter it directly in the UI when prompted. **Option B: Google Cloud Vertex AI** ```bash echo "GOOGLE_GENAI_USE_VERTEXAI=TRUE" >> .env echo "GOOGLE_CLOUD_PROJECT=your_project_id" >> .env echo "GOOGLE_CLOUD_LOCATION=your_location" >> .env ``` Note: Ensure you've authenticated with gcloud using `gcloud auth login` first. For detailed instructions on authentication setup, see the [ADK documentation](https://google.github.io/adk-docs/get-started/quickstart/#set-up-the-model). 3. Run the front end example: ```bash uv run main.py ``` Note: The application runs on port 12000 by default ## /demo/ui/README.md ## /demo/ui/__init__.py ```py path="/demo/ui/__init__.py" ``` ## /demo/ui/components/__init__.py ```py path="/demo/ui/components/__init__.py" ``` ## /demo/ui/components/agent_list.py ```py path="/demo/ui/components/agent_list.py" import mesop as me import pandas as pd from typing import List, Tuple from state.state import AppState from state.agent_state import AgentState from common.types import AgentCard @me.component def agents_list( agents: list[AgentCard], ): """Agents list component""" df_data = { "Address": [], "Name": [], "Description": [], "Organization": [], "Input Modes": [], "Output Modes": [], "Streaming": [], } for agent_info in agents: df_data["Address"].append(agent_info.url) df_data["Name"].append(agent_info.name) df_data["Description"].append(agent_info.description) df_data["Organization"].append( agent_info.provider.organization if agent_info.provider else '' ) df_data["Input Modes"].append(", ".join(agent_info.defaultInputModes)) df_data["Output Modes"].append(", ".join(agent_info.defaultOutputModes)) df_data["Streaming"].append(agent_info.capabilities.streaming) df = pd.DataFrame( pd.DataFrame(df_data), columns=[ "Address", "Name", "Description", "Organization", "Input Modes", "Output Modes", "Streaming", ], ) with me.box( style=me.Style( display="flex", justify_content="space-between", flex_direction="column", ) ): me.table( df, header=me.TableHeader(sticky=True), columns={ "Address": me.TableColumn(sticky=True), "Name": me.TableColumn(sticky=True), "Description": me.TableColumn(sticky=True), }, ) with me.content_button( type="raised", on_click=add_agent, key="new_agent", style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", margin=me.Margin(top=10), ), ): me.icon(icon="upload") def add_agent(e: me.ClickEvent): # pylint: disable=unused-argument """import agent button handler""" state = me.state(AgentState) state.agent_dialog_open = True ``` ## /demo/ui/components/api_key_dialog.py ```py path="/demo/ui/components/api_key_dialog.py" import mesop as me import os from state.state import AppState from state.host_agent_service import UpdateApiKey from .dialog import dialog, dialog_actions def on_api_key_change(e: me.InputBlurEvent): """Save API key to app state when input changes""" state = me.state(AppState) state.api_key = e.value async def save_api_key(e: me.ClickEvent): """Save API key and close dialog""" yield # Yield to allow UI update state = me.state(AppState) # Validate API key is not empty if not state.api_key.strip(): return # Set the environment variable for current process os.environ["GOOGLE_API_KEY"] = state.api_key # Update the API key in the server await UpdateApiKey(state.api_key) state.api_key_dialog_open = False yield @me.component def api_key_dialog(): """Dialog for API key input""" state = me.state(AppState) with dialog(state.api_key_dialog_open): with me.box( style=me.Style(display="flex", flex_direction="column", gap=12) ): me.text( "Google API Key Required", type="headline-4", style=me.Style(margin=me.Margin(bottom=10)), ) me.text( "Please enter your Google API Key to use the application.", style=me.Style(margin=me.Margin(bottom=20)), ) me.input( label="Google API Key", value=state.api_key, on_blur=on_api_key_change, type="password", style=me.Style(width="100%"), ) with dialog_actions(): me.button("Save", on_click=save_api_key) ``` ## /demo/ui/components/async_poller.js ```js path="/demo/ui/components/async_poller.js" import { LitElement, html, } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js'; class AsyncPoller extends LitElement { static properties = { triggerEvent: {type: String}, action: {type: Object}, polling_interval: {type: Number}, }; render() { return html`
`; } firstUpdated() { if (this.polling_interval <= 0) { return; } if (this.action) { setTimeout(() => { this.runTimeout(this.action) }, this.polling_interval * 1000); } } runTimeout(action) { this.dispatchEvent( new MesopEvent(this.triggerEvent, { action: action, }), ); if (this.polling_interval > 0) { setTimeout(() => { this.runTimeout(); }, this.polling_interval * 1000); } } } customElements.define('async-action-component', AsyncPoller); ``` ## /demo/ui/components/async_poller.py ```py path="/demo/ui/components/async_poller.py" from dataclasses import asdict, dataclass from typing import Any, Callable import mesop.labs as mel from state.state import AppState @dataclass class AsyncAction: value: AppState duration_seconds: int @mel.web_component(path="./async_poller.js") def async_poller( *, trigger_event: Callable[[mel.WebEvent], Any], action: AsyncAction | None = None, key: str | None = None, ): """Creates an invisible component that will delay state changes asynchronously. Right now this implementation is limited since we basically just pass the key around. But ideally we also pass in some kind of value to update when the time out expires. The main benefit of this component is for cases, such as status messages that may appear and disappear after some duration. The primary example here is the example snackbar widget, which right now blocks the UI when using the sleep yield approach. The other benefit of this component is that it works generically (rather than say implementing a custom snackbar widget as a web component). Returns: The web component that was created. """ return mel.insert_web_component( name="async-action-component", key=key, events={ "triggerEvent": trigger_event, }, properties={ "polling_interval": action.duration_seconds if action else 1, "action": asdict(action) if action else {} }, ) ``` ## /demo/ui/components/chat_bubble.py ```py path="/demo/ui/components/chat_bubble.py" import mesop as me from state.state import StateMessage from state.state import AppState @me.component def chat_bubble(message: StateMessage, key: str): """Chat bubble component""" app_state = me.state(AppState) show_progress_bar = ( message.message_id in app_state.background_tasks or message.message_id in app_state.message_aliases.values() ) progress_text = "" if show_progress_bar: progress_text = app_state.background_tasks[message.message_id] if not message.content: print("No message content") for pair in message.content: chat_box(pair[0], pair[1], message.role, key, progress_bar=show_progress_bar, progress_text=progress_text) def chat_box( content: str, media_type: str, role: str, key: str, progress_bar: bool, progress_text: str ): with me.box( style=me.Style( display="flex", justify_content=( "space-between" if role == "agent" else "end" ), min_width=500, ), key=key, ): with me.box( style=me.Style( display="flex", flex_direction="column", gap=5) ): if media_type == "image/png": if "/message/file" not in content: content = "data:image/png;base64," + content me.image( src=content, style=me.Style( width="50%", object_fit="contain", ), ) else: me.markdown( content, style=me.Style( font_family="Google Sans", box_shadow=( "0 1px 2px 0 rgba(60, 64, 67, 0.3), " "0 1px 3px 1px rgba(60, 64, 67, 0.15)" ), padding=me.Padding(top=1, left=15, right=15, bottom=1), margin=me.Margin(top=5, left=0, right=0, bottom=5), background=( me.theme_var("primary-container") if role == "user" else me.theme_var("secondary-container") ), border_radius=15), ) if progress_bar: with me.box( style=me.Style( display="flex", justify_content=( "space-between" if role == "user" else "end" ), min_width=500, ), key=key, ): with me.box( style=me.Style( display="flex", flex_direction="column", gap=5) ): with me.box( style=me.Style( font_family="Google Sans", box_shadow=( "0 1px 2px 0 rgba(60, 64, 67, 0.3), " "0 1px 3px 1px rgba(60, 64, 67, 0.15)" ), padding=me.Padding(top=1, left=15, right=15, bottom=1), margin=me.Margin(top=5, left=0, right=0, bottom=5), background=( me.theme_var("primary-container") if role == "agent" else me.theme_var("secondary-container") ), border_radius=15), ): if not progress_text: progress_text = "Working..." me.text(progress_text, style=me.Style( padding=me.Padding(top=1, left=15, right=15, bottom=1), margin=me.Margin(top=5, left=0, right=0, bottom=5))) me.progress_bar(color="accent") ``` ## /demo/ui/components/conversation.py ```py path="/demo/ui/components/conversation.py" import mesop as me import mesop.labs as mel import asyncio import uuid import functools import threading from state.state import AppState, SettingsState, StateMessage from state.host_agent_service import SendMessage, ListConversations, convert_message_to_state from .chat_bubble import chat_bubble from .form_render import is_form, render_form, form_sent from .async_poller import async_poller, AsyncAction from common.types import Message, TextPart @me.stateclass class PageState: """Local Page State""" conversation_id: str = "" message_content: str = "" def on_blur(e: me.InputBlurEvent): """input handler""" state = me.state(PageState) state.message_content = e.value async def send_message(message: str, message_id: str = ""): state = me.state(PageState) app_state = me.state(AppState) settings_state = me.state(SettingsState) c = next( ( x for x in await ListConversations() if x.conversation_id == state.conversation_id ), None, ) if not c: print("Conversation id ", state.conversation_id, " not found") request = Message( id=message_id, role="user", parts=[TextPart(text=message)], metadata={'conversation_id': c.conversation_id if c else "", 'conversation_name': c.name if c else ""}, ) # Add message to state until refresh replaces it. state_message = convert_message_to_state(request) if not app_state.messages: app_state.messages = [] app_state.messages.append(state_message) conversation = next(filter( lambda x: x.conversation_id == c.conversation_id, app_state.conversations), None) if conversation: conversation.message_ids.append(state_message.message_id) response = await SendMessage(request) async def send_message_enter(e: me.InputEnterEvent): # pylint: disable=unused-argument """send message handler""" yield state = me.state(PageState) state.message_content = e.value app_state = me.state(AppState) message_id = str(uuid.uuid4()) app_state.background_tasks[message_id] = "" yield await send_message(state.message_content, message_id) yield async def send_message_button(e: me.ClickEvent): # pylint: disable=unused-argument """send message button handler""" yield state = me.state(PageState) app_state = me.state(AppState) message_id = str(uuid.uuid4()) app_state.background_tasks[message_id] = "" await send_message(state.message_content, message_id) yield @me.component def conversation(): """Conversation component""" page_state = me.state(PageState) app_state = me.state(AppState) if "conversation_id" in me.query_params: page_state.conversation_id = me.query_params["conversation_id"] app_state.current_conversation_id = page_state.conversation_id with me.box( style=me.Style( display="flex", justify_content="space-between", flex_direction="column", ) ): for message in app_state.messages: if is_form(message): render_form(message, app_state) elif form_sent(message, app_state): chat_bubble(StateMessage( message_id=message.message_id, role=message.role, content=[("Form submitted", "text/plain")] ), message.message_id) else: chat_bubble(message, message.message_id) with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", min_width=500, width="100%", ) ): me.input( label="How can I help you?", on_blur=on_blur, on_enter=send_message_enter, style=me.Style(min_width="80vw"), ) with me.content_button( type="flat", on_click=send_message_button, ): me.icon(icon="send") ``` ## /demo/ui/components/conversation_list.py ```py path="/demo/ui/components/conversation_list.py" import mesop as me import pandas as pd from typing import List import uuid from state.state import AppState from state.state import StateConversation from state.host_agent_service import CreateConversation @me.component def conversation_list(conversations: List[StateConversation]): """Conversation list component""" df_data = {"ID": [], "Name": [], "Status": [], "Messages": []} for conversation in conversations: df_data["ID"].append(conversation.conversation_id) df_data["Name"].append(conversation.conversation_name) df_data["Status"].append("Open" if conversation.is_active else "Closed") df_data["Messages"].append(len(conversation.message_ids)) df = pd.DataFrame( pd.DataFrame(df_data), columns=["ID", "Name", "Status", "Messages"]) with me.box( style=me.Style( display="flex", justify_content="space-between", flex_direction="column", ) ): me.table( df, on_click=on_click, header=me.TableHeader(sticky=True), columns={ "ID": me.TableColumn(sticky=True), "Name": me.TableColumn(sticky=True), "Status": me.TableColumn(sticky=True), "Messages": me.TableColumn(sticky=True), }, ) with me.content_button( type="raised", on_click=add_conversation, key="new_conversation", style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", margin=me.Margin(top=10), ), ): me.icon(icon="add") async def add_conversation(e: me.ClickEvent): # pylint: disable=unused-argument """add conversation button handler""" response = await CreateConversation() me.state(AppState).messages = [] me.navigate("/conversation", query_params={"conversation_id": response.conversation_id}) yield def on_click(e: me.TableClickEvent): state = me.state(AppState) conversation = state.conversations[e.row_index] state.current_conversation_id = conversation.conversation_id me.query_params.update({"conversation_id": conversation.conversation_id}) me.navigate("/conversation", query_params=me.query_params) yield ``` ## /demo/ui/components/dialog.py ```py path="/demo/ui/components/dialog.py" import mesop as me @me.content_component def dialog(is_open: bool): with me.box( style=me.Style( background="rgba(0,0,0,0.4)", display="block" if is_open else "none", height="100%", overflow_x="auto", overflow_y="auto", position="fixed", width="100%", z_index=1000, ) ): with me.box( style=me.Style( align_items="center", display="grid", height="100vh", justify_items="center", ) ): with me.box( style=me.Style( background=me.theme_var("background"), border_radius=20, box_sizing="content-box", box_shadow=( "0 3px 1px -2px #0003, 0 2px 2px #00000024, 0 1px 5px #0000001f" ), margin=me.Margin.symmetric(vertical="0", horizontal="auto"), padding=me.Padding.all(20), ) ): me.slot() @me.content_component def dialog_actions(): with me.box( style=me.Style( display="flex", justify_content="end", margin=me.Margin(top=20) ) ): me.slot() ``` ## /demo/ui/components/event_viewer.py ```py path="/demo/ui/components/event_viewer.py" import asyncio from typing import List, Tuple import google.genai.types as types import mesop as me import pandas as pd from state.agent_state import AgentState from state.host_agent_service import GetEvents from state.state import AppState from state.host_agent_service import convert_event_to_state def flatten_content(content: list[Tuple[str,str]]) -> str: parts = [] for p in content: if p[1] == 'text/plain' or p[1] == 'application/json': parts.append(p[0]) else: parts.append(p[1]) return '\n'.join(parts) @me.component def event_list(): """Events list component""" df_data = { "Conversation ID": [], "Actor": [], "Role": [], "Id": [], "Content": [], } events = asyncio.run(GetEvents()) for e in events: event = convert_event_to_state(e) df_data["Conversation ID"].append(event.conversation_id) df_data["Role"].append(event.role) df_data["Id"].append(event.id) df_data["Content"].append(flatten_content(event.content)) df_data["Actor"].append(event.actor) if not df_data["Conversation ID"]: me.text("No events found") return df = pd.DataFrame( pd.DataFrame(df_data), columns=["Conversation ID", "Actor", "Role", "Id", "Content"], ) with me.box( style=me.Style( display="flex", justify_content="space-between", flex_direction="column", ) ): me.table( df, header=me.TableHeader(sticky=True), columns={ "Conversation ID": me.TableColumn(sticky=True), "Actor": me.TableColumn(sticky=True), "Role": me.TableColumn(sticky=True), "Id": me.TableColumn(sticky=True), "Content": me.TableColumn(sticky=True), }, ) ``` ## /demo/ui/components/form_render.py ```py path="/demo/ui/components/form_render.py" import json import mesop as me from typing import Literal, Any, Tuple import uuid import dataclasses from state.state import AppState, StateMessage from state.host_agent_service import SendMessage from common.types import Message, DataPart, TextPart ROW_GAP = 15 BOX_PADDING = 20 @dataclasses.dataclass class FormElement: """FormElement is a declarative structure for the form rendering""" name: str = "" label: str = "" value: str = "" formType: Literal[ "color", "date", "datetime-local", "email", "month", "number", "password", "search", "tel", "text", "time", "url", "week", # These are custom types that dictate non input elements. "radio", "checkbox", "date-picker", ] = "text" required: bool = False formDetails: dict[str, str] = dataclasses.field(default_factory=dict) @dataclasses.dataclass class FormState: message_id: str data: dict[str, str] errors: dict[str, str] elements: list[FormElement] def __post_init__(self): # Parse each element as FormElement. Clean up for non-recursive dict parse for i, element_dict in enumerate(self.elements): if isinstance(element_dict, dict): self.elements[i] = FormElement(**element_dict) @me.stateclass class State: """This contains the data in the form""" #forms: dict[str, FormState] forms: dict[str, str] def is_form(message: StateMessage) -> bool: """Returns whether the message indicates a form should be rendered""" if any([x[1] == 'form' for x in message.content]): return True return False def form_sent(message: StateMessage, app_state: AppState) -> bool: return message.message_id in app_state.form_responses def render_form(message: StateMessage, app_state: AppState): """Renders the form or the data entered in a submitted form""" # Check if the form was completed, if so, render the content as a card if message.message_id in app_state.completed_forms: render_form_card(message, app_state.completed_forms[message.message_id]) return # Otherwise, get the form structure. instructions, form_structure = generate_form_elements(message) data = {} # Initialize the state data for element in form_structure: data[element.name] = element.value state = me.state(State) if message.message_id not in state.forms: form = FormState( message_id=message.message_id, data=data, errors={}, elements=form_structure, ) try: state.forms[message.message_id] = form_state_to_string(form) except Exception as e: print("Failed to serialize form", e, form) render_structure(message.message_id, form_structure, instructions) def render_form_card(message: StateMessage, data: dict[str, Any] | None): """Renders the result of a previous form as a card""" with me.box( style=me.Style( padding=me.Padding.all(BOX_PADDING), max_width="75vw", background=me.theme_var("surface"), border_radius=15, margin=me.Margin(top=5, bottom=20, left=5, right=5), justify_content=( "end" if message.role == "agent" else "space-between" ), box_shadow=("0 1px 2px 0 rgba(60, 64, 67, 0.3), " "0 1px 3px 1px rgba(60, 64, 67, 0.15)"), ) ): if data: # Build markdown result lines = [] for k, v in data.items(): lines.append(f"**{k}**: {v} ") # end with 2 spaces to force newline me.markdown('\n'.join(lines).rstrip()) else: me.text("Form canceled") def generate_form_elements(message: StateMessage) -> Tuple[str, list[FormElement]]: """Returns a declarative structure for a form to generate""" # Get the message part with the form information. form_content = next(filter(lambda x: x[1] == 'form', message.content), None) if not form_content: return [] form_info = form_content[0] if not isinstance(form_info, dict): return [] return instructions_for_form(form_info), make_form_elements(form_info) def make_form_elements(form_info: dict[str, Any]) -> list[FormElement]: if 'form' not in form_info or 'properties' not in form_info['form']: return [] # This is the key, value pairs of field names -> field info. Now we need to # supplement it. fields = form_info['form']['properties'] if ('required' in form_info['form'] and isinstance(form_info['form']['required'], list)): for field in form_info['form']['required']: if field in fields: fields[field]['required'] = True if 'form_data' in form_info and isinstance(form_info['form_data'], dict): for field, value in form_info['form_data'].items(): fields[field]['value'] = value # Now convert the dictionary to FormElements elements = [] for key, info in fields.items(): elements.append(FormElement( name=key, label=info['title'] if 'title' in info else key, value=info['value'] if 'value' in info else "", required=info['required'] if 'required' in info else False, formType=info['format'] if 'format' in info else "text", # TODO more details for input like validation rules formDetails={}, )) return elements def instructions_for_form(form_info: dict[str, Any]) -> str: if 'instructions' in form_info: return form_info['instructions'] return "" def render_structure(id: str, elements: list[FormElement], instructions: str): with me.box( style=me.Style( padding=me.Padding.all(BOX_PADDING), max_width="75vw", background=me.theme_var("surface"), border_radius=15, margin=me.Margin(top=5, bottom=20, left=5, right=5), box_shadow=("0 1px 2px 0 rgba(60, 64, 67, 0.3), " "0 1px 3px 1px rgba(60, 64, 67, 0.15)"), ) ): if instructions: me.text( instructions, type="headline-4", style=me.Style(margin=me.Margin(bottom=10)), ) for element in elements: with form_group(): input_field(id=id, element=element) with me.box(): me.button("Cancel", type="flat", on_click=cancel_form, key=id) me.button("Submit", type="flat", on_click=submit_form, key=id) def input_field( *, id: str, element: FormElement, width: str | int = "100%", ): """Renders an individual form input field""" state = me.state(State) form = FormState(**json.loads(state.forms[id])) key = element.name if element.name else element.label.lower().replace(" ", "_") value = element.value if key in form.data and form.data[key]: value = form.data[key] with me.box(style=me.Style(flex_grow=1, width=width)): me.input( key=f"{id}_{key}", label=element.label, value=value, appearance="outline", color="warn" if key in form.errors else "primary", style=me.Style(width=width), type=element.formType, on_blur=on_blur, ) if key in form.errors: me.text( form.errors[key], style=me.Style( margin=me.Margin(top=-13, left=15, bottom=15), color=me.theme_var("error"), font_size=13, ), ) @me.content_component def form_group(flex_direction: Literal["row", "column"] = "row"): """Groups input fields together visually""" with me.box( style=me.Style( display="flex", flex_direction=flex_direction, gap=ROW_GAP, width="100%" ) ): me.slot() def on_change(e: me.RadioChangeEvent): state = me.state(State) key_parts = e.key.split("_") id = key_parts[0] field = "_".join(key_parts[1:]) form = FormState(**json.loads(state.forms[id])) form.data[field] = e.value state.forms[id] = form_state_to_string(form) def on_blur(e: me.InputBlurEvent): state = me.state(State) key_parts = e.key.split("_") id = key_parts[0] field = "_".join(key_parts[1:]) form = FormState(**json.loads(state.forms[id])) form.data[field] = e.value state.forms[id] = form_state_to_string(form) async def cancel_form(e: me.ClickEvent): message_id = str(uuid.uuid4()) app_state = me.state(AppState) app_state.form_responses[message_id] = e.key app_state.background_tasks[message_id] = "" app_state.completed_forms[e.key] = None request = Message( id=message_id, role="user", parts=[TextPart(text="rejected form entry")], metadata={ 'conversation_id': app_state.current_conversation_id, 'message_id': message_id, }, ) response = await SendMessage(request) async def send_response(id: str, state: State, app_state: AppState): message_id = str(uuid.uuid4()) app_state.background_tasks[message_id] = "" app_state.form_responses[message_id] = id form = FormState(**json.loads(state.forms[id])) request = Message( id=message_id, role="user", parts=[DataPart(data=form.data)], metadata={ 'conversation_id': app_state.current_conversation_id, 'message_id': message_id, } ) response = await SendMessage(request) async def submit_form(e: me.ClickEvent): try: state = me.state(State) id = e.key form = FormState(**json.loads(state.forms[id])) # Replace with real validation logic. errors = {} for element in form.elements: if element.name == "error": continue if not form.data[element.name] and element.required: errors[element.name] = f"{element.name.replace('_', ' ').capitalize()} is required" form.errors = errors state.forms[id] = form_state_to_string(form) # Replace with form processing logic. if errors: return app_state = me.state(AppState) app_state.completed_forms[id] = form.data await send_response(id, state, app_state) except Exception as e: print("Failed to submit form", e) # There is some issue with mesop serialization. Instead we use raw string # in the server state and interpret it as needed. def form_state_to_string(form: FormState) -> str: form_dict = dataclasses.asdict(form) return json.dumps(form_dict) ``` ## /demo/ui/components/header.py ```py path="/demo/ui/components/header.py" import mesop as me from .poller import polling_buttons @me.content_component def header(title: str, icon: str): """Header component""" with me.box( style=me.Style( display="flex", justify_content="space-between", ) ): with me.box( style=me.Style(display="flex", flex_direction="row", gap=5) ): me.icon(icon=icon) me.text( title, type="headline-5", style=me.Style(font_family="Google Sans"), ) me.slot() polling_buttons() ``` ## /demo/ui/components/page_scaffold.py ```py path="/demo/ui/components/page_scaffold.py" import mesop as me import mesop.labs as mel from .side_nav import sidenav from .async_poller import async_poller, AsyncAction from .poller import polling_buttons from state.state import AppState from state.host_agent_service import UpdateAppState from styles.styles import ( MAIN_COLUMN_STYLE, PAGE_BACKGROUND_PADDING_STYLE, PAGE_BACKGROUND_STYLE, SIDENAV_MAX_WIDTH, SIDENAV_MIN_WIDTH, ) async def refresh_app_state(e: mel.WebEvent): # pylint: disable=unused-argument """Refresh app state event handler""" yield app_state = me.state(AppState) await UpdateAppState(app_state, app_state.current_conversation_id) yield @me.content_component def page_scaffold(): """page scaffold component""" app_state = me.state(AppState) action = ( AsyncAction( value=app_state, duration_seconds=app_state.polling_interval) if app_state else None ) async_poller( action=action, trigger_event=refresh_app_state ) sidenav("") with me.box( style=me.Style( display="flex", flex_direction="column", height="100%", margin=me.Margin( left=SIDENAV_MAX_WIDTH if app_state.sidenav_open else SIDENAV_MIN_WIDTH, ), ), ): with me.box( style=me.Style( background=me.theme_var("background"), height="100%", overflow_y="scroll", margin=me.Margin(bottom=20), ) ): me.slot() @me.content_component def page_frame(): """Page Frame""" with me.box(style=MAIN_COLUMN_STYLE): with me.box(style=PAGE_BACKGROUND_STYLE): with me.box(style=PAGE_BACKGROUND_PADDING_STYLE): me.slot() ``` ## /demo/ui/components/poller.py ```py path="/demo/ui/components/poller.py" import mesop as me from state.state import AppState from state.host_agent_service import UpdateAppState @me.content_component def polling_buttons(): """Polling buttons component""" state = me.state(AppState) with me.box( style=me.Style( display="flex", justify_content="end", ) ): me.button_toggle( value=[str(state.polling_interval)], buttons=[ me.ButtonToggleButton(label="1s", value="1"), me.ButtonToggleButton(label="5s", value="5"), me.ButtonToggleButton(label="30s", value="30"), me.ButtonToggleButton(label="Disable", value="0") ], multiple=False, hide_selection_indicator=True, disabled=False, on_change=on_change, style=me.Style( margin=me.Margin(bottom=20), ), ) with me.content_button( type="raised", on_click=force_refresh, ): me.icon("refresh") me.slot() def on_change(e: me.ButtonToggleChangeEvent): state = me.state(AppState) state.polling_interval = int(e.value) async def force_refresh(e: me.ClickEvent): """Refresh app state event handler""" yield app_state = me.state(AppState) await UpdateAppState(app_state, app_state.current_conversation_id) yield ``` ## /demo/ui/components/side_nav.py ```py path="/demo/ui/components/side_nav.py" import mesop as me from state.state import AppState from styles.styles import ( SIDENAV_MAX_WIDTH, SIDENAV_MIN_WIDTH, _FANCY_TEXT_GRADIENT, DEFAULT_MENU_STYLE, ) page_json = [ {"display": "Home", "icon": "message", "route": "/"}, {"display": "Agents", "icon": "smart_toy", "route": "/agents"}, {"display": "Event List", "icon": "list", "route": "/event_list"}, {"display": "Task List", "icon": "task", "route": "/task_list"}, {"display": "Settings", "icon": "settings", "route": "/settings"}, ] def on_sidenav_menu_click(e: me.ClickEvent): # pylint: disable=unused-argument """Side navigation menu click handler""" state = me.state(AppState) state.sidenav_open = not state.sidenav_open def navigate_to(e: me.ClickEvent): """navigate to a specific page""" s = me.state(AppState) idx = int(e.key) if idx > len(page_json): return page = page_json[idx] s.current_page = page["route"] me.navigate(s.current_page) yield @me.component def sidenav(current_page: str): """Render side navigation""" app_state = me.state(AppState) with me.sidenav( opened=True, style=me.Style( width=SIDENAV_MAX_WIDTH if app_state.sidenav_open else SIDENAV_MIN_WIDTH, background=me.theme_var("secondary-container"), ), ): with me.box( style=me.Style( margin=me.Margin(top=16, left=16, right=16, bottom=16), display="flex", flex_direction="column", gap=5, ), ): with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", ), ): with me.content_button( type="icon", on_click=on_sidenav_menu_click, ): with me.box(): with me.tooltip(message="Expand menu"): me.icon(icon="menu") if app_state.sidenav_open: me.text("STUDIO", style=_FANCY_TEXT_GRADIENT) me.box(style=me.Style(height=16)) for idx, page in enumerate(page_json): menu_item( idx, page["icon"], page["display"], not app_state.sidenav_open ) # settings & theme toggle with me.box(style=MENU_BOTTOM): theme_toggle_icon( 9, "light_mode", "Theme", not app_state.sidenav_open, ) #menu_item(10, "settings", "Settings", not app_state.sidenav_open) def menu_item( key: int, icon: str, text: str, minimized: bool = True, content_style: me.Style = DEFAULT_MENU_STYLE, ): """render menu item""" if minimized: # minimized with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", ), ): with me.content_button( key=str(key), on_click=navigate_to, style=content_style, type="icon", ): with me.tooltip(message=text): me.icon(icon=icon) else: # expanded with me.content_button( key=str(key), on_click=navigate_to, style=content_style, ): with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", ), ): me.icon(icon=icon) me.text(text) def toggle_theme(e: me.ClickEvent): # pylint: disable=unused-argument """Toggle theme event""" s = me.state(AppState) if me.theme_brightness() == "light": me.set_theme_mode("dark") s.theme_mode = "dark" else: me.set_theme_mode("light") s.theme_mode = "light" def theme_toggle_icon(key: int, icon: str, text: str, min: bool = True): """Theme toggle icon""" # THEME_TOGGLE_STYLE = me.Style(position="absolute", bottom=50, align_content="left") if min: # minimized with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", ), ): with me.content_button( key=str(key), on_click=toggle_theme, # style=THEME_TOGGLE_STYLE, type="icon", ): with me.tooltip(message=text): me.icon( "light_mode" if me.theme_brightness() == "dark" else "dark_mode" ) else: # expanded with me.content_button( key=str(key), on_click=toggle_theme, # style=THEME_TOGGLE_STYLE, ): with me.box( style=me.Style( display="flex", flex_direction="row", gap=5, align_items="center", ), ): me.icon( "light_mode" if me.theme_brightness() == "dark" else "dark_mode" ) me.text( "Light mode" if me.theme_brightness() == "dark" else "Dark mode" ) MENU_BOTTOM = me.Style( display="flex", flex_direction="column", position="absolute", bottom=8, align_content="left", ) ``` ## /demo/ui/components/task_card.py ```py path="/demo/ui/components/task_card.py" import json import mesop as me import pandas as pd from state.state import SessionTask, StateTask, ContentPart def message_string(content: ContentPart) -> str: if isinstance(content, str): return content return json.dumps(content) @me.component def task_card(tasks: list[SessionTask]): """Task card component""" columns = ["Conversation ID", "Task ID", "Description", "Status", "Output"] df_data = dict([(c, []) for c in columns]) for task in tasks: df_data["Conversation ID"].append(task.session_id) df_data["Task ID"].append(task.task.task_id) df_data["Description"].append('\n'.join(message_string(x[0]) for x in task.task.message.content)) df_data["Status"].append(task.task.state) df_data["Output"].append(flatten_artifacts(task.task)) df = pd.DataFrame( pd.DataFrame(df_data), columns=columns) with me.box( style=me.Style( display="flex", justify_content="space-between", ) ): me.table( df, header=me.TableHeader(sticky=True), columns=dict([(c, me.TableColumn(sticky=True)) for c in columns]), ) def flatten_artifacts(task: StateTask) -> str: parts = [] for a in task.artifacts: for p in a: if p[1] == 'text/plain' or p[1] == 'application/json': parts.append(message_string(p[0])) else: parts.append(p[1]) return '\n'.join(parts) ``` ## /demo/ui/main.py ```py path="/demo/ui/main.py" """A UI solution and host service to interact with the agent framework. run: uv main.py """ import asyncio import os import threading import mesop as me from state.state import AppState from components.page_scaffold import page_scaffold from components.api_key_dialog import api_key_dialog from pages.home import home_page_content from pages.agent_list import agent_list_page from pages.conversation import conversation_page from pages.event_list import event_list_page from pages.settings import settings_page_content from pages.task_list import task_list_page from state import host_agent_service from service.server.server import ConversationServer from fastapi import FastAPI, APIRouter from fastapi.middleware.wsgi import WSGIMiddleware from dotenv import load_dotenv load_dotenv() def on_load(e: me.LoadEvent): # pylint: disable=unused-argument """On load event""" state = me.state(AppState) me.set_theme_mode(state.theme_mode) if "conversation_id" in me.query_params: state.current_conversation_id = me.query_params["conversation_id"] else: state.current_conversation_id = "" # check if the API key is set in the environment # and if the user is using Vertex AI uses_vertex_ai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE" api_key = os.getenv("GOOGLE_API_KEY", "") if uses_vertex_ai: state.uses_vertex_ai = True elif api_key: state.api_key = api_key else: # Show the API key dialog if both are not set state.api_key_dialog_open = True # Policy to allow the lit custom element to load security_policy=me.SecurityPolicy( allowed_script_srcs=[ 'https://cdn.jsdelivr.net', ] ) @me.page( path="/", title="Chat", on_load=on_load, security_policy=security_policy, ) def home_page(): """Main Page""" state = me.state(AppState) # Show API key dialog if needed api_key_dialog() with page_scaffold(): # pylint: disable=not-context-manager home_page_content(state) @me.page( path="/agents", title="Agents", on_load=on_load, security_policy=security_policy, ) def another_page(): """Another Page""" api_key_dialog() agent_list_page(me.state(AppState)) @me.page( path="/conversation", title="Conversation", on_load=on_load, security_policy=security_policy, ) def chat_page(): """Conversation Page.""" api_key_dialog() conversation_page(me.state(AppState)) @me.page( path="/event_list", title="Event List", on_load=on_load, security_policy=security_policy, ) def event_page(): """Event List Page.""" api_key_dialog() event_list_page(me.state(AppState)) @me.page( path="/settings", title="Settings", on_load=on_load, security_policy=security_policy, ) def settings_page(): """Settings Page.""" api_key_dialog() settings_page_content() @me.page( path="/task_list", title="Task List", on_load=on_load, security_policy=security_policy, ) def task_page(): """Task List Page.""" api_key_dialog() task_list_page(me.state(AppState)) # Setup the server global objects app = FastAPI() router = APIRouter() agent_server = ConversationServer(router) app.include_router(router) app.mount( "/", WSGIMiddleware( me.create_wsgi_app(debug_mode=os.environ.get("DEBUG_MODE", "") == "true") ), ) if __name__ == "__main__": import uvicorn # Setup the connection details, these should be set in the environment host = os.environ.get("A2A_UI_HOST", "0.0.0.0") port = int(os.environ.get("A2A_UI_PORT", "12000")) # Set the client to talk to the server host_agent_service.server_url = f"http://{host}:{port}" uvicorn.run( "main:app", host=host, port=port, reload=True, reload_includes=["*.py", "*.js"], timeout_graceful_shutdown=0, ) ``` ## /demo/ui/pages/__init__.py ```py path="/demo/ui/pages/__init__.py" ``` ## /demo/ui/pages/agent_list.py ```py path="/demo/ui/pages/agent_list.py" import stat import asyncio import mesop as me from components.agent_list import agents_list from components.dialog import dialog, dialog_actions from components.header import header from components.page_scaffold import page_frame from components.page_scaffold import page_scaffold from state.agent_state import AgentState from state.host_agent_service import ListRemoteAgents, AddRemoteAgent from state.state import AppState from utils.agent_card import get_agent_card from common.types import JSONRPCError def agent_list_page(app_state: AppState): """Agents List Page""" state = me.state(AgentState) with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): with header("Remote Agents", "smart_toy"): pass agents = asyncio.run(ListRemoteAgents()) agents_list(agents) with dialog(state.agent_dialog_open): with me.box( style=me.Style(display="flex", flex_direction="column", gap=12) ): me.input( label="Agent Address", on_blur=set_agent_address, placeholder="localhost:10000", ) input_modes_string = ", ".join(state.input_modes) output_modes_string = ", ".join(state.output_modes) if state.error != "": me.text(state.error, style=me.Style(color="red")) if state.agent_name != "": me.text(f"Agent Name: {state.agent_name}") if state.agent_description: me.text(f"Agent Description: {state.agent_description}") if state.agent_framework_type: me.text(f"Agent Framework Type: {state.agent_framework_type}") if state.input_modes: me.text(f"Input Modes: {input_modes_string}") if state.output_modes: me.text(f"Output Modes: {output_modes_string}") if state.agent_name: me.text(f"Streaming Supported: {state.stream_supported}") me.text(f"Push Notifications Supported: {state.push_notifications_supported}") with dialog_actions(): if not state.agent_name: me.button("Read", on_click=load_agent_info) elif not state.error: me.button("Save", on_click=save_agent) me.button("Cancel", on_click=cancel_agent_dialog) def set_agent_address(e: me.InputBlurEvent): state = me.state(AgentState) state.agent_address = e.value def load_agent_info(e: me.ClickEvent): state = me.state(AgentState) try: state.error = None agent_card_response = get_agent_card(state.agent_address) state.agent_name = agent_card_response.name state.agent_description = agent_card_response.description state.agent_framework_type = agent_card_response.provider.organization if agent_card_response.provider else '' state.input_modes = agent_card_response.defaultInputModes state.output_modes = agent_card_response.defaultOutputModes state.stream_supported = agent_card_response.capabilities.streaming state.push_notifications_supported = agent_card_response.capabilities.pushNotifications except Exception as e: print(e) state.agent_name = None state.error = f"Cannot connect to agent as {state.agent_address}" def cancel_agent_dialog(e: me.ClickEvent): state = me.state(AgentState) state.agent_dialog_open = False async def save_agent(e: me.ClickEvent): state = me.state(AgentState) await AddRemoteAgent(state.agent_address) state.agent_address = "" state.agent_name = "" state.agent_description = "" state.agent_dialog_open = False ``` ## /demo/ui/pages/conversation.py ```py path="/demo/ui/pages/conversation.py" import mesop as me from components.header import header from components.page_scaffold import page_scaffold from components.page_scaffold import page_frame from components.conversation import conversation from state.state import AppState def conversation_page(app_state: AppState): """Conversation Page""" state = me.state(AppState) with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): with header("Conversation", "chat"): pass conversation() ``` ## /demo/ui/pages/event_list.py ```py path="/demo/ui/pages/event_list.py" import stat import mesop as me from components.header import header from components.page_scaffold import page_scaffold from components.page_scaffold import page_frame from components.event_viewer import event_list from state.state import AppState from state.agent_state import AgentState def event_list_page(app_state: AppState): """Agents List Page""" state = me.state(AgentState) with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): with header("Event List", "list"): pass event_list() ``` ## /demo/ui/pages/home.py ```py path="/demo/ui/pages/home.py" import mesop as me from components.header import header from components.conversation_list import conversation_list from state.state import AppState @me.stateclass class PageState: """ Local Page State""" temp_name: str = "" def on_blur_set_name(e: me.InputBlurEvent): """input handler""" state = me.state(PageState) state.temp_name = e.value def on_enter_change_name(e: me.components.input.input.InputEnterEvent): # pylint: disable=unused-argument """change name button handler""" state = me.state(PageState) app_state = me.state(AppState) app_state.name = state.temp_name app_state.greeting = "" # reset greeting yield def on_click_change_name(e: me.ClickEvent): # pylint: disable=unused-argument """change name button handler""" state = me.state(PageState) app_state = me.state(AppState) app_state.name = state.temp_name app_state.greeting = "" # reset greeting yield def home_page_content(app_state: AppState): """Home Page""" with me.box( style=me.Style( display="flex", flex_direction="column", height="100%", ), ): with me.box( style=me.Style( background=me.theme_var("background"), height="100%", margin=me.Margin(bottom=20), ) ): with me.box( style=me.Style( background=me.theme_var("background"), padding=me.Padding(top=24, left=24, right=24, bottom=24), display="flex", flex_direction="column", width="100%", ) ): with header("Conversations", "message"): pass conversation_list(app_state.conversations) ``` ## /demo/ui/pages/settings.py ```py path="/demo/ui/pages/settings.py" import mesop as me import asyncio from components.header import header from components.page_scaffold import page_scaffold from components.page_scaffold import page_frame from state.state import SettingsState, AppState from state.host_agent_service import UpdateApiKey def on_selection_change_output_types(e: me.SelectSelectionChangeEvent): s = me.state(SettingsState) s.output_mime_types = e.values def on_api_key_change(e: me.InputBlurEvent): s = me.state(AppState) s.api_key = e.value @me.stateclass class UpdateStatus: """Status for API key update""" show_success: bool = False async def update_api_key(e: me.ClickEvent): yield # Allow UI to update state = me.state(AppState) update_status = me.state(UpdateStatus) if state.api_key.strip(): success = await UpdateApiKey(state.api_key) if success: update_status.show_success = True # Hide success message after 3 seconds yield await asyncio.sleep(3) update_status.show_success = False yield # Allow UI to update after operation completes def settings_page_content(): """Settings Page Content.""" settings_state = me.state(SettingsState) app_state = me.state(AppState) update_status = me.state(UpdateStatus) with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): with header("Settings", "settings"): pass with me.box( style=me.Style( display="flex", justify_content="space-between", flex_direction="column", gap=30, ) ): # API Key Settings Section if not app_state.uses_vertex_ai: with me.box( style=me.Style( display="flex", flex_direction="column", margin=me.Margin(bottom=30), ) ): me.text( "Google API Key", type="headline-6", style=me.Style( margin=me.Margin(bottom=15), font_family="Google Sans", ), ) with me.box( style=me.Style( display="flex", flex_direction="row", gap=10, align_items="center", margin=me.Margin(bottom=5), ) ): me.input( label="API Key", value=app_state.api_key, on_blur=on_api_key_change, type="password", appearance="outline", style=me.Style(width="400px"), ) me.button( "Update", type="raised", on_click=update_api_key, style=me.Style( color=me.theme_var("primary"), ), ) # Success message if update_status.show_success: with me.box( style=me.Style( background=me.theme_var("success-container"), padding=me.Padding(top=10, bottom=10, left=10, right=10), border_radius=4, margin=me.Margin(top=10), display="flex", flex_direction="row", align_items="center", width="400px", ) ): me.icon( "check_circle", style=me.Style( color=me.theme_var("on-success-container"), margin=me.Margin(right=10), ) ) me.text( "API Key updated successfully", style=me.Style( color=me.theme_var("on-success-container"), ) ) # Add spacing instead of divider with style with me.box(style=me.Style(margin=me.Margin(top=10, bottom=10))): me.divider() # Output Types Section me.select( label="Supported Output Types", options=[ me.SelectOption(label="Image", value="image/*"), me.SelectOption(label="Text (Plain)", value="text/plain"), ], on_selection_change=on_selection_change_output_types, style=me.Style(width=500), multiple=True, appearance="outline", value=settings_state.output_mime_types, ) ``` ## /demo/ui/pages/task_list.py ```py path="/demo/ui/pages/task_list.py" from components.header import header from components.page_scaffold import page_scaffold from components.page_scaffold import page_frame from components.task_card import task_card from state.state import AppState def task_list_page(app_state: AppState): """Task List Page""" with page_scaffold(): # pylint: disable=not-context-manager with page_frame(): with header("Task List", "task"): pass task_card(app_state.task_list) ``` ## /demo/ui/pyproject.toml ```toml path="/demo/ui/pyproject.toml" [project] name = "a2a-python-example-ui" version = "0.1.0" description = "Agent2Agent example UI" readme = "README.md" requires-python = ">=3.13" dependencies = [ "asyncio>=3.4.3", "httpx>=0.28.1", "httpx-sse>=0.4.0", "pydantic>=2.10.6", "fastapi>=0.115.0", "uvicorn>=0.34.0", "mesop>=1.0.0", "a2a-samples", "pandas>=2.2.0", "google-genai>=1.9.0", "google-adk>=0.0.3", ] [tool.hatch.build.targets.wheel] packages = ["a2a_ui"] [tool.uv.sources] a2a_ui = { workspace = true } a2a_samples = { path = "../../samples/python", editable = true } [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [dependency-groups] dev = [ "ruff>=0.11.2", ] ``` ## /demo/ui/service/__init__.py ```py path="/demo/ui/service/__init__.py" ``` ## /demo/ui/service/client/__init__.py ```py path="/demo/ui/service/client/__init__.py" ``` ## /demo/ui/service/client/client.py ```py path="/demo/ui/service/client/client.py" import httpx from httpx_sse import connect_sse from typing import Any, AsyncIterable from service.types import ( CreateConversationRequest, CreateConversationResponse, ListConversationRequest, ListConversationResponse, SendMessageRequest, SendMessageResponse, ListMessageRequest, ListMessageResponse, GetEventRequest, GetEventResponse, PendingMessageRequest, PendingMessageResponse, ListTaskRequest, ListTaskResponse, RegisterAgentRequest, RegisterAgentResponse, AgentClientHTTPError, ListAgentRequest, ListAgentResponse, AgentClientJSONError, JSONRPCRequest, Conversation, ) import json class ConversationClient: def __init__(self, base_url): self.base_url = base_url.rstrip("/") async def send_message(self, payload: SendMessageRequest) -> SendMessageResponse: return SendMessageResponse(**await self._send_request(payload)) async def _send_request(self, request: JSONRPCRequest) -> dict[str, Any]: async with httpx.AsyncClient() as client: try: response = await client.post( self.base_url + "/" + request.method, json=request.model_dump() ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise AgentClientHTTPError(e.response.status_code, str(e)) from e except json.JSONDecodeError as e: raise AgentClientJSONError(str(e)) from e async def create_conversation(self, payload: CreateConversationRequest) -> CreateConversationResponse: return CreateConversationResponse(**await self._send_request(payload)) async def list_conversation(self, payload: ListConversationRequest) -> ListConversationResponse: return ListConversationResponse(**await self._send_request(payload)) async def get_events(self, payload: GetEventRequest) -> GetEventResponse: return GetEventResponse(**await self._send_request(payload)) async def list_messages(self, payload: ListMessageRequest) -> ListMessageResponse: return ListMessageResponse(**await self._send_request(payload)) async def get_pending_messages(self, payload: PendingMessageRequest) -> PendingMessageResponse: return PendingMessageResponse(**await self._send_request(payload)) async def list_tasks(self, payload: ListTaskRequest) -> ListTaskResponse: return ListTaskResponse(**await self._send_request(payload)) async def register_agent(self, payload: RegisterAgentRequest) -> RegisterAgentResponse: return RegisterAgentResponse(**await self._send_request(payload)) async def list_agents(self, payload: ListAgentRequest) -> ListAgentResponse: return ListAgentResponse(**await self._send_request(payload)) ``` ## /demo/ui/service/server/__init__.py ```py path="/demo/ui/service/server/__init__.py" ``` ## /demo/ui/service/server/adk_host_manager.py ```py path="/demo/ui/service/server/adk_host_manager.py" import asyncio import datetime import json import os from typing import Tuple, Optional, Any import uuid from service.types import Conversation, Event from common.types import ( Message, Task, TextPart, TaskState, TaskStatus, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, Artifact, AgentCard, DataPart, FilePart, FileContent, Part, ) from hosts.multiagent.host_agent import HostAgent from hosts.multiagent.remote_agent_connection import ( TaskCallbackArg, ) from utils.agent_card import get_agent_card from service.server.application_manager import ApplicationManager from google.adk import Runner from google.adk.sessions.in_memory_session_service import InMemorySessionService from google.adk.memory.in_memory_memory_service import InMemoryMemoryService from google.adk.artifacts import InMemoryArtifactService from google.adk.events.event import Event as ADKEvent from google.adk.events.event_actions import EventActions as ADKEventActions from google.genai import types import base64 class ADKHostManager(ApplicationManager): """An implementation of memory based management with fake agent actions This implements the interface of the ApplicationManager to plug into the AgentServer. This acts as the service contract that the Mesop app uses to send messages to the agent and provide information for the frontend. """ _conversations: list[Conversation] _messages: list[Message] _tasks: list[Task] _events: dict[str, Event] _pending_message_ids: list[str] _agents: list[AgentCard] _task_map: dict[str, str] def __init__(self, api_key: str = "", uses_vertex_ai: bool = False): self._conversations = [] self._messages = [] self._tasks = [] self._events = {} self._pending_message_ids = [] self._agents = [] self._artifact_chunks = {} self._session_service = InMemorySessionService() self._artifact_service = InMemoryArtifactService() self._memory_service = InMemoryMemoryService() self._host_agent = HostAgent([], self.task_callback) self.user_id = "test_user" self.app_name = "A2A" self.api_key = api_key or os.environ.get("GOOGLE_API_KEY", "") self.uses_vertex_ai = uses_vertex_ai or os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE" # Set environment variables based on auth method if self.uses_vertex_ai: os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE" elif self.api_key: # Use API key authentication os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "FALSE" os.environ["GOOGLE_API_KEY"] = self.api_key self._initialize_host() # Map of message id to task id self._task_map = {} # Map to manage 'lost' message ids until protocol level id is introduced self._next_id = {} # dict[str, str]: previous message to next message def update_api_key(self, api_key: str): """Update the API key and reinitialize the host if needed""" if api_key and api_key != self.api_key: self.api_key = api_key # Only update if not using Vertex AI if not self.uses_vertex_ai: os.environ["GOOGLE_API_KEY"] = api_key # Reinitialize host with new API key self._initialize_host() def _initialize_host(self): agent = self._host_agent.create_agent() self._host_runner = Runner( app_name=self.app_name, agent=agent, artifact_service=self._artifact_service, session_service=self._session_service, memory_service=self._memory_service, ) def create_conversation(self) -> Conversation: session = self._session_service.create_session( app_name=self.app_name, user_id=self.user_id) conversation_id = session.id c = Conversation(conversation_id=conversation_id, is_active=True) self._conversations.append(c) return c def sanitize_message(self, message: Message) -> Message: if not message.metadata: message.metadata = {} if 'message_id' not in message.metadata: message.metadata.update({'message_id': str(uuid.uuid4())}) if 'conversation_id' in message.metadata: conversation = self.get_conversation(message.metadata['conversation_id']) if conversation: if conversation.messages: # Get the last message last_message_id = get_message_id(conversation.messages[-1]) if last_message_id: message.metadata.update({'last_message_id': last_message_id}) return message async def process_message(self, message: Message): self._messages.append(message) message_id = get_message_id(message) if message_id: self._pending_message_ids.append(message_id) conversation_id = ( message.metadata['conversation_id'] if 'conversation_id' in message.metadata else None ) # Now check the conversation and attach the message id. conversation = self.get_conversation(conversation_id) if conversation: conversation.messages.append(message) self.add_event(Event( id=str(uuid.uuid4()), actor='user', content=message, timestamp=datetime.datetime.utcnow().timestamp(), )) final_event: GenAIEvent | None = None # Determine if a task is to be resumed. session = self._session_service.get_session( app_name='A2A', user_id='test_user', session_id=conversation_id) # Update state must happen in the event state_update = { 'input_message_metadata': message.metadata, 'session_id': conversation_id } last_message_id = get_last_message_id(message) if (last_message_id and last_message_id in self._task_map and task_still_open(next( filter( lambda x: x.id == self._task_map[last_message_id], self._tasks), None))): state_update['task_id'] = self._task_map[last_message_id] # Need to upsert session state now, only way is to append an event. self._session_service.append_event(session, ADKEvent( id=ADKEvent.new_id(), author="host_agent", invocation_id=ADKEvent.new_id(), actions=ADKEventActions(state_delta=state_update), )) async for event in self._host_runner.run_async( user_id=self.user_id, session_id=conversation_id, new_message=self.adk_content_from_message(message) ): self.add_event(Event( id=event.id, actor=event.author, content=self.adk_content_to_message(event.content, conversation_id), timestamp=event.timestamp, )) final_event = event response: Message | None = None if final_event: final_event.content.role = 'model' response = self.adk_content_to_message(final_event.content, conversation_id) last_message_id = get_message_id(message) new_message_id = "" if last_message_id and last_message_id in self._next_id: new_message_id = self._next_id[last_message_id] else: new_message_id = str(uuid.uuid4()) last_message_id = None response.metadata = { **message.metadata, **{'last_message_id': last_message_id, 'message_id': new_message_id} } self._messages.append(response) if conversation: conversation.messages.append(response) self._pending_message_ids.remove(message_id) def add_task(self, task: Task): self._tasks.append(task) def update_task(self, task: Task): for i, t in enumerate(self._tasks): if t.id == task.id: self._tasks[i] = task return def task_callback(self, task: TaskCallbackArg, agent_card: AgentCard): self.emit_event(task, agent_card) if isinstance(task, TaskStatusUpdateEvent): current_task = self.add_or_get_task(task) current_task.status = task.status self.attach_message_to_task(task.status.message, current_task.id) self.insert_message_history(current_task, task.status.message) self.update_task(current_task) self.insert_id_trace(task.status.message) return current_task elif isinstance(task, TaskArtifactUpdateEvent): current_task = self.add_or_get_task(task) self.process_artifact_event(current_task, task) self.update_task(current_task) return current_task # Otherwise this is a Task, either new or updated elif not any(filter(lambda x: x.id == task.id, self._tasks)): self.attach_message_to_task(task.status.message, task.id) self.insert_id_trace(task.status.message) self.add_task(task) return task else: self.attach_message_to_task(task.status.message, task.id) self.insert_id_trace(task.status.message) self.update_task(task) return task def emit_event(self, task: TaskCallbackArg, agent_card: AgentCard): content = None conversation_id = get_conversation_id(task) metadata = {'conversation_id': conversation_id} if conversation_id else None if isinstance(task, TaskStatusUpdateEvent): if task.status.message: content = task.status.message else: content = Message( parts=[TextPart(text=str(task.status.state))], role="agent", metadata=metadata, ) elif isinstance(task, TaskArtifactUpdateEvent): content = Message( parts=task.artifact.parts, role="agent", metadata=metadata, ) elif task.status and task.status.message: content = task.status.message elif task.artifacts: parts = [] for a in task.artifacts: parts.extend(a.parts) content = Message( parts=parts, role="agent", metadata=metadata, ) else: content = Message( parts=[TextPart(text=str(task.status.state))], role="agent", metadata=metadata, ) self.add_event(Event( id=str(uuid.uuid4()), actor=agent_card.name, content=content, timestamp=datetime.datetime.utcnow().timestamp(), )) def attach_message_to_task(self, message: Message | None, task_id: str): if message and message.metadata and 'message_id' in message.metadata: self._task_map[message.metadata['message_id']] = task_id def insert_id_trace(self, message: Message | None): if not message: return message_id = get_message_id(message) last_message_id = get_last_message_id(message) if message_id and last_message_id: self._next_id[last_message_id] = message_id def insert_message_history(self, task: Task, message: Message | None): if not message: return if task.history is None: task.history = [] message_id = get_message_id(message) if not message_id: return if get_message_id(task.status.message) not in [ get_message_id(x) for x in task.history ]: task.history.append(task.status.message) else: print("Message id already in history", get_message_id(task.status.message), task.history) def add_or_get_task(self, task: TaskCallbackArg): current_task = next(filter(lambda x: x.id == task.id, self._tasks), None) if not current_task: conversation_id = None if task.metadata and 'conversation_id' in task.metadata: conversation_id = task.metadata['conversation_id'] current_task = Task( id=task.id, status=TaskStatus(state = TaskState.SUBMITTED), #initialize with submitted metadata=task.metadata, artifacts = [], sessionId=conversation_id, ) self.add_task(current_task) return current_task return current_task def process_artifact_event(self, current_task:Task, task_update_event: TaskArtifactUpdateEvent): artifact = task_update_event.artifact if not artifact.append: #received the first chunk or entire payload for an artifact if artifact.lastChunk is None or artifact.lastChunk: #lastChunk bit is missing or is set to true, so this is the entire payload #add this to artifacts if not current_task.artifacts: current_task.artifacts = [] current_task.artifacts.append(artifact) else: #this is a chunk of an artifact, stash it in temp store for assemling if not task_update_event.id in self._artifact_chunks: self._artifact_chunks[task_update_event.id] = {} self._artifact_chunks[task_update_event.id][artifact.index] = artifact else: # we received an append chunk, add to the existing temp artifact current_temp_artifact = self._artifact_chunks[task_update_event.id][artifact.index] # TODO handle if current_temp_artifact is missing current_temp_artifact.parts.extend(artifact.parts) if artifact.lastChunk: current_task.artifacts.append(current_temp_artifact) del self._artifact_chunks[task_update_event.id][artifact.index] def add_event(self, event: Event): self._events[event.id] = event def get_conversation( self, conversation_id: Optional[str] ) -> Optional[Conversation]: if not conversation_id: return None return next( filter(lambda c: c.conversation_id == conversation_id, self._conversations), None) def get_pending_messages(self) -> list[Tuple[str, str]]: rval = [] for message_id in self._pending_message_ids: if message_id in self._task_map: task_id = self._task_map[message_id] task = next(filter(lambda x: x.id == task_id, self._tasks), None) if not task: rval.append((message_id, "")) elif task.history and task.history[-1].parts: if len(task.history) == 1: rval.append((message_id, "Working...")) else: part = task.history[-1].parts[0] rval.append(( message_id, part.text if part.type == "text" else "Working...")) else: rval.append((message_id, "")) return rval def register_agent(self, url): agent_data = get_agent_card(url) if not agent_data.url: agent_data.url = url self._agents.append(agent_data) self._host_agent.register_agent_card(agent_data) # Now update the host agent definition self._initialize_host() @property def agents(self) -> list[AgentCard]: return self._agents @property def conversations(self) -> list[Conversation]: return self._conversations @property def tasks(self) -> list[Task]: return self._tasks @property def events(self) -> list[Event]: return sorted(self._events.values(), key=lambda x: x.timestamp) def adk_content_from_message(self, message: Message) -> types.Content: parts: list[types.Part] = [] for part in message.parts: if part.type == "text": parts.append(types.Part.from_text(text=part.text)) elif part.type == "data": json_string = json.dumps(part.data) parts.append(types.Part.from_text(text=json_string)) elif part.type == "file": if part.uri: parts.append(types.Part.from_uri( file_uri=part.uri, mime_type=part.mimeType )) elif content_part.bytes: parts.append(types.Part.from_bytes( data=part.bytes.encode('utf-8'), mime_type=part.mimeType) ) else: raise ValueError("Unsupported message type") return types.Content(parts=parts, role=message.role) def adk_content_to_message(self, content: types.Content, conversation_id: str) -> Message: parts: list[Part] = [] if not content.parts: return Message( parts=[], role=content.role if content.role == 'user' else 'agent', metadata={'conversation_id': conversation_id}, ) for part in content.parts: if part.text: # try parse as data try: data = json.loads(part.text) parts.append(DataPart(data=data)) except: parts.append(TextPart(text=part.text)) elif part.inline_data: parts.append(FilePart( data=part.inline_data.decode('utf-8'), mimeType=part.inline_data.mime_type )) elif part.file_data: parts.append(FilePart( file=FileContent( uri=part.file_data.file_uri, mimeType=part.file_data.mime_type ) )) # These aren't managed by the A2A message structure, these are internal # details of ADK, we will simply flatten these to json representations. elif part.video_metadata: parts.append(DataPart(data=part.video_metadata.model_dump())) elif part.thought: parts.append(TextPart(text="thought")) elif part.executable_code: parts.append(DataPart(data=part.executable_code.model_dump())) elif part.function_call: parts.append(DataPart(data=part.function_call.model_dump())) elif part.function_response: parts.extend(self._handle_function_response(part, conversation_id)) else: raise ValueError("Unexpected content, unknown type") return Message( role=content.role if content.role == 'user' else 'agent', parts=parts, metadata={'conversation_id': conversation_id}, ) def _handle_function_response(self, part: types.Part, conversation_id: str) -> list[Part]: parts = [] try: for p in part.function_response.response['result']: if isinstance(p, str): parts.append(TextPart(text=p)) elif isinstance(p, dict): if 'type' in p and p['type'] == 'file': parts.append(FilePart(**p)) else: parts.append(DataPart(data=p)) elif isinstance(p, DataPart): if 'artifact-file-id' in p.data: file_part = self._artifact_service.load_artifact(user_id=self.user_id, session_id=conversation_id, app_name=self.app_name, filename = p.data['artifact-file-id']) file_data = file_part.inline_data base64_data = base64.b64encode(file_data.data).decode('utf-8') parts.append(FilePart( file=FileContent( bytes=base64_data, mimeType=file_data.mime_type, name='artifact_file' ) )) else: parts.append(DataPart(data=p.data)) else: parts.append(TextPart(text=json.dumps(p))) except Exception as e: print("Couldn't convert to messages:", e) parts.append(DataPart(data=part.function_response.model_dump())) return parts def get_message_id(m: Message | None) -> str | None: if not m or not m.metadata or 'message_id' not in m.metadata: return None return m.metadata['message_id'] def get_last_message_id(m: Message | None) -> str | None: if not m or not m.metadata or 'last_message_id' not in m.metadata: return None return m.metadata['last_message_id'] def get_conversation_id( t: (Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent | Message | None) ) -> str | None: if (t and hasattr(t, 'metadata') and t.metadata and 'conversation_id' in t.metadata): return t.metadata['conversation_id'] return None def task_still_open(task: Task | None) -> bool: if not task: return False return task.status.state in [ TaskState.SUBMITTED, TaskState.WORKING, TaskState.INPUT_REQUIRED ] ``` ## /demo/ui/service/server/application_manager.py ```py path="/demo/ui/service/server/application_manager.py" from abc import ABC, abstractmethod from common.types import Message, Task, AgentCard from service.types import Conversation, Event class ApplicationManager(ABC): @abstractmethod def create_conversation(self) -> Conversation: pass @abstractmethod def sanitize_message(self, message: Message) -> Message: pass @abstractmethod async def process_message(self, message: Message): pass @abstractmethod def register_agent(self, url: str): pass @abstractmethod def get_pending_messages(self) -> list[str]: pass @property @abstractmethod def conversations(self) -> list[Conversation]: pass @property @abstractmethod def tasks(self) -> list[Task]: pass @property @abstractmethod def agents(self) -> list[AgentCard]: pass @property @abstractmethod def events(self) -> list[Event]: pass ``` ## /demo/ui/service/server/in_memory_manager.py ```py path="/demo/ui/service/server/in_memory_manager.py" import asyncio import datetime from typing import Tuple, Optional import uuid from service.types import Conversation, Event from common.types import ( Message, Task, TextPart, TaskState, TaskStatus, Artifact, AgentCard, DataPart, ) from utils.agent_card import get_agent_card from service.server.application_manager import ApplicationManager from service.server import test_image class InMemoryFakeAgentManager(ApplicationManager): """An implementation of memory based management with fake agent actions This implements the interface of the ApplicationManager to plug into the AgentServer. This acts as the service contract that the Mesop app uses to send messages to the agent and provide information for the frontend. """ _conversations: list[Conversation] _messages: list[Message] _tasks: list[Task] _events: list[Event] _pending_message_ids: list[str] _next_message_idx: int _agents: list[AgentCard] def __init__(self): self._conversations = [] self._messages = [] self._tasks = [] self._events = [] self._pending_message_ids = [] self._next_message_idx = 0 self._agents = [] self._task_map = {} def create_conversation(self) -> Conversation: conversation_id = str(uuid.uuid4()) c = Conversation(conversation_id=conversation_id, is_active=True) self._conversations.append(c) return c def sanitize_message(self, message: Message) -> Message: if not message.metadata: message.metadata = {} message.metadata.update({'message_id': str(uuid.uuid4())}) return message async def process_message(self, message: Message): self._messages.append(message) message_id = message.metadata['message_id'] self._pending_message_ids.append(message_id) conversation_id = ( message.metadata['conversation_id'] if 'conversation_id' in message.metadata else None ) # Now check the conversation and attach the message id. conversation = self.get_conversation(conversation_id) if conversation: conversation.messages.append(message) self._events.append(Event( id=str(uuid.uuid4()), actor="host", content=message, timestamp=datetime.datetime.utcnow().timestamp(), )) # Now actually process the message. If the response is async, return None # for the message response and the updated message information for the # incoming message (with ids attached). task_id = str(uuid.uuid4()) task = Task( id=task_id, sessionId=conversation_id, status=TaskStatus( state=TaskState.SUBMITTED, message=message, ), history=[message], ) if self._next_message_idx != 0: self._task_map[message_id] = task_id self.add_task(task) await asyncio.sleep(self._next_message_idx) response = self.next_message() response.metadata = {**message.metadata, **{'message_id': str(uuid.uuid4())}} if conversation: conversation.messages.append(response) self._events.append(Event( id=str(uuid.uuid4()), actor="host", content=response, timestamp=datetime.datetime.utcnow().timestamp(), )) self._pending_message_ids.remove(message.metadata['message_id']) # Now clean up the task if task: task.status.state = TaskState.COMPLETED task.artifacts = [Artifact(name="response", parts=response.parts)] task.history.append(response) self.update_task(task) def add_task(self, task: Task): self._tasks.append(task) def update_task(self, task: Task): for i, t in enumerate(self._tasks): if t.id == task.id: self._tasks[i] = task return def add_event(self, event: Event): self._events.append(event) def next_message(self) -> Message: message = _message_queue[self._next_message_idx] self._next_message_idx = (self._next_message_idx + 1) % len(_message_queue) return message def get_conversation( self, conversation_id: Optional[str] ) -> Optional[Conversation]: if not conversation_id: return None return next( filter(lambda c: c.conversation_id == conversation_id, self._conversations), None) def get_pending_messages(self) -> list[Tuple[str,str]]: rval = [] for message_id in self._pending_message_ids: if message_id in self._task_map: task_id = self._task_map[message_id] task = next(filter(lambda x: x.id == task_id, self._tasks), None) if not task: rval.append((message_id, "")) elif task.history and task.history[-1].parts: if len(task.history) == 1: rval.append((message_id, "Working...")) else: part = task.history[-1].parts[0] rval.append(( message_id, part.text if part.type == "text" else "Working...")) else: rval.append((message_id, "")) return rval return self._pending_message_ids def register_agent(self, url): agent_data = get_agent_card(url) if not agent_data.url: agent_data.url = url self._agents.append(agent_data) @property def agents(self) -> list[AgentCard]: return self._agents @property def conversations(self) -> list[Conversation]: return self._conversations @property def tasks(self) -> list[Task]: return self._tasks @property def events(self) -> list[Event]: return self._events # This represents the precanned responses that will be returned in order. # Extend this list to test more functionality of the UI _message_queue: list[Message] = [ Message(role="agent", parts=[TextPart(text="Hello")]), Message(role="agent", parts=[ DataPart( data={ 'type': 'form', 'form': { 'type': 'object', 'properties': { 'name': { 'type': 'string', 'description': 'Enter your name', 'title': 'Name', }, 'date': { 'type': 'string', 'format': 'date', 'description': 'Birthday', 'title': 'Birthday', }, }, 'required': ['date'], }, 'form_data': { 'name': 'John Smith', }, 'instructions': "Please provide your birthday and name", } ), ]), Message(role="agent", parts=[TextPart(text="I like cats")]), test_image.test_image, Message(role="agent", parts=[TextPart(text="And I like dogs")]), ] ``` ## /demo/ui/service/server/server.py ```py path="/demo/ui/service/server/server.py" import asyncio import base64 import threading import os import uuid from typing import Any from fastapi import APIRouter from fastapi import Request, Response from common.types import Message, Task, FilePart, FileContent from .in_memory_manager import InMemoryFakeAgentManager from .application_manager import ApplicationManager from .adk_host_manager import ADKHostManager, get_message_id from service.types import ( Conversation, Event, CreateConversationResponse, ListConversationResponse, SendMessageResponse, MessageInfo, ListMessageResponse, PendingMessageResponse, ListTaskResponse, RegisterAgentResponse, ListAgentResponse, GetEventResponse ) class ConversationServer: """ConversationServer is the backend to serve the agent interactions in the UI This defines the interface that is used by the Mesop system to interact with agents and provide details about the executions. """ def __init__(self, router: APIRouter): agent_manager = os.environ.get("A2A_HOST", "ADK") self.manager: ApplicationManager # Get API key from environment api_key = os.environ.get("GOOGLE_API_KEY", "") uses_vertex_ai = os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE" if agent_manager.upper() == "ADK": self.manager = ADKHostManager(api_key=api_key, uses_vertex_ai=uses_vertex_ai) else: self.manager = InMemoryFakeAgentManager() self._file_cache = {} # dict[str, FilePart] maps file id to message data self._message_to_cache = {} # dict[str, str] maps message id to cache id router.add_api_route( "/conversation/create", self._create_conversation, methods=["POST"]) router.add_api_route( "/conversation/list", self._list_conversation, methods=["POST"]) router.add_api_route( "/message/send", self._send_message, methods=["POST"]) router.add_api_route( "/events/get", self._get_events, methods=["POST"]) router.add_api_route( "/message/list", self._list_messages, methods=["POST"]) router.add_api_route( "/message/pending", self._pending_messages, methods=["POST"]) router.add_api_route( "/task/list", self._list_tasks, methods=["POST"]) router.add_api_route( "/agent/register", self._register_agent, methods=["POST"]) router.add_api_route( "/agent/list", self._list_agents, methods=["POST"]) router.add_api_route( "/message/file/{file_id}", self._files, methods=["GET"]) router.add_api_route( "/api_key/update", self._update_api_key, methods=["POST"]) # Update API key in manager def update_api_key(self, api_key: str): if isinstance(self.manager, ADKHostManager): self.manager.update_api_key(api_key) def _create_conversation(self): c = self.manager.create_conversation() return CreateConversationResponse(result=c) async def _send_message(self, request: Request): message_data = await request.json() message = Message(**message_data['params']) message = self.manager.sanitize_message(message) t = threading.Thread(target=lambda: asyncio.run(self.manager.process_message(message))) t.start() return SendMessageResponse(result=MessageInfo( message_id=message.metadata['message_id'], conversation_id=message.metadata['conversation_id'] if 'conversation_id' in message.metadata else '', )) async def _list_messages(self, request: Request): message_data = await request.json() conversation_id = message_data['params'] conversation = self.manager.get_conversation(conversation_id) if conversation: return ListMessageResponse(result=self.cache_content( conversation.messages)) return ListMessageResponse(result=[]) def cache_content(self, messages: list[Message]): rval = [] for m in messages: message_id = get_message_id(m) if not message_id: rval.append(m) continue new_parts = [] for i, part in enumerate(m.parts): if part.type != 'file': new_parts.append(part) continue message_part_id = f"{message_id}:{i}" if message_part_id in self._message_to_cache: cache_id = self._message_to_cache[message_part_id] else: cache_id = str(uuid.uuid4()) self._message_to_cache[message_part_id] = cache_id # Replace the part data with a url reference new_parts.append(FilePart( file=FileContent( mimeType=part.file.mimeType, uri=f"/message/file/{cache_id}", ) )) if cache_id not in self._file_cache: self._file_cache[cache_id] = part m.parts = new_parts rval.append(m) return rval async def _pending_messages(self): return PendingMessageResponse(result=self.manager.get_pending_messages()) def _list_conversation(self): return ListConversationResponse(result=self.manager.conversations) def _get_events(self): return GetEventResponse(result=self.manager.events) def _list_tasks(self): return ListTaskResponse(result=self.manager.tasks) async def _register_agent(self, request: Request): message_data = await request.json() url = message_data['params'] self.manager.register_agent(url) return RegisterAgentResponse() async def _list_agents(self): return ListAgentResponse(result=self.manager.agents) def _files(self, file_id): if file_id not in self._file_cache: raise Exception("file not found") part = self._file_cache[file_id] if "image" in part.file.mimeType: return Response( content=base64.b64decode(part.file.bytes), media_type=part.file.mimeType) return Response(content=part.file.bytes, media_type=part.file.mimeType) async def _update_api_key(self, request: Request): """Update the API key""" try: data = await request.json() api_key = data.get("api_key", "") if api_key: # Update in the manager self.update_api_key(api_key) return {"status": "success"} return {"status": "error", "message": "No API key provided"} except Exception as e: return {"status": "error", "message": str(e)} ``` The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.