```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── LICENSE
├── README.md
├── demo/
├── README.md
├── ui/
├── README.md
├── __init__.py
├── components/
├── __init__.py
├── agent_list.py
├── api_key_dialog.py
├── async_poller.js
├── async_poller.py
├── chat_bubble.py
├── conversation.py
├── conversation_list.py
├── dialog.py
├── event_viewer.py
├── form_render.py
├── header.py
├── page_scaffold.py
├── poller.py
├── side_nav.py
├── task_card.py
├── main.py
├── pages/
├── __init__.py
├── agent_list.py
├── conversation.py
├── event_list.py
├── home.py
├── settings.py
├── task_list.py
├── pyproject.toml
├── service/
├── __init__.py
├── client/
├── __init__.py
├── client.py
├── server/
├── __init__.py
├── adk_host_manager.py
├── application_manager.py
├── in_memory_manager.py
├── server.py
├── test_image.py
```
## /.gitignore
```gitignore path="/.gitignore"
.DS_Store
__pycache__
.env
```
## /CODE_OF_CONDUCT.md
# Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, gender identity and expression, level of
experience, education, socio-economic status, nationality, personal appearance,
race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, or to ban temporarily or permanently any
contributor for other behaviors that they deem inappropriate, threatening,
offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
This Code of Conduct also applies outside the project spaces when the Project
Steward has a reasonable belief that an individual's behavior may have a
negative impact on the project or its community.
## Conflict Resolution
We do not believe that all conflict is bad; healthy debate and disagreement
often yield positive results. However, it is never okay to be disrespectful or
to engage in behavior that violates the project’s code of conduct.
If you see someone violating the code of conduct, you are encouraged to address
the behavior directly with those involved. Many issues can be resolved quickly
and easily, and this gives people more control over the outcome of their
dispute. If you are unable to resolve the matter for any reason, or if the
behavior is threatening or harassing, report it. We are dedicated to providing
an environment where participants feel welcome and safe.
Reports should be directed to *[PROJECT STEWARD NAME(s) AND EMAIL(s)]*, the
Project Steward(s) for *[PROJECT NAME]*. It is the Project Steward’s duty to
receive and address reported violations of the code of conduct. They will then
work with a committee consisting of representatives from the Open Source
Programs Office and the Google Open Source Strategy team. If for any reason you
are uncomfortable reaching out to the Project Steward, please email
opensource@google.com.
We will investigate every complaint, but you may not receive a direct response.
We will use our discretion in determining when and how to follow up on reported
incidents, which may range from not taking action to permanent expulsion from
the project and project-sponsored spaces. We will notify the accused of the
report and provide them an opportunity to discuss it before any action is taken.
The identity of the reporter will be omitted from the details of the report
supplied to the accused. In potentially harmful situations, such as ongoing
harassment or threats to anyone's safety, we may take action without notice.
## Attribution
This Code of Conduct is adapted from the Contributor Covenant, version 1.4,
available at
https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
Note: A version of this file is also available in the
[New Project repo](https://github.com/google/new-project/blob/master/docs/code-of-conduct.md).
## /CONTRIBUTING.md
# How to contribute
We'd love to accept your patches and contributions to this project.
## Before you begin
### Sign our Contributor License Agreement
Contributions to this project must be accompanied by a
[Contributor License Agreement](https://cla.developers.google.com/about) (CLA).
You (or your employer) retain the copyright to your contribution; this simply
gives us permission to use and redistribute your contributions as part of the
project.
If you or your current employer have already signed the Google CLA (even if it
was for a different project), you probably don't need to do it again.
Visit to see your current agreements or to
sign a new one.
### Review our community guidelines
This project follows
[Google's Open Source Community Guidelines](https://opensource.google/conduct/).
## Contribution process
### Code reviews
All submissions, including submissions by project members, require review. We
use GitHub pull requests for this purpose. Consult
[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more
information on using pull requests.
## /LICENSE
``` path="/LICENSE"
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```
## /README.md

[](LICENSE)
**_An open protocol enabling communication and interoperability between opaque agentic applications._**
- [Agent2Agent Protocol A2A](#agent2agent-protocol-a2a)
- [Getting Started](#getting-started)
- [Contributing](#contributing)
- [What's next](#whats-next)
- [About](#about)
One of the biggest challenges in enterprise AI adoption is getting agents built on different frameworks and vendors to work together. That’s why we created an open *Agent2Agent (A2A) protocol*, a collaborative way to help agents across different ecosystems communicate with each other. Google is driving this open protocol initiative for the industry because we believe this protocol will be **critical to support multi-agent communication by giving your agents a common language – irrespective of the framework or vendor they are built on**.
With *A2A*, agents can show each other their capabilities and negotiate how they will interact with users (via text, forms, or bidirectional audio/video) – all while working securely together.
### **See A2A in Action**
Watch [this demo video](https://storage.googleapis.com/gweb-developer-goog-blog-assets/original_videos/A2A_demo_v4.mp4) to see how A2A enables seamless communication between different agent frameworks.
### Conceptual Overview
The Agent2Agent (A2A) protocol facilitates communication between independent AI agents. Here are the core concepts:
* **Agent Card:** A public metadata file (usually at `/.well-known/agent.json`) describing an agent's capabilities, skills, endpoint URL, and authentication requirements. Clients use this for discovery.
* **A2A Server:** An agent exposing an HTTP endpoint that implements the A2A protocol methods (defined in the [json specification](/specification)). It receives requests and manages task execution.
* **A2A Client:** An application or another agent that consumes A2A services. It sends requests (like `tasks/send`) to an A2A Server's URL.
* **Task:** The central unit of work. A client initiates a task by sending a message (`tasks/send` or `tasks/sendSubscribe`). Tasks have unique IDs and progress through states (`submitted`, `working`, `input-required`, `completed`, `failed`, `canceled`).
* **Message:** Represents communication turns between the client (`role: "user"`) and the agent (`role: "agent"`). Messages contain `Parts`.
* **Part:** The fundamental content unit within a `Message` or `Artifact`. Can be `TextPart`, `FilePart` (with inline bytes or a URI), or `DataPart` (for structured JSON, e.g., forms).
* **Artifact:** Represents outputs generated by the agent during a task (e.g., generated files, final structured data). Artifacts also contain `Parts`.
* **Streaming:** For long-running tasks, servers supporting the `streaming` capability can use `tasks/sendSubscribe`. The client receives Server-Sent Events (SSE) containing `TaskStatusUpdateEvent` or `TaskArtifactUpdateEvent` messages, providing real-time progress.
* **Push Notifications:** Servers supporting `pushNotifications` can proactively send task updates to a client-provided webhook URL, configured via `tasks/pushNotification/set`.
**Typical Flow:**
1. **Discovery:** Client fetches the Agent Card from the server's well-known URL.
2. **Initiation:** Client sends a `tasks/send` or `tasks/sendSubscribe` request containing the initial user message and a unique Task ID.
3. **Processing:**
* **(Streaming):** Server sends SSE events (status updates, artifacts) as the task progresses.
* **(Non-Streaming):** Server processes the task synchronously and returns the final `Task` object in the response.
4. **Interaction (Optional):** If the task enters `input-required`, the client sends subsequent messages using the same Task ID via `tasks/send` or `tasks/sendSubscribe`.
5. **Completion:** The task eventually reaches a terminal state (`completed`, `failed`, `canceled`).
### **Getting Started**
* 📚 Read the [technical documentation](https://google.github.io/A2A/#/documentation) to understand the capabilities
* 📝 Review the [json specification](/specification) of the protocol structures
* 🎬 Use our [samples](/samples) to see A2A in action
* Sample A2A Client/Server ([Python](/samples/python/common), [JS](/samples/js/src))
* [Multi-Agent Web App](/demo/README.md)
* CLI ([Python](/samples/python/hosts/cli/README.md), [JS](/samples/js/README.md))
* 🤖 Use our [sample agents](/samples/python/agents/README.md) to see how to bring A2A to agent frameworks
* [Agent Development Kit (ADK)](/samples/python/agents/google_adk/README.md)
* [CrewAI](/samples/python/agents/crewai/README.md)
* [LangGraph](/samples/python/agents/langgraph/README.md)
* [Genkit](/samples/js/src/agents/README.md)
* [LlamaIndex](/samples/python/agents/llama_index_file_chat/README.md)
* [Marvin](/samples/python/agents/marvin/README.md)
* [Semantic Kernel](/samples/python/agents/semantickernel/README.md)
* 📑 Review key topics to understand protocol details
* [A2A and MCP](https://google.github.io/A2A/#/topics/a2a_and_mcp.md)
* [Agent Discovery](https://google.github.io/A2A/#/topics/agent_discovery.md)
* [Enterprise Ready](https://google.github.io/A2A/#/topics/enterprise_ready.md)
* [Push Notifications](https://google.github.io/A2A/#/topics/push_notifications.md)
### **Contributing**
We highly value community contributions and appreciate your interest in A2A Protocol! Here's how you can get involved:
* Get Started? Please see our [contributing guide](CONTRIBUTING.md) to get started.
* Have questions? Join our community in [GitHub discussions](https://github.com/google/A2A/discussions).
* Want to help with protocol improvement feedback? Dive into [GitHub issues](https://github.com/google/A2A/issues).
* Private Feedback? Please use this [Google form](https://docs.google.com/forms/d/e/1FAIpQLScS23OMSKnVFmYeqS2dP7dxY3eTyT7lmtGLUa8OJZfP4RTijQ/viewform)
* Existing Google cloud platform customer and want to join our partner program to contribute to A2A ecosystem? Please fill this [form](https://docs.google.com/forms/d/1VXYY1qBhUbRfY15Z5G_KPYoPC9d1LCrwde5ehjYKCZ8/preview)
### **What's next**
Future plans include improvements to the protocol itself and enhancements to the samples:
**Protocol Enhancements:**
* **Agent Discovery:**
* Formalize inclusion of authorization schemes and optional credentials directly within the `AgentCard`.
* **Agent Collaboration:**
* Investigate a `QuerySkill()` method for dynamically checking unsupported or unanticipated skills.
* **Task Lifecycle & UX:**
* Support for dynamic UX negotiation *within* a task (e.g., agent adding audio/video mid-conversation).
* **Client Methods & Transport:**
* Explore extending support to client-initiated methods (beyond task management).
* Improvements to streaming reliability and push notification mechanisms.
**Sample & Documentation Enhancements:**
* Simplify "Hello World" examples.
* Include additional examples of agents integrated with different frameworks or showcasing specific A2A features.
* Provide more comprehensive documentation for the common client/server libraries.
* Generate human-readable HTML documentation from the JSON Schema.
### **About**
A2A Protocol is an open source project run by Google LLC, under [Apache License](LICENSE) and open to contributions from the entire community.
## /demo/README.md
## Demo Web App
This demo application showcases agents talking to other agents over A2A.

* The frontend is a [mesop](https://github.com/mesop-dev/mesop) web application that renders conversations as content between the end user and the "Host Agent". This app can render text content, thought bubbles, web forms (requests for input from agents), and images. More content types coming soon
* The [Host Agent](/samples/python/hosts/multiagent/host_agent.py) is a Google ADK agent which orchestrates user requests to Remote Agents.
* Each [Remote Agent](/samples/python/hosts/multiagent/remote_agent_connection.py) is an A2AClient running inside a Google ADK agent. Each remote agent will retrieve the A2AServer's [AgentCard](https://google.github.io/A2A/#documentation?id=agent-card) and then proxy all requests using A2A.
## Features
### Dynamically add agents
Clicking on the robot icon in the web app lets you add new agents. Enter the address of the remote agent's AgentCard and the app will fetch the card and add the remote agent to the local set of known agents.
### Speak with one or more agents
Click on the chat button to start or continue an existing conversation. This conversation will go to the Host Agent which will then delegate the request to one or more remote agents.
If the agent returns complex content - like an image or a web-form - the frontend will render this in the conversation view. The Remote Agent will take care of converting this content between A2A and the web apps native application representation.
### Explore A2A Tasks
Click on the history to see the messages sent between the web app and all of the agents (Host agent and Remote agents).
Click on the task list to see all the A2A task updates from the remote agents
## Prerequisites
- Python 3.12 or higher
- UV
- Agent servers speaking A2A ([use these samples](/samples/python/agents/README.md))
- Authentication credentials (API Key or Vertex AI)
## Running the Examples
1. Navigate to the demo ui directory:
```bash
cd demo/ui
```
2. Create an environment file with your API key or enter it directly in the UI when prompted:
**Option A: Google AI Studio API Key**
```bash
echo "GOOGLE_API_KEY=your_api_key_here" >> .env
```
Or enter it directly in the UI when prompted.
**Option B: Google Cloud Vertex AI**
```bash
echo "GOOGLE_GENAI_USE_VERTEXAI=TRUE" >> .env
echo "GOOGLE_CLOUD_PROJECT=your_project_id" >> .env
echo "GOOGLE_CLOUD_LOCATION=your_location" >> .env
```
Note: Ensure you've authenticated with gcloud using `gcloud auth login` first.
For detailed instructions on authentication setup, see the [ADK documentation](https://google.github.io/adk-docs/get-started/quickstart/#set-up-the-model).
3. Run the front end example:
```bash
uv run main.py
```
Note: The application runs on port 12000 by default
## /demo/ui/README.md
## /demo/ui/__init__.py
```py path="/demo/ui/__init__.py"
```
## /demo/ui/components/__init__.py
```py path="/demo/ui/components/__init__.py"
```
## /demo/ui/components/agent_list.py
```py path="/demo/ui/components/agent_list.py"
import mesop as me
import pandas as pd
from typing import List, Tuple
from state.state import AppState
from state.agent_state import AgentState
from common.types import AgentCard
@me.component
def agents_list(
agents: list[AgentCard],
):
"""Agents list component"""
df_data = {
"Address": [],
"Name": [],
"Description": [],
"Organization": [],
"Input Modes": [],
"Output Modes": [],
"Streaming": [],
}
for agent_info in agents:
df_data["Address"].append(agent_info.url)
df_data["Name"].append(agent_info.name)
df_data["Description"].append(agent_info.description)
df_data["Organization"].append(
agent_info.provider.organization if agent_info.provider else ''
)
df_data["Input Modes"].append(", ".join(agent_info.defaultInputModes))
df_data["Output Modes"].append(", ".join(agent_info.defaultOutputModes))
df_data["Streaming"].append(agent_info.capabilities.streaming)
df = pd.DataFrame(
pd.DataFrame(df_data),
columns=[
"Address",
"Name",
"Description",
"Organization",
"Input Modes",
"Output Modes",
"Streaming",
],
)
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
flex_direction="column",
)
):
me.table(
df,
header=me.TableHeader(sticky=True),
columns={
"Address": me.TableColumn(sticky=True),
"Name": me.TableColumn(sticky=True),
"Description": me.TableColumn(sticky=True),
},
)
with me.content_button(
type="raised",
on_click=add_agent,
key="new_agent",
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
margin=me.Margin(top=10),
),
):
me.icon(icon="upload")
def add_agent(e: me.ClickEvent): # pylint: disable=unused-argument
"""import agent button handler"""
state = me.state(AgentState)
state.agent_dialog_open = True
```
## /demo/ui/components/api_key_dialog.py
```py path="/demo/ui/components/api_key_dialog.py"
import mesop as me
import os
from state.state import AppState
from state.host_agent_service import UpdateApiKey
from .dialog import dialog, dialog_actions
def on_api_key_change(e: me.InputBlurEvent):
"""Save API key to app state when input changes"""
state = me.state(AppState)
state.api_key = e.value
async def save_api_key(e: me.ClickEvent):
"""Save API key and close dialog"""
yield # Yield to allow UI update
state = me.state(AppState)
# Validate API key is not empty
if not state.api_key.strip():
return
# Set the environment variable for current process
os.environ["GOOGLE_API_KEY"] = state.api_key
# Update the API key in the server
await UpdateApiKey(state.api_key)
state.api_key_dialog_open = False
yield
@me.component
def api_key_dialog():
"""Dialog for API key input"""
state = me.state(AppState)
with dialog(state.api_key_dialog_open):
with me.box(
style=me.Style(display="flex", flex_direction="column", gap=12)
):
me.text(
"Google API Key Required",
type="headline-4",
style=me.Style(margin=me.Margin(bottom=10)),
)
me.text(
"Please enter your Google API Key to use the application.",
style=me.Style(margin=me.Margin(bottom=20)),
)
me.input(
label="Google API Key",
value=state.api_key,
on_blur=on_api_key_change,
type="password",
style=me.Style(width="100%"),
)
with dialog_actions():
me.button("Save", on_click=save_api_key)
```
## /demo/ui/components/async_poller.js
```js path="/demo/ui/components/async_poller.js"
import {
LitElement,
html,
} from 'https://cdn.jsdelivr.net/gh/lit/dist@3/core/lit-core.min.js';
class AsyncPoller extends LitElement {
static properties = {
triggerEvent: {type: String},
action: {type: Object},
polling_interval: {type: Number},
};
render() {
return html``;
}
firstUpdated() {
if (this.polling_interval <= 0) {
return;
}
if (this.action) {
setTimeout(() => {
this.runTimeout(this.action)
}, this.polling_interval * 1000);
}
}
runTimeout(action) {
this.dispatchEvent(
new MesopEvent(this.triggerEvent, {
action: action,
}),
);
if (this.polling_interval > 0) {
setTimeout(() => {
this.runTimeout();
}, this.polling_interval * 1000);
}
}
}
customElements.define('async-action-component', AsyncPoller);
```
## /demo/ui/components/async_poller.py
```py path="/demo/ui/components/async_poller.py"
from dataclasses import asdict, dataclass
from typing import Any, Callable
import mesop.labs as mel
from state.state import AppState
@dataclass
class AsyncAction:
value: AppState
duration_seconds: int
@mel.web_component(path="./async_poller.js")
def async_poller(
*,
trigger_event: Callable[[mel.WebEvent], Any],
action: AsyncAction | None = None,
key: str | None = None,
):
"""Creates an invisible component that will delay state changes asynchronously.
Right now this implementation is limited since we basically just pass the key
around. But ideally we also pass in some kind of value to update when the time
out expires.
The main benefit of this component is for cases, such as status messages that
may appear and disappear after some duration. The primary example here is the
example snackbar widget, which right now blocks the UI when using the sleep
yield approach.
The other benefit of this component is that it works generically (rather than
say implementing a custom snackbar widget as a web component).
Returns:
The web component that was created.
"""
return mel.insert_web_component(
name="async-action-component",
key=key,
events={
"triggerEvent": trigger_event,
},
properties={
"polling_interval": action.duration_seconds if action else 1,
"action": asdict(action) if action else {}
},
)
```
## /demo/ui/components/chat_bubble.py
```py path="/demo/ui/components/chat_bubble.py"
import mesop as me
from state.state import StateMessage
from state.state import AppState
@me.component
def chat_bubble(message: StateMessage, key: str):
"""Chat bubble component"""
app_state = me.state(AppState)
show_progress_bar = (
message.message_id in app_state.background_tasks
or message.message_id in app_state.message_aliases.values()
)
progress_text = ""
if show_progress_bar:
progress_text = app_state.background_tasks[message.message_id]
if not message.content:
print("No message content")
for pair in message.content:
chat_box(pair[0], pair[1], message.role, key, progress_bar=show_progress_bar, progress_text=progress_text)
def chat_box(
content: str,
media_type: str,
role: str,
key: str,
progress_bar: bool,
progress_text: str
):
with me.box(
style=me.Style(
display="flex",
justify_content=(
"space-between" if role == "agent" else "end"
),
min_width=500,
),
key=key,
):
with me.box(
style=me.Style(
display="flex",
flex_direction="column",
gap=5)
):
if media_type == "image/png":
if "/message/file" not in content:
content = "data:image/png;base64," + content
me.image(
src=content,
style=me.Style(
width="50%",
object_fit="contain",
),
)
else:
me.markdown(
content,
style=me.Style(
font_family="Google Sans",
box_shadow=(
"0 1px 2px 0 rgba(60, 64, 67, 0.3), "
"0 1px 3px 1px rgba(60, 64, 67, 0.15)"
),
padding=me.Padding(top=1, left=15, right=15, bottom=1),
margin=me.Margin(top=5, left=0, right=0, bottom=5),
background=(
me.theme_var("primary-container") if role == "user" else me.theme_var("secondary-container")
),
border_radius=15),
)
if progress_bar:
with me.box(
style=me.Style(
display="flex",
justify_content=(
"space-between" if role == "user" else "end"
),
min_width=500,
),
key=key,
):
with me.box(
style=me.Style(
display="flex",
flex_direction="column",
gap=5)
):
with me.box(
style=me.Style(
font_family="Google Sans",
box_shadow=(
"0 1px 2px 0 rgba(60, 64, 67, 0.3), "
"0 1px 3px 1px rgba(60, 64, 67, 0.15)"
),
padding=me.Padding(top=1, left=15, right=15, bottom=1),
margin=me.Margin(top=5, left=0, right=0, bottom=5),
background=(
me.theme_var("primary-container") if role == "agent" else me.theme_var("secondary-container")
),
border_radius=15),
):
if not progress_text:
progress_text = "Working..."
me.text(progress_text,
style=me.Style(
padding=me.Padding(top=1, left=15, right=15, bottom=1),
margin=me.Margin(top=5, left=0, right=0, bottom=5)))
me.progress_bar(color="accent")
```
## /demo/ui/components/conversation.py
```py path="/demo/ui/components/conversation.py"
import mesop as me
import mesop.labs as mel
import asyncio
import uuid
import functools
import threading
from state.state import AppState, SettingsState, StateMessage
from state.host_agent_service import SendMessage, ListConversations, convert_message_to_state
from .chat_bubble import chat_bubble
from .form_render import is_form, render_form, form_sent
from .async_poller import async_poller, AsyncAction
from common.types import Message, TextPart
@me.stateclass
class PageState:
"""Local Page State"""
conversation_id: str = ""
message_content: str = ""
def on_blur(e: me.InputBlurEvent):
"""input handler"""
state = me.state(PageState)
state.message_content = e.value
async def send_message(message: str, message_id: str = ""):
state = me.state(PageState)
app_state = me.state(AppState)
settings_state = me.state(SettingsState)
c = next(
(
x
for x in await ListConversations()
if x.conversation_id == state.conversation_id
),
None,
)
if not c:
print("Conversation id ", state.conversation_id, " not found")
request = Message(
id=message_id,
role="user",
parts=[TextPart(text=message)],
metadata={'conversation_id': c.conversation_id if c else "",
'conversation_name': c.name if c else ""},
)
# Add message to state until refresh replaces it.
state_message = convert_message_to_state(request)
if not app_state.messages:
app_state.messages = []
app_state.messages.append(state_message)
conversation = next(filter(
lambda x: x.conversation_id == c.conversation_id,
app_state.conversations), None)
if conversation:
conversation.message_ids.append(state_message.message_id)
response = await SendMessage(request)
async def send_message_enter(e: me.InputEnterEvent): # pylint: disable=unused-argument
"""send message handler"""
yield
state = me.state(PageState)
state.message_content = e.value
app_state = me.state(AppState)
message_id = str(uuid.uuid4())
app_state.background_tasks[message_id] = ""
yield
await send_message(state.message_content, message_id)
yield
async def send_message_button(e: me.ClickEvent): # pylint: disable=unused-argument
"""send message button handler"""
yield
state = me.state(PageState)
app_state = me.state(AppState)
message_id = str(uuid.uuid4())
app_state.background_tasks[message_id] = ""
await send_message(state.message_content, message_id)
yield
@me.component
def conversation():
"""Conversation component"""
page_state = me.state(PageState)
app_state = me.state(AppState)
if "conversation_id" in me.query_params:
page_state.conversation_id = me.query_params["conversation_id"]
app_state.current_conversation_id = page_state.conversation_id
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
flex_direction="column",
)
):
for message in app_state.messages:
if is_form(message):
render_form(message, app_state)
elif form_sent(message, app_state):
chat_bubble(StateMessage(
message_id=message.message_id,
role=message.role,
content=[("Form submitted", "text/plain")]
), message.message_id)
else:
chat_bubble(message, message.message_id)
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
min_width=500,
width="100%",
)
):
me.input(
label="How can I help you?",
on_blur=on_blur,
on_enter=send_message_enter,
style=me.Style(min_width="80vw"),
)
with me.content_button(
type="flat",
on_click=send_message_button,
):
me.icon(icon="send")
```
## /demo/ui/components/conversation_list.py
```py path="/demo/ui/components/conversation_list.py"
import mesop as me
import pandas as pd
from typing import List
import uuid
from state.state import AppState
from state.state import StateConversation
from state.host_agent_service import CreateConversation
@me.component
def conversation_list(conversations: List[StateConversation]):
"""Conversation list component"""
df_data = {"ID": [], "Name": [], "Status": [], "Messages": []}
for conversation in conversations:
df_data["ID"].append(conversation.conversation_id)
df_data["Name"].append(conversation.conversation_name)
df_data["Status"].append("Open" if conversation.is_active else "Closed")
df_data["Messages"].append(len(conversation.message_ids))
df = pd.DataFrame(
pd.DataFrame(df_data),
columns=["ID", "Name", "Status", "Messages"])
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
flex_direction="column",
)
):
me.table(
df,
on_click=on_click,
header=me.TableHeader(sticky=True),
columns={
"ID": me.TableColumn(sticky=True),
"Name": me.TableColumn(sticky=True),
"Status": me.TableColumn(sticky=True),
"Messages": me.TableColumn(sticky=True),
},
)
with me.content_button(
type="raised",
on_click=add_conversation,
key="new_conversation",
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
margin=me.Margin(top=10),
),
):
me.icon(icon="add")
async def add_conversation(e: me.ClickEvent): # pylint: disable=unused-argument
"""add conversation button handler"""
response = await CreateConversation()
me.state(AppState).messages = []
me.navigate("/conversation", query_params={"conversation_id": response.conversation_id})
yield
def on_click(e: me.TableClickEvent):
state = me.state(AppState)
conversation = state.conversations[e.row_index]
state.current_conversation_id = conversation.conversation_id
me.query_params.update({"conversation_id": conversation.conversation_id})
me.navigate("/conversation", query_params=me.query_params)
yield
```
## /demo/ui/components/dialog.py
```py path="/demo/ui/components/dialog.py"
import mesop as me
@me.content_component
def dialog(is_open: bool):
with me.box(
style=me.Style(
background="rgba(0,0,0,0.4)",
display="block" if is_open else "none",
height="100%",
overflow_x="auto",
overflow_y="auto",
position="fixed",
width="100%",
z_index=1000,
)
):
with me.box(
style=me.Style(
align_items="center",
display="grid",
height="100vh",
justify_items="center",
)
):
with me.box(
style=me.Style(
background=me.theme_var("background"),
border_radius=20,
box_sizing="content-box",
box_shadow=(
"0 3px 1px -2px #0003, 0 2px 2px #00000024, 0 1px 5px #0000001f"
),
margin=me.Margin.symmetric(vertical="0", horizontal="auto"),
padding=me.Padding.all(20),
)
):
me.slot()
@me.content_component
def dialog_actions():
with me.box(
style=me.Style(
display="flex", justify_content="end", margin=me.Margin(top=20)
)
):
me.slot()
```
## /demo/ui/components/event_viewer.py
```py path="/demo/ui/components/event_viewer.py"
import asyncio
from typing import List, Tuple
import google.genai.types as types
import mesop as me
import pandas as pd
from state.agent_state import AgentState
from state.host_agent_service import GetEvents
from state.state import AppState
from state.host_agent_service import convert_event_to_state
def flatten_content(content: list[Tuple[str,str]]) -> str:
parts = []
for p in content:
if p[1] == 'text/plain' or p[1] == 'application/json':
parts.append(p[0])
else:
parts.append(p[1])
return '\n'.join(parts)
@me.component
def event_list():
"""Events list component"""
df_data = {
"Conversation ID": [],
"Actor": [],
"Role": [],
"Id": [],
"Content": [],
}
events = asyncio.run(GetEvents())
for e in events:
event = convert_event_to_state(e)
df_data["Conversation ID"].append(event.conversation_id)
df_data["Role"].append(event.role)
df_data["Id"].append(event.id)
df_data["Content"].append(flatten_content(event.content))
df_data["Actor"].append(event.actor)
if not df_data["Conversation ID"]:
me.text("No events found")
return
df = pd.DataFrame(
pd.DataFrame(df_data),
columns=["Conversation ID", "Actor", "Role", "Id", "Content"],
)
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
flex_direction="column",
)
):
me.table(
df,
header=me.TableHeader(sticky=True),
columns={
"Conversation ID": me.TableColumn(sticky=True),
"Actor": me.TableColumn(sticky=True),
"Role": me.TableColumn(sticky=True),
"Id": me.TableColumn(sticky=True),
"Content": me.TableColumn(sticky=True),
},
)
```
## /demo/ui/components/form_render.py
```py path="/demo/ui/components/form_render.py"
import json
import mesop as me
from typing import Literal, Any, Tuple
import uuid
import dataclasses
from state.state import AppState, StateMessage
from state.host_agent_service import SendMessage
from common.types import Message, DataPart, TextPart
ROW_GAP = 15
BOX_PADDING = 20
@dataclasses.dataclass
class FormElement:
"""FormElement is a declarative structure for the form rendering"""
name: str = ""
label: str = ""
value: str = ""
formType: Literal[
"color",
"date",
"datetime-local",
"email",
"month",
"number",
"password",
"search",
"tel",
"text",
"time",
"url",
"week",
# These are custom types that dictate non input elements.
"radio",
"checkbox",
"date-picker",
] = "text"
required: bool = False
formDetails: dict[str, str] = dataclasses.field(default_factory=dict)
@dataclasses.dataclass
class FormState:
message_id: str
data: dict[str, str]
errors: dict[str, str]
elements: list[FormElement]
def __post_init__(self):
# Parse each element as FormElement. Clean up for non-recursive dict parse
for i, element_dict in enumerate(self.elements):
if isinstance(element_dict, dict):
self.elements[i] = FormElement(**element_dict)
@me.stateclass
class State:
"""This contains the data in the form"""
#forms: dict[str, FormState]
forms: dict[str, str]
def is_form(message: StateMessage) -> bool:
"""Returns whether the message indicates a form should be rendered"""
if any([x[1] == 'form' for x in message.content]):
return True
return False
def form_sent(message: StateMessage, app_state: AppState) -> bool:
return message.message_id in app_state.form_responses
def render_form(message: StateMessage, app_state: AppState):
"""Renders the form or the data entered in a submitted form"""
# Check if the form was completed, if so, render the content as a card
if message.message_id in app_state.completed_forms:
render_form_card(message, app_state.completed_forms[message.message_id])
return
# Otherwise, get the form structure.
instructions, form_structure = generate_form_elements(message)
data = {}
# Initialize the state data
for element in form_structure:
data[element.name] = element.value
state = me.state(State)
if message.message_id not in state.forms:
form = FormState(
message_id=message.message_id,
data=data,
errors={},
elements=form_structure,
)
try:
state.forms[message.message_id] = form_state_to_string(form)
except Exception as e:
print("Failed to serialize form", e, form)
render_structure(message.message_id, form_structure, instructions)
def render_form_card(message: StateMessage, data: dict[str, Any] | None):
"""Renders the result of a previous form as a card"""
with me.box(
style=me.Style(
padding=me.Padding.all(BOX_PADDING),
max_width="75vw",
background=me.theme_var("surface"),
border_radius=15,
margin=me.Margin(top=5, bottom=20, left=5, right=5),
justify_content=(
"end" if message.role == "agent" else "space-between"
),
box_shadow=("0 1px 2px 0 rgba(60, 64, 67, 0.3), "
"0 1px 3px 1px rgba(60, 64, 67, 0.15)"),
)
):
if data:
# Build markdown result
lines = []
for k, v in data.items():
lines.append(f"**{k}**: {v} ") # end with 2 spaces to force newline
me.markdown('\n'.join(lines).rstrip())
else:
me.text("Form canceled")
def generate_form_elements(message: StateMessage) -> Tuple[str, list[FormElement]]:
"""Returns a declarative structure for a form to generate"""
# Get the message part with the form information.
form_content = next(filter(lambda x: x[1] == 'form', message.content), None)
if not form_content:
return []
form_info = form_content[0]
if not isinstance(form_info, dict):
return []
return instructions_for_form(form_info), make_form_elements(form_info)
def make_form_elements(form_info: dict[str, Any]) -> list[FormElement]:
if 'form' not in form_info or 'properties' not in form_info['form']:
return []
# This is the key, value pairs of field names -> field info. Now we need to
# supplement it.
fields = form_info['form']['properties']
if ('required' in form_info['form'] and
isinstance(form_info['form']['required'], list)):
for field in form_info['form']['required']:
if field in fields:
fields[field]['required'] = True
if 'form_data' in form_info and isinstance(form_info['form_data'], dict):
for field, value in form_info['form_data'].items():
fields[field]['value'] = value
# Now convert the dictionary to FormElements
elements = []
for key, info in fields.items():
elements.append(FormElement(
name=key,
label=info['title'] if 'title' in info else key,
value=info['value'] if 'value' in info else "",
required=info['required'] if 'required' in info else False,
formType=info['format'] if 'format' in info else "text",
# TODO more details for input like validation rules
formDetails={},
))
return elements
def instructions_for_form(form_info: dict[str, Any]) -> str:
if 'instructions' in form_info:
return form_info['instructions']
return ""
def render_structure(id: str, elements: list[FormElement], instructions: str):
with me.box(
style=me.Style(
padding=me.Padding.all(BOX_PADDING),
max_width="75vw",
background=me.theme_var("surface"),
border_radius=15,
margin=me.Margin(top=5, bottom=20, left=5, right=5),
box_shadow=("0 1px 2px 0 rgba(60, 64, 67, 0.3), "
"0 1px 3px 1px rgba(60, 64, 67, 0.15)"),
)
):
if instructions:
me.text(
instructions,
type="headline-4",
style=me.Style(margin=me.Margin(bottom=10)),
)
for element in elements:
with form_group():
input_field(id=id, element=element)
with me.box():
me.button("Cancel", type="flat", on_click=cancel_form, key=id)
me.button("Submit", type="flat", on_click=submit_form, key=id)
def input_field(
*,
id: str,
element: FormElement,
width: str | int = "100%",
):
"""Renders an individual form input field"""
state = me.state(State)
form = FormState(**json.loads(state.forms[id]))
key = element.name if element.name else element.label.lower().replace(" ", "_")
value = element.value
if key in form.data and form.data[key]:
value = form.data[key]
with me.box(style=me.Style(flex_grow=1, width=width)):
me.input(
key=f"{id}_{key}",
label=element.label,
value=value,
appearance="outline",
color="warn" if key in form.errors else "primary",
style=me.Style(width=width),
type=element.formType,
on_blur=on_blur,
)
if key in form.errors:
me.text(
form.errors[key],
style=me.Style(
margin=me.Margin(top=-13, left=15, bottom=15),
color=me.theme_var("error"),
font_size=13,
),
)
@me.content_component
def form_group(flex_direction: Literal["row", "column"] = "row"):
"""Groups input fields together visually"""
with me.box(
style=me.Style(
display="flex", flex_direction=flex_direction, gap=ROW_GAP, width="100%"
)
):
me.slot()
def on_change(e: me.RadioChangeEvent):
state = me.state(State)
key_parts = e.key.split("_")
id = key_parts[0]
field = "_".join(key_parts[1:])
form = FormState(**json.loads(state.forms[id]))
form.data[field] = e.value
state.forms[id] = form_state_to_string(form)
def on_blur(e: me.InputBlurEvent):
state = me.state(State)
key_parts = e.key.split("_")
id = key_parts[0]
field = "_".join(key_parts[1:])
form = FormState(**json.loads(state.forms[id]))
form.data[field] = e.value
state.forms[id] = form_state_to_string(form)
async def cancel_form(e: me.ClickEvent):
message_id = str(uuid.uuid4())
app_state = me.state(AppState)
app_state.form_responses[message_id] = e.key
app_state.background_tasks[message_id] = ""
app_state.completed_forms[e.key] = None
request = Message(
id=message_id,
role="user",
parts=[TextPart(text="rejected form entry")],
metadata={
'conversation_id': app_state.current_conversation_id,
'message_id': message_id,
},
)
response = await SendMessage(request)
async def send_response(id: str, state: State, app_state: AppState):
message_id = str(uuid.uuid4())
app_state.background_tasks[message_id] = ""
app_state.form_responses[message_id] = id
form = FormState(**json.loads(state.forms[id]))
request = Message(
id=message_id,
role="user",
parts=[DataPart(data=form.data)],
metadata={
'conversation_id': app_state.current_conversation_id,
'message_id': message_id,
}
)
response = await SendMessage(request)
async def submit_form(e: me.ClickEvent):
try:
state = me.state(State)
id = e.key
form = FormState(**json.loads(state.forms[id]))
# Replace with real validation logic.
errors = {}
for element in form.elements:
if element.name == "error":
continue
if not form.data[element.name] and element.required:
errors[element.name] = f"{element.name.replace('_', ' ').capitalize()} is required"
form.errors = errors
state.forms[id] = form_state_to_string(form)
# Replace with form processing logic.
if errors:
return
app_state = me.state(AppState)
app_state.completed_forms[id] = form.data
await send_response(id, state, app_state)
except Exception as e:
print("Failed to submit form", e)
# There is some issue with mesop serialization. Instead we use raw string
# in the server state and interpret it as needed.
def form_state_to_string(form: FormState) -> str:
form_dict = dataclasses.asdict(form)
return json.dumps(form_dict)
```
## /demo/ui/components/header.py
```py path="/demo/ui/components/header.py"
import mesop as me
from .poller import polling_buttons
@me.content_component
def header(title: str, icon: str):
"""Header component"""
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
)
):
with me.box(
style=me.Style(display="flex", flex_direction="row", gap=5)
):
me.icon(icon=icon)
me.text(
title,
type="headline-5",
style=me.Style(font_family="Google Sans"),
)
me.slot()
polling_buttons()
```
## /demo/ui/components/page_scaffold.py
```py path="/demo/ui/components/page_scaffold.py"
import mesop as me
import mesop.labs as mel
from .side_nav import sidenav
from .async_poller import async_poller, AsyncAction
from .poller import polling_buttons
from state.state import AppState
from state.host_agent_service import UpdateAppState
from styles.styles import (
MAIN_COLUMN_STYLE,
PAGE_BACKGROUND_PADDING_STYLE,
PAGE_BACKGROUND_STYLE,
SIDENAV_MAX_WIDTH,
SIDENAV_MIN_WIDTH,
)
async def refresh_app_state(e: mel.WebEvent): # pylint: disable=unused-argument
"""Refresh app state event handler"""
yield
app_state = me.state(AppState)
await UpdateAppState(app_state, app_state.current_conversation_id)
yield
@me.content_component
def page_scaffold():
"""page scaffold component"""
app_state = me.state(AppState)
action = (
AsyncAction(
value=app_state,
duration_seconds=app_state.polling_interval)
if app_state
else None
)
async_poller(
action=action, trigger_event=refresh_app_state
)
sidenav("")
with me.box(
style=me.Style(
display="flex",
flex_direction="column",
height="100%",
margin=me.Margin(
left=SIDENAV_MAX_WIDTH if app_state.sidenav_open else SIDENAV_MIN_WIDTH,
),
),
):
with me.box(
style=me.Style(
background=me.theme_var("background"),
height="100%",
overflow_y="scroll",
margin=me.Margin(bottom=20),
)
):
me.slot()
@me.content_component
def page_frame():
"""Page Frame"""
with me.box(style=MAIN_COLUMN_STYLE):
with me.box(style=PAGE_BACKGROUND_STYLE):
with me.box(style=PAGE_BACKGROUND_PADDING_STYLE):
me.slot()
```
## /demo/ui/components/poller.py
```py path="/demo/ui/components/poller.py"
import mesop as me
from state.state import AppState
from state.host_agent_service import UpdateAppState
@me.content_component
def polling_buttons():
"""Polling buttons component"""
state = me.state(AppState)
with me.box(
style=me.Style(
display="flex",
justify_content="end",
)
):
me.button_toggle(
value=[str(state.polling_interval)],
buttons=[
me.ButtonToggleButton(label="1s", value="1"),
me.ButtonToggleButton(label="5s", value="5"),
me.ButtonToggleButton(label="30s", value="30"),
me.ButtonToggleButton(label="Disable", value="0")
],
multiple=False,
hide_selection_indicator=True,
disabled=False,
on_change=on_change,
style=me.Style(
margin=me.Margin(bottom=20),
),
)
with me.content_button(
type="raised",
on_click=force_refresh,
):
me.icon("refresh")
me.slot()
def on_change(e: me.ButtonToggleChangeEvent):
state = me.state(AppState)
state.polling_interval = int(e.value)
async def force_refresh(e: me.ClickEvent):
"""Refresh app state event handler"""
yield
app_state = me.state(AppState)
await UpdateAppState(app_state, app_state.current_conversation_id)
yield
```
## /demo/ui/components/side_nav.py
```py path="/demo/ui/components/side_nav.py"
import mesop as me
from state.state import AppState
from styles.styles import (
SIDENAV_MAX_WIDTH,
SIDENAV_MIN_WIDTH,
_FANCY_TEXT_GRADIENT,
DEFAULT_MENU_STYLE,
)
page_json = [
{"display": "Home", "icon": "message", "route": "/"},
{"display": "Agents", "icon": "smart_toy", "route": "/agents"},
{"display": "Event List", "icon": "list", "route": "/event_list"},
{"display": "Task List", "icon": "task", "route": "/task_list"},
{"display": "Settings", "icon": "settings", "route": "/settings"},
]
def on_sidenav_menu_click(e: me.ClickEvent): # pylint: disable=unused-argument
"""Side navigation menu click handler"""
state = me.state(AppState)
state.sidenav_open = not state.sidenav_open
def navigate_to(e: me.ClickEvent):
"""navigate to a specific page"""
s = me.state(AppState)
idx = int(e.key)
if idx > len(page_json):
return
page = page_json[idx]
s.current_page = page["route"]
me.navigate(s.current_page)
yield
@me.component
def sidenav(current_page: str):
"""Render side navigation"""
app_state = me.state(AppState)
with me.sidenav(
opened=True,
style=me.Style(
width=SIDENAV_MAX_WIDTH if app_state.sidenav_open else SIDENAV_MIN_WIDTH,
background=me.theme_var("secondary-container"),
),
):
with me.box(
style=me.Style(
margin=me.Margin(top=16, left=16, right=16, bottom=16),
display="flex",
flex_direction="column",
gap=5,
),
):
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
),
):
with me.content_button(
type="icon",
on_click=on_sidenav_menu_click,
):
with me.box():
with me.tooltip(message="Expand menu"):
me.icon(icon="menu")
if app_state.sidenav_open:
me.text("STUDIO", style=_FANCY_TEXT_GRADIENT)
me.box(style=me.Style(height=16))
for idx, page in enumerate(page_json):
menu_item(
idx, page["icon"], page["display"], not app_state.sidenav_open
)
# settings & theme toggle
with me.box(style=MENU_BOTTOM):
theme_toggle_icon(
9,
"light_mode",
"Theme",
not app_state.sidenav_open,
)
#menu_item(10, "settings", "Settings", not app_state.sidenav_open)
def menu_item(
key: int,
icon: str,
text: str,
minimized: bool = True,
content_style: me.Style = DEFAULT_MENU_STYLE,
):
"""render menu item"""
if minimized: # minimized
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
),
):
with me.content_button(
key=str(key),
on_click=navigate_to,
style=content_style,
type="icon",
):
with me.tooltip(message=text):
me.icon(icon=icon)
else: # expanded
with me.content_button(
key=str(key),
on_click=navigate_to,
style=content_style,
):
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
),
):
me.icon(icon=icon)
me.text(text)
def toggle_theme(e: me.ClickEvent): # pylint: disable=unused-argument
"""Toggle theme event"""
s = me.state(AppState)
if me.theme_brightness() == "light":
me.set_theme_mode("dark")
s.theme_mode = "dark"
else:
me.set_theme_mode("light")
s.theme_mode = "light"
def theme_toggle_icon(key: int, icon: str, text: str, min: bool = True):
"""Theme toggle icon"""
# THEME_TOGGLE_STYLE = me.Style(position="absolute", bottom=50, align_content="left")
if min: # minimized
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
),
):
with me.content_button(
key=str(key),
on_click=toggle_theme,
# style=THEME_TOGGLE_STYLE,
type="icon",
):
with me.tooltip(message=text):
me.icon(
"light_mode" if me.theme_brightness() == "dark" else "dark_mode"
)
else: # expanded
with me.content_button(
key=str(key),
on_click=toggle_theme,
# style=THEME_TOGGLE_STYLE,
):
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=5,
align_items="center",
),
):
me.icon(
"light_mode" if me.theme_brightness() == "dark" else "dark_mode"
)
me.text(
"Light mode" if me.theme_brightness() == "dark" else "Dark mode"
)
MENU_BOTTOM = me.Style(
display="flex",
flex_direction="column",
position="absolute",
bottom=8,
align_content="left",
)
```
## /demo/ui/components/task_card.py
```py path="/demo/ui/components/task_card.py"
import json
import mesop as me
import pandas as pd
from state.state import SessionTask, StateTask, ContentPart
def message_string(content: ContentPart) -> str:
if isinstance(content, str):
return content
return json.dumps(content)
@me.component
def task_card(tasks: list[SessionTask]):
"""Task card component"""
columns = ["Conversation ID", "Task ID", "Description", "Status", "Output"]
df_data = dict([(c, []) for c in columns])
for task in tasks:
df_data["Conversation ID"].append(task.session_id)
df_data["Task ID"].append(task.task.task_id)
df_data["Description"].append('\n'.join(message_string(x[0]) for x in task.task.message.content))
df_data["Status"].append(task.task.state)
df_data["Output"].append(flatten_artifacts(task.task))
df = pd.DataFrame(
pd.DataFrame(df_data),
columns=columns)
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
)
):
me.table(
df,
header=me.TableHeader(sticky=True),
columns=dict([(c, me.TableColumn(sticky=True)) for c in columns]),
)
def flatten_artifacts(task: StateTask) -> str:
parts = []
for a in task.artifacts:
for p in a:
if p[1] == 'text/plain' or p[1] == 'application/json':
parts.append(message_string(p[0]))
else:
parts.append(p[1])
return '\n'.join(parts)
```
## /demo/ui/main.py
```py path="/demo/ui/main.py"
"""A UI solution and host service to interact with the agent framework.
run:
uv main.py
"""
import asyncio
import os
import threading
import mesop as me
from state.state import AppState
from components.page_scaffold import page_scaffold
from components.api_key_dialog import api_key_dialog
from pages.home import home_page_content
from pages.agent_list import agent_list_page
from pages.conversation import conversation_page
from pages.event_list import event_list_page
from pages.settings import settings_page_content
from pages.task_list import task_list_page
from state import host_agent_service
from service.server.server import ConversationServer
from fastapi import FastAPI, APIRouter
from fastapi.middleware.wsgi import WSGIMiddleware
from dotenv import load_dotenv
load_dotenv()
def on_load(e: me.LoadEvent): # pylint: disable=unused-argument
"""On load event"""
state = me.state(AppState)
me.set_theme_mode(state.theme_mode)
if "conversation_id" in me.query_params:
state.current_conversation_id = me.query_params["conversation_id"]
else:
state.current_conversation_id = ""
# check if the API key is set in the environment
# and if the user is using Vertex AI
uses_vertex_ai = os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE"
api_key = os.getenv("GOOGLE_API_KEY", "")
if uses_vertex_ai:
state.uses_vertex_ai = True
elif api_key:
state.api_key = api_key
else:
# Show the API key dialog if both are not set
state.api_key_dialog_open = True
# Policy to allow the lit custom element to load
security_policy=me.SecurityPolicy(
allowed_script_srcs=[
'https://cdn.jsdelivr.net',
]
)
@me.page(
path="/",
title="Chat",
on_load=on_load,
security_policy=security_policy,
)
def home_page():
"""Main Page"""
state = me.state(AppState)
# Show API key dialog if needed
api_key_dialog()
with page_scaffold(): # pylint: disable=not-context-manager
home_page_content(state)
@me.page(
path="/agents",
title="Agents",
on_load=on_load,
security_policy=security_policy,
)
def another_page():
"""Another Page"""
api_key_dialog()
agent_list_page(me.state(AppState))
@me.page(
path="/conversation",
title="Conversation",
on_load=on_load,
security_policy=security_policy,
)
def chat_page():
"""Conversation Page."""
api_key_dialog()
conversation_page(me.state(AppState))
@me.page(
path="/event_list",
title="Event List",
on_load=on_load,
security_policy=security_policy,
)
def event_page():
"""Event List Page."""
api_key_dialog()
event_list_page(me.state(AppState))
@me.page(
path="/settings",
title="Settings",
on_load=on_load,
security_policy=security_policy,
)
def settings_page():
"""Settings Page."""
api_key_dialog()
settings_page_content()
@me.page(
path="/task_list",
title="Task List",
on_load=on_load,
security_policy=security_policy,
)
def task_page():
"""Task List Page."""
api_key_dialog()
task_list_page(me.state(AppState))
# Setup the server global objects
app = FastAPI()
router = APIRouter()
agent_server = ConversationServer(router)
app.include_router(router)
app.mount(
"/",
WSGIMiddleware(
me.create_wsgi_app(debug_mode=os.environ.get("DEBUG_MODE", "") == "true")
),
)
if __name__ == "__main__":
import uvicorn
# Setup the connection details, these should be set in the environment
host = os.environ.get("A2A_UI_HOST", "0.0.0.0")
port = int(os.environ.get("A2A_UI_PORT", "12000"))
# Set the client to talk to the server
host_agent_service.server_url = f"http://{host}:{port}"
uvicorn.run(
"main:app",
host=host,
port=port,
reload=True,
reload_includes=["*.py", "*.js"],
timeout_graceful_shutdown=0,
)
```
## /demo/ui/pages/__init__.py
```py path="/demo/ui/pages/__init__.py"
```
## /demo/ui/pages/agent_list.py
```py path="/demo/ui/pages/agent_list.py"
import stat
import asyncio
import mesop as me
from components.agent_list import agents_list
from components.dialog import dialog, dialog_actions
from components.header import header
from components.page_scaffold import page_frame
from components.page_scaffold import page_scaffold
from state.agent_state import AgentState
from state.host_agent_service import ListRemoteAgents, AddRemoteAgent
from state.state import AppState
from utils.agent_card import get_agent_card
from common.types import JSONRPCError
def agent_list_page(app_state: AppState):
"""Agents List Page"""
state = me.state(AgentState)
with page_scaffold(): # pylint: disable=not-context-manager
with page_frame():
with header("Remote Agents", "smart_toy"):
pass
agents = asyncio.run(ListRemoteAgents())
agents_list(agents)
with dialog(state.agent_dialog_open):
with me.box(
style=me.Style(display="flex", flex_direction="column", gap=12)
):
me.input(
label="Agent Address",
on_blur=set_agent_address,
placeholder="localhost:10000",
)
input_modes_string = ", ".join(state.input_modes)
output_modes_string = ", ".join(state.output_modes)
if state.error != "":
me.text(state.error, style=me.Style(color="red"))
if state.agent_name != "":
me.text(f"Agent Name: {state.agent_name}")
if state.agent_description:
me.text(f"Agent Description: {state.agent_description}")
if state.agent_framework_type:
me.text(f"Agent Framework Type: {state.agent_framework_type}")
if state.input_modes:
me.text(f"Input Modes: {input_modes_string}")
if state.output_modes:
me.text(f"Output Modes: {output_modes_string}")
if state.agent_name:
me.text(f"Streaming Supported: {state.stream_supported}")
me.text(f"Push Notifications Supported: {state.push_notifications_supported}")
with dialog_actions():
if not state.agent_name:
me.button("Read", on_click=load_agent_info)
elif not state.error:
me.button("Save", on_click=save_agent)
me.button("Cancel", on_click=cancel_agent_dialog)
def set_agent_address(e: me.InputBlurEvent):
state = me.state(AgentState)
state.agent_address = e.value
def load_agent_info(e: me.ClickEvent):
state = me.state(AgentState)
try:
state.error = None
agent_card_response = get_agent_card(state.agent_address)
state.agent_name = agent_card_response.name
state.agent_description = agent_card_response.description
state.agent_framework_type = agent_card_response.provider.organization if agent_card_response.provider else ''
state.input_modes = agent_card_response.defaultInputModes
state.output_modes = agent_card_response.defaultOutputModes
state.stream_supported = agent_card_response.capabilities.streaming
state.push_notifications_supported = agent_card_response.capabilities.pushNotifications
except Exception as e:
print(e)
state.agent_name = None
state.error = f"Cannot connect to agent as {state.agent_address}"
def cancel_agent_dialog(e: me.ClickEvent):
state = me.state(AgentState)
state.agent_dialog_open = False
async def save_agent(e: me.ClickEvent):
state = me.state(AgentState)
await AddRemoteAgent(state.agent_address)
state.agent_address = ""
state.agent_name = ""
state.agent_description = ""
state.agent_dialog_open = False
```
## /demo/ui/pages/conversation.py
```py path="/demo/ui/pages/conversation.py"
import mesop as me
from components.header import header
from components.page_scaffold import page_scaffold
from components.page_scaffold import page_frame
from components.conversation import conversation
from state.state import AppState
def conversation_page(app_state: AppState):
"""Conversation Page"""
state = me.state(AppState)
with page_scaffold(): # pylint: disable=not-context-manager
with page_frame():
with header("Conversation", "chat"): pass
conversation()
```
## /demo/ui/pages/event_list.py
```py path="/demo/ui/pages/event_list.py"
import stat
import mesop as me
from components.header import header
from components.page_scaffold import page_scaffold
from components.page_scaffold import page_frame
from components.event_viewer import event_list
from state.state import AppState
from state.agent_state import AgentState
def event_list_page(app_state: AppState):
"""Agents List Page"""
state = me.state(AgentState)
with page_scaffold(): # pylint: disable=not-context-manager
with page_frame():
with header("Event List", "list"): pass
event_list()
```
## /demo/ui/pages/home.py
```py path="/demo/ui/pages/home.py"
import mesop as me
from components.header import header
from components.conversation_list import conversation_list
from state.state import AppState
@me.stateclass
class PageState:
""" Local Page State"""
temp_name: str = ""
def on_blur_set_name(e: me.InputBlurEvent):
"""input handler"""
state = me.state(PageState)
state.temp_name = e.value
def on_enter_change_name(e: me.components.input.input.InputEnterEvent): # pylint: disable=unused-argument
"""change name button handler"""
state = me.state(PageState)
app_state = me.state(AppState)
app_state.name = state.temp_name
app_state.greeting = "" # reset greeting
yield
def on_click_change_name(e: me.ClickEvent): # pylint: disable=unused-argument
"""change name button handler"""
state = me.state(PageState)
app_state = me.state(AppState)
app_state.name = state.temp_name
app_state.greeting = "" # reset greeting
yield
def home_page_content(app_state: AppState):
"""Home Page"""
with me.box(
style=me.Style(
display="flex",
flex_direction="column",
height="100%",
),
):
with me.box(
style=me.Style(
background=me.theme_var("background"),
height="100%",
margin=me.Margin(bottom=20),
)
):
with me.box(
style=me.Style(
background=me.theme_var("background"),
padding=me.Padding(top=24, left=24, right=24, bottom=24),
display="flex",
flex_direction="column",
width="100%",
)
):
with header("Conversations", "message"): pass
conversation_list(app_state.conversations)
```
## /demo/ui/pages/settings.py
```py path="/demo/ui/pages/settings.py"
import mesop as me
import asyncio
from components.header import header
from components.page_scaffold import page_scaffold
from components.page_scaffold import page_frame
from state.state import SettingsState, AppState
from state.host_agent_service import UpdateApiKey
def on_selection_change_output_types(e: me.SelectSelectionChangeEvent):
s = me.state(SettingsState)
s.output_mime_types = e.values
def on_api_key_change(e: me.InputBlurEvent):
s = me.state(AppState)
s.api_key = e.value
@me.stateclass
class UpdateStatus:
"""Status for API key update"""
show_success: bool = False
async def update_api_key(e: me.ClickEvent):
yield # Allow UI to update
state = me.state(AppState)
update_status = me.state(UpdateStatus)
if state.api_key.strip():
success = await UpdateApiKey(state.api_key)
if success:
update_status.show_success = True
# Hide success message after 3 seconds
yield
await asyncio.sleep(3)
update_status.show_success = False
yield # Allow UI to update after operation completes
def settings_page_content():
"""Settings Page Content."""
settings_state = me.state(SettingsState)
app_state = me.state(AppState)
update_status = me.state(UpdateStatus)
with page_scaffold(): # pylint: disable=not-context-manager
with page_frame():
with header("Settings", "settings"): pass
with me.box(
style=me.Style(
display="flex",
justify_content="space-between",
flex_direction="column",
gap=30,
)
):
# API Key Settings Section
if not app_state.uses_vertex_ai:
with me.box(
style=me.Style(
display="flex",
flex_direction="column",
margin=me.Margin(bottom=30),
)
):
me.text(
"Google API Key",
type="headline-6",
style=me.Style(
margin=me.Margin(bottom=15),
font_family="Google Sans",
),
)
with me.box(
style=me.Style(
display="flex",
flex_direction="row",
gap=10,
align_items="center",
margin=me.Margin(bottom=5),
)
):
me.input(
label="API Key",
value=app_state.api_key,
on_blur=on_api_key_change,
type="password",
appearance="outline",
style=me.Style(width="400px"),
)
me.button(
"Update",
type="raised",
on_click=update_api_key,
style=me.Style(
color=me.theme_var("primary"),
),
)
# Success message
if update_status.show_success:
with me.box(
style=me.Style(
background=me.theme_var("success-container"),
padding=me.Padding(top=10, bottom=10, left=10, right=10),
border_radius=4,
margin=me.Margin(top=10),
display="flex",
flex_direction="row",
align_items="center",
width="400px",
)
):
me.icon(
"check_circle",
style=me.Style(
color=me.theme_var("on-success-container"),
margin=me.Margin(right=10),
)
)
me.text(
"API Key updated successfully",
style=me.Style(
color=me.theme_var("on-success-container"),
)
)
# Add spacing instead of divider with style
with me.box(style=me.Style(margin=me.Margin(top=10, bottom=10))):
me.divider()
# Output Types Section
me.select(
label="Supported Output Types",
options=[
me.SelectOption(label="Image", value="image/*"),
me.SelectOption(label="Text (Plain)", value="text/plain"),
],
on_selection_change=on_selection_change_output_types,
style=me.Style(width=500),
multiple=True,
appearance="outline",
value=settings_state.output_mime_types,
)
```
## /demo/ui/pages/task_list.py
```py path="/demo/ui/pages/task_list.py"
from components.header import header
from components.page_scaffold import page_scaffold
from components.page_scaffold import page_frame
from components.task_card import task_card
from state.state import AppState
def task_list_page(app_state: AppState):
"""Task List Page"""
with page_scaffold(): # pylint: disable=not-context-manager
with page_frame():
with header("Task List", "task"):
pass
task_card(app_state.task_list)
```
## /demo/ui/pyproject.toml
```toml path="/demo/ui/pyproject.toml"
[project]
name = "a2a-python-example-ui"
version = "0.1.0"
description = "Agent2Agent example UI"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"asyncio>=3.4.3",
"httpx>=0.28.1",
"httpx-sse>=0.4.0",
"pydantic>=2.10.6",
"fastapi>=0.115.0",
"uvicorn>=0.34.0",
"mesop>=1.0.0",
"a2a-samples",
"pandas>=2.2.0",
"google-genai>=1.9.0",
"google-adk>=0.0.3",
]
[tool.hatch.build.targets.wheel]
packages = ["a2a_ui"]
[tool.uv.sources]
a2a_ui = { workspace = true }
a2a_samples = { path = "../../samples/python", editable = true }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"ruff>=0.11.2",
]
```
## /demo/ui/service/__init__.py
```py path="/demo/ui/service/__init__.py"
```
## /demo/ui/service/client/__init__.py
```py path="/demo/ui/service/client/__init__.py"
```
## /demo/ui/service/client/client.py
```py path="/demo/ui/service/client/client.py"
import httpx
from httpx_sse import connect_sse
from typing import Any, AsyncIterable
from service.types import (
CreateConversationRequest,
CreateConversationResponse,
ListConversationRequest,
ListConversationResponse,
SendMessageRequest,
SendMessageResponse,
ListMessageRequest,
ListMessageResponse,
GetEventRequest,
GetEventResponse,
PendingMessageRequest,
PendingMessageResponse,
ListTaskRequest,
ListTaskResponse,
RegisterAgentRequest,
RegisterAgentResponse,
AgentClientHTTPError,
ListAgentRequest,
ListAgentResponse,
AgentClientJSONError,
JSONRPCRequest,
Conversation,
)
import json
class ConversationClient:
def __init__(self, base_url):
self.base_url = base_url.rstrip("/")
async def send_message(self, payload: SendMessageRequest) -> SendMessageResponse:
return SendMessageResponse(**await self._send_request(payload))
async def _send_request(self, request: JSONRPCRequest) -> dict[str, Any]:
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.base_url + "/" + request.method, json=request.model_dump()
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise AgentClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise AgentClientJSONError(str(e)) from e
async def create_conversation(self, payload: CreateConversationRequest) -> CreateConversationResponse:
return CreateConversationResponse(**await self._send_request(payload))
async def list_conversation(self, payload: ListConversationRequest) -> ListConversationResponse:
return ListConversationResponse(**await self._send_request(payload))
async def get_events(self, payload: GetEventRequest) -> GetEventResponse:
return GetEventResponse(**await self._send_request(payload))
async def list_messages(self, payload: ListMessageRequest) -> ListMessageResponse:
return ListMessageResponse(**await self._send_request(payload))
async def get_pending_messages(self, payload: PendingMessageRequest) -> PendingMessageResponse:
return PendingMessageResponse(**await self._send_request(payload))
async def list_tasks(self, payload: ListTaskRequest) -> ListTaskResponse:
return ListTaskResponse(**await self._send_request(payload))
async def register_agent(self, payload: RegisterAgentRequest) -> RegisterAgentResponse:
return RegisterAgentResponse(**await self._send_request(payload))
async def list_agents(self, payload: ListAgentRequest) -> ListAgentResponse:
return ListAgentResponse(**await self._send_request(payload))
```
## /demo/ui/service/server/__init__.py
```py path="/demo/ui/service/server/__init__.py"
```
## /demo/ui/service/server/adk_host_manager.py
```py path="/demo/ui/service/server/adk_host_manager.py"
import asyncio
import datetime
import json
import os
from typing import Tuple, Optional, Any
import uuid
from service.types import Conversation, Event
from common.types import (
Message,
Task,
TextPart,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
Artifact,
AgentCard,
DataPart,
FilePart,
FileContent,
Part,
)
from hosts.multiagent.host_agent import HostAgent
from hosts.multiagent.remote_agent_connection import (
TaskCallbackArg,
)
from utils.agent_card import get_agent_card
from service.server.application_manager import ApplicationManager
from google.adk import Runner
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.artifacts import InMemoryArtifactService
from google.adk.events.event import Event as ADKEvent
from google.adk.events.event_actions import EventActions as ADKEventActions
from google.genai import types
import base64
class ADKHostManager(ApplicationManager):
"""An implementation of memory based management with fake agent actions
This implements the interface of the ApplicationManager to plug into
the AgentServer. This acts as the service contract that the Mesop app
uses to send messages to the agent and provide information for the frontend.
"""
_conversations: list[Conversation]
_messages: list[Message]
_tasks: list[Task]
_events: dict[str, Event]
_pending_message_ids: list[str]
_agents: list[AgentCard]
_task_map: dict[str, str]
def __init__(self, api_key: str = "", uses_vertex_ai: bool = False):
self._conversations = []
self._messages = []
self._tasks = []
self._events = {}
self._pending_message_ids = []
self._agents = []
self._artifact_chunks = {}
self._session_service = InMemorySessionService()
self._artifact_service = InMemoryArtifactService()
self._memory_service = InMemoryMemoryService()
self._host_agent = HostAgent([], self.task_callback)
self.user_id = "test_user"
self.app_name = "A2A"
self.api_key = api_key or os.environ.get("GOOGLE_API_KEY", "")
self.uses_vertex_ai = uses_vertex_ai or os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE"
# Set environment variables based on auth method
if self.uses_vertex_ai:
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
elif self.api_key:
# Use API key authentication
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "FALSE"
os.environ["GOOGLE_API_KEY"] = self.api_key
self._initialize_host()
# Map of message id to task id
self._task_map = {}
# Map to manage 'lost' message ids until protocol level id is introduced
self._next_id = {} # dict[str, str]: previous message to next message
def update_api_key(self, api_key: str):
"""Update the API key and reinitialize the host if needed"""
if api_key and api_key != self.api_key:
self.api_key = api_key
# Only update if not using Vertex AI
if not self.uses_vertex_ai:
os.environ["GOOGLE_API_KEY"] = api_key
# Reinitialize host with new API key
self._initialize_host()
def _initialize_host(self):
agent = self._host_agent.create_agent()
self._host_runner = Runner(
app_name=self.app_name,
agent=agent,
artifact_service=self._artifact_service,
session_service=self._session_service,
memory_service=self._memory_service,
)
def create_conversation(self) -> Conversation:
session = self._session_service.create_session(
app_name=self.app_name,
user_id=self.user_id)
conversation_id = session.id
c = Conversation(conversation_id=conversation_id, is_active=True)
self._conversations.append(c)
return c
def sanitize_message(self, message: Message) -> Message:
if not message.metadata:
message.metadata = {}
if 'message_id' not in message.metadata:
message.metadata.update({'message_id': str(uuid.uuid4())})
if 'conversation_id' in message.metadata:
conversation = self.get_conversation(message.metadata['conversation_id'])
if conversation:
if conversation.messages:
# Get the last message
last_message_id = get_message_id(conversation.messages[-1])
if last_message_id:
message.metadata.update({'last_message_id': last_message_id})
return message
async def process_message(self, message: Message):
self._messages.append(message)
message_id = get_message_id(message)
if message_id:
self._pending_message_ids.append(message_id)
conversation_id = (
message.metadata['conversation_id']
if 'conversation_id' in message.metadata
else None
)
# Now check the conversation and attach the message id.
conversation = self.get_conversation(conversation_id)
if conversation:
conversation.messages.append(message)
self.add_event(Event(
id=str(uuid.uuid4()),
actor='user',
content=message,
timestamp=datetime.datetime.utcnow().timestamp(),
))
final_event: GenAIEvent | None = None
# Determine if a task is to be resumed.
session = self._session_service.get_session(
app_name='A2A',
user_id='test_user',
session_id=conversation_id)
# Update state must happen in the event
state_update = {
'input_message_metadata': message.metadata,
'session_id': conversation_id
}
last_message_id = get_last_message_id(message)
if (last_message_id and
last_message_id in self._task_map and
task_still_open(next(
filter(
lambda x: x.id == self._task_map[last_message_id],
self._tasks),
None))):
state_update['task_id'] = self._task_map[last_message_id]
# Need to upsert session state now, only way is to append an event.
self._session_service.append_event(session, ADKEvent(
id=ADKEvent.new_id(),
author="host_agent",
invocation_id=ADKEvent.new_id(),
actions=ADKEventActions(state_delta=state_update),
))
async for event in self._host_runner.run_async(
user_id=self.user_id,
session_id=conversation_id,
new_message=self.adk_content_from_message(message)
):
self.add_event(Event(
id=event.id,
actor=event.author,
content=self.adk_content_to_message(event.content, conversation_id),
timestamp=event.timestamp,
))
final_event = event
response: Message | None = None
if final_event:
final_event.content.role = 'model'
response = self.adk_content_to_message(final_event.content, conversation_id)
last_message_id = get_message_id(message)
new_message_id = ""
if last_message_id and last_message_id in self._next_id:
new_message_id = self._next_id[last_message_id]
else:
new_message_id = str(uuid.uuid4())
last_message_id = None
response.metadata = {
**message.metadata,
**{'last_message_id': last_message_id,
'message_id': new_message_id}
}
self._messages.append(response)
if conversation:
conversation.messages.append(response)
self._pending_message_ids.remove(message_id)
def add_task(self, task: Task):
self._tasks.append(task)
def update_task(self, task: Task):
for i, t in enumerate(self._tasks):
if t.id == task.id:
self._tasks[i] = task
return
def task_callback(self, task: TaskCallbackArg, agent_card: AgentCard):
self.emit_event(task, agent_card)
if isinstance(task, TaskStatusUpdateEvent):
current_task = self.add_or_get_task(task)
current_task.status = task.status
self.attach_message_to_task(task.status.message, current_task.id)
self.insert_message_history(current_task, task.status.message)
self.update_task(current_task)
self.insert_id_trace(task.status.message)
return current_task
elif isinstance(task, TaskArtifactUpdateEvent):
current_task = self.add_or_get_task(task)
self.process_artifact_event(current_task, task)
self.update_task(current_task)
return current_task
# Otherwise this is a Task, either new or updated
elif not any(filter(lambda x: x.id == task.id, self._tasks)):
self.attach_message_to_task(task.status.message, task.id)
self.insert_id_trace(task.status.message)
self.add_task(task)
return task
else:
self.attach_message_to_task(task.status.message, task.id)
self.insert_id_trace(task.status.message)
self.update_task(task)
return task
def emit_event(self, task: TaskCallbackArg, agent_card: AgentCard):
content = None
conversation_id = get_conversation_id(task)
metadata = {'conversation_id': conversation_id} if conversation_id else None
if isinstance(task, TaskStatusUpdateEvent):
if task.status.message:
content = task.status.message
else:
content = Message(
parts=[TextPart(text=str(task.status.state))],
role="agent",
metadata=metadata,
)
elif isinstance(task, TaskArtifactUpdateEvent):
content = Message(
parts=task.artifact.parts,
role="agent",
metadata=metadata,
)
elif task.status and task.status.message:
content = task.status.message
elif task.artifacts:
parts = []
for a in task.artifacts:
parts.extend(a.parts)
content = Message(
parts=parts,
role="agent",
metadata=metadata,
)
else:
content = Message(
parts=[TextPart(text=str(task.status.state))],
role="agent",
metadata=metadata,
)
self.add_event(Event(
id=str(uuid.uuid4()),
actor=agent_card.name,
content=content,
timestamp=datetime.datetime.utcnow().timestamp(),
))
def attach_message_to_task(self, message: Message | None, task_id: str):
if message and message.metadata and 'message_id' in message.metadata:
self._task_map[message.metadata['message_id']] = task_id
def insert_id_trace(self, message: Message | None):
if not message:
return
message_id = get_message_id(message)
last_message_id = get_last_message_id(message)
if message_id and last_message_id:
self._next_id[last_message_id] = message_id
def insert_message_history(self, task: Task, message: Message | None):
if not message:
return
if task.history is None:
task.history = []
message_id = get_message_id(message)
if not message_id:
return
if get_message_id(task.status.message) not in [
get_message_id(x) for x in task.history
]:
task.history.append(task.status.message)
else:
print("Message id already in history", get_message_id(task.status.message), task.history)
def add_or_get_task(self, task: TaskCallbackArg):
current_task = next(filter(lambda x: x.id == task.id, self._tasks), None)
if not current_task:
conversation_id = None
if task.metadata and 'conversation_id' in task.metadata:
conversation_id = task.metadata['conversation_id']
current_task = Task(
id=task.id,
status=TaskStatus(state = TaskState.SUBMITTED), #initialize with submitted
metadata=task.metadata,
artifacts = [],
sessionId=conversation_id,
)
self.add_task(current_task)
return current_task
return current_task
def process_artifact_event(self, current_task:Task, task_update_event: TaskArtifactUpdateEvent):
artifact = task_update_event.artifact
if not artifact.append:
#received the first chunk or entire payload for an artifact
if artifact.lastChunk is None or artifact.lastChunk:
#lastChunk bit is missing or is set to true, so this is the entire payload
#add this to artifacts
if not current_task.artifacts:
current_task.artifacts = []
current_task.artifacts.append(artifact)
else:
#this is a chunk of an artifact, stash it in temp store for assemling
if not task_update_event.id in self._artifact_chunks:
self._artifact_chunks[task_update_event.id] = {}
self._artifact_chunks[task_update_event.id][artifact.index] = artifact
else:
# we received an append chunk, add to the existing temp artifact
current_temp_artifact = self._artifact_chunks[task_update_event.id][artifact.index]
# TODO handle if current_temp_artifact is missing
current_temp_artifact.parts.extend(artifact.parts)
if artifact.lastChunk:
current_task.artifacts.append(current_temp_artifact)
del self._artifact_chunks[task_update_event.id][artifact.index]
def add_event(self, event: Event):
self._events[event.id] = event
def get_conversation(
self,
conversation_id: Optional[str]
) -> Optional[Conversation]:
if not conversation_id:
return None
return next(
filter(lambda c: c.conversation_id == conversation_id,
self._conversations), None)
def get_pending_messages(self) -> list[Tuple[str, str]]:
rval = []
for message_id in self._pending_message_ids:
if message_id in self._task_map:
task_id = self._task_map[message_id]
task = next(filter(lambda x: x.id == task_id, self._tasks), None)
if not task:
rval.append((message_id, ""))
elif task.history and task.history[-1].parts:
if len(task.history) == 1:
rval.append((message_id, "Working..."))
else:
part = task.history[-1].parts[0]
rval.append((
message_id,
part.text if part.type == "text" else "Working..."))
else:
rval.append((message_id, ""))
return rval
def register_agent(self, url):
agent_data = get_agent_card(url)
if not agent_data.url:
agent_data.url = url
self._agents.append(agent_data)
self._host_agent.register_agent_card(agent_data)
# Now update the host agent definition
self._initialize_host()
@property
def agents(self) -> list[AgentCard]:
return self._agents
@property
def conversations(self) -> list[Conversation]:
return self._conversations
@property
def tasks(self) -> list[Task]:
return self._tasks
@property
def events(self) -> list[Event]:
return sorted(self._events.values(), key=lambda x: x.timestamp)
def adk_content_from_message(self, message: Message) -> types.Content:
parts: list[types.Part] = []
for part in message.parts:
if part.type == "text":
parts.append(types.Part.from_text(text=part.text))
elif part.type == "data":
json_string = json.dumps(part.data)
parts.append(types.Part.from_text(text=json_string))
elif part.type == "file":
if part.uri:
parts.append(types.Part.from_uri(
file_uri=part.uri,
mime_type=part.mimeType
))
elif content_part.bytes:
parts.append(types.Part.from_bytes(
data=part.bytes.encode('utf-8'),
mime_type=part.mimeType)
)
else:
raise ValueError("Unsupported message type")
return types.Content(parts=parts, role=message.role)
def adk_content_to_message(self, content: types.Content, conversation_id: str) -> Message:
parts: list[Part] = []
if not content.parts:
return Message(
parts=[],
role=content.role if content.role == 'user' else 'agent',
metadata={'conversation_id': conversation_id},
)
for part in content.parts:
if part.text:
# try parse as data
try:
data = json.loads(part.text)
parts.append(DataPart(data=data))
except:
parts.append(TextPart(text=part.text))
elif part.inline_data:
parts.append(FilePart(
data=part.inline_data.decode('utf-8'),
mimeType=part.inline_data.mime_type
))
elif part.file_data:
parts.append(FilePart(
file=FileContent(
uri=part.file_data.file_uri,
mimeType=part.file_data.mime_type
)
))
# These aren't managed by the A2A message structure, these are internal
# details of ADK, we will simply flatten these to json representations.
elif part.video_metadata:
parts.append(DataPart(data=part.video_metadata.model_dump()))
elif part.thought:
parts.append(TextPart(text="thought"))
elif part.executable_code:
parts.append(DataPart(data=part.executable_code.model_dump()))
elif part.function_call:
parts.append(DataPart(data=part.function_call.model_dump()))
elif part.function_response:
parts.extend(self._handle_function_response(part, conversation_id))
else:
raise ValueError("Unexpected content, unknown type")
return Message(
role=content.role if content.role == 'user' else 'agent',
parts=parts,
metadata={'conversation_id': conversation_id},
)
def _handle_function_response(self, part: types.Part, conversation_id: str) -> list[Part]:
parts = []
try:
for p in part.function_response.response['result']:
if isinstance(p, str):
parts.append(TextPart(text=p))
elif isinstance(p, dict):
if 'type' in p and p['type'] == 'file':
parts.append(FilePart(**p))
else:
parts.append(DataPart(data=p))
elif isinstance(p, DataPart):
if 'artifact-file-id' in p.data:
file_part = self._artifact_service.load_artifact(user_id=self.user_id,
session_id=conversation_id,
app_name=self.app_name,
filename = p.data['artifact-file-id'])
file_data = file_part.inline_data
base64_data = base64.b64encode(file_data.data).decode('utf-8')
parts.append(FilePart(
file=FileContent(
bytes=base64_data, mimeType=file_data.mime_type, name='artifact_file'
)
))
else:
parts.append(DataPart(data=p.data))
else:
parts.append(TextPart(text=json.dumps(p)))
except Exception as e:
print("Couldn't convert to messages:", e)
parts.append(DataPart(data=part.function_response.model_dump()))
return parts
def get_message_id(m: Message | None) -> str | None:
if not m or not m.metadata or 'message_id' not in m.metadata:
return None
return m.metadata['message_id']
def get_last_message_id(m: Message | None) -> str | None:
if not m or not m.metadata or 'last_message_id' not in m.metadata:
return None
return m.metadata['last_message_id']
def get_conversation_id(
t: (Task |
TaskStatusUpdateEvent |
TaskArtifactUpdateEvent |
Message |
None)
) -> str | None:
if (t and
hasattr(t, 'metadata') and
t.metadata and
'conversation_id' in t.metadata):
return t.metadata['conversation_id']
return None
def task_still_open(task: Task | None) -> bool:
if not task:
return False
return task.status.state in [
TaskState.SUBMITTED, TaskState.WORKING, TaskState.INPUT_REQUIRED
]
```
## /demo/ui/service/server/application_manager.py
```py path="/demo/ui/service/server/application_manager.py"
from abc import ABC, abstractmethod
from common.types import Message, Task, AgentCard
from service.types import Conversation, Event
class ApplicationManager(ABC):
@abstractmethod
def create_conversation(self) -> Conversation:
pass
@abstractmethod
def sanitize_message(self, message: Message) -> Message:
pass
@abstractmethod
async def process_message(self, message: Message):
pass
@abstractmethod
def register_agent(self, url: str):
pass
@abstractmethod
def get_pending_messages(self) -> list[str]:
pass
@property
@abstractmethod
def conversations(self) -> list[Conversation]:
pass
@property
@abstractmethod
def tasks(self) -> list[Task]:
pass
@property
@abstractmethod
def agents(self) -> list[AgentCard]:
pass
@property
@abstractmethod
def events(self) -> list[Event]:
pass
```
## /demo/ui/service/server/in_memory_manager.py
```py path="/demo/ui/service/server/in_memory_manager.py"
import asyncio
import datetime
from typing import Tuple, Optional
import uuid
from service.types import Conversation, Event
from common.types import (
Message,
Task,
TextPart,
TaskState,
TaskStatus,
Artifact,
AgentCard,
DataPart,
)
from utils.agent_card import get_agent_card
from service.server.application_manager import ApplicationManager
from service.server import test_image
class InMemoryFakeAgentManager(ApplicationManager):
"""An implementation of memory based management with fake agent actions
This implements the interface of the ApplicationManager to plug into
the AgentServer. This acts as the service contract that the Mesop app
uses to send messages to the agent and provide information for the frontend.
"""
_conversations: list[Conversation]
_messages: list[Message]
_tasks: list[Task]
_events: list[Event]
_pending_message_ids: list[str]
_next_message_idx: int
_agents: list[AgentCard]
def __init__(self):
self._conversations = []
self._messages = []
self._tasks = []
self._events = []
self._pending_message_ids = []
self._next_message_idx = 0
self._agents = []
self._task_map = {}
def create_conversation(self) -> Conversation:
conversation_id = str(uuid.uuid4())
c = Conversation(conversation_id=conversation_id, is_active=True)
self._conversations.append(c)
return c
def sanitize_message(self, message: Message) -> Message:
if not message.metadata:
message.metadata = {}
message.metadata.update({'message_id': str(uuid.uuid4())})
return message
async def process_message(self, message: Message):
self._messages.append(message)
message_id = message.metadata['message_id']
self._pending_message_ids.append(message_id)
conversation_id = (
message.metadata['conversation_id']
if 'conversation_id' in message.metadata
else None
)
# Now check the conversation and attach the message id.
conversation = self.get_conversation(conversation_id)
if conversation:
conversation.messages.append(message)
self._events.append(Event(
id=str(uuid.uuid4()),
actor="host",
content=message,
timestamp=datetime.datetime.utcnow().timestamp(),
))
# Now actually process the message. If the response is async, return None
# for the message response and the updated message information for the
# incoming message (with ids attached).
task_id = str(uuid.uuid4())
task = Task(
id=task_id,
sessionId=conversation_id,
status=TaskStatus(
state=TaskState.SUBMITTED,
message=message,
),
history=[message],
)
if self._next_message_idx != 0:
self._task_map[message_id] = task_id
self.add_task(task)
await asyncio.sleep(self._next_message_idx)
response = self.next_message()
response.metadata = {**message.metadata, **{'message_id': str(uuid.uuid4())}}
if conversation:
conversation.messages.append(response)
self._events.append(Event(
id=str(uuid.uuid4()),
actor="host",
content=response,
timestamp=datetime.datetime.utcnow().timestamp(),
))
self._pending_message_ids.remove(message.metadata['message_id'])
# Now clean up the task
if task:
task.status.state = TaskState.COMPLETED
task.artifacts = [Artifact(name="response", parts=response.parts)]
task.history.append(response)
self.update_task(task)
def add_task(self, task: Task):
self._tasks.append(task)
def update_task(self, task: Task):
for i, t in enumerate(self._tasks):
if t.id == task.id:
self._tasks[i] = task
return
def add_event(self, event: Event):
self._events.append(event)
def next_message(self) -> Message:
message = _message_queue[self._next_message_idx]
self._next_message_idx = (self._next_message_idx + 1) % len(_message_queue)
return message
def get_conversation(
self,
conversation_id: Optional[str]
) -> Optional[Conversation]:
if not conversation_id:
return None
return next(
filter(lambda c: c.conversation_id == conversation_id,
self._conversations), None)
def get_pending_messages(self) -> list[Tuple[str,str]]:
rval = []
for message_id in self._pending_message_ids:
if message_id in self._task_map:
task_id = self._task_map[message_id]
task = next(filter(lambda x: x.id == task_id, self._tasks), None)
if not task:
rval.append((message_id, ""))
elif task.history and task.history[-1].parts:
if len(task.history) == 1:
rval.append((message_id, "Working..."))
else:
part = task.history[-1].parts[0]
rval.append((
message_id,
part.text if part.type == "text" else "Working..."))
else:
rval.append((message_id, ""))
return rval
return self._pending_message_ids
def register_agent(self, url):
agent_data = get_agent_card(url)
if not agent_data.url:
agent_data.url = url
self._agents.append(agent_data)
@property
def agents(self) -> list[AgentCard]:
return self._agents
@property
def conversations(self) -> list[Conversation]:
return self._conversations
@property
def tasks(self) -> list[Task]:
return self._tasks
@property
def events(self) -> list[Event]:
return self._events
# This represents the precanned responses that will be returned in order.
# Extend this list to test more functionality of the UI
_message_queue: list[Message] = [
Message(role="agent", parts=[TextPart(text="Hello")]),
Message(role="agent", parts=[
DataPart(
data={
'type': 'form',
'form': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'description': 'Enter your name',
'title': 'Name',
},
'date': {
'type': 'string',
'format': 'date',
'description': 'Birthday',
'title': 'Birthday',
},
},
'required': ['date'],
},
'form_data': {
'name': 'John Smith',
},
'instructions': "Please provide your birthday and name",
}
),
]),
Message(role="agent", parts=[TextPart(text="I like cats")]),
test_image.test_image,
Message(role="agent", parts=[TextPart(text="And I like dogs")]),
]
```
## /demo/ui/service/server/server.py
```py path="/demo/ui/service/server/server.py"
import asyncio
import base64
import threading
import os
import uuid
from typing import Any
from fastapi import APIRouter
from fastapi import Request, Response
from common.types import Message, Task, FilePart, FileContent
from .in_memory_manager import InMemoryFakeAgentManager
from .application_manager import ApplicationManager
from .adk_host_manager import ADKHostManager, get_message_id
from service.types import (
Conversation,
Event,
CreateConversationResponse,
ListConversationResponse,
SendMessageResponse,
MessageInfo,
ListMessageResponse,
PendingMessageResponse,
ListTaskResponse,
RegisterAgentResponse,
ListAgentResponse,
GetEventResponse
)
class ConversationServer:
"""ConversationServer is the backend to serve the agent interactions in the UI
This defines the interface that is used by the Mesop system to interact with
agents and provide details about the executions.
"""
def __init__(self, router: APIRouter):
agent_manager = os.environ.get("A2A_HOST", "ADK")
self.manager: ApplicationManager
# Get API key from environment
api_key = os.environ.get("GOOGLE_API_KEY", "")
uses_vertex_ai = os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").upper() == "TRUE"
if agent_manager.upper() == "ADK":
self.manager = ADKHostManager(api_key=api_key, uses_vertex_ai=uses_vertex_ai)
else:
self.manager = InMemoryFakeAgentManager()
self._file_cache = {} # dict[str, FilePart] maps file id to message data
self._message_to_cache = {} # dict[str, str] maps message id to cache id
router.add_api_route(
"/conversation/create",
self._create_conversation,
methods=["POST"])
router.add_api_route(
"/conversation/list",
self._list_conversation,
methods=["POST"])
router.add_api_route(
"/message/send",
self._send_message,
methods=["POST"])
router.add_api_route(
"/events/get",
self._get_events,
methods=["POST"])
router.add_api_route(
"/message/list",
self._list_messages,
methods=["POST"])
router.add_api_route(
"/message/pending",
self._pending_messages,
methods=["POST"])
router.add_api_route(
"/task/list",
self._list_tasks,
methods=["POST"])
router.add_api_route(
"/agent/register",
self._register_agent,
methods=["POST"])
router.add_api_route(
"/agent/list",
self._list_agents,
methods=["POST"])
router.add_api_route(
"/message/file/{file_id}",
self._files,
methods=["GET"])
router.add_api_route(
"/api_key/update",
self._update_api_key,
methods=["POST"])
# Update API key in manager
def update_api_key(self, api_key: str):
if isinstance(self.manager, ADKHostManager):
self.manager.update_api_key(api_key)
def _create_conversation(self):
c = self.manager.create_conversation()
return CreateConversationResponse(result=c)
async def _send_message(self, request: Request):
message_data = await request.json()
message = Message(**message_data['params'])
message = self.manager.sanitize_message(message)
t = threading.Thread(target=lambda: asyncio.run(self.manager.process_message(message)))
t.start()
return SendMessageResponse(result=MessageInfo(
message_id=message.metadata['message_id'],
conversation_id=message.metadata['conversation_id'] if 'conversation_id' in message.metadata else '',
))
async def _list_messages(self, request: Request):
message_data = await request.json()
conversation_id = message_data['params']
conversation = self.manager.get_conversation(conversation_id)
if conversation:
return ListMessageResponse(result=self.cache_content(
conversation.messages))
return ListMessageResponse(result=[])
def cache_content(self, messages: list[Message]):
rval = []
for m in messages:
message_id = get_message_id(m)
if not message_id:
rval.append(m)
continue
new_parts = []
for i, part in enumerate(m.parts):
if part.type != 'file':
new_parts.append(part)
continue
message_part_id = f"{message_id}:{i}"
if message_part_id in self._message_to_cache:
cache_id = self._message_to_cache[message_part_id]
else:
cache_id = str(uuid.uuid4())
self._message_to_cache[message_part_id] = cache_id
# Replace the part data with a url reference
new_parts.append(FilePart(
file=FileContent(
mimeType=part.file.mimeType,
uri=f"/message/file/{cache_id}",
)
))
if cache_id not in self._file_cache:
self._file_cache[cache_id] = part
m.parts = new_parts
rval.append(m)
return rval
async def _pending_messages(self):
return PendingMessageResponse(result=self.manager.get_pending_messages())
def _list_conversation(self):
return ListConversationResponse(result=self.manager.conversations)
def _get_events(self):
return GetEventResponse(result=self.manager.events)
def _list_tasks(self):
return ListTaskResponse(result=self.manager.tasks)
async def _register_agent(self, request: Request):
message_data = await request.json()
url = message_data['params']
self.manager.register_agent(url)
return RegisterAgentResponse()
async def _list_agents(self):
return ListAgentResponse(result=self.manager.agents)
def _files(self, file_id):
if file_id not in self._file_cache:
raise Exception("file not found")
part = self._file_cache[file_id]
if "image" in part.file.mimeType:
return Response(
content=base64.b64decode(part.file.bytes),
media_type=part.file.mimeType)
return Response(content=part.file.bytes, media_type=part.file.mimeType)
async def _update_api_key(self, request: Request):
"""Update the API key"""
try:
data = await request.json()
api_key = data.get("api_key", "")
if api_key:
# Update in the manager
self.update_api_key(api_key)
return {"status": "success"}
return {"status": "error", "message": "No API key provided"}
except Exception as e:
return {"status": "error", "message": str(e)}
```
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.