```
├── .github/
├── ISSUE_TEMPLATE/
├── bug_report.md
├── feature_request.md
├── workflows/
├── python-unit-tests.yml
├── .gitignore
├── CHANGELOG.md
├── CONTRIBUTING.md
├── LICENSE
├── README.md
├── assets/
├── adk-web-dev-ui-function-call.png
├── agent-development-kit.png
├── pylintrc
├── pyproject.toml
├── src/
├── google/
├── adk/
├── __init__.py
├── agents/
├── __init__.py
├── active_streaming_tool.py
├── base_agent.py
├── callback_context.py
├── invocation_context.py
├── langgraph_agent.py
├── live_request_queue.py
├── llm_agent.py
├── loop_agent.py
├── parallel_agent.py
├── readonly_context.py
├── remote_agent.py
├── run_config.py
├── sequential_agent.py
├── transcription_entry.py
├── artifacts/
├── __init__.py
├── base_artifact_service.py
├── gcs_artifact_service.py
├── in_memory_artifact_service.py
├── auth/
├── __init__.py
├── auth_credential.py
├── auth_handler.py
├── auth_preprocessor.py
├── auth_schemes.py
├── auth_tool.py
├── cli/
├── __init__.py
├── __main__.py
├── agent_graph.py
├── browser/
├── adk_favicon.svg
├── assets/
├── audio-processor.js
├── config/
├── runtime-config.json
├── index.html
├── main-HWIBUY2R.js
```
## /.github/ISSUE_TEMPLATE/bug_report.md
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
** Please make sure you read the contribution guide and file the issues in the rigth place. **
[Contribution guide.](https://google.github.io/adk-docs/contributing-guide/)
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Install '...'
2. Run '....'
3. Open '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
- Python version(python -V):
- ADK version(pip show google-adk):
**Additional context**
Add any other context about the problem here.
## /.github/ISSUE_TEMPLATE/feature_request.md
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
** Please make sure you read the contribution guide and file the issues in the rigth place. **
[Contribution guide.](https://google.github.io/adk-docs/contributing-guide/)
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
## /.github/workflows/python-unit-tests.yml
```yml path="/.github/workflows/python-unit-tests.yml"
name: Python Unit Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: |
uv venv .venv
source .venv/bin/activate
uv sync --extra test --extra eval
- name: Run unit tests with pytest
run: |
source .venv/bin/activate
pytest tests/unittests \
--ignore=tests/unittests/artifacts/test_artifact_service.py \
--ignore=tests/unittests/tools/application_integration_tool/clients/test_connections_client.py \
--ignore=tests/unittests/tools/google_api_tool/test_googleapi_to_openapi_converter.py
```
## /.gitignore
```gitignore path="/.gitignore"
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
ENV/
env/
.env
.venv
env.bak/
venv.bak/
# IDE
.idea/
.vscode/
*.swp
*.swo
.DS_Store
# Testing
.coverage
htmlcov/
.tox/
.nox/
.pytest_cache/
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Jupyter Notebook
.ipynb_checkpoints
# Logs
*.log
logs/
log/
# Local development settings
.env.local
.env.development.local
.env.test.local
.env.production.local
# Google Cloud specific
.gcloudignore
.gcloudignore.local
# Documentation
docs/_build/
site/
# Misc
.DS_Store
Thumbs.db
*.bak
*.tmp
*.temp
```
## /CHANGELOG.md
# Changelog
## 0.3.0
### ⚠ BREAKING CHANGES
* Auth: expose `access_token` and `refresh_token` at top level of auth
credentails, instead of a `dict`
([commit](https://github.com/google/adk-python/commit/956fb912e8851b139668b1ccb8db10fd252a6990)).
### Features
* Added support for running agents with MCPToolset easily on `adk web`.
* Added `custom_metadata` field to `LlmResponse`, which can be used to tag
LlmResponse via `after_model_callback`.
* Added `--session_db_url` to `adk deploy cloud_run` option.
* Many Dev UI improvements:
* Better google search result rendering.
* Show websocket close reason in Dev UI.
* Better error message showing for audio/video.
### Bug Fixes
* Fixed MCP tool json schema parsing issue.
* Fixed issues in DatabaseSessionService that leads to crash.
* Fixed functions.py.
* Fixed `skip_summarization` behavior in `AgentTool`.
### Miscellaneous Chores
* README.md impprovements.
* Various code improvements.
* Various typo fixes.
* Bump min version of google-genai to 1.11.0.
## 0.2.0
### ⚠ BREAKING CHANGES
* Fix typo in method name in `Event`: has_trailing_code_exeuction_result --> has_trailing_code_execution_result.
### Features
* `adk` CLI:
* Introduce `adk create` cli tool to help creating agents.
* Adds `--verbosity` option to `adk deploy cloud_run` to show detailed cloud
run deploy logging.
* Improve the initialization error message for `DatabaseSessionService`.
* Lazy loading for Google 1P tools to minimize the initial latency.
* Support emitting state-change-only events from planners.
* Lots of Dev UI updates, including:
* Show planner thoughts and actions in the Dev UI.
* Support MCP tools in Dev UI.
(NOTE: `agent.py` interface is temp solution and is subject to change)
* Auto-select the only app if only one app is available.
* Show grounding links generated by Google Search Tool.
* `.env` file is reloaded on every agent run.
### Bug Fixes
* `LiteLlm`: arg parsing error and python 3.9 compatibility.
* `DatabaseSessionService`: adds the missing fields; fixes event with empty
content not being persisted.
* Google API Discovery response parsing issue.
* `load_memory_tool` rendering issue in Dev UI.
* Markdown text overflows in Dev UI.
### Miscellaneous Chores
* Adds unit tests in Github action.
* Improves test coverage.
* Various typo fixes.
## 0.1.0
### Features
* Initial release of the Agent Development Kit (ADK).
* Multi-agent, agent-as-workflow, and custom agent support
* Tool authentication support
* Rich tool support, e.g. built-in tools, google-cloud tools, third-party tools, and MCP tools
* Rich callback support
* Built-in code execution capability
* Asynchronous runtime and execution
* Session, and memory support
* Built-in evaluation support
* Development UI that makes local development easy
* Deploy to Google Cloud Run, Agent Engine
* (Experimental) Live(Bidi) auido/video agent support and Compositional Function Calling(CFC) support
## /CONTRIBUTING.md
# How to contribute
We'd love to accept your patches and contributions to this project.
## Before you begin
### Sign our Contributor License Agreement
Contributions to this project must be accompanied by a
[Contributor License Agreement](https://cla.developers.google.com/about) (CLA).
You (or your employer) retain the copyright to your contribution; this simply
gives us permission to use and redistribute your contributions as part of the
project.
If you or your current employer have already signed the Google CLA (even if it
was for a different project), you probably don't need to do it again.
Visit to see your current agreements or to
sign a new one.
### Review our community guidelines
This project follows
[Google's Open Source Community Guidelines](https://opensource.google/conduct/).
## Contribution process
### Requirement for PRs
- All PRs, other than small documentation or typo fixes, should have a Issue assoicated. If not, please create one.
- Small, focused PRs. Keep changes minimal—one concern per PR.
- For bug fixes or features, please provide logs or screenshot after the fix is applied to help reviewers better understand the fix.
- Please add corresponding testing for your code change if it's not covered by existing tests.
### Large or Complex Changes
For substantial features or architectural revisions:
- Open an Issue First: Outline your proposal, including design considerations and impact.
- Gather Feedback: Discuss with maintainers and the community to ensure alignment and avoid duplicate work
### Code reviews
All submissions, including submissions by project members, require review. We
use GitHub pull requests for this purpose. Consult
[GitHub Help](https://help.github.com/articles/about-pull-requests/) for more
information on using pull requests.
## /LICENSE
``` path="/LICENSE"
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```
## /README.md
# Agent Development Kit (ADK)
[](LICENSE)
[](https://github.com/google/adk-python/actions/workflows/python-unit-tests.yml)
[](https://www.reddit.com/r/agentdevelopmentkit/)
An open-source, code-first Python toolkit for building, evaluating, and deploying sophisticated AI agents with flexibility and control.
Agent Development Kit (ADK) is a flexible and modular framework for developing and deploying AI agents. While optimized for Gemini and the Google ecosystem, ADK is model-agnostic, deployment-agnostic, and is built for compatibility with other frameworks. ADK was designed to make agent development feel more like software development, to make it easier for developers to create, deploy, and orchestrate agentic architectures that range from simple tasks to complex workflows.
---
## ✨ Key Features
- **Rich Tool Ecosystem**: Utilize pre-built tools, custom functions,
OpenAPI specs, or integrate existing tools to give agents diverse
capabilities, all for tight integration with the Google ecosystem.
- **Code-First Development**: Define agent logic, tools, and orchestration
directly in Python for ultimate flexibility, testability, and versioning.
- **Modular Multi-Agent Systems**: Design scalable applications by composing
multiple specialized agents into flexible hierarchies.
- **Deploy Anywhere**: Easily containerize and deploy agents on Cloud Run or
scale seamlessly with Vertex AI Agent Engine.
## 🚀 Installation
### Stable Release (Recommended)
You can install the latest stable version of ADK using `pip`:
```bash
pip install google-adk
```
The release cadence is weekly.
This version is recommended for most users as it represents the most recent official release.
### Development Version
Bug fixes and new features are merged into the main branch on GitHub first. If you need access to changes that haven't been included in an official PyPI release yet, you can install directly from the main branch:
```bash
pip install git+https://github.com/google/adk-python.git@main
```
Note: The development version is built directly from the latest code commits. While it includes the newest fixes and features, it may also contain experimental changes or bugs not present in the stable release. Use it primarily for testing upcoming changes or accessing critical fixes before they are officially released.
## 📚 Documentation
Explore the full documentation for detailed guides on building, evaluating, and
deploying agents:
* **[Documentation](https://google.github.io/adk-docs)**
## 🏁 Feature Highlight
### Define a single agent:
```python
from google.adk.agents import Agent
from google.adk.tools import google_search
root_agent = Agent(
name="search_assistant",
model="gemini-2.0-flash", # Or your preferred Gemini model
instruction="You are a helpful assistant. Answer user questions using Google Search when needed.",
description="An assistant that can search the web.",
tools=[google_search]
)
```
### Define a multi-agent system:
Define a multi-agent system with coordinator agent, greeter agent, and task execution agent. Then ADK engine and the model will guide the agents works together to accomplish the task.
```python
from google.adk.agents import LlmAgent, BaseAgent
# Define individual agents
greeter = LlmAgent(name="greeter", model="gemini-2.0-flash", ...)
task_executor = LlmAgent(name="task_executor", model="gemini-2.0-flash", ...)
# Create parent agent and assign children via sub_agents
coordinator = LlmAgent(
name="Coordinator",
model="gemini-2.0-flash",
description="I coordinate greetings and tasks.",
sub_agents=[ # Assign sub_agents here
greeter,
task_executor
]
)
```
### Development UI
A built-in development UI to help you test, evaluate, debug, and showcase your agent(s).
### Evaluate Agents
```bash
adk eval \
samples_for_testing/hello_world \
samples_for_testing/hello_world/hello_world_eval_set_001.evalset.json
```
## 🤖 A2A and ADK integration
For remote agent-to-agent communication, ADK integrates with the
[A2A protocol](https://github.com/google/A2A/).
See this [example](https://github.com/google/A2A/tree/main/samples/python/agents/google_adk)
for how they can work together.
## 🤝 Contributing
We welcome contributions from the community! Whether it's bug reports, feature requests, documentation improvements, or code contributions, please see our
- [General contribution guideline and flow](https://google.github.io/adk-docs/contributing-guide/#questions).
- Then if you want to contribute code, please read [Code Contributing Guidelines](./CONTRIBUTING.md) to get started.
## 📄 License
This project is licensed under the Apache 2.0 License - see the [LICENSE](LICENSE) file for details.
## Preview
This feature is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the [Service Specific Terms](https://cloud.google.com/terms/service-terms#1). Pre-GA features are available "as is" and might have limited support. For more information, see the [launch stage descriptions](https://cloud.google.com/products?hl=en#product-launch-stages).
---
*Happy Agent Building!*
## /assets/adk-web-dev-ui-function-call.png
Binary file available at https://raw.githubusercontent.com/google/adk-python/refs/heads/main/assets/adk-web-dev-ui-function-call.png
## /assets/agent-development-kit.png
Binary file available at https://raw.githubusercontent.com/google/adk-python/refs/heads/main/assets/agent-development-kit.png
## /pylintrc
``` path="/pylintrc"
# This Pylint rcfile contains a best-effort configuration to uphold the
# best-practices and style described in the Google Python style guide:
# https://google.github.io/styleguide/pyguide.html
#
# Its canonical open-source location is:
# https://google.github.io/styleguide/pylintrc
[MAIN]
# Files or directories to be skipped. They should be base names, not paths.
ignore=third_party
# Files or directories matching the regex patterns are skipped. The regex
# matches against base names, not paths.
ignore-patterns=
# Pickle collected data for later comparisons.
persistent=no
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
# Use multiple processes to speed up Pylint.
jobs=4
# Allow loading of arbitrary C extensions. Extensions are imported into the
# active Python interpreter and may run arbitrary code.
unsafe-load-any-extension=no
[MESSAGES CONTROL]
# Only show warnings with the listed confidence levels. Leave empty to show
# all. Valid levels: HIGH, INFERENCE, INFERENCE_FAILURE, UNDEFINED
confidence=
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
#enable=
# Disable the message, report, category or checker with the given id(s). You
# can either give multiple identifiers separated by comma (,) or put this
# option multiple times (only on the command line, not in the configuration
# file where it should appear only once).You can also use "--disable=all" to
# disable everything first and then re-enable specific checks. For example, if
# you want to run only the similarities checker, you can use "--disable=all
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=R,
abstract-method,
apply-builtin,
arguments-differ,
attribute-defined-outside-init,
backtick,
bad-option-value,
basestring-builtin,
buffer-builtin,
c-extension-no-member,
consider-using-enumerate,
cmp-builtin,
cmp-method,
coerce-builtin,
coerce-method,
delslice-method,
div-method,
eq-without-hash,
execfile-builtin,
file-builtin,
filter-builtin-not-iterating,
fixme,
getslice-method,
global-statement,
hex-method,
idiv-method,
implicit-str-concat,
import-error,
import-self,
import-star-module-level,
import-outside-toplevel,
input-builtin,
intern-builtin,
invalid-str-codec,
locally-disabled,
long-builtin,
long-suffix,
map-builtin-not-iterating,
misplaced-comparison-constant,
missing-function-docstring,
metaclass-assignment,
next-method-called,
next-method-defined,
no-absolute-import,
no-init, # added
no-member,
no-name-in-module,
no-self-use,
nonzero-method,
oct-method,
old-division,
old-ne-operator,
old-octal-literal,
old-raise-syntax,
parameter-unpacking,
print-statement,
raising-string,
range-builtin-not-iterating,
raw_input-builtin,
rdiv-method,
reduce-builtin,
relative-import,
reload-builtin,
round-builtin,
setslice-method,
signature-differs,
standarderror-builtin,
suppressed-message,
sys-max-int,
trailing-newlines,
unichr-builtin,
unicode-builtin,
unnecessary-pass,
unpacking-in-except,
useless-else-on-loop,
useless-suppression,
using-cmp-argument,
wrong-import-order,
xrange-builtin,
zip-builtin-not-iterating,
[REPORTS]
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html. You can also give a reporter class, eg
# mypackage.mymodule.MyReporterClass.
output-format=text
# Tells whether to display a full report or only the messages
reports=no
# Python expression which should return a note less than 10 (10 is the highest
# note). You have access to the variables errors warning, statement which
# respectively contain the number of errors / warnings messages and the total
# number of statements analyzed. This is used by the global evaluation report
# (RP0004).
evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
# Template used to display messages. This is a python new-style format string
# used to format the message information. See doc for all details
#msg-template=
[BASIC]
# Good variable names which should always be accepted, separated by a comma
good-names=main,_
# Bad variable names which should always be refused, separated by a comma
bad-names=
# Colon-delimited sets of names that determine each other's naming style when
# the name regexes allow several styles.
name-group=
# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
# List of decorators that produce properties, such as abc.abstractproperty. Add
# to this list to register other decorators that produce valid properties.
property-classes=abc.abstractproperty,cached_property.cached_property,cached_property.threaded_cached_property,cached_property.cached_property_with_ttl,cached_property.threaded_cached_property_with_ttl
# Regular expression matching correct function names
function-rgx=^(?:(?PsetUp|tearDown|setUpModule|tearDownModule)|(?P_?[A-Z][a-zA-Z0-9]*)|(?P_?[a-z][a-z0-9_]*))$
# Regular expression matching correct variable names
variable-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct constant names
const-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$
# Regular expression matching correct attribute names
attr-rgx=^_{0,2}[a-z][a-z0-9_]*$
# Regular expression matching correct argument names
argument-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct class attribute names
class-attribute-rgx=^(_?[A-Z][A-Z0-9_]*|__[a-z0-9_]+__|_?[a-z][a-z0-9_]*)$
# Regular expression matching correct inline iteration names
inlinevar-rgx=^[a-z][a-z0-9_]*$
# Regular expression matching correct class names
class-rgx=^_?[A-Z][a-zA-Z0-9]*$
# Regular expression matching correct module names
module-rgx=^(_?[a-z][a-z0-9_]*|__init__)$
# Regular expression matching correct method names
method-rgx=(?x)^(?:(?P_[a-z0-9_]+__|runTest|setUp|tearDown|setUpTestCase|tearDownTestCase|setupSelf|tearDownClass|setUpClass|(test|assert)_*[A-Z0-9][a-zA-Z0-9_]*|next)|(?P_{0,2}[A-Z][a-zA-Z0-9_]*)|(?P_{0,2}[a-z][a-z0-9_]*))$
# Regular expression which should only match function or class names that do
# not require a docstring.
no-docstring-rgx=(__.*__|main|test.*|.*test|.*Test)$
# Minimum line length for functions/classes that require docstrings, shorter
# ones are exempt.
docstring-min-length=12
[TYPECHECK]
# List of decorators that produce context managers, such as
# contextlib.contextmanager. Add to this list to register other decorators that
# produce valid context managers.
contextmanager-decorators=contextlib.contextmanager,contextlib2.contextmanager
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=
# List of class names for which member attributes should not be checked (useful
# for classes with dynamically set attributes). This supports the use of
# qualified names.
ignored-classes=optparse.Values,thread._local,_thread._local
# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
# expressions are accepted.
generated-members=
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=80
# TODO(https://github.com/pylint-dev/pylint/issues/3352): Direct pylint to exempt
# lines made too long by directives to pytype.
# Regexp for a line that is allowed to be longer than the limit.
ignore-long-lines=(?x)(
^\s*(\#\ )??$|
^\s*(from\s+\S+\s+)?import\s+.+$)
# Allow the body of an if to be on the same line as the test if there is no
# else.
single-line-if-stmt=yes
# Maximum number of lines in a module
max-module-lines=99999
# String used as indentation unit. The internal Google style guide mandates 2
# spaces. Google's externaly-published style guide says 4, consistent with
# PEP 8. Here, we use 2 spaces, for conformity with many open-sourced Google
# projects (like TensorFlow).
indent-string=' '
# Number of spaces of indent required inside a hanging or continued line.
indent-after-paren=4
# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
expected-line-ending-format=
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=TODO
[STRING]
# This flag controls whether inconsistent-quotes generates a warning when the
# character used as a quote delimiter is used inconsistently within a module.
check-quote-consistency=yes
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=no
# A regular expression matching the name of dummy variables (i.e. expectedly
# not used).
dummy-variables-rgx=^\*{0,2}(_$|unused_|dummy_)
# List of additional names supposed to be defined in builtins. Remember that
# you should avoid to define new builtins when possible.
additional-builtins=
# List of strings which can identify a callback function by name. A callback
# name must start or end with one of those strings.
callbacks=cb_,_cb
# List of qualified module names which can have objects that can redefine
# builtins.
redefining-builtins-modules=six,six.moves,past.builtins,future.builtins,functools
[LOGGING]
# Logging modules to check that the string format arguments are in logging
# function parameter format
logging-modules=logging,absl.logging,tensorflow.io.logging
[SIMILARITIES]
# Minimum lines number of a similarity.
min-similarity-lines=4
# Ignore comments when computing similarities.
ignore-comments=yes
# Ignore docstrings when computing similarities.
ignore-docstrings=yes
# Ignore imports when computing similarities.
ignore-imports=no
[SPELLING]
# Spelling dictionary name. Available dictionaries: none. To make it working
# install python-enchant package.
spelling-dict=
# List of comma separated words that should not be checked.
spelling-ignore-words=
# A path to a file that contains private dictionary; one word per line.
spelling-private-dict-file=
# Tells whether to store unknown words to indicated private dictionary in
# --spelling-private-dict-file option instead of raising a message.
spelling-store-unknown-words=no
[IMPORTS]
# Deprecated modules which should not be used, separated by a comma
deprecated-modules=regsub,
TERMIOS,
Bastion,
rexec,
sets
# Create a graph of every (i.e. internal and external) dependencies in the
# given file (report RP0402 must not be disabled)
import-graph=
# Create a graph of external dependencies in the given file (report RP0402 must
# not be disabled)
ext-import-graph=
# Create a graph of internal dependencies in the given file (report RP0402 must
# not be disabled)
int-import-graph=
# Force import order to recognize a module as part of the standard
# compatibility libraries.
known-standard-library=
# Force import order to recognize a module as part of a third party library.
known-third-party=enchant, absl
# Analyse import fallback blocks. This can be used to support both Python 2 and
# 3 compatible code, which means that the block might have code that exists
# only in one or another interpreter, leading to false positives when analysed.
analyse-fallback-blocks=no
[CLASSES]
# List of method names used to declare (i.e. assign) instance attributes.
defining-attr-methods=__init__,
__new__,
setUp
# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls,
class_
# List of valid names for the first argument in a metaclass class method.
valid-metaclass-classmethod-first-arg=mcs
```
## /pyproject.toml
```toml path="/pyproject.toml"
[project]
# Project metadata. Available keys are documented at:
# https://packaging.python.org/en/latest/specifications/declaring-project-metadata
name = "google-adk"
description = "Agent Development Kit"
readme = "README.md"
requires-python = ">=3.9"
license = { file = "LICENSE" }
authors = [{ name = "Google LLC", email = "googleapis-packages@google.com" }]
classifiers = [ # List of https://pypi.org/classifiers/
"Typing :: Typed",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.10",
"Operating System :: OS Independent",
"Topic :: Software Development :: Libraries :: Python Modules",
"License :: OSI Approved :: Apache Software License",
]
dependencies = [
# go/keep-sorted start
"authlib>=1.5.1", # For RestAPI Tool
"click>=8.1.8", # For CLI tools
"fastapi>=0.115.0", # FastAPI framework
"google-api-python-client>=2.157.0", # Google API client discovery
"google-cloud-aiplatform>=1.87.0", # For VertexAI integrations, e.g. example store.
"google-cloud-secret-manager>=2.22.0", # Fetching secrets in RestAPI Tool
"google-cloud-speech>=2.30.0", # For Audo Transcription
"google-cloud-storage>=2.18.0, <3.0.0", # For GCS Artifact service
"google-genai>=1.11.0", # Google GenAI SDK
"graphviz>=0.20.2", # Graphviz for graph rendering
"mcp>=1.5.0;python_version>='3.10'", # For MCP Toolset
"opentelemetry-api>=1.31.0", # OpenTelemetry
"opentelemetry-exporter-gcp-trace>=1.9.0",
"opentelemetry-sdk>=1.31.0",
"pydantic>=2.0, <3.0.0", # For data validation/models
"python-dotenv>=1.0.0", # To manage environment variables
"PyYAML>=6.0.2", # For APIHubToolset.
"sqlalchemy>=2.0", # SQL database ORM
"tzlocal>=5.3", # Time zone utilities
"uvicorn>=0.34.0", # ASGI server for FastAPI
# go/keep-sorted end
]
dynamic = ["version"]
[project.urls]
homepage = "https://google.github.io/adk-docs/"
repository = "https://github.com/google/adk-python"
changelog = "https://github.com/google/adk-python/blob/main/CHANGELOG.md"
documentation = "https://google.github.io/adk-docs/"
[project.scripts]
adk = "google.adk.cli:main"
[project.optional-dependencies]
dev = [
# go/keep-sorted start
"flit>=3.10.0",
"isort>=6.0.0",
"pyink>=24.10.0",
"pylint>=2.6.0",
# go/keep-sorted end
]
eval = [
# go/keep-sorted start
"google-cloud-aiplatform[evaluation]>=1.87.0",
"pandas>=2.2.3",
"tabulate>=0.9.0",
# go/keep-sorted end
]
test = [
# go/keep-sorted start
"anthropic>=0.43.0", # For anthropic model tests
"langchain-community>=0.3.17",
"langgraph>=0.2.60", # For LangGraphAgent
"litellm>=1.63.11", # For LiteLLM tests
"llama-index-readers-file>=0.4.0", # for retrieval tests
"pytest-asyncio>=0.25.0",
"pytest-mock>=3.14.0",
"pytest-xdist>=3.6.1",
"pytest>=8.3.4",
# go/keep-sorted end
]
docs = [
"autodoc_pydantic",
"furo",
"myst-parser",
"sphinx",
"sphinx-autodoc-typehints",
"sphinx-rtd-theme",
]
# Optional extensions
extensions = [
"anthropic>=0.43.0", # For anthropic model support
"beautifulsoup4>=3.2.2", # For load_web_page tool.
"crewai[tools];python_version>='3.10'", # For CrewaiTool
"docker>=7.0.0", # For ContainerCodeExecutor
"langgraph>=0.2.60", # For LangGraphAgent
"litellm>=1.63.11", # For LiteLLM support
"llama-index-readers-file>=0.4.0", # for retrieval usings LlamaIndex.
"lxml>=5.3.0", # For load_web_page tool.
]
[tool.pyink]
# Format py files following Google style-guide
line-length = 80
unstable = true
pyink-indentation = 2
pyink-use-majority-quotes = true
[build-system]
# Build system specify which backend is used to build/install the project (flit,
# poetry, setuptools,...). All backends are supported by `pip install`
requires = ["flit_core >=3.8,<4"]
build-backend = "flit_core.buildapi"
[tool.flit.sdist]
include = ['src/**/*', 'README.md', 'pyproject.toml', 'LICENSE']
exclude = ['src/**/*.sh']
[tool.flit.module]
name = "google.adk"
[tool.isort]
# Organize imports following Google style-guide
force_single_line = true
force_sort_within_sections = true
honor_case_in_force_sorted_sections = true
order_by_type = false
sort_relative_in_force_sorted_sections = true
multi_line_output = 3
line_length = 200
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "function"
```
## /src/google/adk/__init__.py
```py path="/src/google/adk/__init__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import version
from .agents.llm_agent import Agent
from .runners import Runner
__version__ = version.__version__
__all__ = ["Agent", "Runner"]
```
## /src/google/adk/agents/__init__.py
```py path="/src/google/adk/agents/__init__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base_agent import BaseAgent
from .live_request_queue import LiveRequest
from .live_request_queue import LiveRequestQueue
from .llm_agent import Agent
from .llm_agent import LlmAgent
from .loop_agent import LoopAgent
from .parallel_agent import ParallelAgent
from .run_config import RunConfig
from .sequential_agent import SequentialAgent
__all__ = [
'Agent',
'BaseAgent',
'LlmAgent',
'LoopAgent',
'ParallelAgent',
'SequentialAgent',
]
```
## /src/google/adk/agents/active_streaming_tool.py
```py path="/src/google/adk/agents/active_streaming_tool.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
from typing import Optional
from pydantic import BaseModel
from pydantic import ConfigDict
from .live_request_queue import LiveRequestQueue
class ActiveStreamingTool(BaseModel):
"""Manages streaming tool related resources during invocation."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra='forbid',
)
task: Optional[asyncio.Task] = None
"""The active task of this streaming tool."""
stream: Optional[LiveRequestQueue] = None
"""The active (input) streams of this streaming tool."""
```
## /src/google/adk/agents/base_agent.py
```py path="/src/google/adk/agents/base_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Any
from typing import AsyncGenerator
from typing import Callable
from typing import final
from typing import Optional
from typing import TYPE_CHECKING
from google.genai import types
from opentelemetry import trace
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import Field
from pydantic import field_validator
from typing_extensions import override
from ..events.event import Event
from .callback_context import CallbackContext
if TYPE_CHECKING:
from .invocation_context import InvocationContext
tracer = trace.get_tracer('gcp.vertex.agent')
BeforeAgentCallback = Callable[[CallbackContext], Optional[types.Content]]
"""Callback signature that is invoked before the agent run.
Args:
callback_context: MUST be named 'callback_context' (enforced).
Returns:
The content to return to the user. When set, the agent run will be skipped and
the provided content will be returned to user.
"""
AfterAgentCallback = Callable[[CallbackContext], Optional[types.Content]]
"""Callback signature that is invoked after the agent run.
Args:
callback_context: MUST be named 'callback_context' (enforced).
Returns:
The content to return to the user. When set, the provided content will be
appended to event history as agent response.
"""
class BaseAgent(BaseModel):
"""Base class for all agents in Agent Development Kit."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra='forbid',
)
name: str
"""The agent's name.
Agent name must be a Python identifier and unique within the agent tree.
Agent name cannot be "user", since it's reserved for end-user's input.
"""
description: str = ''
"""Description about the agent's capability.
The model uses this to determine whether to delegate control to the agent.
One-line description is enough and preferred.
"""
parent_agent: Optional[BaseAgent] = Field(default=None, init=False)
"""The parent agent of this agent.
Note that an agent can ONLY be added as sub-agent once.
If you want to add one agent twice as sub-agent, consider to create two agent
instances with identical config, but with different name and add them to the
agent tree.
"""
sub_agents: list[BaseAgent] = Field(default_factory=list)
"""The sub-agents of this agent."""
before_agent_callback: Optional[BeforeAgentCallback] = None
"""Callback signature that is invoked before the agent run.
Args:
callback_context: MUST be named 'callback_context' (enforced).
Returns:
The content to return to the user. When set, the agent run will be skipped
and the provided content will be returned to user.
"""
after_agent_callback: Optional[AfterAgentCallback] = None
"""Callback signature that is invoked after the agent run.
Args:
callback_context: MUST be named 'callback_context' (enforced).
Returns:
The content to return to the user. When set, the provided content will be
appended to event history as agent response.
"""
@final
async def run_async(
self,
parent_context: InvocationContext,
) -> AsyncGenerator[Event, None]:
"""Entry method to run an agent via text-based conversation.
Args:
parent_context: InvocationContext, the invocation context of the parent
agent.
Yields:
Event: the events generated by the agent.
"""
with tracer.start_as_current_span(f'agent_run [{self.name}]'):
ctx = self._create_invocation_context(parent_context)
if event := self.__handle_before_agent_callback(ctx):
yield event
if ctx.end_invocation:
return
async for event in self._run_async_impl(ctx):
yield event
if ctx.end_invocation:
return
if event := self.__handle_after_agent_callback(ctx):
yield event
@final
async def run_live(
self,
parent_context: InvocationContext,
) -> AsyncGenerator[Event, None]:
"""Entry method to run an agent via video/audio-based conversation.
Args:
parent_context: InvocationContext, the invocation context of the parent
agent.
Yields:
Event: the events generated by the agent.
"""
with tracer.start_as_current_span(f'agent_run [{self.name}]'):
ctx = self._create_invocation_context(parent_context)
# TODO(hangfei): support before/after_agent_callback
async for event in self._run_live_impl(ctx):
yield event
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Core logic to run this agent via text-based conversation.
Args:
ctx: InvocationContext, the invocation context for this agent.
Yields:
Event: the events generated by the agent.
"""
raise NotImplementedError(
f'_run_async_impl for {type(self)} is not implemented.'
)
yield # AsyncGenerator requires having at least one yield statement
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""Core logic to run this agent via video/audio-based conversation.
Args:
ctx: InvocationContext, the invocation context for this agent.
Yields:
Event: the events generated by the agent.
"""
raise NotImplementedError(
f'_run_live_impl for {type(self)} is not implemented.'
)
yield # AsyncGenerator requires having at least one yield statement
@property
def root_agent(self) -> BaseAgent:
"""Gets the root agent of this agent."""
root_agent = self
while root_agent.parent_agent is not None:
root_agent = root_agent.parent_agent
return root_agent
def find_agent(self, name: str) -> Optional[BaseAgent]:
"""Finds the agent with the given name in this agent and its descendants.
Args:
name: The name of the agent to find.
Returns:
The agent with the matching name, or None if no such agent is found.
"""
if self.name == name:
return self
return self.find_sub_agent(name)
def find_sub_agent(self, name: str) -> Optional[BaseAgent]:
"""Finds the agent with the given name in this agent's descendants.
Args:
name: The name of the agent to find.
Returns:
The agent with the matching name, or None if no such agent is found.
"""
for sub_agent in self.sub_agents:
if result := sub_agent.find_agent(name):
return result
return None
def _create_invocation_context(
self, parent_context: InvocationContext
) -> InvocationContext:
"""Creates a new invocation context for this agent."""
invocation_context = parent_context.model_copy(update={'agent': self})
if parent_context.branch:
invocation_context.branch = f'{parent_context.branch}.{self.name}'
return invocation_context
def __handle_before_agent_callback(
self, ctx: InvocationContext
) -> Optional[Event]:
"""Runs the before_agent_callback if it exists.
Returns:
Optional[Event]: an event if callback provides content or changed state.
"""
ret_event = None
if not isinstance(self.before_agent_callback, Callable):
return ret_event
callback_context = CallbackContext(ctx)
before_agent_callback_content = self.before_agent_callback(
callback_context=callback_context
)
if before_agent_callback_content:
ret_event = Event(
invocation_id=ctx.invocation_id,
author=self.name,
branch=ctx.branch,
content=before_agent_callback_content,
actions=callback_context._event_actions,
)
ctx.end_invocation = True
return ret_event
if callback_context.state.has_delta():
ret_event = Event(
invocation_id=ctx.invocation_id,
author=self.name,
branch=ctx.branch,
actions=callback_context._event_actions,
)
return ret_event
def __handle_after_agent_callback(
self, invocation_context: InvocationContext
) -> Optional[Event]:
"""Runs the after_agent_callback if it exists.
Returns:
Optional[Event]: an event if callback provides content or changed state.
"""
ret_event = None
if not isinstance(self.after_agent_callback, Callable):
return ret_event
callback_context = CallbackContext(invocation_context)
after_agent_callback_content = self.after_agent_callback(
callback_context=callback_context
)
if after_agent_callback_content or callback_context.state.has_delta():
ret_event = Event(
invocation_id=invocation_context.invocation_id,
author=self.name,
branch=invocation_context.branch,
content=after_agent_callback_content,
actions=callback_context._event_actions,
)
return ret_event
@override
def model_post_init(self, __context: Any) -> None:
self.__set_parent_agent_for_sub_agents()
@field_validator('name', mode='after')
@classmethod
def __validate_name(cls, value: str):
if not value.isidentifier():
raise ValueError(
f'Found invalid agent name: `{value}`.'
' Agent name must be a valid identifier. It should start with a'
' letter (a-z, A-Z) or an underscore (_), and can only contain'
' letters, digits (0-9), and underscores.'
)
if value == 'user':
raise ValueError(
"Agent name cannot be `user`. `user` is reserved for end-user's"
' input.'
)
return value
def __set_parent_agent_for_sub_agents(self) -> BaseAgent:
for sub_agent in self.sub_agents:
if sub_agent.parent_agent is not None:
raise ValueError(
f'Agent `{sub_agent.name}` already has a parent agent, current'
f' parent: `{sub_agent.parent_agent.name}`, trying to add:'
f' `{self.name}`'
)
sub_agent.parent_agent = self
return self
```
## /src/google/adk/agents/callback_context.py
```py path="/src/google/adk/agents/callback_context.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Optional, TYPE_CHECKING
from typing_extensions import override
from .readonly_context import ReadonlyContext
if TYPE_CHECKING:
from google.genai import types
from ..events.event_actions import EventActions
from ..sessions.state import State
from .invocation_context import InvocationContext
class CallbackContext(ReadonlyContext):
"""The context of various callbacks within an agent run."""
def __init__(
self,
invocation_context: InvocationContext,
*,
event_actions: Optional[EventActions] = None,
) -> None:
super().__init__(invocation_context)
from ..events.event_actions import EventActions
from ..sessions.state import State
# TODO(weisun): make this public for Agent Development Kit, but private for
# users.
self._event_actions = event_actions or EventActions()
self._state = State(
value=invocation_context.session.state,
delta=self._event_actions.state_delta,
)
@property
@override
def state(self) -> State:
"""The delta-aware state of the current session.
For any state change, you can mutate this object directly,
e.g. `ctx.state['foo'] = 'bar'`
"""
return self._state
@property
def user_content(self) -> Optional[types.Content]:
"""The user content that started this invocation. READONLY field."""
return self._invocation_context.user_content
def load_artifact(
self, filename: str, version: Optional[int] = None
) -> Optional[types.Part]:
"""Loads an artifact attached to the current session.
Args:
filename: The filename of the artifact.
version: The version of the artifact. If None, the latest version will be
returned.
Returns:
The artifact.
"""
if self._invocation_context.artifact_service is None:
raise ValueError("Artifact service is not initialized.")
return self._invocation_context.artifact_service.load_artifact(
app_name=self._invocation_context.app_name,
user_id=self._invocation_context.user_id,
session_id=self._invocation_context.session.id,
filename=filename,
version=version,
)
def save_artifact(self, filename: str, artifact: types.Part) -> int:
"""Saves an artifact and records it as delta for the current session.
Args:
filename: The filename of the artifact.
artifact: The artifact to save.
Returns:
The version of the artifact.
"""
if self._invocation_context.artifact_service is None:
raise ValueError("Artifact service is not initialized.")
version = self._invocation_context.artifact_service.save_artifact(
app_name=self._invocation_context.app_name,
user_id=self._invocation_context.user_id,
session_id=self._invocation_context.session.id,
filename=filename,
artifact=artifact,
)
self._event_actions.artifact_delta[filename] = version
return version
```
## /src/google/adk/agents/invocation_context.py
```py path="/src/google/adk/agents/invocation_context.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import Optional
import uuid
from google.genai import types
from pydantic import BaseModel
from pydantic import ConfigDict
from ..artifacts.base_artifact_service import BaseArtifactService
from ..memory.base_memory_service import BaseMemoryService
from ..sessions.base_session_service import BaseSessionService
from ..sessions.session import Session
from .active_streaming_tool import ActiveStreamingTool
from .base_agent import BaseAgent
from .live_request_queue import LiveRequestQueue
from .run_config import RunConfig
from .transcription_entry import TranscriptionEntry
class LlmCallsLimitExceededError(Exception):
"""Error thrown when the number of LLM calls exceed the limit."""
class _InvocationCostManager(BaseModel):
"""A container to keep track of the cost of invocation.
While we don't expected the metrics captured here to be a direct
representatative of monetary cost incurred in executing the current
invocation, but they, in someways have an indirect affect.
"""
_number_of_llm_calls: int = 0
"""A counter that keeps track of number of llm calls made."""
def increment_and_enforce_llm_calls_limit(
self, run_config: Optional[RunConfig]
):
"""Increments _number_of_llm_calls and enforces the limit."""
# We first increment the counter and then check the conditions.
self._number_of_llm_calls += 1
if (
run_config
and run_config.max_llm_calls > 0
and self._number_of_llm_calls > run_config.max_llm_calls
):
# We only enforce the limit if the limit is a positive number.
raise LlmCallsLimitExceededError(
"Max number of llm calls limit of"
f" `{run_config.max_llm_calls}` exceeded"
)
class InvocationContext(BaseModel):
"""An invocation context represents the data of a single invocation of an agent.
An invocation:
1. Starts with a user message and ends with a final response.
2. Can contain one or multiple agent calls.
3. Is handled by runner.run_async().
An invocation runs an agent until it does not request to transfer to another
agent.
An agent call:
1. Is handled by agent.run().
2. Ends when agent.run() ends.
An LLM agent call is an agent with a BaseLLMFlow.
An LLM agent call can contain one or multiple steps.
An LLM agent runs steps in a loop until:
1. A final response is generated.
2. The agent transfers to another agent.
3. The end_invocation is set to true by any callbacks or tools.
A step:
1. Calls the LLM only once and yields its response.
2. Calls the tools and yields their responses if requested.
The summarization of the function response is considered another step, since
it is another llm call.
A step ends when it's done calling llm and tools, or if the end_invocation
is set to true at any time.
\`\`\`
┌─────────────────────── invocation ──────────────────────────┐
┌──────────── llm_agent_call_1 ────────────┐ ┌─ agent_call_2 ─┐
┌──── step_1 ────────┐ ┌───── step_2 ──────┐
[call_llm] [call_tool] [call_llm] [transfer]
\`\`\`
"""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra="forbid",
)
artifact_service: Optional[BaseArtifactService] = None
session_service: BaseSessionService
memory_service: Optional[BaseMemoryService] = None
invocation_id: str
"""The id of this invocation context. Readonly."""
branch: Optional[str] = None
"""The branch of the invocation context.
The format is like agent_1.agent_2.agent_3, where agent_1 is the parent of
agent_2, and agent_2 is the parent of agent_3.
Branch is used when multiple sub-agents shouldn't see their peer agents'
conversation history.
"""
agent: BaseAgent
"""The current agent of this invocation context. Readonly."""
user_content: Optional[types.Content] = None
"""The user content that started this invocation. Readonly."""
session: Session
"""The current session of this invocation context. Readonly."""
end_invocation: bool = False
"""Whether to end this invocation.
Set to True in callbacks or tools to terminate this invocation."""
live_request_queue: Optional[LiveRequestQueue] = None
"""The queue to receive live requests."""
active_streaming_tools: Optional[dict[str, ActiveStreamingTool]] = None
"""The running streaming tools of this invocation."""
transcription_cache: Optional[list[TranscriptionEntry]] = None
"""Caches necessary, data audio or contents, that are needed by transcription."""
run_config: Optional[RunConfig] = None
"""Configurations for live agents under this invocation."""
_invocation_cost_manager: _InvocationCostManager = _InvocationCostManager()
"""A container to keep track of different kinds of costs incurred as a part
of this invocation.
"""
def increment_llm_call_count(
self,
):
"""Tracks number of llm calls made.
Raises:
LlmCallsLimitExceededError: If number of llm calls made exceed the set
threshold.
"""
self._invocation_cost_manager.increment_and_enforce_llm_calls_limit(
self.run_config
)
@property
def app_name(self) -> str:
return self.session.app_name
@property
def user_id(self) -> str:
return self.session.user_id
def new_invocation_context_id() -> str:
return "e-" + str(uuid.uuid4())
```
## /src/google/adk/agents/langgraph_agent.py
```py path="/src/google/adk/agents/langgraph_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import AsyncGenerator
from typing import Union
from google.genai import types
from langchain_core.messages import AIMessage
from langchain_core.messages import HumanMessage
from langchain_core.messages import SystemMessage
from langchain_core.runnables.config import RunnableConfig
from langgraph.graph.graph import CompiledGraph
from pydantic import ConfigDict
from typing_extensions import override
from ..events.event import Event
from .base_agent import BaseAgent
from .invocation_context import InvocationContext
def _get_last_human_messages(events: list[Event]) -> list[HumanMessage]:
"""Extracts last human messages from given list of events.
Args:
events: the list of events
Returns:
list of last human messages
"""
messages = []
for event in reversed(events):
if messages and event.author != 'user':
break
if event.author == 'user' and event.content and event.content.parts:
messages.append(HumanMessage(content=event.content.parts[0].text))
return list(reversed(messages))
class LangGraphAgent(BaseAgent):
"""Currently a concept implementation, supports single and multi-turn."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
)
graph: CompiledGraph
instruction: str = ''
@override
async def _run_async_impl(
self,
ctx: InvocationContext,
) -> AsyncGenerator[Event, None]:
# Needed for langgraph checkpointer (for subsequent invocations; multi-turn)
config: RunnableConfig = {'configurable': {'thread_id': ctx.session.id}}
# Add instruction as SystemMessage if graph state is empty
current_graph_state = self.graph.get_state(config)
graph_messages = (
current_graph_state.values.get('messages', [])
if current_graph_state.values
else []
)
messages = (
[SystemMessage(content=self.instruction)]
if self.instruction and not graph_messages
else []
)
# Add events to messages (evaluating the memory used; parent agent vs checkpointer)
messages += self._get_messages(ctx.session.events)
# Use the Runnable
final_state = self.graph.invoke({'messages': messages}, config)
result = final_state['messages'][-1].content
result_event = Event(
invocation_id=ctx.invocation_id,
author=self.name,
branch=ctx.branch,
content=types.Content(
role='model',
parts=[types.Part.from_text(text=result)],
),
)
yield result_event
def _get_messages(
self, events: list[Event]
) -> list[Union[HumanMessage, AIMessage]]:
"""Extracts messages from given list of events.
If the developer provides their own memory within langgraph, we return the
last user messages only. Otherwise, we return all messages between the user
and the agent.
Args:
events: the list of events
Returns:
list of messages
"""
if self.graph.checkpointer:
return _get_last_human_messages(events)
else:
return self._get_conversation_with_agent(events)
def _get_conversation_with_agent(
self, events: list[Event]
) -> list[Union[HumanMessage, AIMessage]]:
"""Extracts messages from given list of events.
Args:
events: the list of events
Returns:
list of messages
"""
messages = []
for event in events:
if not event.content or not event.content.parts:
continue
if event.author == 'user':
messages.append(HumanMessage(content=event.content.parts[0].text))
elif event.author == self.name:
messages.append(AIMessage(content=event.content.parts[0].text))
return messages
```
## /src/google/adk/agents/live_request_queue.py
```py path="/src/google/adk/agents/live_request_queue.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from typing import Optional
from google.genai import types
from pydantic import BaseModel
from pydantic import ConfigDict
class LiveRequest(BaseModel):
"""Request send to live agents."""
model_config = ConfigDict(ser_json_bytes='base64', val_json_bytes='base64')
content: Optional[types.Content] = None
"""If set, send the content to the model in turn-by-turn mode."""
blob: Optional[types.Blob] = None
"""If set, send the blob to the model in realtime mode."""
close: bool = False
"""If set, close the queue. queue.shutdown() is only supported in Python 3.13+."""
class LiveRequestQueue:
"""Queue used to send LiveRequest in a live(bidirectional streaming) way."""
def __init__(self):
# Ensure there's an event loop available in this thread
try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop, create one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Now create the queue (it will use the event loop we just ensured exists)
self._queue = asyncio.Queue()
def close(self):
self._queue.put_nowait(LiveRequest(close=True))
def send_content(self, content: types.Content):
self._queue.put_nowait(LiveRequest(content=content))
def send_realtime(self, blob: types.Blob):
self._queue.put_nowait(LiveRequest(blob=blob))
def send(self, req: LiveRequest):
self._queue.put_nowait(req)
async def get(self) -> LiveRequest:
return await self._queue.get()
```
## /src/google/adk/agents/llm_agent.py
```py path="/src/google/adk/agents/llm_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from typing import Any
from typing import AsyncGenerator
from typing import Callable
from typing import Literal
from typing import Optional
from typing import Union
from google.genai import types
from pydantic import BaseModel
from pydantic import Field
from pydantic import field_validator
from pydantic import model_validator
from typing_extensions import override
from typing_extensions import TypeAlias
from ..code_executors.base_code_executor import BaseCodeExecutor
from ..events.event import Event
from ..examples.base_example_provider import BaseExampleProvider
from ..examples.example import Example
from ..flows.llm_flows.auto_flow import AutoFlow
from ..flows.llm_flows.base_llm_flow import BaseLlmFlow
from ..flows.llm_flows.single_flow import SingleFlow
from ..models.base_llm import BaseLlm
from ..models.llm_request import LlmRequest
from ..models.llm_response import LlmResponse
from ..models.registry import LLMRegistry
from ..planners.base_planner import BasePlanner
from ..tools.base_tool import BaseTool
from ..tools.function_tool import FunctionTool
from ..tools.tool_context import ToolContext
from .base_agent import BaseAgent
from .callback_context import CallbackContext
from .invocation_context import InvocationContext
from .readonly_context import ReadonlyContext
logger = logging.getLogger(__name__)
BeforeModelCallback: TypeAlias = Callable[
[CallbackContext, LlmRequest], Optional[LlmResponse]
]
AfterModelCallback: TypeAlias = Callable[
[CallbackContext, LlmResponse],
Optional[LlmResponse],
]
BeforeToolCallback: TypeAlias = Callable[
[BaseTool, dict[str, Any], ToolContext],
Optional[dict],
]
AfterToolCallback: TypeAlias = Callable[
[BaseTool, dict[str, Any], ToolContext, dict],
Optional[dict],
]
InstructionProvider: TypeAlias = Callable[[ReadonlyContext], str]
ToolUnion: TypeAlias = Union[Callable, BaseTool]
ExamplesUnion = Union[list[Example], BaseExampleProvider]
def _convert_tool_union_to_tool(
tool_union: ToolUnion,
) -> BaseTool:
return (
tool_union
if isinstance(tool_union, BaseTool)
else FunctionTool(tool_union)
)
class LlmAgent(BaseAgent):
"""LLM-based Agent."""
model: Union[str, BaseLlm] = ''
"""The model to use for the agent.
When not set, the agent will inherit the model from its ancestor.
"""
instruction: Union[str, InstructionProvider] = ''
"""Instructions for the LLM model, guiding the agent's behavior."""
global_instruction: Union[str, InstructionProvider] = ''
"""Instructions for all the agents in the entire agent tree.
global_instruction ONLY takes effect in root agent.
For example: use global_instruction to make all agents have a stable identity
or personality.
"""
tools: list[ToolUnion] = Field(default_factory=list)
"""Tools available to this agent."""
generate_content_config: Optional[types.GenerateContentConfig] = None
"""The additional content generation configurations.
NOTE: not all fields are usable, e.g. tools must be configured via `tools`,
thinking_config must be configured via `planner` in LlmAgent.
For example: use this config to adjust model temperature, configure safety
settings, etc.
"""
# LLM-based agent transfer configs - Start
disallow_transfer_to_parent: bool = False
"""Disallows LLM-controlled transferring to the parent agent."""
disallow_transfer_to_peers: bool = False
"""Disallows LLM-controlled transferring to the peer agents."""
# LLM-based agent transfer configs - End
include_contents: Literal['default', 'none'] = 'default'
"""Whether to include contents in the model request.
When set to 'none', the model request will not include any contents, such as
user messages, tool results, etc.
"""
# Controlled input/output configurations - Start
input_schema: Optional[type[BaseModel]] = None
"""The input schema when agent is used as a tool."""
output_schema: Optional[type[BaseModel]] = None
"""The output schema when agent replies.
NOTE: when this is set, agent can ONLY reply and CANNOT use any tools, such as
function tools, RAGs, agent transfer, etc.
"""
output_key: Optional[str] = None
"""The key in session state to store the output of the agent.
Typically use cases:
- Extracts agent reply for later use, such as in tools, callbacks, etc.
- Connects agents to coordinate with each other.
"""
# Controlled input/output configurations - End
# Advance features - Start
planner: Optional[BasePlanner] = None
"""Instructs the agent to make a plan and execute it step by step.
NOTE: to use model's built-in thinking features, set the `thinking_config`
field in `google.adk.planners.built_in_planner`.
"""
code_executor: Optional[BaseCodeExecutor] = None
"""Allow agent to execute code blocks from model responses using the provided
CodeExecutor.
Check out available code executions in `google.adk.code_executor` package.
NOTE: to use model's built-in code executor, don't set this field, add
`google.adk.tools.built_in_code_execution` to tools instead.
"""
# Advance features - End
# TODO: remove below fields after migration. - Start
# These fields are added back for easier migration.
examples: Optional[ExamplesUnion] = None
# TODO: remove above fields after migration. - End
# Callbacks - Start
before_model_callback: Optional[BeforeModelCallback] = None
"""Called before calling the LLM.
Args:
callback_context: CallbackContext,
llm_request: LlmRequest, The raw model request. Callback can mutate the
request.
Returns:
The content to return to the user. When present, the model call will be
skipped and the provided content will be returned to user.
"""
after_model_callback: Optional[AfterModelCallback] = None
"""Called after calling LLM.
Args:
callback_context: CallbackContext,
llm_response: LlmResponse, the actual model response.
Returns:
The content to return to the user. When present, the actual model response
will be ignored and the provided content will be returned to user.
"""
before_tool_callback: Optional[BeforeToolCallback] = None
"""Called before the tool is called.
Args:
tool: The tool to be called.
args: The arguments to the tool.
tool_context: ToolContext,
Returns:
The tool response. When present, the returned tool response will be used and
the framework will skip calling the actual tool.
"""
after_tool_callback: Optional[AfterToolCallback] = None
"""Called after the tool is called.
Args:
tool: The tool to be called.
args: The arguments to the tool.
tool_context: ToolContext,
tool_response: The response from the tool.
Returns:
When present, the returned dict will be used as tool result.
"""
# Callbacks - End
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
async for event in self._llm_flow.run_async(ctx):
self.__maybe_save_output_to_state(event)
yield event
@override
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
async for event in self._llm_flow.run_live(ctx):
self.__maybe_save_output_to_state(event)
yield event
if ctx.end_invocation:
return
@property
def canonical_model(self) -> BaseLlm:
"""The resolved self.model field as BaseLlm.
This method is only for use by Agent Development Kit.
"""
if isinstance(self.model, BaseLlm):
return self.model
elif self.model: # model is non-empty str
return LLMRegistry.new_llm(self.model)
else: # find model from ancestors.
ancestor_agent = self.parent_agent
while ancestor_agent is not None:
if isinstance(ancestor_agent, LlmAgent):
return ancestor_agent.canonical_model
ancestor_agent = ancestor_agent.parent_agent
raise ValueError(f'No model found for {self.name}.')
def canonical_instruction(self, ctx: ReadonlyContext) -> str:
"""The resolved self.instruction field to construct instruction for this agent.
This method is only for use by Agent Development Kit.
"""
if isinstance(self.instruction, str):
return self.instruction
else:
return self.instruction(ctx)
def canonical_global_instruction(self, ctx: ReadonlyContext) -> str:
"""The resolved self.instruction field to construct global instruction.
This method is only for use by Agent Development Kit.
"""
if isinstance(self.global_instruction, str):
return self.global_instruction
else:
return self.global_instruction(ctx)
@property
def canonical_tools(self) -> list[BaseTool]:
"""The resolved self.tools field as a list of BaseTool.
This method is only for use by Agent Development Kit.
"""
return [_convert_tool_union_to_tool(tool) for tool in self.tools]
@property
def _llm_flow(self) -> BaseLlmFlow:
if (
self.disallow_transfer_to_parent
and self.disallow_transfer_to_peers
and not self.sub_agents
):
return SingleFlow()
else:
return AutoFlow()
def __maybe_save_output_to_state(self, event: Event):
"""Saves the model output to state if needed."""
if (
self.output_key
and event.is_final_response()
and event.content
and event.content.parts
):
result = ''.join(
[part.text if part.text else '' for part in event.content.parts]
)
if self.output_schema:
result = self.output_schema.model_validate_json(result).model_dump(
exclude_none=True
)
event.actions.state_delta[self.output_key] = result
@model_validator(mode='after')
def __model_validator_after(self) -> LlmAgent:
self.__check_output_schema()
return self
def __check_output_schema(self):
if not self.output_schema:
return
if (
not self.disallow_transfer_to_parent
or not self.disallow_transfer_to_peers
):
logger.warning(
'Invalid config for agent %s: output_schema cannot co-exist with'
' agent transfer configurations. Setting'
' disallow_transfer_to_parent=True, disallow_transfer_to_peers=True',
self.name,
)
self.disallow_transfer_to_parent = True
self.disallow_transfer_to_peers = True
if self.sub_agents:
raise ValueError(
f'Invalid config for agent {self.name}: if output_schema is set,'
' sub_agents must be empty to disable agent transfer.'
)
if self.tools:
raise ValueError(
f'Invalid config for agent {self.name}: if output_schema is set,'
' tools must be empty'
)
@field_validator('generate_content_config', mode='after')
@classmethod
def __validate_generate_content_config(
cls, generate_content_config: Optional[types.GenerateContentConfig]
) -> types.GenerateContentConfig:
if not generate_content_config:
return types.GenerateContentConfig()
if generate_content_config.thinking_config:
raise ValueError('Thinking config should be set via LlmAgent.planner.')
if generate_content_config.tools:
raise ValueError('All tools must be set via LlmAgent.tools.')
if generate_content_config.system_instruction:
raise ValueError(
'System instruction must be set via LlmAgent.instruction.'
)
if generate_content_config.response_schema:
raise ValueError(
'Response schema must be set via LlmAgent.output_schema.'
)
return generate_content_config
Agent: TypeAlias = LlmAgent
```
## /src/google/adk/agents/loop_agent.py
```py path="/src/google/adk/agents/loop_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Loop agent implementation."""
from __future__ import annotations
from typing import AsyncGenerator
from typing import Optional
from typing_extensions import override
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from .base_agent import BaseAgent
class LoopAgent(BaseAgent):
"""A shell agent that run its sub-agents in a loop.
When sub-agent generates an event with escalate or max_iterations are
reached, the loop agent will stop.
"""
max_iterations: Optional[int] = None
"""The maximum number of iterations to run the loop agent.
If not set, the loop agent will run indefinitely until a sub-agent
escalates.
"""
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
times_looped = 0
while not self.max_iterations or times_looped < self.max_iterations:
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
yield event
if event.actions.escalate:
return
times_looped += 1
return
@override
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
raise NotImplementedError('The behavior for run_live is not defined yet.')
yield # AsyncGenerator requires having at least one yield statement
```
## /src/google/adk/agents/parallel_agent.py
```py path="/src/google/adk/agents/parallel_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parallel agent implementation."""
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
from typing_extensions import override
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from .base_agent import BaseAgent
def _set_branch_for_current_agent(
current_agent: BaseAgent, invocation_context: InvocationContext
):
invocation_context.branch = (
f"{invocation_context.branch}.{current_agent.name}"
if invocation_context.branch
else current_agent.name
)
async def _merge_agent_run(
agent_runs: list[AsyncGenerator[Event, None]],
) -> AsyncGenerator[Event, None]:
"""Merges the agent run event generator.
This implementation guarantees for each agent, it won't move on until the
generated event is processed by upstream runner.
Args:
agent_runs: A list of async generators that yield events from each agent.
Yields:
Event: The next event from the merged generator.
"""
tasks = [
asyncio.create_task(events_for_one_agent.__anext__())
for events_for_one_agent in agent_runs
]
pending_tasks = set(tasks)
while pending_tasks:
done, pending_tasks = await asyncio.wait(
pending_tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
yield task.result()
# Find the generator that produced this event and move it on.
for i, original_task in enumerate(tasks):
if task == original_task:
new_task = asyncio.create_task(agent_runs[i].__anext__())
tasks[i] = new_task
pending_tasks.add(new_task)
break # stop iterating once found
except StopAsyncIteration:
continue
class ParallelAgent(BaseAgent):
"""A shell agent that run its sub-agents in parallel in isolated manner.
This approach is beneficial for scenarios requiring multiple perspectives or
attempts on a single task, such as:
- Running different algorithms simultaneously.
- Generating multiple responses for review by a subsequent evaluation agent.
"""
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
_set_branch_for_current_agent(self, ctx)
agent_runs = [agent.run_async(ctx) for agent in self.sub_agents]
async for event in _merge_agent_run(agent_runs):
yield event
```
## /src/google/adk/agents/readonly_context.py
```py path="/src/google/adk/agents/readonly_context.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from types import MappingProxyType
from typing import Any
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .invocation_context import InvocationContext
class ReadonlyContext:
def __init__(
self,
invocation_context: InvocationContext,
) -> None:
self._invocation_context = invocation_context
@property
def invocation_id(self) -> str:
"""The current invocation id."""
return self._invocation_context.invocation_id
@property
def agent_name(self) -> str:
"""The name of the agent that is currently running."""
return self._invocation_context.agent.name
@property
def state(self) -> MappingProxyType[str, Any]:
"""The state of the current session. READONLY field."""
return MappingProxyType(self._invocation_context.session.state)
```
## /src/google/adk/agents/remote_agent.py
```py path="/src/google/adk/agents/remote_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import AsyncGenerator
from pydantic import Field
import requests
from typing_extensions import override
from ..events.event import Event
from .base_agent import BaseAgent
from .invocation_context import InvocationContext
class RemoteAgent(BaseAgent):
"""Experimental, do not use."""
url: str
sub_agents: list[BaseAgent] = Field(
default_factory=list, init=False, frozen=True
)
"""Sub-agent is disabled in RemoteAgent."""
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
data = {
'invocation_id': ctx.invocation_id,
'session': ctx.session.model_dump(exclude_none=True),
}
events = requests.post(self.url, data=json.dumps(data), timeout=120)
events.raise_for_status()
for event in events.json():
e = Event.model_validate(event)
e.author = self.name
yield e
```
## /src/google/adk/agents/run_config.py
```py path="/src/google/adk/agents/run_config.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
import logging
import sys
from typing import Optional
from google.genai import types
from pydantic import BaseModel
from pydantic import ConfigDict
from pydantic import field_validator
logger = logging.getLogger(__name__)
class StreamingMode(Enum):
NONE = None
SSE = 'sse'
BIDI = 'bidi'
class RunConfig(BaseModel):
"""Configs for runtime behavior of agents."""
model_config = ConfigDict(
extra='forbid',
)
speech_config: Optional[types.SpeechConfig] = None
"""Speech configuration for the live agent."""
response_modalities: Optional[list[str]] = None
"""The output modalities. If not set, it's default to AUDIO."""
save_input_blobs_as_artifacts: bool = False
"""Whether or not to save the input blobs as artifacts."""
support_cfc: bool = False
"""
Whether to support CFC (Compositional Function Calling). Only applicable for
StreamingMode.SSE. If it's true. the LIVE API will be invoked. Since only LIVE
API supports CFC
.. warning::
This feature is **experimental** and its API or behavior may change
in future releases.
"""
streaming_mode: StreamingMode = StreamingMode.NONE
"""Streaming mode, None or StreamingMode.SSE or StreamingMode.BIDI."""
output_audio_transcription: Optional[types.AudioTranscriptionConfig] = None
"""Output transcription for live agents with audio response."""
max_llm_calls: int = 500
"""
A limit on the total number of llm calls for a given run.
Valid Values:
- More than 0 and less than sys.maxsize: The bound on the number of llm
calls is enforced, if the value is set in this range.
- Less than or equal to 0: This allows for unbounded number of llm calls.
"""
@field_validator('max_llm_calls', mode='after')
@classmethod
def validate_max_llm_calls(cls, value: int) -> int:
if value == sys.maxsize:
raise ValueError(f'max_llm_calls should be less than {sys.maxsize}.')
elif value <= 0:
logger.warning(
'max_llm_calls is less than or equal to 0. This will result in'
' no enforcement on total number of llm calls that will be made for a'
' run. This may not be ideal, as this could result in a never'
' ending communication between the model and the agent in certain'
' cases.',
)
return value
```
## /src/google/adk/agents/sequential_agent.py
```py path="/src/google/adk/agents/sequential_agent.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sequential agent implementation."""
from __future__ import annotations
from typing import AsyncGenerator
from typing_extensions import override
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from .base_agent import BaseAgent
class SequentialAgent(BaseAgent):
"""A shell agent that run its sub-agents in sequence."""
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
for sub_agent in self.sub_agents:
async for event in sub_agent.run_async(ctx):
yield event
@override
async def _run_live_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
for sub_agent in self.sub_agents:
async for event in sub_agent.run_live(ctx):
yield event
```
## /src/google/adk/agents/transcription_entry.py
```py path="/src/google/adk/agents/transcription_entry.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from google.genai import types
from pydantic import BaseModel
from pydantic import ConfigDict
class TranscriptionEntry(BaseModel):
"""Store the data that can be used for transcription."""
model_config = ConfigDict(
arbitrary_types_allowed=True,
extra='forbid',
)
role: str
"""The role that created this data, typically "user" or "model"""
data: Union[types.Blob, types.Content]
"""The data that can be used for transcription"""
```
## /src/google/adk/artifacts/__init__.py
```py path="/src/google/adk/artifacts/__init__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base_artifact_service import BaseArtifactService
from .gcs_artifact_service import GcsArtifactService
from .in_memory_artifact_service import InMemoryArtifactService
__all__ = [
'BaseArtifactService',
'GcsArtifactService',
'InMemoryArtifactService',
]
```
## /src/google/adk/artifacts/base_artifact_service.py
```py path="/src/google/adk/artifacts/base_artifact_service.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Abstract base class for artifact services."""
from abc import ABC
from abc import abstractmethod
from typing import Optional
from google.genai import types
class BaseArtifactService(ABC):
"""Abstract base class for artifact services."""
@abstractmethod
def save_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
artifact: types.Part,
) -> int:
"""Saves an artifact to the artifact service storage.
The artifact is a file identified by the app name, user ID, session ID, and
filename. After saving the artifact, a revision ID is returned to identify
the artifact version.
Args:
app_name: The app name.
user_id: The user ID.
session_id: The session ID.
filename: The filename of the artifact.
artifact: The artifact to save.
Returns:
The revision ID. The first version of the artifact has a revision ID of 0.
This is incremented by 1 after each successful save.
"""
@abstractmethod
def load_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
version: Optional[int] = None,
) -> Optional[types.Part]:
"""Gets an artifact from the artifact service storage.
The artifact is a file identified by the app name, user ID, session ID, and
filename.
Args:
app_name: The app name.
user_id: The user ID.
session_id: The session ID.
filename: The filename of the artifact.
version: The version of the artifact. If None, the latest version will be
returned.
Returns:
The artifact or None if not found.
"""
pass
@abstractmethod
def list_artifact_keys(
self, *, app_name: str, user_id: str, session_id: str
) -> list[str]:
"""Lists all the artifact filenames within a session.
Args:
app_name: The name of the application.
user_id: The ID of the user.
session_id: The ID of the session.
Returns:
A list of all artifact filenames within a session.
"""
pass
@abstractmethod
def delete_artifact(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> None:
"""Deletes an artifact.
Args:
app_name: The name of the application.
user_id: The ID of the user.
session_id: The ID of the session.
filename: The name of the artifact file.
"""
pass
@abstractmethod
def list_versions(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> list[int]:
"""Lists all versions of an artifact.
Args:
app_name: The name of the application.
user_id: The ID of the user.
session_id: The ID of the session.
filename: The name of the artifact file.
Returns:
A list of all available versions of the artifact.
"""
pass
```
## /src/google/adk/artifacts/gcs_artifact_service.py
```py path="/src/google/adk/artifacts/gcs_artifact_service.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An artifact service implementation using Google Cloud Storage (GCS)."""
import logging
from typing import Optional
from google.cloud import storage
from google.genai import types
from typing_extensions import override
from .base_artifact_service import BaseArtifactService
logger = logging.getLogger(__name__)
class GcsArtifactService(BaseArtifactService):
"""An artifact service implementation using Google Cloud Storage (GCS)."""
def __init__(self, bucket_name: str, **kwargs):
"""Initializes the GcsArtifactService.
Args:
bucket_name: The name of the bucket to use.
**kwargs: Keyword arguments to pass to the Google Cloud Storage client.
"""
self.bucket_name = bucket_name
self.storage_client = storage.Client(**kwargs)
self.bucket = self.storage_client.bucket(self.bucket_name)
def _file_has_user_namespace(self, filename: str) -> bool:
"""Checks if the filename has a user namespace.
Args:
filename: The filename to check.
Returns:
True if the filename has a user namespace (starts with "user:"),
False otherwise.
"""
return filename.startswith("user:")
def _get_blob_name(
self,
app_name: str,
user_id: str,
session_id: str,
filename: str,
version: int,
) -> str:
"""Constructs the blob name in GCS.
Args:
app_name: The name of the application.
user_id: The ID of the user.
session_id: The ID of the session.
filename: The name of the artifact file.
version: The version of the artifact.
Returns:
The constructed blob name in GCS.
"""
if self._file_has_user_namespace(filename):
return f"{app_name}/{user_id}/user/{filename}/{version}"
return f"{app_name}/{user_id}/{session_id}/{filename}/{version}"
@override
def save_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
artifact: types.Part,
) -> int:
versions = self.list_versions(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=filename,
)
version = 0 if not versions else max(versions) + 1
blob_name = self._get_blob_name(
app_name, user_id, session_id, filename, version
)
blob = self.bucket.blob(blob_name)
blob.upload_from_string(
data=artifact.inline_data.data,
content_type=artifact.inline_data.mime_type,
)
return version
@override
def load_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
version: Optional[int] = None,
) -> Optional[types.Part]:
if version is None:
versions = self.list_versions(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=filename,
)
if not versions:
return None
version = max(versions)
blob_name = self._get_blob_name(
app_name, user_id, session_id, filename, version
)
blob = self.bucket.blob(blob_name)
artifact_bytes = blob.download_as_bytes()
if not artifact_bytes:
return None
artifact = types.Part.from_bytes(
data=artifact_bytes, mime_type=blob.content_type
)
return artifact
@override
def list_artifact_keys(
self, *, app_name: str, user_id: str, session_id: str
) -> list[str]:
filenames = set()
session_prefix = f"{app_name}/{user_id}/{session_id}/"
session_blobs = self.storage_client.list_blobs(
self.bucket, prefix=session_prefix
)
for blob in session_blobs:
_, _, _, filename, _ = blob.name.split("/")
filenames.add(filename)
user_namespace_prefix = f"{app_name}/{user_id}/user/"
user_namespace_blobs = self.storage_client.list_blobs(
self.bucket, prefix=user_namespace_prefix
)
for blob in user_namespace_blobs:
_, _, _, filename, _ = blob.name.split("/")
filenames.add(filename)
return sorted(list(filenames))
@override
def delete_artifact(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> None:
versions = self.list_versions(
app_name=app_name,
user_id=user_id,
session_id=session_id,
filename=filename,
)
for version in versions:
blob_name = self._get_blob_name(
app_name, user_id, session_id, filename, version
)
blob = self.bucket.blob(blob_name)
blob.delete()
return
@override
def list_versions(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> list[int]:
prefix = self._get_blob_name(app_name, user_id, session_id, filename, "")
blobs = self.storage_client.list_blobs(self.bucket, prefix=prefix)
versions = []
for blob in blobs:
_, _, _, _, version = blob.name.split("/")
versions.append(int(version))
return versions
```
## /src/google/adk/artifacts/in_memory_artifact_service.py
```py path="/src/google/adk/artifacts/in_memory_artifact_service.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""An in-memory implementation of the artifact service."""
import logging
from typing import Optional
from google.genai import types
from pydantic import BaseModel
from pydantic import Field
from typing_extensions import override
from .base_artifact_service import BaseArtifactService
logger = logging.getLogger(__name__)
class InMemoryArtifactService(BaseArtifactService, BaseModel):
"""An in-memory implementation of the artifact service."""
artifacts: dict[str, list[types.Part]] = Field(default_factory=dict)
def _file_has_user_namespace(self, filename: str) -> bool:
"""Checks if the filename has a user namespace.
Args:
filename: The filename to check.
Returns:
True if the filename has a user namespace (starts with "user:"),
False otherwise.
"""
return filename.startswith("user:")
def _artifact_path(
self, app_name: str, user_id: str, session_id: str, filename: str
) -> str:
"""Constructs the artifact path.
Args:
app_name: The name of the application.
user_id: The ID of the user.
session_id: The ID of the session.
filename: The name of the artifact file.
Returns:
The constructed artifact path.
"""
if self._file_has_user_namespace(filename):
return f"{app_name}/{user_id}/user/{filename}"
return f"{app_name}/{user_id}/{session_id}/{filename}"
@override
def save_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
artifact: types.Part,
) -> int:
path = self._artifact_path(app_name, user_id, session_id, filename)
if path not in self.artifacts:
self.artifacts[path] = []
version = len(self.artifacts[path])
self.artifacts[path].append(artifact)
return version
@override
def load_artifact(
self,
*,
app_name: str,
user_id: str,
session_id: str,
filename: str,
version: Optional[int] = None,
) -> Optional[types.Part]:
path = self._artifact_path(app_name, user_id, session_id, filename)
versions = self.artifacts.get(path)
if not versions:
return None
if version is None:
version = -1
return versions[version]
@override
def list_artifact_keys(
self, *, app_name: str, user_id: str, session_id: str
) -> list[str]:
session_prefix = f"{app_name}/{user_id}/{session_id}/"
usernamespace_prefix = f"{app_name}/{user_id}/user/"
filenames = []
for path in self.artifacts:
if path.startswith(session_prefix):
filename = path.removeprefix(session_prefix)
filenames.append(filename)
elif path.startswith(usernamespace_prefix):
filename = path.removeprefix(usernamespace_prefix)
filenames.append(filename)
return sorted(filenames)
@override
def delete_artifact(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> None:
path = self._artifact_path(app_name, user_id, session_id, filename)
if not self.artifacts.get(path):
return None
self.artifacts.pop(path, None)
@override
def list_versions(
self, *, app_name: str, user_id: str, session_id: str, filename: str
) -> list[int]:
path = self._artifact_path(app_name, user_id, session_id, filename)
versions = self.artifacts.get(path)
if not versions:
return []
return list(range(len(versions)))
```
## /src/google/adk/auth/__init__.py
```py path="/src/google/adk/auth/__init__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .auth_credential import AuthCredential
from .auth_credential import AuthCredentialTypes
from .auth_credential import OAuth2Auth
from .auth_handler import AuthHandler
from .auth_schemes import AuthScheme
from .auth_schemes import AuthSchemeType
from .auth_schemes import OpenIdConnectWithConfig
from .auth_tool import AuthConfig
```
## /src/google/adk/auth/auth_credential.py
```py path="/src/google/adk/auth/auth_credential.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from pydantic import BaseModel
from pydantic import Field
class BaseModelWithConfig(BaseModel):
model_config = {"extra": "allow"}
class HttpCredentials(BaseModelWithConfig):
"""Represents the secret token value for HTTP authentication, like user name, password, oauth token, etc."""
username: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
@classmethod
def model_validate(cls, data: Dict[str, Any]) -> "HttpCredentials":
return cls(
username=data.get("username"),
password=data.get("password"),
token=data.get("token"),
)
class HttpAuth(BaseModelWithConfig):
"""The credentials and metadata for HTTP authentication."""
# The name of the HTTP Authorization scheme to be used in the Authorization
# header as defined in RFC7235. The values used SHOULD be registered in the
# IANA Authentication Scheme registry.
# Examples: 'basic', 'bearer'
scheme: str
credentials: HttpCredentials
class OAuth2Auth(BaseModelWithConfig):
"""Represents credential value and its metadata for a OAuth2 credential."""
client_id: Optional[str] = None
client_secret: Optional[str] = None
# tool or adk can generate the auth_uri with the state info thus client
# can verify the state
auth_uri: Optional[str] = None
state: Optional[str] = None
# tool or adk can decide the redirect_uri if they don't want client to decide
redirect_uri: Optional[str] = None
auth_response_uri: Optional[str] = None
auth_code: Optional[str] = None
access_token: Optional[str] = None
refresh_token: Optional[str] = None
class ServiceAccountCredential(BaseModelWithConfig):
"""Represents Google Service Account configuration.
Attributes:
type: The type should be "service_account".
project_id: The project ID.
private_key_id: The ID of the private key.
private_key: The private key.
client_email: The client email.
client_id: The client ID.
auth_uri: The authorization URI.
token_uri: The token URI.
auth_provider_x509_cert_url: URL for auth provider's X.509 cert.
client_x509_cert_url: URL for the client's X.509 cert.
universe_domain: The universe domain.
Example:
config = ServiceAccountCredential(
type_="service_account",
project_id="your_project_id",
private_key_id="your_private_key_id",
private_key="-----BEGIN PRIVATE KEY-----...",
client_email="...@....iam.gserviceaccount.com",
client_id="your_client_id",
auth_uri="https://accounts.google.com/o/oauth2/auth",
token_uri="https://oauth2.googleapis.com/token",
auth_provider_x509_cert_url="https://www.googleapis.com/oauth2/v1/certs",
client_x509_cert_url="https://www.googleapis.com/robot/v1/metadata/x509/...",
universe_domain="googleapis.com"
)
config = ServiceAccountConfig.model_construct(**{
...service account config dict
})
"""
type_: str = Field("", alias="type")
project_id: str
private_key_id: str
private_key: str
client_email: str
client_id: str
auth_uri: str
token_uri: str
auth_provider_x509_cert_url: str
client_x509_cert_url: str
universe_domain: str
class ServiceAccount(BaseModelWithConfig):
"""Represents Google Service Account configuration."""
service_account_credential: Optional[ServiceAccountCredential] = None
scopes: List[str]
use_default_credential: Optional[bool] = False
class AuthCredentialTypes(str, Enum):
"""Represents the type of authentication credential."""
# API Key credential:
# https://swagger.io/docs/specification/v3_0/authentication/api-keys/
API_KEY = "apiKey"
# Credentials for HTTP Auth schemes:
# https://www.iana.org/assignments/http-authschemes/http-authschemes.xhtml
HTTP = "http"
# OAuth2 credentials:
# https://swagger.io/docs/specification/v3_0/authentication/oauth2/
OAUTH2 = "oauth2"
# OpenID Connect credentials:
# https://swagger.io/docs/specification/v3_0/authentication/openid-connect-discovery/
OPEN_ID_CONNECT = "openIdConnect"
# Service Account credentials:
# https://cloud.google.com/iam/docs/service-account-creds
SERVICE_ACCOUNT = "serviceAccount"
class AuthCredential(BaseModelWithConfig):
"""Data class representing an authentication credential.
To exchange for the actual credential, please use
CredentialExchanger.exchange_credential().
Examples: API Key Auth
AuthCredential(
auth_type=AuthCredentialTypes.API_KEY,
api_key="1234",
)
Example: HTTP Auth
AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="basic",
credentials=HttpCredentials(username="user", password="password"),
),
)
Example: OAuth2 Bearer Token in HTTP Header
AuthCredential(
auth_type=AuthCredentialTypes.HTTP,
http=HttpAuth(
scheme="bearer",
credentials=HttpCredentials(token="eyAkaknabna...."),
),
)
Example: OAuth2 Auth with Authorization Code Flow
AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2,
oauth2=OAuth2Auth(
client_id="1234",
client_secret="secret",
),
)
Example: OpenID Connect Auth
AuthCredential(
auth_type=AuthCredentialTypes.OPEN_ID_CONNECT,
oauth2=OAuth2Auth(
client_id="1234",
client_secret="secret",
redirect_uri="https://example.com",
scopes=["scope1", "scope2"],
),
)
Example: Auth with resource reference
AuthCredential(
auth_type=AuthCredentialTypes.API_KEY,
resource_ref="projects/1234/locations/us-central1/resources/resource1",
)
"""
auth_type: AuthCredentialTypes
# Resource reference for the credential.
# This will be supported in the future.
resource_ref: Optional[str] = None
api_key: Optional[str] = None
http: Optional[HttpAuth] = None
service_account: Optional[ServiceAccount] = None
oauth2: Optional[OAuth2Auth] = None
```
## /src/google/adk/auth/auth_handler.py
```py path="/src/google/adk/auth/auth_handler.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi.openapi.models import OAuth2
from fastapi.openapi.models import SecurityBase
from .auth_credential import AuthCredential
from .auth_credential import AuthCredentialTypes
from .auth_credential import OAuth2Auth
from .auth_schemes import AuthSchemeType
from .auth_schemes import OAuthGrantType
from .auth_schemes import OpenIdConnectWithConfig
from .auth_tool import AuthConfig
if TYPE_CHECKING:
from ..sessions.state import State
try:
from authlib.integrations.requests_client import OAuth2Session
SUPPORT_TOKEN_EXCHANGE = True
except ImportError:
SUPPORT_TOKEN_EXCHANGE = False
class AuthHandler:
def __init__(self, auth_config: AuthConfig):
self.auth_config = auth_config
def exchange_auth_token(
self,
) -> AuthCredential:
"""Generates an auth token from the authorization response.
Returns:
An AuthCredential object containing the access token.
Raises:
ValueError: If the token endpoint is not configured in the auth
scheme.
AuthCredentialMissingError: If the access token cannot be retrieved
from the token endpoint.
"""
auth_scheme = self.auth_config.auth_scheme
auth_credential = self.auth_config.exchanged_auth_credential
if not SUPPORT_TOKEN_EXCHANGE:
return auth_credential
if isinstance(auth_scheme, OpenIdConnectWithConfig):
if not hasattr(auth_scheme, "token_endpoint"):
return self.auth_config.exchanged_auth_credential
token_endpoint = auth_scheme.token_endpoint
scopes = auth_scheme.scopes
elif isinstance(auth_scheme, OAuth2):
if (
not auth_scheme.flows.authorizationCode
or not auth_scheme.flows.authorizationCode.tokenUrl
):
return self.auth_config.exchanged_auth_credential
token_endpoint = auth_scheme.flows.authorizationCode.tokenUrl
scopes = list(auth_scheme.flows.authorizationCode.scopes.keys())
else:
return self.auth_config.exchanged_auth_credential
if (
not auth_credential
or not auth_credential.oauth2
or not auth_credential.oauth2.client_id
or not auth_credential.oauth2.client_secret
or auth_credential.oauth2.access_token
or auth_credential.oauth2.refresh_token
):
return self.auth_config.exchanged_auth_credential
client = OAuth2Session(
auth_credential.oauth2.client_id,
auth_credential.oauth2.client_secret,
scope=" ".join(scopes),
redirect_uri=auth_credential.oauth2.redirect_uri,
state=auth_credential.oauth2.state,
)
tokens = client.fetch_token(
token_endpoint,
authorization_response=auth_credential.oauth2.auth_response_uri,
code=auth_credential.oauth2.auth_code,
grant_type=OAuthGrantType.AUTHORIZATION_CODE,
)
updated_credential = AuthCredential(
auth_type=AuthCredentialTypes.OAUTH2,
oauth2=OAuth2Auth(
access_token=tokens.get("access_token"),
refresh_token=tokens.get("refresh_token"),
),
)
return updated_credential
def parse_and_store_auth_response(self, state: State) -> None:
credential_key = self.get_credential_key()
state[credential_key] = self.auth_config.exchanged_auth_credential
if not isinstance(
self.auth_config.auth_scheme, SecurityBase
) or self.auth_config.auth_scheme.type_ not in (
AuthSchemeType.oauth2,
AuthSchemeType.openIdConnect,
):
return
state[credential_key] = self.exchange_auth_token()
def _validate(self) -> None:
if not self.auth_scheme:
raise ValueError("auth_scheme is empty.")
def get_auth_response(self, state: State) -> AuthCredential:
credential_key = self.get_credential_key()
return state.get(credential_key, None)
def generate_auth_request(self) -> AuthConfig:
if not isinstance(
self.auth_config.auth_scheme, SecurityBase
) or self.auth_config.auth_scheme.type_ not in (
AuthSchemeType.oauth2,
AuthSchemeType.openIdConnect,
):
return self.auth_config.model_copy(deep=True)
# auth_uri already in exchanged credential
if (
self.auth_config.exchanged_auth_credential
and self.auth_config.exchanged_auth_credential.oauth2
and self.auth_config.exchanged_auth_credential.oauth2.auth_uri
):
return self.auth_config.model_copy(deep=True)
# Check if raw_auth_credential exists
if not self.auth_config.raw_auth_credential:
raise ValueError(
f"Auth Scheme {self.auth_config.auth_scheme.type_} requires"
" auth_credential."
)
# Check if oauth2 exists in raw_auth_credential
if not self.auth_config.raw_auth_credential.oauth2:
raise ValueError(
f"Auth Scheme {self.auth_config.auth_scheme.type_} requires oauth2 in"
" auth_credential."
)
# auth_uri in raw credential
if self.auth_config.raw_auth_credential.oauth2.auth_uri:
return AuthConfig(
auth_scheme=self.auth_config.auth_scheme,
raw_auth_credential=self.auth_config.raw_auth_credential,
exchanged_auth_credential=self.auth_config.raw_auth_credential.model_copy(
deep=True
),
)
# Check for client_id and client_secret
if (
not self.auth_config.raw_auth_credential.oauth2.client_id
or not self.auth_config.raw_auth_credential.oauth2.client_secret
):
raise ValueError(
f"Auth Scheme {self.auth_config.auth_scheme.type_} requires both"
" client_id and client_secret in auth_credential.oauth2."
)
# Generate new auth URI
exchanged_credential = self.generate_auth_uri()
return AuthConfig(
auth_scheme=self.auth_config.auth_scheme,
raw_auth_credential=self.auth_config.raw_auth_credential,
exchanged_auth_credential=exchanged_credential,
)
def get_credential_key(self) -> str:
"""Generates a unique key for the given auth scheme and credential."""
auth_scheme = self.auth_config.auth_scheme
auth_credential = self.auth_config.raw_auth_credential
if auth_scheme.model_extra:
auth_scheme = auth_scheme.model_copy(deep=True)
auth_scheme.model_extra.clear()
scheme_name = (
f"{auth_scheme.type_.name}_{hash(auth_scheme.model_dump_json())}"
if auth_scheme
else ""
)
if auth_credential.model_extra:
auth_credential = auth_credential.model_copy(deep=True)
auth_credential.model_extra.clear()
credential_name = (
f"{auth_credential.auth_type.value}_{hash(auth_credential.model_dump_json())}"
if auth_credential
else ""
)
return f"temp:adk_{scheme_name}_{credential_name}"
def generate_auth_uri(
self,
) -> AuthCredential:
"""Generates an response containing the auth uri for user to sign in.
Returns:
An AuthCredential object containing the auth URI and state.
Raises:
ValueError: If the authorization endpoint is not configured in the auth
scheme.
"""
auth_scheme = self.auth_config.auth_scheme
auth_credential = self.auth_config.raw_auth_credential
if isinstance(auth_scheme, OpenIdConnectWithConfig):
authorization_endpoint = auth_scheme.authorization_endpoint
scopes = auth_scheme.scopes
else:
authorization_endpoint = (
auth_scheme.flows.implicit
and auth_scheme.flows.implicit.authorizationUrl
or auth_scheme.flows.authorizationCode
and auth_scheme.flows.authorizationCode.authorizationUrl
or auth_scheme.flows.clientCredentials
and auth_scheme.flows.clientCredentials.tokenUrl
or auth_scheme.flows.password
and auth_scheme.flows.password.tokenUrl
)
scopes = (
auth_scheme.flows.implicit
and auth_scheme.flows.implicit.scopes
or auth_scheme.flows.authorizationCode
and auth_scheme.flows.authorizationCode.scopes
or auth_scheme.flows.clientCredentials
and auth_scheme.flows.clientCredentials.scopes
or auth_scheme.flows.password
and auth_scheme.flows.password.scopes
)
scopes = list(scopes.keys())
client = OAuth2Session(
auth_credential.oauth2.client_id,
auth_credential.oauth2.client_secret,
scope=" ".join(scopes),
redirect_uri=auth_credential.oauth2.redirect_uri,
)
uri, state = client.create_authorization_url(
url=authorization_endpoint, access_type="offline", prompt="consent"
)
exchanged_auth_credential = auth_credential.model_copy(deep=True)
exchanged_auth_credential.oauth2.auth_uri = uri
exchanged_auth_credential.oauth2.state = state
return exchanged_auth_credential
```
## /src/google/adk/auth/auth_preprocessor.py
```py path="/src/google/adk/auth/auth_preprocessor.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import AsyncGenerator
from typing import TYPE_CHECKING
from typing_extensions import override
from ..agents.invocation_context import InvocationContext
from ..events.event import Event
from ..flows.llm_flows import functions
from ..flows.llm_flows._base_llm_processor import BaseLlmRequestProcessor
from ..flows.llm_flows.functions import REQUEST_EUC_FUNCTION_CALL_NAME
from ..models.llm_request import LlmRequest
from .auth_handler import AuthHandler
from .auth_tool import AuthConfig
from .auth_tool import AuthToolArguments
if TYPE_CHECKING:
from ..agents.llm_agent import LlmAgent
class _AuthLlmRequestProcessor(BaseLlmRequestProcessor):
"""Handles auth information to build the LLM request."""
@override
async def run_async(
self, invocation_context: InvocationContext, llm_request: LlmRequest
) -> AsyncGenerator[Event, None]:
from ..agents.llm_agent import LlmAgent
agent = invocation_context.agent
if not isinstance(agent, LlmAgent):
return
events = invocation_context.session.events
if not events:
return
request_euc_function_call_ids = set()
for k in range(len(events) - 1, -1, -1):
event = events[k]
# look for first event authored by user
if not event.author or event.author != 'user':
continue
responses = event.get_function_responses()
if not responses:
return
for function_call_response in responses:
if function_call_response.name != REQUEST_EUC_FUNCTION_CALL_NAME:
continue
# found the function call response for the system long running request euc
# function call
request_euc_function_call_ids.add(function_call_response.id)
auth_config = AuthConfig.model_validate(function_call_response.response)
AuthHandler(auth_config=auth_config).parse_and_store_auth_response(
state=invocation_context.session.state
)
break
if not request_euc_function_call_ids:
return
for i in range(len(events) - 2, -1, -1):
event = events[i]
# looking for the system long running request euc function call
function_calls = event.get_function_calls()
if not function_calls:
continue
tools_to_resume = set()
for function_call in function_calls:
if function_call.id not in request_euc_function_call_ids:
continue
args = AuthToolArguments.model_validate(function_call.args)
tools_to_resume.add(args.function_call_id)
if not tools_to_resume:
continue
# found the the system long running request euc function call
# looking for original function call that requests euc
for j in range(i - 1, -1, -1):
event = events[j]
function_calls = event.get_function_calls()
if not function_calls:
continue
for function_call in function_calls:
function_response_event = None
if function_call.id in tools_to_resume:
function_response_event = await functions.handle_function_calls_async(
invocation_context,
event,
{tool.name: tool for tool in agent.canonical_tools},
# there could be parallel function calls that require auth
# auth response would be a dict keyed by function call id
tools_to_resume,
)
if function_response_event:
yield function_response_event
return
return
request_processor = _AuthLlmRequestProcessor()
```
## /src/google/adk/auth/auth_schemes.py
```py path="/src/google/adk/auth/auth_schemes.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import List
from typing import Optional
from typing import Union
from fastapi.openapi.models import OAuthFlows
from fastapi.openapi.models import SecurityBase
from fastapi.openapi.models import SecurityScheme
from fastapi.openapi.models import SecuritySchemeType
from pydantic import Field
class OpenIdConnectWithConfig(SecurityBase):
type_: SecuritySchemeType = Field(
default=SecuritySchemeType.openIdConnect, alias="type"
)
authorization_endpoint: str
token_endpoint: str
userinfo_endpoint: Optional[str] = None
revocation_endpoint: Optional[str] = None
token_endpoint_auth_methods_supported: Optional[List[str]] = None
grant_types_supported: Optional[List[str]] = None
scopes: Optional[List[str]] = None
# AuthSchemes contains SecuritySchemes from OpenAPI 3.0 and an extra flattened OpenIdConnectWithConfig.
AuthScheme = Union[SecurityScheme, OpenIdConnectWithConfig]
class OAuthGrantType(str, Enum):
"""Represents the OAuth2 flow (or grant type)."""
CLIENT_CREDENTIALS = "client_credentials"
AUTHORIZATION_CODE = "authorization_code"
IMPLICIT = "implicit"
PASSWORD = "password"
@staticmethod
def from_flow(flow: OAuthFlows) -> "OAuthGrantType":
"""Converts an OAuthFlows object to a OAuthGrantType."""
if flow.clientCredentials:
return OAuthGrantType.CLIENT_CREDENTIALS
if flow.authorizationCode:
return OAuthGrantType.AUTHORIZATION_CODE
if flow.implicit:
return OAuthGrantType.IMPLICIT
if flow.password:
return OAuthGrantType.PASSWORD
return None
# AuthSchemeType re-exports SecuritySchemeType from OpenAPI 3.0.
AuthSchemeType = SecuritySchemeType
```
## /src/google/adk/auth/auth_tool.py
```py path="/src/google/adk/auth/auth_tool.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pydantic import BaseModel
from .auth_credential import AuthCredential
from .auth_schemes import AuthScheme
class AuthConfig(BaseModel):
"""The auth config sent by tool asking client to collect auth credentials and
adk and client will help to fill in the response
"""
auth_scheme: AuthScheme
"""The auth scheme used to collect credentials"""
raw_auth_credential: AuthCredential = None
"""The raw auth credential used to collect credentials. The raw auth
credentials are used in some auth scheme that needs to exchange auth
credentials. e.g. OAuth2 and OIDC. For other auth scheme, it could be None.
"""
exchanged_auth_credential: AuthCredential = None
"""The exchanged auth credential used to collect credentials. adk and client
will work together to fill it. For those auth scheme that doesn't need to
exchange auth credentials, e.g. API key, service account etc. It's filled by
client directly. For those auth scheme that need to exchange auth credentials,
e.g. OAuth2 and OIDC, it's first filled by adk. If the raw credentials
passed by tool only has client id and client credential, adk will help to
generate the corresponding authorization uri and state and store the processed
credential in this field. If the raw credentials passed by tool already has
authorization uri, state, etc. then it's copied to this field. Client will use
this field to guide the user through the OAuth2 flow and fill auth response in
this field"""
class AuthToolArguments(BaseModel):
"""the arguments for the special long running function tool that is used to
request end user credentials.
"""
function_call_id: str
auth_config: AuthConfig
```
## /src/google/adk/cli/__init__.py
```py path="/src/google/adk/cli/__init__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .cli_tools_click import main
```
## /src/google/adk/cli/__main__.py
```py path="/src/google/adk/cli/__main__.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .cli_tools_click import main
if __name__ == '__main__':
main()
```
## /src/google/adk/cli/agent_graph.py
```py path="/src/google/adk/cli/agent_graph.py"
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from typing import Union
import graphviz
from ..agents import BaseAgent
from ..agents.llm_agent import LlmAgent
from ..tools.agent_tool import AgentTool
from ..tools.base_tool import BaseTool
from ..tools.function_tool import FunctionTool
logger = logging.getLogger(__name__)
try:
from ..tools.retrieval.base_retrieval_tool import BaseRetrievalTool
except ModuleNotFoundError:
retrieval_tool_module_loaded = False
else:
retrieval_tool_module_loaded = True
def build_graph(graph, agent: BaseAgent, highlight_pairs):
dark_green = '#0F5223'
light_green = '#69CB87'
light_gray = '#cccccc'
def get_node_name(tool_or_agent: Union[BaseAgent, BaseTool]):
if isinstance(tool_or_agent, BaseAgent):
return tool_or_agent.name
elif isinstance(tool_or_agent, BaseTool):
return tool_or_agent.name
else:
raise ValueError(f'Unsupported tool type: {tool_or_agent}')
def get_node_caption(tool_or_agent: Union[BaseAgent, BaseTool]):
if isinstance(tool_or_agent, BaseAgent):
return '🤖 ' + tool_or_agent.name
elif retrieval_tool_module_loaded and isinstance(
tool_or_agent, BaseRetrievalTool
):
return '🔎 ' + tool_or_agent.name
elif isinstance(tool_or_agent, FunctionTool):
return '🔧 ' + tool_or_agent.name
elif isinstance(tool_or_agent, AgentTool):
return '🤖 ' + tool_or_agent.name
elif isinstance(tool_or_agent, BaseTool):
return '🔧 ' + tool_or_agent.name
else:
logger.warning(
'Unsupported tool, type: %s, obj: %s',
type(tool_or_agent),
tool_or_agent,
)
return f'❓ Unsupported tool type: {type(tool_or_agent)}'
def get_node_shape(tool_or_agent: Union[BaseAgent, BaseTool]):
if isinstance(tool_or_agent, BaseAgent):
return 'ellipse'
elif retrieval_tool_module_loaded and isinstance(
tool_or_agent, BaseRetrievalTool
):
return 'cylinder'
elif isinstance(tool_or_agent, FunctionTool):
return 'box'
elif isinstance(tool_or_agent, BaseTool):
return 'box'
else:
logger.warning(
'Unsupported tool, type: %s, obj: %s',
type(tool_or_agent),
tool_or_agent,
)
return 'cylinder'
def draw_node(tool_or_agent: Union[BaseAgent, BaseTool]):
name = get_node_name(tool_or_agent)
shape = get_node_shape(tool_or_agent)
caption = get_node_caption(tool_or_agent)
if highlight_pairs:
for highlight_tuple in highlight_pairs:
if name in highlight_tuple:
graph.node(
name,
caption,
style='filled,rounded',
fillcolor=dark_green,
color=dark_green,
shape=shape,
fontcolor=light_gray,
)
return
# if not in highlight, draw non-highliht node
graph.node(
name,
caption,
shape=shape,
style='rounded',
color=light_gray,
fontcolor=light_gray,
)
def draw_edge(from_name, to_name):
if highlight_pairs:
for highlight_from, highlight_to in highlight_pairs:
if from_name == highlight_from and to_name == highlight_to:
graph.edge(from_name, to_name, color=light_green)
return
elif from_name == highlight_to and to_name == highlight_from:
graph.edge(from_name, to_name, color=light_green, dir='back')
return
# if no need to highlight, color gray
graph.edge(from_name, to_name, arrowhead='none', color=light_gray)
draw_node(agent)
for sub_agent in agent.sub_agents:
build_graph(graph, sub_agent, highlight_pairs)
draw_edge(agent.name, sub_agent.name)
if isinstance(agent, LlmAgent):
for tool in agent.canonical_tools:
draw_node(tool)
draw_edge(agent.name, get_node_name(tool))
def get_agent_graph(root_agent, highlights_pairs, image=False):
print('build graph')
graph = graphviz.Digraph(graph_attr={'rankdir': 'LR', 'bgcolor': '#333537'})
build_graph(graph, root_agent, highlights_pairs)
if image:
return graph.pipe(format='png')
else:
return graph
```
## /src/google/adk/cli/browser/adk_favicon.svg
```svg path="/src/google/adk/cli/browser/adk_favicon.svg"
```
## /src/google/adk/cli/browser/assets/audio-processor.js
```js path="/src/google/adk/cli/browser/assets/audio-processor.js"
/**
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
class AudioProcessor extends AudioWorkletProcessor {
constructor() {
super();
this.targetSampleRate = 22000; // Change to your desired rate
this.originalSampleRate = sampleRate; // Browser's sample rate
this.resampleRatio = this.originalSampleRate / this.targetSampleRate;
}
process(inputs, outputs, parameters) {
const input = inputs[0];
if (input.length > 0) {
let audioData = input[0]; // Get first channel's data
if (this.resampleRatio !== 1) {
audioData = this.resample(audioData);
}
this.port.postMessage(audioData);
}
return true; // Keep processor alive
}
resample(audioData) {
const newLength = Math.round(audioData.length / this.resampleRatio);
const resampled = new Float32Array(newLength);
for (let i = 0; i < newLength; i++) {
const srcIndex = Math.floor(i * this.resampleRatio);
resampled[i] = audioData[srcIndex]; // Nearest neighbor resampling
}
return resampled;
}
}
registerProcessor('audio-processor', AudioProcessor);
```
## /src/google/adk/cli/browser/assets/config/runtime-config.json
```json path="/src/google/adk/cli/browser/assets/config/runtime-config.json"
{
"backendUrl": ""
}
```
## /src/google/adk/cli/browser/index.html
```html path="/src/google/adk/cli/browser/index.html"
Agent Development Kit Dev UI
```
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.