```
├── .github/
├── ISSUE_TEMPLATE/
├── bug_report.md (100 tokens)
├── feature_request.md (100 tokens)
├── .gitignore (900 tokens)
├── .pre-commit-config.yaml (100 tokens)
├── CODE_OF_CONDUCT.md (1000 tokens)
├── CONTRIBUTING.md (1500 tokens)
├── LICENSE (omitted)
├── Makefile (100 tokens)
├── README.md (2.5k tokens)
├── datapizza-ai-cache/
├── redis/
├── README.md
├── datapizza/
├── cache/
├── redis/
├── __init__.py
├── cache.py (200 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_redis_cache.py
├── datapizza-ai-clients/
├── datapizza-ai-clients-anthropic/
├── README.md
├── datapizza/
├── clients/
├── anthropic/
├── __init__.py
├── anthropic_client.py (2.8k tokens)
├── memory_adapter.py (1000 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_anthropic_memory_adapter.py (200 tokens)
├── datapizza-ai-clients-azure-openai/
├── datapizza/
├── clients/
├── azure_openai/
├── __init__.py
├── azure_openai_client.py (300 tokens)
├── pyproject.toml (300 tokens)
├── datapizza-ai-clients-bedrock/
├── README.md (1800 tokens)
├── datapizza/
├── clients/
├── bedrock/
├── __init__.py
├── bedrock_client.py (3.8k tokens)
├── memory_adapter.py (1100 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_bedrock_async.py (300 tokens)
├── test_bedrock_memory_adapter.py (400 tokens)
├── datapizza-ai-clients-google/
├── README.md
├── datapizza/
├── clients/
├── google/
├── __init__.py
├── google_client.py (4.6k tokens)
├── memory_adapter.py (1000 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_memory_adapter.py (400 tokens)
├── datapizza-ai-clients-mistral/
├── README.md
├── datapizza/
├── clients/
├── mistral/
├── __init__.py
├── memory_adapter.py (700 tokens)
├── mistral_client.py (3.4k tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_mistral_client.py
├── datapizza-ai-clients-openai-like/
├── README.md (1100 tokens)
├── datapizza/
├── clients/
├── openai_like/
├── __init__.py
├── memory_adapter.py (1200 tokens)
├── openai_completion_client.py (3.1k tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_openai_completion.py (100 tokens)
├── datapizza-ai-clients-openai/
├── README.md
├── datapizza/
├── clients/
├── openai/
├── __init__.py
├── memory_adapter.py (1300 tokens)
├── openai_client.py (3.6k tokens)
├── pyproject.toml (200 tokens)
├── tests/
├── __init__.py
├── test_base_client.py (600 tokens)
├── test_memory_adapter.py (1200 tokens)
├── datapizza-ai-core/
├── README.md
├── datapizza/
├── agents/
├── __init__.py
├── __version__.py
├── agent.py (4.7k tokens)
├── client_manager.py (200 tokens)
├── logger.py (500 tokens)
├── tests/
├── test_base_agents.py (1500 tokens)
├── cache/
├── __init__.py
├── clients/
├── __init__.py
├── factory.py (900 tokens)
├── mock_client.py (1500 tokens)
├── tests/
├── test_client_factory.py (100 tokens)
├── core/
├── __init__.py
├── __version__.py
├── cache/
├── __init__.py
├── cache.py (700 tokens)
├── clients/
├── __init__.py
├── client.py (5.8k tokens)
├── models.py (1000 tokens)
├── tests/
├── test_mock_client.py (700 tokens)
├── test_token_usage.py (200 tokens)
├── constants.py (100 tokens)
├── embedder/
├── __init__.py
├── base.py (200 tokens)
├── executors/
├── async_executor.py (500 tokens)
├── models.py (700 tokens)
├── modules/
├── captioner.py (200 tokens)
├── metatagger.py (100 tokens)
├── parser.py (200 tokens)
├── prompt.py (100 tokens)
├── reranker.py (100 tokens)
├── rewriter.py (200 tokens)
├── splitter.py (100 tokens)
├── utils.py (1100 tokens)
├── vectorstore/
├── __init__.py
├── tests/
├── test_vectorstore_models.py (100 tokens)
├── vectorstore.py (600 tokens)
├── embedders/
├── __init__.py
├── embedders.py (900 tokens)
├── memory/
├── __init__.py
├── __version__.py
├── memory.py (1200 tokens)
├── memory_adapter.py (400 tokens)
├── tests/
├── __init__.py
├── test_memory.py (1600 tokens)
├── modules/
├── captioners/
├── __init__.py
├── llm_captioner.py (1500 tokens)
├── metatagger/
├── __init__.py
├── keyword_metatagger.py (600 tokens)
├── tests/
├── test_keyword_metagger.py (700 tokens)
├── parsers/
├── __init__.py
├── tests/
├── test_base_parser.py (200 tokens)
├── text_parser.py (600 tokens)
├── prompt/
├── __init__.py
├── image_rag.py (700 tokens)
├── prompt.py (600 tokens)
├── tests/
├── test_chat_prompt_template.py (500 tokens)
├── rewriters/
├── __init__.py
├── tests/
├── test_tool_rewriter.py (100 tokens)
├── tool_rewriter.py (700 tokens)
├── splitters/
├── __init__.py (100 tokens)
├── bbox_merger.py (700 tokens)
├── node_splitter.py (300 tokens)
├── pdf_image_splitter.py (1000 tokens)
├── recursive_splitter.py (700 tokens)
├── tests/
├── test_node_splitter.py (300 tokens)
├── test_recursive_splitter.py (100 tokens)
├── test_text_splitter.py (100 tokens)
├── text_splitter.py (400 tokens)
├── treebuilder/
├── __init__.py
├── llm_treebuilder.py (2k tokens)
├── test/
├── test_llm_treebuilder.py (4.5k tokens)
├── pipeline/
├── __init__.py
├── dag_pipeline.py (2.2k tokens)
├── functional_pipeline.py (5.6k tokens)
├── pipeline.py (1900 tokens)
├── tests/
├── config.yaml (200 tokens)
├── dag_config.yaml (200 tokens)
├── functional_pipeline_config.yaml (200 tokens)
├── test_functional_pipeline.py (1000 tokens)
├── test_graph_pipeline.py (800 tokens)
├── test_pipeline.py (1200 tokens)
├── tools/
├── __init__.py
├── google.py (100 tokens)
├── mcp_client.py (1500 tokens)
├── tests/
├── __init__.py
├── test_tools.py (500 tokens)
├── tools.py (900 tokens)
├── utils.py (700 tokens)
├── tracing/
├── __init__.py
├── memory_exporter.py (600 tokens)
├── tests/
├── test_tracing.py (3.2k tokens)
├── tracing.py (1000 tokens)
├── type/
├── __init__.py (100 tokens)
├── tests/
├── test_type.py (500 tokens)
├── type.py (2.7k tokens)
├── pyproject.toml (300 tokens)
├── datapizza-ai-embedders/
├── cohere/
├── README.md
├── datapizza/
├── embedders/
├── cohere/
├── __init__.py
├── cohere.py (400 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_base.py
├── fastembedder/
├── README.md
├── datapizza/
├── embedders/
├── fastembedder/
├── __init__.py
├── fastembedder.py (400 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_fastembedder.py
├── google/
├── README.md (100 tokens)
├── datapizza/
├── embedders/
├── google/
├── __init__.py
├── google.py (400 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_google_embedder.py (100 tokens)
├── image_embedder.py (3.7k tokens)
├── openai/
├── README.md (100 tokens)
├── datapizza/
├── embedders/
├── openai/
├── __init__.py
├── openai.py (400 tokens)
├── pyproject.toml (200 tokens)
├── tests/
├── test_openai_embedder.py (100 tokens)
├── datapizza-ai-eval/
├── metrics.py (5.6k tokens)
├── datapizza-ai-modules/
├── parsers/
├── azure/
├── README.md
├── datapizza/
├── modules/
├── parsers/
├── azure/
├── __init__.py
├── azure_parser.py (3.2k tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── attention_wikipedia_test.json (413.5k tokens)
├── attention_wikipedia_test.pdf
├── test_azure_parser.py (900 tokens)
├── docling/
├── README.md
├── datapizza/
├── modules/
├── parsers/
├── docling/
├── __init__.py
├── docling_parser.py (7.4k tokens)
├── ocr_options.py (700 tokens)
├── tests/
├── conftest.py (200 tokens)
├── test_docling_parser.py (1500 tokens)
├── utils.py (600 tokens)
├── mypy.ini
├── pyproject.toml (300 tokens)
├── rerankers/
├── cohere/
├── README.md
├── datapizza/
├── modules/
├── rerankers/
├── cohere/
├── __init__.py
├── cohere_reranker.py (900 tokens)
├── pyproject.toml (300 tokens)
├── together/
├── README.md
├── datapizza/
├── modules/
├── rerankers/
├── together/
├── __init__.py
├── together_reranker.py (500 tokens)
├── pyproject.toml (300 tokens)
├── datapizza-ai-tools/
├── SQLDatabase/
├── README.md (1400 tokens)
├── datapizza/
├── tools/
├── SQLDatabase/
├── __init__.py
├── base.py (500 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_sql_database_tool.py (500 tokens)
├── duckduckgo/
├── README.md
├── datapizza/
├── tools/
├── duckduckgo/
├── __init__.py
├── base.py (200 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_ddgs_tools.py
├── filesystem/
├── README.md (1700 tokens)
├── datapizza/
├── tools/
├── filesystem/
├── __init__.py
├── filesystem.py (2.1k tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_file_path_matches_pattern.py (500 tokens)
├── test_filesystem.py (1700 tokens)
├── web_fetch/
├── README.md (600 tokens)
├── datapizza/
├── tools/
├── web_fetch/
├── __init__.py
├── base.py (500 tokens)
├── pyproject.toml (300 tokens)
├── tests/
├── test_web_fetch.py (500 tokens)
├── datapizza-ai-vectorstores/
├── datapizza-ai-vectorstores-milvus/
├── README.md
├── datapizza/
├── vectorstores/
├── milvus/
├── __init__.py
├── milvus_vectorstore.py (4.1k tokens)
├── tests/
├── test_milvus_vectorstore.py (900 tokens)
├── pyproject.toml (300 tokens)
├── datapizza-ai-vectorstores-qdrant/
├── README.md
├── datapizza/
├── vectorstores/
├── qdrant/
├── __init__.py
├── qdrant_vectorstore.py (2.9k tokens)
├── tests/
├── test_qdrant_vectorstore.py (900 tokens)
├── pyproject.toml (300 tokens)
├── docs/
├── .pages
├── API Reference/
├── .pages
├── Agents/
├── agent.md
├── Clients/
├── .pages
├── Avaiable_Clients/
├── .pages
├── anthropic.md (200 tokens)
├── google.md (100 tokens)
├── mistral.md (100 tokens)
├── openai-like.md (200 tokens)
├── openai.md (100 tokens)
├── cache.md
├── client_factory.md (300 tokens)
├── clients.md
├── models.md
├── Embedders/
├── chunk_embedder.md (400 tokens)
├── cohere_embedder.md (600 tokens)
├── fast_embedder.md (200 tokens)
├── google_embedder.md (300 tokens)
├── ollama_embedder.md (100 tokens)
├── openai_embedder.md (300 tokens)
├── Modules/
├── .pages
├── Parsers/
├── .pages
├── azure_parser.md (400 tokens)
├── docling_parser.md (1100 tokens)
├── index.md (300 tokens)
├── text_parser.md (600 tokens)
├── Prompt/
├── .pages
├── ChatPromptTemplate.md (300 tokens)
├── ImageRAGPrompt.md (200 tokens)
├── Rerankers/
├── .pages
├── cohere_reranker.md (400 tokens)
├── index.md (500 tokens)
├── together_reranker.md (500 tokens)
├── Splitters/
├── .pages
├── index.md (400 tokens)
├── node_splitter.md (300 tokens)
├── pdf_image_splitter.md (300 tokens)
├── recursive_splitter.md (200 tokens)
├── text_splitter.md (200 tokens)
├── captioners.md (400 tokens)
├── index.md (200 tokens)
├── metatagger.md (500 tokens)
├── rewriters.md (300 tokens)
├── treebuilder.md (700 tokens)
├── Pipelines/
├── dag.md
├── functional.md
├── ingestion.md
├── Tools/
├── .pages
├── SQLDatabase.md (300 tokens)
├── duckduckgo.md (300 tokens)
├── filesystem.md (700 tokens)
├── mcp.md
├── web_fetch.md (300 tokens)
├── Type/
├── block.md (100 tokens)
├── chunk.md (100 tokens)
├── media.md
├── node.md
├── tool.md
├── Vectorstore/
├── milvus_vectorstore.md (1300 tokens)
├── qdrant_vectorstore.md (500 tokens)
├── index.md
├── memory.md
├── Guides/
├── .pages
├── Agents/
├── agent.md (1600 tokens)
├── mcp.md (400 tokens)
├── Clients/
├── .pages
├── chatbot.md (300 tokens)
├── local_model.md (300 tokens)
├── multimodality.md (700 tokens)
├── quick_start.md (700 tokens)
├── streaming.md (200 tokens)
├── structured_responses.md (500 tokens)
├── tools.md (500 tokens)
├── Monitoring/
├── .pages
├── log.md (100 tokens)
├── tracing.md (1000 tokens)
├── Pipeline/
├── .pages
├── functional_pipeline.md (1400 tokens)
├── ingestion_pipeline.md (1200 tokens)
├── retrieval_pipeline.md (1700 tokens)
├── RAG/
├── .pages
├── rag.md (1200 tokens)
├── assets/
├── logo.png
├── logo_bg_dark.png
├── pool.png
├── workflow.png
├── index.md (300 tokens)
├── pyproject.toml (700 tokens)
├── pyrightconfig.json (100 tokens)
```
## /.github/ISSUE_TEMPLATE/bug_report.md
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**Environment**
OS, Python version, datapizza-ai version
**To Reproduce**
Steps to reproduce the behavior
**Expected behavior**
A clear and concise description of what you expected to happen.
**Logs**
If applicable, attach logs to help explain your problem.
**Additional context**
Add any other context about the problem here.
## /.github/ISSUE_TEMPLATE/feature_request.md
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.
## /.gitignore
```gitignore path="/.gitignore"
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Milvus
milvus.db
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Cursor
# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
# refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
```
## /.pre-commit-config.yaml
```yaml path="/.pre-commit-config.yaml"
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.1
hooks:
- id: ruff-check
args: [ --fix ]
- id: ruff-format
```
## /CODE_OF_CONDUCT.md
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
overall community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or
advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
ai.support@datapizza.tech.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series
of actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within
the community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.
Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.
## /CONTRIBUTING.md
# Contributing to datapizza-ai
First off, thanks for taking the time to contribute! ❤️
All types of contributions are encouraged and valued. See the [Table of Contents](#table-of-contents) for different ways to help and details about how this project handles them. Please make sure to read the relevant section before making your contribution. It will make it a lot easier for us maintainers and smooth out the experience for all involved. The community looks forward to your contributions. 🎉
> And if you like the project, but just don't have time to contribute, that's fine. There are other easy ways to support the project and show your appreciation, which we would also be very happy about:
> - Star the project
> - Tweet about it
> - Refer this project in your project's readme
> - Mention the project at local meetups and tell your friends/colleagues
## Table of Contents
- [Code of Conduct](#code-of-conduct)
- [I Have a Question](#i-have-a-question)
- [I Want To Contribute](#i-want-to-contribute)
- [Reporting Bugs](#reporting-bugs)
- [Suggesting Enhancements](#suggesting-enhancements)
## Code of Conduct
This project and everyone participating in it is governed by the
[Datapizza Code of Conduct](https://github.com/datapizza-labs/datapizza-ai/blob/main/CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior
to <ai.support@datapizza.tech>.
## I Have a Question
> If you want to ask a question, we assume that you have read the available [Documentation](https://docs.datapizza.ai).
Before you ask a question, it is best to search for existing [Issues](https://github.com/datapizza-labs/datapizza-ai/issues) that might help you. In case you have found a suitable issue and still need clarification, you can write your question in this issue. It is also advisable to search the internet for answers first.
If you then still feel the need to ask a question and need clarification, we recommend the following:
- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new).
- Provide as much context as you can about what you're running into.
- Provide project and platform versions (docker, datapizza-ai, etc), depending on what seems relevant.
We will then take care of the issue as soon as possible.
## I Want To Contribute
> ### Legal Notice <!-- omit in toc -->
> When contributing to this project, you must agree that you have authored 100% of the content, that you have the necessary rights to the content and that the content you contribute may be provided under the project license.
### Reporting Bugs
#### Before Submitting a Bug Report
A good bug report shouldn't leave others needing to chase you up for more information. Therefore, we ask you to investigate carefully, collect information and describe the issue in detail in your report. Please complete the following steps in advance to help us fix any potential bug as fast as possible.
- Make sure that you are using the latest version.
- Determine if your bug is really a bug and not an error on your side e.g. using incompatible environment components/versions (Make sure that you have read the [documentation](https://docs.datapizza.ai). If you are looking for support, you might want to check [this section](#i-have-a-question)).
- To see if other users have experienced (and potentially already solved) the same issue you are having, check if there is not already a bug report existing for your bug or error in the [bug tracker](https://github.com/datapizza-labs/datapizza-ai/issues?q=is%3Aissue%20state%3Aopen%20label%3Abug).
- Also make sure to search the internet (including Stack Overflow) to see if users outside of the GitHub community have discussed the issue.
- Collect information about the bug:
- Stack trace (Traceback)
- OS, Platform and Version (Windows, Linux, macOS, x86, ARM)
- Version of the interpreter, compiler, SDK, runtime environment, package manager, depending on what seems relevant.
- Possibly your input and the output
- Can you reliably reproduce the issue? And can you also reproduce it with older versions?
#### How Do I Submit a Good Bug Report?
> You must never report security related issues, vulnerabilities or bugs including sensitive information to the issue tracker, or elsewhere in public. Instead sensitive bugs must be sent by email to <ai.support@datapizza.tech>.
We use GitHub issues to track bugs and errors. If you run into an issue with the project:
- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new?template=bug_report.md). (Since we can't be sure at this point whether it is a bug or not, we ask you not to talk about a bug yet and not to label the issue.)
- Explain the behavior you would expect and the actual behavior.
- Please provide as much context as possible and describe the *reproduction steps* that someone else can follow to recreate the issue on their own. This usually includes your code. For good bug reports you should isolate the problem and create a reduced test case.
- Provide the information you collected in the previous section.
Once it's filed:
- The project team will label the issue accordingly.
- A team member will try to reproduce the issue with your provided steps. If there are no reproduction steps or no obvious way to reproduce the issue, the team will ask you for those steps and mark the issue as `needs-repro`. Bugs with the `needs-repro` tag will not be addressed until they are reproduced.
<!-- You might want to create an issue template for bugs and errors that can be used as a guide and that defines the structure of the information to be included. If you do so, reference it here in the description. -->
### Suggesting Enhancements
This section guides you through submitting an enhancement suggestion for datapizza-ai, **including completely new features and minor improvements to existing functionality**. Following these guidelines will help maintainers and the community to understand your suggestion and find related suggestions.
<!-- omit in toc -->
#### Before Submitting an Enhancement
- Make sure that you are using the latest version.
- Read the [documentation](https://docs.datapizza.ai) carefully and find out if the functionality is already covered, maybe by an individual configuration.
- Perform a [search](https://github.com/datapizza-labs/datapizza-ai/issues) to see if the enhancement has already been suggested. If it has, add a comment to the existing issue instead of opening a new one.
- Find out whether your idea fits with the scope and aims of the project. It's up to you to make a strong case to convince the project's developers of the merits of this feature. Keep in mind that we want features that will be useful to the majority of our users and not just a small subset. If you're just targeting a minority of users, consider writing an add-on/plugin library.
#### How Do I Submit a Good Enhancement Suggestion?
Enhancement suggestions are tracked as [GitHub issues](https://github.com/datapizza-labs/datapizza-ai/issues).
- Use a **clear and descriptive title** for the issue to identify the suggestion.
- Provide a **step-by-step description of the suggested enhancement** in as many details as possible.
- **Describe the current behavior** and **explain which behavior you expected to see instead** and why. At this point you can also tell which alternatives do not work for you.
- **Explain why this enhancement would be useful** to most datapizza-ai users. You may also want to point out the other projects that solved it better and which could serve as inspiration.
## /Makefile
``` path="/Makefile"
test:
uv run pytest --tb=short -v
watch-tests:
find . -name "*.py" -not -path "*/site-packages/*" | entr uv run pytest --tb=short -v
format:
uvx ruff format .
linter-check:
uvx ruff check .
linter-fix:
uvx ruff check --fix
linter-force-fix:
uvx ruff check --fix --unsafe-fixes
dependency-check:
uv run deptry .
run_docs:
uv pip install mkdocs-material pymdown-extensions mkdocs-awesome-pages-plugin mkdocstrings-python
uv run mkdocs serve --livereload
```
## /README.md
<div align="center">
<img src="docs/assets/logo_bg_dark.png" alt="Datapizza AI Logo" width="200" height="200">
**Build reliable Gen AI solutions without overhead**
*Written in Python. Designed for speed. A no-fluff GenAI framework that gets your agents from dev to prod, fast*
[](https://opensource.org/licenses/MIT)
[](https://pypi.org/project/datapizza-ai/)
[](https://www.python.org/downloads/)
[](https://pypi.org/project/datapizza-ai/)
[](https://github.com/datapizza-labs/datapizza-ai)
[🏠Homepage](https://datapizza.tech/en/ai-framework/) • [🚀 Quick Start](#-quick-start) • [📖 Documentation](https://docs.datapizza.ai) • [🎯 Examples](#-examples) • [🤝 Community](#-community)
</div>
---
## 🌟 Why Datapizza AI?
A framework that keeps your agents predictable, your debugging fast, and your code trusted in production. Built by Engineers, trusted by Engineers.
<div align="center">
### ⚡ **Less abstraction, more control** | 🚀 **API-first design** | 🔧 **Observable by design**
</div>
## How to install
```sh
pip install datapizza-ai
```
## Client invoke
```python
from datapizza.clients.openai import OpenAIClient
client = OpenAIClient(api_key="YOUR_API_KEY")
result = client.invoke("Hi, how are u?")
print(result.text)
```
## ✨ Key Features
<table>
<tr>
<td width="50%" valign="top">
### 🎯 **API-first**
- **Multi-Provider Support**: OpenAI, Google Gemini, Anthropic, Mistral, Azure
- **Tool Integration**: Built-in web search, document processing, custom tools
- **Memory Management**: Persistent conversations and context awareness
</td>
<td width="50%" valign="top">
### 🔍 **Composable**
- **Reusable blocks**: Declarative configuration, easy overrides
- **Document Processing**: PDF, DOCX, images with Azure AI & Docling
- **Smart Chunking**: Context-aware text splitting and embedding
- **Built-in reranking**: Add a reranker (e.g., Cohere) to boost relevance
</td>
</tr>
<tr>
<td width="50%" valign="top">
### 🔧 **Observable**
- **OpenTelemetry tracing**: Standards-based instrumentation
- **Client I/O tracing**: Optional toggle to log inputs, outputs, and in-memory context
- **Custom spans**: Trace fine-grained phases and sub-steps to pinpoint bottlenecks
</td>
<td width="50%" valign="top">
### 🚀 **Vendor-Agnostic**
- **Swap models**: Change providers without rewiring business logic
- **Clear Interfaces**: Predictable APIs across all components
- **Rich Ecosystem**: Modular design with optional components
- **Migration-friendly**: Quick migration from other frameworks
</td>
</tr>
</table>
## 🚀 Quick Start
### Installation
```bash
# Core framework
pip install datapizza-ai
# With specific providers (optional)
pip install datapizza-ai-clients-openai
pip install datapizza-ai-clients-google
pip install datapizza-ai-clients-anthropic
```
### Start with Agent
```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool
@tool
def get_weather(city: str) -> str:
return f"The weather in {city} is sunny"
client = OpenAIClient(api_key="YOUR_API_KEY")
agent = Agent(name="assistant", client=client, tools = [get_weather])
response = agent.run("What is the weather in Rome?")
# output: The weather in Rome is sunny
```
## 📊 Detailed Tracing
A key requirement for principled development of LLM applications over your data (RAG systems, agents) is being able to observe and debug.
Datapizza-ai provides built-in observability with OpenTelemetry tracing to help you monitor performance and understand execution flow.
<summary><b>🔍 Trace Your AI Operations</b></summary>
```sh
pip install datapizza-ai-tools-duckduckgo
```
```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool
from datapizza.tracing import ContextTracing
client = OpenAIClient(api_key="OPENAI_API_KEY")
agent = Agent(name="assistant", client=client, tools = [DuckDuckGoSearchTool()])
with ContextTracing().trace("my_ai_operation"):
response = agent.run("Tell me some news about Bitcoin")
# Output shows:
# ╭─ Trace Summary of my_ai_operation ──────────────────────────────────╮
# │ Total Spans: 3 │
# │ Duration: 2.45s │
# │ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ |
# │ ┃ Model ┃ Prompt Tokens ┃ Completion Tokens ┃ Cached Tokens ┃ |
# │ ┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ |
# │ │ gpt-4o-mini │ 31 │ 27 │ 0 │ |
# │ └─────────────┴───────────────┴───────────────────┴───────────────┘ |
# ╰─────────────────────────────────────────────────────────────────────╯
```

## 🎯 Examples
### 🌐 Multi-Agent System
Build sophisticated AI systems where multiple specialized agents collaborate to solve complex tasks. This example shows how to create a trip planning system with dedicated agents for weather information, web search, and planning coordination.
```sh
# Install DuckDuckGo tool
pip install datapizza-ai-tools-duckduckgo
```
```python
from datapizza.agents.agent import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool
client = OpenAIClient(api_key="YOUR_API_KEY", model="gpt-4.1")
@tool
def get_weather(city: str) -> str:
return f""" it's sunny all the week in {city}"""
weather_agent = Agent(
name="weather_expert",
client=client,
system_prompt="You are a weather expert. Provide detailed weather information and forecasts.",
tools=[get_weather]
)
web_search_agent = Agent(
name="web_search_expert",
client=client,
system_prompt="You are a web search expert. You can search the web for information.",
tools=[DuckDuckGoSearchTool()]
)
planner_agent = Agent(
name="planner",
client=client,
system_prompt="You are a trip planner. You should provide a plan for the user. Make sure to provide a detailed plan with the best places to visit and the best time to visit them."
)
planner_agent.can_call([weather_agent, web_search_agent])
response = planner_agent.run(
"I need to plan a hiking trip in Seattle next week. I want to see some waterfalls and a forest."
)
print(response.text)
```
### 📊 Document Ingestion
Process and index documents for retrieval-augmented generation (RAG). This pipeline automatically parses PDFs, splits them into chunks, generates embeddings, and stores them in a vector database for efficient similarity search.
```sh
pip install datapizza-ai-parsers-docling
```
```python
from datapizza.core.vectorstore import VectorConfig
from datapizza.embedders import ChunkEmbedder
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.parsers.docling import DoclingParser
from datapizza.modules.splitters import NodeSplitter
from datapizza.pipeline import IngestionPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore
vectorstore = QdrantVectorstore(location=":memory:")
embedder = ChunkEmbedder(client=OpenAIEmbedder(api_key="YOUR_API_KEY", model_name="text-embedding-3-small"))
vectorstore.create_collection("my_documents",vector_config=[VectorConfig(name="embedding", dimensions=1536)])
pipeline = IngestionPipeline(
modules=[
DoclingParser(),
NodeSplitter(max_char=1024),
embedder,
],
vector_store=vectorstore,
collection_name="my_documents"
)
pipeline.run("sample.pdf")
results = vectorstore.search(query_vector = [0.0] * 1536, collection_name="my_documents", k=5)
print(results)
```
### 📊 RAG (Retrieval-Augmented Generation)
Create a complete RAG pipeline that enhances AI responses with relevant document context. This example demonstrates query rewriting, embedding generation, document retrieval, and response generation in a connected workflow.
```python
from datapizza.clients.openai import OpenAIClient
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.prompt import ChatPromptTemplate
from datapizza.modules.rewriters import ToolRewriter
from datapizza.pipeline import DagPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore
openai_client = OpenAIClient(
model="gpt-4o-mini",
api_key="YOUR_API_KEY"
)
dag_pipeline = DagPipeline()
dag_pipeline.add_module("rewriter", ToolRewriter(client=openai_client, system_prompt="Rewrite user queries to improve retrieval accuracy."))
dag_pipeline.add_module("embedder", OpenAIEmbedder(api_key= "YOUR_API_KEY", model_name="text-embedding-3-small"))
dag_pipeline.add_module("retriever", QdrantVectorstore(host="localhost", port=6333).as_retriever(collection_name="my_documents", k=5))
dag_pipeline.add_module("prompt", ChatPromptTemplate(user_prompt_template="User question: {{user_prompt}}\n:", retrieval_prompt_template="Retrieved content:\n{% for chunk in chunks %}{{ chunk.text }}\n{% endfor %}"))
dag_pipeline.add_module("generator", openai_client)
dag_pipeline.connect("rewriter", "embedder", target_key="text")
dag_pipeline.connect("embedder", "retriever", target_key="query_vector")
dag_pipeline.connect("retriever", "prompt", target_key="chunks")
dag_pipeline.connect("prompt", "generator", target_key="memory")
query = "tell me something about this document"
result = dag_pipeline.run({
"rewriter": {"user_prompt": query},
"prompt": {"user_prompt": query},
"retriever": {"collection_name": "my_documents", "k": 3},
"generator":{"input": query}
})
print(f"Generated response: {result['generator']}")
```
## 🌐 Ecosystem
### 🤖 Supported AI Providers
<table >
<tr>
<td align="center"><img src="https://logosandtypes.com/wp-content/uploads/2022/07/OpenAI.png" width="32" style="border-radius: 50%"><br><b>OpenAI</b></td>
<td align="center"><img src="https://www.google.com/favicon.ico" width="32"><br><b>Google Gemini</b></td>
<td align="center"><img src="https://anthropic.com/favicon.ico" width="32"><br><b>Anthropic</b></td>
<td align="center"><img src="https://mistral.ai/favicon.ico" width="32"><br><b>Mistral</b></td>
<td align="center"><img src="https://azure.microsoft.com/favicon.ico" width="32"><br><b>Azure OpenAI</b></td>
</tr>
</table>
### 🔧 Tools & Integrations
| Category | Components |
|----------|------------|
| **📄 Document Parsers** | Azure AI Document Intelligence, Docling |
| **🔍 Vector Stores** | Qdrant |
| **🎯 Rerankers** | Cohere, Together AI |
| **🌐 Tools** | DuckDuckGo Search, Custom Tools |
| **💾 Caching** | Redis integration for performance optimization |
| **📊 Embedders** | OpenAI, Google, Cohere, FastEmbed |
## 🎓 Learning Resources
- 📖 **[Complete Documentation](https://docs.datapizza.ai)** - Comprehensive guides and API reference
- 🎯 **[RAG Tutorial](https://docs.datapizza.ai/latest/Guides/RAG/rag/)** - Build production RAG systems
- 🤖 **[Agent Examples](https://docs.datapizza.ai/latest/Guides/agent/)** - Real-world agent implementations
## 🤝 Community
- 💬 **[Discord Community](https://discord.gg/s5sJNHz2C8)**
- 📚 **[Documentation](https://docs.datapizza.ai)**
- 📧 **[GitHub Issues](https://github.com/datapizza-labs/datapizza-ai/issues)**
- 🐦 **[Twitter](https://x.com/datapizza_ai)**
### 🌟 Contributing
We love contributions! Whether it's:
- 🐛 **Bug Reports** - Help us improve
- 💡 **Feature Requests** - Share your ideas
- 📝 **Documentation** - Make it better for everyone
- 🔧 **Code Contributions** - Build the future together
Check out our [Contributing Guide](CONTRIBUTING.md) to get started.
## 📄 License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
---
<div align="center">
**Built by Datapizza, the AI native company**
*A framework made to be easy to learn, easy to maintain and ready for production* 🍕
[⭐ Star us on GitHub](https://github.com/datapizza-labs/datapizza-ai) • [🚀 Get Started](https://docs.datapizza.ai) • [💬 Join Discord](https://discord.gg/s5sJNHz2C8)
## Star History
[](https://www.star-history.com/#datapizza-labs/datapizza-ai&Date)
</div>
## /datapizza-ai-cache/redis/README.md
## /datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py
```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py"
# Import Redis cache implementation
from .cache import RedisCache
__all__ = ["RedisCache"]
```
## /datapizza-ai-cache/redis/datapizza/cache/redis/cache.py
```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/cache.py"
import logging
import pickle
from datapizza.core.cache import Cache
import redis
log = logging.getLogger(__name__)
class RedisCache(Cache):
"""
A Redis-based cache implementation.
"""
def __init__(
self, host="localhost", port=6379, db=0, expiration_time=3600
): # 1 hour default
self.redis = redis.Redis(host=host, port=port, db=db)
self.expiration_time = expiration_time
def get(self, key: str) -> str | None:
"""Retrieve and deserialize object"""
pickled_obj = self.redis.get(key)
if pickled_obj is None:
return None
return pickle.loads(pickled_obj) # type: ignore
def set(self, key: str, obj):
"""Serialize and store object"""
pickled_obj = pickle.dumps(obj)
self.redis.set(key, pickled_obj, ex=self.expiration_time)
```
## /datapizza-ai-cache/redis/pyproject.toml
```toml path="/datapizza-ai-cache/redis/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-cache-redis"
version = "0.0.3"
description = "An implementation using Redis for datapizza-ai cache"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"datapizza-ai-core>=0.0.0,<0.1.0",
"redis>=5.2.1,<6.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
# "E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-cache/redis/tests/test_redis_cache.py
```py path="/datapizza-ai-cache/redis/tests/test_redis_cache.py"
from datapizza.cache.redis import RedisCache
def test_redis_cache():
cache = RedisCache(host="localhost", port=6379, db=0)
assert cache is not None
```
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/README.md
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py"
from .anthropic_client import AnthropicClient
__all__ = ["AnthropicClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py"
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock, ThoughtBlock
from anthropic import Anthropic, AsyncAnthropic
from .memory_adapter import AnthropicMemoryAdapter
class AnthropicClient(Client):
"""A client for interacting with the Anthropic API (Claude).
This class provides methods for invoking the Anthropic API to generate responses
based on given input data. It extends the Client class.
"""
def __init__(
self,
api_key: str,
model: str = "claude-3-5-sonnet-latest",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
):
"""
Args:
api_key: The API key for the Anthropic API.
model: The model to use for the Anthropic API.
system_prompt: The system prompt to use for the Anthropic API.
temperature: The temperature to use for the Anthropic API.
cache: The cache to use for the Anthropic API.
"""
if temperature and not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.api_key = api_key
self.memory_adapter = AnthropicMemoryAdapter()
self._set_client()
def _set_client(self):
if not self.client:
self.client = Anthropic(api_key=self.api_key)
def _set_a_client(self):
if not self.a_client:
self.a_client = AsyncAnthropic(api_key=self.api_key)
def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
"""Convert tools to Anthropic tool format"""
anthropic_tools = []
for tool in tools:
anthropic_tool = {
"name": tool.name,
"description": tool.description or "",
"input_schema": {
"type": "object",
"properties": tool.properties,
"required": tool.required,
},
}
anthropic_tools.append(anthropic_tool)
return anthropic_tools
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> dict | Literal["auto", "required", "none"]:
if isinstance(tool_choice, list) and len(tool_choice) > 1:
raise NotImplementedError(
"multiple function names is not supported by Anthropic"
)
elif isinstance(tool_choice, list):
return {
"type": "tool",
"name": tool_choice[0],
}
elif tool_choice == "required":
return {"type": "any"}
elif tool_choice == "auto":
return {"type": "auto"}
else:
return tool_choice
def _response_to_client_response(
self, response, tool_map: dict[str, Tool] | None = None
) -> ClientResponse:
"""Convert Anthropic response to ClientResponse"""
blocks = []
if hasattr(response, "content") and response.content:
if isinstance(
response.content, list
): # Claude 3 returns a list of content blocks
for content_block in response.content:
if content_block.type == "text":
blocks.append(TextBlock(content=content_block.text))
elif content_block.type == "thinking":
# Summarized thinking content
blocks.append(ThoughtBlock(content=content_block.thinking))
elif content_block.type == "tool_use":
tool = tool_map.get(content_block.name) if tool_map else None
if not tool:
raise ValueError(f"Tool {content_block.name} not found")
blocks.append(
FunctionCallBlock(
id=content_block.id,
name=content_block.name,
arguments=content_block.input,
tool=tool,
)
)
else: # Handle as string for compatibility
blocks.append(TextBlock(content=str(response.content)))
stop_reason = response.stop_reason if hasattr(response, "stop_reason") else None
return ClientResponse(
content=blocks,
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=response.usage.input_tokens,
completion_tokens=response.usage.output_tokens,
cached_tokens=response.usage.cache_read_input_tokens,
),
)
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int | None,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _invoke method for Anthropic"""
if tools is None:
tools = []
client = self._get_client()
messages = self._memory_to_contents(None, input, memory)
# remove the model from the messages
messages = [message for message in messages if message.get("role") != "model"]
tool_map = {tool.name: tool for tool in tools}
request_params = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens or 2048,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if system_prompt:
request_params["system"] = system_prompt
if tools:
request_params["tools"] = self._convert_tools(tools)
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = client.messages.create(**request_params)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int | None,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
client = self._get_a_client()
messages = self._memory_to_contents(None, input, memory)
# remove the model from the messages
messages = [message for message in messages if message.get("role") != "model"]
tool_map = {tool.name: tool for tool in tools}
request_params = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens or 2048,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if system_prompt:
request_params["system"] = system_prompt
if tools:
request_params["tools"] = self._convert_tools(tools)
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = await client.messages.create(**request_params)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int | None,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
"""Implementation of the abstract _stream_invoke method for Anthropic"""
if tools is None:
tools = []
messages = self._memory_to_contents(None, input, memory)
client = self._get_client()
request_params = {
"model": self.model_name,
"messages": messages,
"stream": True,
"max_tokens": max_tokens or 2048,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if system_prompt:
request_params["system"] = system_prompt
if tools:
request_params["tools"] = self._convert_tools(tools)
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
stream = client.messages.create(**request_params)
input_tokens = 0
output_tokens = 0
message_text = ""
thought_text = ""
for chunk in stream:
if (
chunk.type == "content_block_delta"
and hasattr(chunk, "delta")
and chunk.delta
):
if hasattr(chunk.delta, "text") and chunk.delta.text:
message_text += chunk.delta.text
yield ClientResponse(
content=[
ThoughtBlock(content=thought_text),
TextBlock(content=message_text),
],
delta=chunk.delta.text,
)
elif hasattr(chunk.delta, "thinking") and chunk.delta.thinking:
thought_text += chunk.delta.thinking
if chunk.type == "message_start":
input_tokens = (
chunk.message.usage.input_tokens if chunk.message.usage else 0
)
if chunk.type == "message_delta":
output_tokens = max(
output_tokens, chunk.usage.output_tokens if chunk.usage else 0
)
yield ClientResponse(
content=[
ThoughtBlock(content=thought_text),
TextBlock(content=message_text),
],
delta="",
stop_reason="end_turn",
usage=TokenUsage(
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cached_tokens=0,
),
)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
system_prompt: str | None = None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
"""Implementation of the abstract _a_stream_invoke method for Anthropic"""
if tools is None:
tools = []
messages = self._memory_to_contents(None, input, memory)
client = self._get_a_client()
request_params = {
"model": self.model_name,
"messages": messages,
"stream": True,
"max_tokens": max_tokens or 2048,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if system_prompt:
request_params["system"] = system_prompt
if max_tokens:
request_params["max_tokens"] = max_tokens
if tools:
request_params["tools"] = self._convert_tools(tools)
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
stream = await client.messages.create(**request_params)
input_tokens = 0
output_tokens = 0
message_text = ""
thought_text = ""
async for chunk in stream:
if (
chunk.type == "content_block_delta"
and hasattr(chunk, "delta")
and chunk.delta
):
if hasattr(chunk.delta, "text") and chunk.delta.text:
message_text += chunk.delta.text
yield ClientResponse(
content=[
ThoughtBlock(content=thought_text),
TextBlock(content=message_text),
],
delta=chunk.delta.text,
)
elif hasattr(chunk.delta, "thinking") and chunk.delta.thinking:
thought_text += chunk.delta.thinking
if chunk.type == "message_start":
input_tokens = (
chunk.message.usage.input_tokens if chunk.message.usage else 0
)
if chunk.type == "message_delta":
output_tokens = max(
output_tokens, chunk.usage.output_tokens if chunk.usage else 0
)
yield ClientResponse(
content=[
ThoughtBlock(content=thought_text),
TextBlock(content=message_text),
],
delta="",
stop_reason="end_turn",
usage=TokenUsage(
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cached_tokens=0,
),
)
def _structured_response(
self,
*args,
**kwargs,
) -> ClientResponse:
raise NotImplementedError("Anthropic does not support structured responses")
async def _a_structured_response(self, *args, **kwargs):
raise NotImplementedError("Anthropic does not support structured responses")
```
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py"
import base64
import json
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
class AnthropicMemoryAdapter(MemoryAdapter):
"""Adapter for converting Memory objects to Anthropic API message format"""
def _turn_to_message(self, turn: Turn) -> dict:
content = []
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {"type": "text", "text": block.content}
case FunctionCallBlock():
block_dict = json.dumps(
{
"type": "tool_call",
"id": block.id,
"tool_name": block.name,
"tool_args": block.arguments,
}
)
case FunctionCallResultBlock():
block_dict = json.dumps(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": block.result,
}
)
case StructuredBlock():
block_dict = {
"type": "text",
"text": str(block.content),
}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
case "pdf":
block_dict = self._process_pdf_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
content.append(block_dict)
if all(isinstance(block, dict) for block in content) and all(
list(block.keys()) == ["type", "text"] for block in content
):
content = "".join([block["text"] for block in content])
if len(content) == 1:
content = content[0]
return {
"role": turn.role.anthropic_role,
"content": (content),
}
def _text_to_message(self, text: str, role: ROLE) -> dict:
"""Convert text and role to Anthropic message format"""
# Anthropic uses 'user', 'assistant', and 'system' roles
return {"role": role.anthropic_role, "content": text}
def _process_pdf_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return {
"type": "document",
"source": {
"type": "url",
"url": block.media.source,
},
}
case "base64":
return {
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": block.media.source,
},
}
case "path":
with open(block.media.source, "rb") as f:
base64_pdf = base64.b64encode(f.read()).decode("utf-8")
return {
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": base64_pdf,
},
}
case _:
raise NotImplementedError("Source type not supported")
def _process_image_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return {
"type": "image",
"source": {
"type": "url",
"url": block.media.source,
},
}
case "base64":
return {
"type": "image",
"source": {
"type": "base64",
"media_type": f"image/{block.media.extension}",
"data": block.media.source,
},
}
case "path":
with open(block.media.source, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
return {
"type": "image",
"source": {
"type": "base64",
"media_type": f"image/{block.media.extension}",
"data": base64_image,
},
}
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-anthropic"
version = "0.0.4"
description = "Anthropic (Claude) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"anthropic>=0.40.0,<1.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py"
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, TextBlock
from datapizza.clients.anthropic import AnthropicClient
from datapizza.clients.anthropic.memory_adapter import AnthropicMemoryAdapter
def test_init_anthropic_client():
client = AnthropicClient(api_key="test")
assert client is not None
def test_anthropic_memory_adapter():
memory_adapter = AnthropicMemoryAdapter()
memory = Memory()
memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.ASSISTANT)
messages = memory_adapter.memory_to_messages(memory)
assert messages == [
{
"role": "user",
"content": "Hello, world!",
},
{
"role": "assistant",
"content": "Hello, world!",
},
]
```
## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/__init__.py"
from datapizza.clients.azure_openai.azure_openai_client import AzureOpenAIClient
__all__ = ["AzureOpenAIClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/azure_openai_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/azure_openai_client.py"
from datapizza.clients.openai.openai_client import OpenAIClient
from datapizza.core.cache import Cache
from openai import AsyncAzureOpenAI, AzureOpenAI
class AzureOpenAIClient(OpenAIClient):
def __init__(
self,
api_key: str,
azure_endpoint: str,
*,
model: str = "gpt-4o-mini",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
):
self.azure_endpoint = azure_endpoint
self.azure_deployment = azure_deployment
self.api_version = api_version
super().__init__(
api_key=api_key,
model=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self._set_client()
def _set_client(self):
self.client = AzureOpenAI(
api_key=self.api_key,
api_version=self.api_version,
azure_endpoint=self.azure_endpoint,
azure_deployment=self.azure_deployment,
)
def _set_a_client(self):
self.a_client = AsyncAzureOpenAI(
api_key=self.api_key,
api_version=self.api_version,
azure_endpoint=self.azure_endpoint,
azure_deployment=self.azure_deployment,
)
```
## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-azure-openai"
version = "0.0.4"
description = "Azure OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"datapizza-ai-clients-openai>=0.0.1",
"openai>=1.63.2,<2.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/README.md
# DataPizza AI - AWS Bedrock Client
AWS Bedrock client implementation for the datapizza-ai framework. This client provides seamless integration with AWS Bedrock's Converse API, supporting various foundation models including Anthropic's Claude models.
## Features
- Full support for AWS Bedrock Converse API
- Multiple authentication methods (AWS Profile, Access Keys, Environment Variables)
- Streaming and non-streaming responses
- Tool/function calling support
- Memory/conversation history management
- Image and document (PDF) support
- Async support
## Installation
```bash
pip install datapizza-ai-clients-bedrock
```
Or install from source in editable mode:
```bash
cd datapizza-ai/datapizza-ai-clients/datapizza-ai-clients-bedrock
pip install -e .
```
## Quick Start
### Basic Usage
```python
from datapizza.clients.bedrock import BedrockClient
# Using AWS Profile
client = BedrockClient(
profile_name="my-aws-profile",
region_name="us-east-1"
)
# Or using access keys
client = BedrockClient(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
region_name="us-east-1"
)
# Simple invocation
result = client.invoke("What is AWS Bedrock?")
# Extract text from response
for block in result.content:
if hasattr(block, 'content'):
print(block.content)
```
## Authentication Methods
The client supports multiple authentication methods in the following priority order:
### 1. Explicit Credentials
```python
client = BedrockClient(
aws_access_key_id="YOUR_ACCESS_KEY",
aws_secret_access_key="YOUR_SECRET_KEY",
aws_session_token="YOUR_SESSION_TOKEN", # Optional, for temporary credentials
region_name="us-east-1"
)
```
### 2. AWS Profile
```python
client = BedrockClient(
profile_name="my-aws-profile",
region_name="us-east-1"
)
```
### 3. Environment Variables
Set these environment variables:
```bash
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_SESSION_TOKEN="your-session-token" # Optional
export AWS_PROFILE="my-aws-profile" # Or use profile
```
Then initialize without parameters:
```python
client = BedrockClient(region_name="us-east-1")
```
### 4. Default AWS Credentials Chain
If no credentials are provided, boto3 will use the default credentials chain (IAM roles, ~/.aws/credentials, etc.)
```python
client = BedrockClient(region_name="us-east-1")
```
## Available Models
The client works with any Bedrock model that supports the Converse API. Popular models include:
- `anthropic.claude-3-5-sonnet-20241022-v2:0` (default)
- `anthropic.claude-3-5-sonnet-20240620-v1:0`
- `anthropic.claude-3-opus-20240229-v1:0`
- `anthropic.claude-3-sonnet-20240229-v1:0`
- `anthropic.claude-3-haiku-20240307-v1:0`
- `meta.llama3-70b-instruct-v1:0`
- `mistral.mistral-large-2402-v1:0`
- And many more...
```python
client = BedrockClient(
model="anthropic.claude-3-opus-20240229-v1:0",
region_name="us-east-1"
)
```
## Usage Examples
### With System Prompt
```python
client = BedrockClient(
system_prompt="You are a helpful coding assistant specialized in Python.",
region_name="us-east-1"
)
result = client.invoke("How do I read a CSV file?")
```
### Streaming Responses
```python
for chunk in client.stream_invoke("Tell me a long story"):
if chunk.delta:
print(chunk.delta, end="", flush=True)
print()
```
### With Memory (Conversation History)
```python
from datapizza.memory import Memory
memory = Memory()
client = BedrockClient(region_name="us-east-1")
# First message
result1 = client.invoke("My favorite color is blue", memory=memory)
# The conversation is tracked in memory
result2 = client.invoke("What's my favorite color?", memory=memory)
# Response: "Your favorite color is blue."
```
### With Temperature and Max Tokens
```python
result = client.invoke(
"Write a creative story",
temperature=0.9, # Higher = more creative (0-1)
max_tokens=1000
)
```
### With Tools/Function Calling
```python
from datapizza.tools import Tool
def get_weather(location: str, unit: str = "celsius") -> str:
"""Get the weather for a location"""
return f"The weather in {location} is 22°{unit[0].upper()}"
weather_tool = Tool(
name="get_weather",
description="Get the current weather for a location",
function=get_weather,
properties={
"location": {
"type": "string",
"description": "The city name"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
required=["location"]
)
result = client.invoke(
"What's the weather in Paris?",
tools=[weather_tool]
)
# Check for function calls
for block in result.content:
if isinstance(block, FunctionCallBlock):
print(f"Function: {block.name}")
print(f"Arguments: {block.arguments}")
```
### Async Support
```python
import asyncio
async def main():
client = BedrockClient(region_name="us-east-1")
result = await client.a_invoke("Hello!")
print(result.content[0].content)
asyncio.run(main())
```
### Async Streaming
```python
async def stream_example():
client = BedrockClient(region_name="us-east-1")
async for chunk in client.a_stream_invoke("Count to 10"):
if chunk.delta:
print(chunk.delta, end="", flush=True)
asyncio.run(stream_example())
```
## Configuration
### Constructor Parameters
```python
BedrockClient(
model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
system_prompt: str = "",
temperature: float | None = None, # 0-1 for most models
cache: Cache | None = None,
region_name: str = "us-east-1",
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
profile_name: str | None = None,
)
```
### Invoke Parameters
```python
client.invoke(
input: str, # The user message
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: "auto" | "required" | "none" | list[str] = "auto",
temperature: float | None = None,
max_tokens: int = 2048,
system_prompt: str | None = None, # Override instance system_prompt
)
```
## Response Format
All methods return a `ClientResponse` object:
```python
response = client.invoke("Hello")
# Access content blocks
for block in response.content:
if isinstance(block, TextBlock):
print(block.content) # The text
elif isinstance(block, FunctionCallBlock):
print(block.name) # Function name
print(block.arguments) # Function arguments
# Token usage
print(f"Prompt tokens: {response.prompt_tokens_used}")
print(f"Completion tokens: {response.completion_tokens_used}")
print(f"Stop reason: {response.stop_reason}")
```
## IAM Permissions
Your AWS credentials need the following permissions:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream"
],
"Resource": [
"arn:aws:bedrock:*::foundation-model/*"
]
}
]
}
```
## Model Access
Before using a model, you need to request access in the AWS Bedrock console:
1. Go to AWS Bedrock console
2. Navigate to "Model access"
3. Request access to the models you want to use
4. Wait for approval (usually instant for most models)
## Limitations
- Structured responses are not natively supported (unlike OpenAI's structured output)
- Some advanced features may vary by model
- Token usage metrics may not include caching information
## Error Handling
```python
from botocore.exceptions import BotoCoreError, ClientError
try:
result = client.invoke("Hello")
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDeniedException':
print("Model access not granted. Check Bedrock console.")
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
print("Model not found in this region.")
else:
print(f"AWS Error: {e}")
except BotoCoreError as e:
print(f"Boto3 Error: {e}")
```
## Development
### Running Tests
```bash
pip install -e ".[dev]"
pytest tests/
```
### Code Formatting
```bash
ruff check .
ruff format .
```
## License
MIT License - see LICENSE file for details
## Contributing
Contributions are welcome! Please see the main datapizza-ai repository for contribution guidelines.
## Support
For issues and questions:
- GitHub Issues: [datapizza-ai repository](https://github.com/datapizza/datapizza-ai)
- Documentation: [DataPizza AI Docs](https://docs.datapizza.ai)
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py"
from .bedrock_client import BedrockClient
__all__ = ["BedrockClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py"
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal
import aioboto3
import boto3
from botocore.config import Config
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock
from .memory_adapter import BedrockMemoryAdapter
class BedrockClient(Client):
"""A client for interacting with AWS Bedrock API.
This class provides methods for invoking AWS Bedrock models (Claude, etc.)
to generate responses. It extends the Client class and supports authentication
via AWS profile or access keys.
"""
def __init__(
self,
model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
region_name: str = "us-east-1",
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_session_token: str | None = None,
profile_name: str | None = None,
):
"""
Args:
model: The Bedrock model to use (e.g., 'anthropic.claude-3-5-sonnet-20241022-v2:0').
system_prompt: The system prompt to use for the model.
temperature: The temperature to use for generation (0-1 for most models).
cache: The cache to use for responses.
region_name: AWS region name (default: us-east-1).
aws_access_key_id: AWS access key ID (optional, can use AWS_ACCESS_KEY_ID env var).
aws_secret_access_key: AWS secret access key (optional, can use AWS_SECRET_ACCESS_KEY env var).
aws_session_token: AWS session token (optional, for temporary credentials).
profile_name: AWS profile name (optional, can use AWS_PROFILE env var).
"""
if temperature and not 0 <= temperature <= 1:
raise ValueError("Temperature must be between 0 and 1 for Bedrock models")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.region_name = region_name
self.aws_access_key_id = aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID")
self.aws_secret_access_key = aws_secret_access_key or os.getenv(
"AWS_SECRET_ACCESS_KEY"
)
self.aws_session_token = aws_session_token or os.getenv("AWS_SESSION_TOKEN")
self.profile_name = profile_name or os.getenv("AWS_PROFILE")
self.memory_adapter = BedrockMemoryAdapter()
self._set_client()
def _set_client(self):
if not self.client:
session_kwargs = {}
# Priority: explicit credentials > profile > default credentials
if self.aws_access_key_id and self.aws_secret_access_key:
session_kwargs["aws_access_key_id"] = self.aws_access_key_id
session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
if self.aws_session_token:
session_kwargs["aws_session_token"] = self.aws_session_token
elif self.profile_name:
session_kwargs["profile_name"] = self.profile_name
session = boto3.Session(**session_kwargs)
# Create bedrock-runtime client with retry configuration
config = Config(
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=300,
)
self.client = session.client(
service_name="bedrock-runtime",
region_name=self.region_name,
config=config,
)
def _set_a_client(self):
"""Initialize async bedrock-runtime client using aioboto3"""
if not self.a_client:
session_kwargs = {}
# Priority: explicit credentials > profile > default credentials
if self.aws_access_key_id and self.aws_secret_access_key:
session_kwargs["aws_access_key_id"] = self.aws_access_key_id
session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
if self.aws_session_token:
session_kwargs["aws_session_token"] = self.aws_session_token
elif self.profile_name:
session_kwargs["profile_name"] = self.profile_name
# Create async session
session = aioboto3.Session(**session_kwargs)
# Create bedrock-runtime client with retry configuration
config = Config(
retries={"max_attempts": 3, "mode": "adaptive"},
read_timeout=300,
)
# Store the session and config for async client creation
self.a_session = session
self.a_config = config
self.a_region_name = self.region_name
def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
"""Convert tools to Bedrock tool format (similar to Anthropic)"""
bedrock_tools = []
for tool in tools:
bedrock_tool = {
"toolSpec": {
"name": tool.name,
"description": tool.description or "",
"inputSchema": {
"json": {
"type": "object",
"properties": tool.properties,
"required": tool.required,
}
},
}
}
bedrock_tools.append(bedrock_tool)
return bedrock_tools
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> dict:
"""Convert tool choice to Bedrock format"""
if isinstance(tool_choice, list):
if len(tool_choice) > 1:
raise NotImplementedError(
"Multiple function names not supported by Bedrock"
)
return {"tool": {"name": tool_choice[0]}}
elif tool_choice == "required":
return {"any": {}}
elif tool_choice == "auto":
return {"auto": {}}
else: # none
return {}
def _response_to_client_response(
self, response: dict, tool_map: dict[str, Tool] | None = None
) -> ClientResponse:
"""Convert Bedrock response to ClientResponse"""
blocks = []
# Parse the response body
response_body = response.get("output", {})
# Handle message content
message = response_body.get("message", {})
content_items = message.get("content", [])
for content_item in content_items:
if "text" in content_item:
blocks.append(TextBlock(content=content_item["text"]))
elif "toolUse" in content_item:
tool_use = content_item["toolUse"]
tool = tool_map.get(tool_use["name"]) if tool_map else None
if not tool:
raise ValueError(f"Tool {tool_use['name']} not found")
blocks.append(
FunctionCallBlock(
id=tool_use["toolUseId"],
name=tool_use["name"],
arguments=tool_use["input"],
tool=tool,
)
)
# Extract usage information
usage = response.get("usage", {})
prompt_tokens = usage.get("inputTokens", 0)
completion_tokens = usage.get("outputTokens", 0)
# Extract stop reason
stop_reason = response.get("stopReason")
return ClientResponse(
content=blocks,
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
cached_tokens=0, # Bedrock doesn't expose cache metrics in the same way
),
)
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _invoke method for Bedrock"""
if tools is None:
tools = []
client = self._get_client()
messages = self._memory_to_contents(None, input, memory)
# Remove model role messages (Bedrock doesn't support this)
messages = [message for message in messages if message.get("role") != "model"]
tool_map = {tool.name: tool for tool in tools}
# Build the request body according to Bedrock Converse API
request_body = {
"modelId": self.model_name,
"messages": messages,
"inferenceConfig": {
"maxTokens": max_tokens or 2048,
},
}
if temperature is not None:
request_body["inferenceConfig"]["temperature"] = temperature
# Add system prompt if provided
if system_prompt:
request_body["system"] = [{"text": system_prompt}]
# Add tools if provided
if tools:
request_body["toolConfig"] = {
"tools": self._convert_tools(tools),
"toolChoice": self._convert_tool_choice(tool_choice),
}
# Add any additional kwargs
request_body.update(kwargs)
# Call Bedrock Converse API
response = client.converse(**request_body)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
"""Async implementation using aioboto3"""
if tools is None:
tools = []
# Ensure async client is initialized
if not hasattr(self, "a_session"):
self._set_a_client()
messages = self._memory_to_contents(None, input, memory)
# Remove model role messages (Bedrock doesn't support this)
messages = [message for message in messages if message.get("role") != "model"]
tool_map = {tool.name: tool for tool in tools}
# Build the request body according to Bedrock Converse API
request_body = {
"modelId": self.model_name,
"messages": messages,
"inferenceConfig": {
"maxTokens": max_tokens or 2048,
},
}
if temperature is not None:
request_body["inferenceConfig"]["temperature"] = temperature
# Add system prompt if provided
if system_prompt:
request_body["system"] = [{"text": system_prompt}]
# Add tools if provided
if tools:
request_body["toolConfig"] = {
"tools": self._convert_tools(tools),
"toolChoice": self._convert_tool_choice(tool_choice),
}
# Add any additional kwargs
request_body.update(kwargs)
# Call Bedrock Converse API asynchronously
async with self.a_session.client(
service_name="bedrock-runtime",
region_name=self.a_region_name,
config=self.a_config,
) as client:
response = await client.converse(**request_body)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
"""Implementation of streaming for Bedrock"""
if tools is None:
tools = []
client = self._get_client()
messages = self._memory_to_contents(None, input, memory)
# Remove model role messages
messages = [message for message in messages if message.get("role") != "model"]
# Build the request body
request_body = {
"modelId": self.model_name,
"messages": messages,
"inferenceConfig": {
"maxTokens": max_tokens or 2048,
},
}
if temperature is not None:
request_body["inferenceConfig"]["temperature"] = temperature
if system_prompt:
request_body["system"] = [{"text": system_prompt}]
if tools:
request_body["toolConfig"] = {
"tools": self._convert_tools(tools),
"toolChoice": self._convert_tool_choice(tool_choice),
}
request_body.update(kwargs)
# Call Bedrock ConverseStream API
response = client.converse_stream(**request_body)
# Process streaming response
message_text = ""
input_tokens = 0
output_tokens = 0
stop_reason = None
stream = response.get("stream")
if stream:
for event in stream:
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
text_delta = delta["text"]
message_text += text_delta
yield ClientResponse(
content=[TextBlock(content=message_text)],
delta=text_delta,
stop_reason=None,
)
elif "metadata" in event:
metadata = event["metadata"]
usage = metadata.get("usage", {})
input_tokens = usage.get("inputTokens", 0)
output_tokens = usage.get("outputTokens", 0)
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason")
# Final response with complete information
yield ClientResponse(
content=[TextBlock(content=message_text)],
delta="",
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cached_tokens=0,
),
)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
"""Async streaming implementation using aioboto3"""
if tools is None:
tools = []
# Ensure async client is initialized
if not hasattr(self, "a_session"):
self._set_a_client()
messages = self._memory_to_contents(None, input, memory)
# Remove model role messages
messages = [message for message in messages if message.get("role") != "model"]
# Build the request body
request_body = {
"modelId": self.model_name,
"messages": messages,
"inferenceConfig": {
"maxTokens": max_tokens or 2048,
},
}
if temperature is not None:
request_body["inferenceConfig"]["temperature"] = temperature
if system_prompt:
request_body["system"] = [{"text": system_prompt}]
if tools:
request_body["toolConfig"] = {
"tools": self._convert_tools(tools),
"toolChoice": self._convert_tool_choice(tool_choice),
}
request_body.update(kwargs)
# Call Bedrock ConverseStream API asynchronously
async with self.a_session.client(
service_name="bedrock-runtime",
region_name=self.a_region_name,
config=self.a_config,
) as client:
response = await client.converse_stream(**request_body)
# Process streaming response
message_text = ""
input_tokens = 0
output_tokens = 0
stop_reason = None
stream = response.get("stream")
if stream:
async for event in stream:
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
text_delta = delta["text"]
message_text += text_delta
yield ClientResponse(
content=[TextBlock(content=message_text)],
delta=text_delta,
stop_reason=None,
)
elif "metadata" in event:
metadata = event["metadata"]
usage = metadata.get("usage", {})
input_tokens = usage.get("inputTokens", 0)
output_tokens = usage.get("outputTokens", 0)
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason")
# Final response with complete information
yield ClientResponse(
content=[TextBlock(content=message_text)],
delta="",
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=input_tokens,
completion_tokens=output_tokens,
cached_tokens=0,
),
)
def _structured_response(
self,
input: str,
output_cls: type,
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
"""Bedrock doesn't natively support structured output like OpenAI
This would require prompting techniques or using Anthropic's prompt caching
"""
raise NotImplementedError(
"Bedrock doesn't natively support structured responses. "
"Consider using prompt engineering or JSON mode with validation."
)
async def _a_structured_response(self, *args, **kwargs):
raise NotImplementedError(
"Bedrock doesn't natively support structured responses"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py"
import base64
import json
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
class BedrockMemoryAdapter(MemoryAdapter):
"""Adapter for converting Memory objects to AWS Bedrock Converse API message format
The Bedrock Converse API uses a message format similar to Anthropic's Claude API.
"""
def _turn_to_message(self, turn: Turn) -> dict:
"""Convert a Turn to Bedrock message format"""
content = []
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {"text": block.content}
case FunctionCallBlock():
# Bedrock uses toolUse format
block_dict = {
"toolUse": {
"toolUseId": block.id,
"name": block.name,
"input": block.arguments,
}
}
case FunctionCallResultBlock():
# Bedrock uses toolResult format
block_dict = {
"toolResult": {
"toolUseId": block.id,
"content": [{"text": str(block.result)}],
}
}
case StructuredBlock():
# Convert structured content to text
block_dict = {
"text": json.dumps(block.content)
if not isinstance(block.content, str)
else block.content,
}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
case "pdf":
block_dict = self._process_document_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
content.append(block_dict)
# Bedrock expects content as a list
return {
"role": self._convert_role(turn.role),
"content": content,
}
def _text_to_message(self, text: str, role: ROLE) -> dict:
"""Convert text and role to Bedrock message format"""
return {
"role": self._convert_role(role),
"content": [{"text": text}],
}
def _convert_role(self, role: ROLE) -> str:
"""Convert ROLE to Bedrock role string
Bedrock Converse API supports 'user' and 'assistant' roles
"""
if role.value == "user":
return "user"
elif role.value == "assistant":
return "assistant"
else:
# Default to user for system or other roles
return "user"
def _process_document_block(self, block: MediaBlock) -> dict:
"""Process document (PDF) blocks for Bedrock"""
match block.media.source_type:
case "base64":
return {
"document": {
"format": "pdf",
"name": "document.pdf",
"source": {"bytes": base64.b64decode(block.media.source)},
}
}
case "path":
with open(block.media.source, "rb") as f:
pdf_bytes = f.read()
return {
"document": {
"format": "pdf",
"name": block.media.source.split("/")[-1],
"source": {"bytes": pdf_bytes},
}
}
case "url":
raise NotImplementedError(
"Bedrock Converse API does not support document URLs directly. "
"Please download and provide as bytes."
)
case _:
raise NotImplementedError(
f"Unsupported source type: {block.media.source_type}"
)
def _process_image_block(self, block: MediaBlock) -> dict:
"""Process image blocks for Bedrock"""
match block.media.source_type:
case "base64":
return {
"image": {
"format": block.media.extension or "png",
"source": {"bytes": base64.b64decode(block.media.source)},
}
}
case "path":
with open(block.media.source, "rb") as image_file:
image_bytes = image_file.read()
return {
"image": {
"format": block.media.extension or "png",
"source": {"bytes": image_bytes},
}
}
case "url":
raise NotImplementedError(
"Bedrock Converse API does not support image URLs directly. "
"Please download and provide as bytes."
)
case _:
raise NotImplementedError(
f"Unsupported source type: {block.media.source_type}"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-bedrock"
version = "0.0.4"
description = "AWS Bedrock client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"boto3>=1.35.0",
"botocore>=1.35.0",
"aioboto3>=13.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
"moto>=5.0.0", # For mocking AWS services in tests
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py"
from datapizza.clients.bedrock import BedrockClient
def test_async_client_initialization():
"""Test that async client can be initialized"""
client = BedrockClient(
aws_access_key_id="test_key",
aws_secret_access_key="test_secret",
region_name="us-east-1",
)
# Initialize async client
client._set_a_client()
# Verify async session is created
assert hasattr(client, "a_session")
assert hasattr(client, "a_config")
assert hasattr(client, "a_region_name")
assert client.a_region_name == "us-east-1"
def test_a_invoke_method_exists():
"""Test that _a_invoke method is implemented and doesn't raise NotImplementedError"""
client = BedrockClient(
aws_access_key_id="test_key",
aws_secret_access_key="test_secret",
region_name="us-east-1",
)
# Verify the method exists and is async
assert hasattr(client, "_a_invoke")
assert callable(client._a_invoke)
def test_a_stream_invoke_method_exists():
"""Test that _a_stream_invoke method is implemented and doesn't raise NotImplementedError"""
client = BedrockClient(
aws_access_key_id="test_key",
aws_secret_access_key="test_secret",
region_name="us-east-1",
)
# Verify the method exists and is async
assert hasattr(client, "_a_stream_invoke")
assert callable(client._a_stream_invoke)
```
## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py"
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, TextBlock
from datapizza.clients.bedrock import BedrockClient
from datapizza.clients.bedrock.memory_adapter import BedrockMemoryAdapter
def test_init_bedrock_client():
"""Test that BedrockClient can be initialized"""
client = BedrockClient()
assert client is not None
assert client.model_name == "anthropic.claude-3-5-sonnet-20241022-v2:0"
def test_init_bedrock_client_with_credentials():
"""Test BedrockClient initialization with explicit credentials"""
client = BedrockClient(
aws_access_key_id="test_key",
aws_secret_access_key="test_secret",
region_name="us-west-2",
)
assert client is not None
assert client.aws_access_key_id == "test_key"
assert client.region_name == "us-west-2"
def test_bedrock_memory_adapter():
"""Test that the memory adapter converts memory to Bedrock message format"""
memory_adapter = BedrockMemoryAdapter()
memory = Memory()
memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
memory.add_turn(
blocks=[TextBlock(content="Hello! How can I help you?")], role=ROLE.ASSISTANT
)
messages = memory_adapter.memory_to_messages(memory)
assert messages == [
{
"role": "user",
"content": [{"text": "Hello, world!"}],
},
{
"role": "assistant",
"content": [{"text": "Hello! How can I help you?"}],
},
]
def test_bedrock_memory_adapter_multiple_blocks():
"""Test memory adapter with multiple text blocks in a single turn"""
memory_adapter = BedrockMemoryAdapter()
memory = Memory()
memory.add_turn(
blocks=[
TextBlock(content="First message."),
TextBlock(content="Second message."),
],
role=ROLE.USER,
)
messages = memory_adapter.memory_to_messages(memory)
assert messages == [
{
"role": "user",
"content": [{"text": "First message."}, {"text": "Second message."}],
},
]
```
## /datapizza-ai-clients/datapizza-ai-clients-google/README.md
## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py"
from .google_client import GoogleClient
__all__ = ["GoogleClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py"
from collections.abc import AsyncIterator, Iterator
from typing import Literal
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
FunctionCallBlock,
Model,
StructuredBlock,
TextBlock,
ThoughtBlock,
)
from google import genai
from google.genai import types
from google.oauth2 import service_account
from .memory_adapter import GoogleMemoryAdapter
class GoogleClient(Client):
"""A client for interacting with Google's Generative AI APIs.
This class provides methods for invoking the Google GenAI API to generate responses
based on given input data. It extends the Client class.
"""
def __init__(
self,
api_key: str | None = None,
model: str = "gemini-2.0-flash",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
project_id: str | None = None,
location: str | None = None,
credentials_path: str | None = None,
use_vertexai: bool = False,
):
"""
Args:
api_key: The API key for the Google API.
model: The model to use for the Google API.
system_prompt: The system prompt to use for the Google API.
temperature: The temperature to use for the Google API.
cache: The cache to use for the Google API.
project_id: The project ID for the Google API.
location: The location for the Google API.
credentials_path: The path to the credentials for the Google API.
use_vertexai: Whether to use Vertex AI for the Google API.
"""
if temperature and not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.memory_adapter = GoogleMemoryAdapter()
try:
if use_vertexai:
if not credentials_path:
raise ValueError("credentials_path must be provided")
if not project_id:
raise ValueError("project_id must be provided")
if not location:
raise ValueError("location must be provided")
credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
self.client = genai.Client(
vertexai=True,
project=project_id,
location=location,
credentials=credentials,
)
else:
if not api_key:
raise ValueError("api_key must be provided")
self.client = genai.Client(api_key=api_key)
except Exception as e:
raise RuntimeError(
f"Failed to initialize Google GenAI client: {e!s}"
) from None
def _convert_tool(self, tool: Tool) -> dict:
"""Convert tools to Google function format"""
parameters = {
"type": tool.schema["parameters"]["type"],
"properties": tool.schema["parameters"]["properties"],
"required": tool.schema["parameters"]["required"],
}
return {
"name": tool.schema["name"],
"description": tool.schema["description"],
"parameters": parameters,
}
def _prepare_tools(self, tools: list[Tool] | None) -> list[types.Tool] | None:
if not tools:
return None
google_tools = []
function_declarations = []
has_google_search = False
for tool in tools:
# Check if tool has google search capability
if hasattr(tool, "name") and "google_search" in tool.name.lower():
has_google_search = True
elif isinstance(tool, Tool):
function_declarations.append(self._convert_tool(tool))
elif isinstance(tool, dict):
google_tools.append(tool)
else:
raise ValueError(f"Unknown tool type: {type(tool)}")
if function_declarations:
google_tools.append(types.Tool(function_declarations=function_declarations))
if has_google_search:
google_tools.append(types.Tool(google_search=types.GoogleSearch()))
return google_tools if google_tools else None
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> types.ToolConfig:
adjusted_tool_choice: types.ToolConfig
if isinstance(tool_choice, list):
adjusted_tool_choice = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(
mode="ANY", # type: ignore
allowed_function_names=tool_choice,
)
)
elif tool_choice == "required":
adjusted_tool_choice = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="ANY") # type: ignore
)
elif tool_choice == "none":
adjusted_tool_choice = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="NONE") # type: ignore
)
elif tool_choice == "auto":
adjusted_tool_choice = types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(mode="AUTO") # type: ignore
)
return adjusted_tool_choice
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _invoke method"""
if tools is None:
tools = []
contents = self._memory_to_contents(None, input, memory)
tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}
prepared_tools = self._prepare_tools(tools)
config = types.GenerateContentConfig(
temperature=temperature or self.temperature,
system_instruction=system_prompt or self.system_prompt,
max_output_tokens=max_tokens or None,
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
**kwargs,
)
response = self.client.models.generate_content(
model=self.model_name,
contents=contents, # type: ignore
config=config, # type: ignore
)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _invoke method"""
if tools is None:
tools = []
contents = self._memory_to_contents(None, input, memory)
tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}
prepared_tools = self._prepare_tools(tools)
config = types.GenerateContentConfig(
temperature=temperature or self.temperature,
system_instruction=system_prompt or self.system_prompt,
max_output_tokens=max_tokens or None,
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
**kwargs,
)
response = await self.client.aio.models.generate_content(
model=self.model_name,
contents=contents, # type: ignore
config=config, # type: ignore
)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
"""Implementation of the abstract _stream_invoke method"""
if tools is None:
tools = []
contents = self._memory_to_contents(None, input, memory)
prepared_tools = self._prepare_tools(tools)
config = types.GenerateContentConfig(
temperature=temperature or self.temperature,
system_instruction=system_prompt or self.system_prompt,
max_output_tokens=max_tokens or None,
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
**kwargs,
)
message_text = ""
thought_block = ThoughtBlock(content="")
usage = TokenUsage()
for chunk in self.client.models.generate_content_stream(
model=self.model_name,
contents=contents, # type: ignore
config=config,
):
usage += TokenUsage(
prompt_tokens=chunk.usage_metadata.prompt_token_count or 0,
completion_tokens=chunk.usage_metadata.candidates_token_count or 0,
cached_tokens=chunk.usage_metadata.cached_content_token_count or 0,
)
if not chunk.candidates:
raise ValueError("No candidates in response")
finish_reason = chunk.candidates[0].finish_reason
stop_reason = (
finish_reason.value.lower()
if finish_reason is not None
else finish_reason
)
if not chunk.candidates[0].content:
raise ValueError("No content in response")
if not chunk.candidates[0].content.parts:
yield ClientResponse(
content=[],
delta=chunk.text or "",
stop_reason=stop_reason,
usage=usage,
)
continue
for part in chunk.candidates[0].content.parts:
if not part.text:
continue
elif hasattr(part, "thought") and part.thought:
thought_block.content += part.text
else: # If it's not a thought, it's a message
if part.text:
message_text += str(chunk.text or "")
yield ClientResponse(
content=[],
delta=chunk.text or "",
stop_reason=stop_reason,
)
yield ClientResponse(
content=[TextBlock(content=message_text)],
stop_reason=stop_reason,
usage=usage,
)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
system_prompt: str | None = None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
"""Implementation of the abstract _a_stream_invoke method for Google"""
if tools is None:
tools = []
contents = self._memory_to_contents(None, input, memory)
prepared_tools = self._prepare_tools(tools)
config = types.GenerateContentConfig(
temperature=temperature or self.temperature,
system_instruction=system_prompt or self.system_prompt,
max_output_tokens=max_tokens or None,
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
**kwargs,
)
usage = TokenUsage()
message_text = ""
thought_block = ThoughtBlock(content="")
async for chunk in await self.client.aio.models.generate_content_stream(
model=self.model_name,
contents=contents, # type: ignore
config=config,
): # type: ignore
usage += TokenUsage(
prompt_tokens=chunk.usage_metadata.prompt_token_count or 0,
completion_tokens=chunk.usage_metadata.candidates_token_count or 0,
cached_tokens=chunk.usage_metadata.cached_content_token_count or 0,
)
finish_reason = chunk.candidates[0].finish_reason
stop_reason = (
finish_reason.value.lower()
if finish_reason is not None
else finish_reason
)
# Handle the case where the response has no parts
if not chunk.candidates[0].content.parts:
yield ClientResponse(
content=[],
delta=chunk.text or "",
stop_reason=stop_reason,
)
continue
for part in chunk.candidates[0].content.parts:
if not part.text:
continue
elif hasattr(part, "thought") and part.thought:
thought_block.content += part.text
else: # If it's not a thought, it's a message
if part.text:
message_text += chunk.text or ""
yield ClientResponse(
content=[],
delta=chunk.text or "",
stop_reason=stop_reason,
)
yield ClientResponse(
content=[TextBlock(content=message_text)],
stop_reason=stop_reason,
usage=usage,
)
def _structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _structured_response method"""
contents = self._memory_to_contents(self.system_prompt, input, memory)
prepared_tools = self._prepare_tools(tools)
response = self.client.models.generate_content(
model=self.model_name,
contents=contents, # type: ignore
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=temperature,
max_output_tokens=max_tokens,
response_mime_type="application/json",
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
response_schema=(
output_cls.model_json_schema()
if hasattr(output_cls, "model_json_schema")
else output_cls
),
),
)
if not response or not response.candidates:
raise ValueError("No response from Google GenAI")
structured_data = output_cls.model_validate_json(str(response.text))
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=response.candidates[0].finish_reason.value.lower()
if response.candidates[0].finish_reason
else None,
usage=TokenUsage(
prompt_tokens=response.usage_metadata.prompt_token_count or 0,
completion_tokens=response.usage_metadata.candidates_token_count or 0,
cached_tokens=response.usage_metadata.cached_content_token_count or 0,
),
)
async def _a_structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
"""Implementation of the abstract _structured_response method"""
contents = self._memory_to_contents(self.system_prompt, input, memory)
prepared_tools = self._prepare_tools(tools)
response = await self.client.aio.models.generate_content(
model=self.model_name,
contents=contents, # type: ignore
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=temperature,
max_output_tokens=max_tokens,
response_mime_type="application/json",
tools=prepared_tools, # type: ignore
tool_config=self._convert_tool_choice(tool_choice)
if tools and any(isinstance(tool, Tool) for tool in tools)
else None,
response_schema=(
output_cls.model_json_schema()
if hasattr(output_cls, "model_json_schema")
else output_cls
),
),
)
if not response or not response.candidates:
raise ValueError("No response from Google GenAI")
structured_data = output_cls.model_validate_json(str(response.text))
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=response.candidates[0].finish_reason.value.lower()
if response.candidates[0].finish_reason
else None,
usage=TokenUsage(
prompt_tokens=response.usage_metadata.prompt_token_count or 0,
completion_tokens=response.usage_metadata.candidates_token_count or 0,
cached_tokens=response.usage_metadata.cached_content_token_count or 0,
),
)
def _embed(
self,
text: str | list[str],
model_name: str | None,
task_type: str = "RETRIEVAL_DOCUMENT",
output_dimensionality: int = 768,
title: str | None = None,
**kwargs,
) -> list[float] | list[list[float] | None]:
"""Embed a text using the model"""
response = self.client.models.embed_content(
model=model_name or self.model_name,
contents=text, # type: ignore
config=types.EmbedContentConfig(
task_type=task_type,
output_dimensionality=output_dimensionality,
title=title,
**kwargs,
),
)
# Extract the embedding values from the response
if not response.embeddings:
return []
embeddings = [embedding.values for embedding in response.embeddings]
if isinstance(text, str) and embeddings[0]:
return embeddings[0]
return embeddings
async def _a_embed(
self,
text: str | list[str],
model_name: str | None,
task_type: str = "RETRIEVAL_DOCUMENT",
output_dimensionality: int = 768,
title: str | None = None,
**kwargs,
) -> list[float] | list[list[float] | None]:
"""Embed a text using the model"""
response = await self.client.aio.models.embed_content(
model=model_name or self.model_name,
contents=text, # type: ignore
config=types.EmbedContentConfig(
task_type=task_type,
output_dimensionality=output_dimensionality,
title=title,
**kwargs,
),
)
# Extract the embedding values from the response
if not response.embeddings:
return []
embeddings = [embedding.values for embedding in response.embeddings]
if isinstance(text, str) and embeddings[0]:
return embeddings[0]
return embeddings
def _response_to_client_response(
self, response, tool_map: dict[str, Tool] | None = None
) -> ClientResponse:
blocks = []
# Handle function calls if present
if hasattr(response, "function_calls") and response.function_calls:
for fc in response.function_calls:
if not tool_map:
raise ValueError("Tool map is required")
tool = tool_map.get(fc.name, None)
if not tool:
raise ValueError(f"Tool {fc.name} not found in tool map")
blocks.append(
FunctionCallBlock(
name=fc.name,
arguments=fc.args,
id=f"fc_{id(fc)}",
tool=tool,
)
)
else:
if hasattr(response, "text") and response.text:
blocks.append(TextBlock(content=response.text))
if hasattr(response, "candidates") and response.candidates:
for part in response.candidates[0].content.parts:
if not part.text:
continue
if hasattr(part, "thought") and part.thought:
blocks.append(ThoughtBlock(content=part.text))
usage_metadata = getattr(response, "usage_metadata", None)
return ClientResponse(
content=blocks,
stop_reason=(response.candidates[0].finish_reason.value.lower())
if hasattr(response, "candidates") and response.candidates
else None,
usage=TokenUsage(
prompt_tokens=usage_metadata.prompt_token_count or 0,
completion_tokens=usage_metadata.candidates_token_count or 0,
cached_tokens=usage_metadata.cached_content_token_count or 0,
),
)
```
## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py"
import base64
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
from google.genai import types
class GoogleMemoryAdapter(MemoryAdapter):
def _turn_to_message(self, turn: Turn) -> dict:
content = []
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {"text": block.content}
case FunctionCallBlock():
block_dict = {
"function_call": {"name": block.name, "args": block.arguments}
}
case FunctionCallResultBlock():
block_dict = types.Part.from_function_response(
name=block.tool.name,
response={"result": block.result},
)
case StructuredBlock():
block_dict = {"text": str(block.content)}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
case "pdf":
block_dict = self._process_pdf_block(block)
case "audio":
block_dict = self._process_audio_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
content.append(block_dict)
return {
"role": turn.role.google_role,
"parts": (content),
}
def _process_audio_block(self, block: MediaBlock) -> types.Part:
match block.media.source_type:
case "raw":
return types.Part.from_bytes(
data=block.media.source,
mime_type="audio/mp3",
)
case "path":
with open(block.media.source, "rb") as f:
audio_bytes = f.read()
return types.Part.from_bytes(
data=audio_bytes,
mime_type="audio/mp3",
)
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
)
def _process_pdf_block(self, block: MediaBlock) -> types.Part | dict:
match block.media.source_type:
case "raw":
return types.Part.from_bytes(
data=block.media.source,
mime_type="application/pdf",
)
case "base64":
return {
"inline_data": {
"mime_type": "application/pdf",
"data": block.media.source,
}
}
case "path":
with open(block.media.source, "rb") as f:
pdf_bytes = f.read()
return {
"inline_data": {
"mime_type": "application/pdf",
"data": pdf_bytes,
}
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type} only supported: raw, base64, path"
)
def _process_image_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return types.Part.from_uri(
file_uri=block.media.source,
mime_type=f"image/{block.media.extension}",
) # type: ignore
case "base64":
return {
"inline_data": {
"mime_type": f"image/{block.media.extension}",
"data": block.media.source,
}
}
case "path":
with open(block.media.source, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
return {
"inline_data": {
"mime_type": f"image/{block.media.extension}",
"data": base64_image,
}
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type} for image, only url, base64, path are supported"
)
def _text_to_message(self, text: str, role: ROLE) -> dict:
return {"role": role.google_role, "parts": [{"text": text}]}
```
## /datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-google"
version = "0.0.5"
description = "Google (Gemini) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"google-genai>=1.3.0,<2.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py"
import pytest
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, StructuredBlock, TextBlock
from datapizza.clients.google.memory_adapter import GoogleMemoryAdapter
def test_google_memory_to_messages_structured_block():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
messages = GoogleMemoryAdapter().memory_to_messages(memory)
# Google adapter may serialize as string or dict in "parts"
assert "key" in str(messages[0]["parts"][0])
def test_google_memory_to_messages_with_system_prompt():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Hello!"))
system_prompt = "You are a helpful assistant."
messages = GoogleMemoryAdapter().memory_to_messages(
memory, system_prompt=system_prompt
)
assert messages[0]["role"] == "model"
assert system_prompt in str(messages[0]["parts"])
assert messages[1]["role"] == "user"
def test_google_memory_to_messages_with_input_str():
memory = Memory()
input_str = "What is the weather?"
messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_str)
assert messages[-1]["role"] == "user"
assert input_str in str(messages[-1]["parts"])
def test_google_memory_to_messages_with_input_block():
memory = Memory()
input_block = TextBlock(content="This is a block input.")
messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_block)
assert messages[-1]["role"] == "user"
assert "block input" in str(messages[-1]["parts"])
def test_google_memory_to_messages_unsupported_input():
memory = Memory()
class Dummy:
pass
with pytest.raises(ValueError):
GoogleMemoryAdapter().memory_to_messages(memory, input=Dummy())
```
## /datapizza-ai-clients/datapizza-ai-clients-mistral/README.md
## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py"
from datapizza.clients.mistral.mistral_client import MistralClient
__all__ = ["MistralClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py"
import base64
import json
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
class MistralMemoryAdapter(MemoryAdapter):
def _turn_to_message(self, turn: Turn) -> dict:
content = []
tool_calls = []
tool_call_id = None
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {"type": "text", "text": block.content}
case FunctionCallBlock():
tool_calls.append(
{
"id": block.id,
"function": {
"name": block.name,
"arguments": json.dumps(block.arguments),
},
"type": "function",
}
)
case FunctionCallResultBlock():
tool_call_id = block.id
block_dict = {"type": "text", "text": block.result}
case StructuredBlock():
block_dict = {"type": "text", "text": str(block.content)}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
# case "pdf":
# block_dict = self._process_pdf_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}, only image are supported"
)
if block_dict:
content.append(block_dict)
messages: dict = {
"role": turn.role.value,
}
if content:
messages["content"] = content
if tool_calls:
messages["tool_calls"] = tool_calls
if tool_call_id:
messages["tool_call_id"] = tool_call_id
return messages
def _text_to_message(self, text: str, role: ROLE) -> dict:
return {"role": role.value, "content": text}
def _process_image_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return {
"type": "image_url",
"image_url": {"url": block.media.source},
}
case "base64":
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{block.media.extension};base64,{block.media.source}"
},
}
case "path":
with open(block.media.source, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{block.media.extension};base64,{base64_image}"
},
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type}, only url, base64, path are supported"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py"
import json
import logging
import os
from collections.abc import AsyncIterator, Iterator
from typing import Literal
import requests
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
FunctionCallBlock,
Media,
MediaBlock,
Model,
StructuredBlock,
TextBlock,
)
from mistralai import Mistral
from mistralai.models.ocrresponse import OCRResponse
from .memory_adapter import MistralMemoryAdapter
log = logging.getLogger(__name__)
class MistralClient(Client):
"""A client for interacting with the Mistral API.
This class provides methods for invoking the Mistral API to generate responses
based on given input data. It extends the Client class.
"""
def __init__(
self,
api_key: str,
model: str = "mistral-large-latest",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
):
"""
Args:
api_key: The API key for the Mistral API.
model: The model to use for the Mistral API.
system_prompt: The system prompt to use for the Mistral API.
temperature: The temperature to use for the Mistral API.
cache: The cache to use for the Mistral API.
"""
if temperature and not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.api_key = api_key
self.memory_adapter = MistralMemoryAdapter()
self._set_client()
def _set_client(self):
self.client = Mistral(api_key=self.api_key)
def _response_to_client_response(
self, response, tool_map: dict[str, Tool] | None = None
) -> ClientResponse:
blocks = []
for choice in response.choices:
if choice.message.content:
blocks.append(TextBlock(content=choice.message.content))
if choice.message.tool_calls:
for tool_call in choice.message.tool_calls:
tool = tool_map.get(tool_call.function.name) if tool_map else None
if tool is None:
raise ValueError(f"Tool {tool_call.function.name} not found")
blocks.append(
FunctionCallBlock(
id=tool_call.id,
name=tool_call.function.name,
arguments=json.loads(tool_call.function.arguments),
tool=tool,
)
)
# Handle media content if present
if hasattr(choice.message, "media") and choice.message.media:
for media_item in choice.message.media:
media = Media(
media_type=media_item.type,
source_type="url" if media_item.source_url else "base64",
source=media_item.source_url or media_item.data,
detail=getattr(media_item, "detail", "high"),
)
blocks.append(MediaBlock(media=media))
log.debug(f"{self.__class__.__name__} response = {response}")
return ClientResponse(
content=blocks,
stop_reason=response.choices[0].finish_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=0,
),
)
def _convert_tools(self, tools: Tool) -> dict:
"""Convert tools to Mistral function format"""
return {"type": "function", "function": tools.schema}
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> dict | Literal["auto", "required", "none"]:
if isinstance(tool_choice, list) and len(tool_choice) > 1:
raise NotImplementedError(
"multiple function names is not supported by Mistral"
)
elif isinstance(tool_choice, list):
return {
"type": "function",
"function": {"name": tool_choice[0]},
}
else:
return tool_choice
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
log.debug(f"{self.__class__.__name__} input = {input}")
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
request_params = {
"model": self.model_name,
"messages": messages,
"stream": False,
"max_tokens": max_tokens,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if tools:
request_params["tools"] = [self._convert_tools(tool) for tool in tools]
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = self.client.chat.complete(**request_params)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
log.debug(f"{self.__class__.__name__} input = {input}")
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
request_params = {
"model": self.model_name,
"messages": messages,
"stream": False,
"max_tokens": max_tokens,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if tools:
request_params["tools"] = [self._convert_tools(tool) for tool in tools]
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = await self.client.chat.complete_async(**request_params)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
request_params = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if tools:
request_params["tools"] = [self._convert_tools(tool) for tool in tools]
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = self.client.chat.stream(**request_params)
text = ""
usage = TokenUsage()
stop_reason = None
for chunk in response:
usage += TokenUsage(
prompt_tokens=chunk.data.usage.prompt_tokens
if chunk.data.usage
else 0 or 0,
completion_tokens=chunk.data.usage.completion_tokens
if chunk.data.usage
else 0 or 0,
cached_tokens=0,
)
stop_reason = chunk.data.choices[0].finish_reason
delta = chunk.data.choices[0].delta.content or ""
text += delta
yield ClientResponse(
content=[],
delta=str(delta),
stop_reason=stop_reason,
)
yield ClientResponse(
content=[TextBlock(content=text)],
stop_reason=stop_reason,
usage=usage,
)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
system_prompt: str | None = None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
request_params = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens or 1024,
**kwargs,
}
if temperature:
request_params["temperature"] = temperature
if tools:
request_params["tools"] = [self._convert_tools(tool) for tool in tools]
request_params["tool_choice"] = self._convert_tool_choice(tool_choice)
response = await self.client.chat.stream_async(**request_params)
text = ""
usage = TokenUsage()
stop_reason = None
async for chunk in response:
usage += TokenUsage(
prompt_tokens=chunk.data.usage.prompt_tokens
if chunk.data.usage
else 0 or 0,
completion_tokens=chunk.data.usage.completion_tokens
if chunk.data.usage
else 0 or 0,
cached_tokens=0,
)
stop_reason = chunk.data.choices[0].finish_reason
delta = chunk.data.choices[0].delta.content or ""
text += delta
yield ClientResponse(
content=[],
delta=str(delta),
stop_reason=stop_reason,
)
yield ClientResponse(
content=[TextBlock(content=text)],
stop_reason=stop_reason,
usage=usage,
)
def _structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
# Add system message to enforce JSON output
messages = self._memory_to_contents(system_prompt, input, memory)
if not tools:
tools = []
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
response = self.client.chat.parse(
model=self.model_name,
messages=messages,
response_format=output_cls,
temperature=temperature,
max_tokens=max_tokens,
**kwargs,
)
if not response.choices:
raise ValueError("No response from Mistral")
log.debug(f"{self.__class__.__name__} structured response: {response}")
stop_reason = response.choices[0].finish_reason if response.choices else None
if hasattr(output_cls, "model_validate_json"):
structured_data = output_cls.model_validate_json(
str(response.choices[0].message.content) # type: ignore
)
else:
structured_data = json.loads(str(response.choices[0].message.content)) # type: ignore
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=0,
),
)
async def _a_structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
# Add system message to enforce JSON output
messages = self._memory_to_contents(system_prompt, input, memory)
if not tools:
tools = []
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
response = await self.client.chat.parse_async(
model=self.model_name,
messages=messages,
response_format=output_cls,
temperature=temperature,
max_tokens=max_tokens,
**kwargs,
)
if not response.choices:
raise ValueError("No response from Mistral")
log.debug(f"{self.__class__.__name__} structured response: {response}")
stop_reason = response.choices[0].finish_reason if response.choices else None
if hasattr(output_cls, "model_validate_json"):
structured_data = output_cls.model_validate_json(
str(response.choices[0].message.content) # type: ignore
)
else:
structured_data = json.loads(str(response.choices[0].message.content)) # type: ignore
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=0,
),
)
def _embed(
self, text: str | list[str], model_name: str | None, **kwargs
) -> list[float] | list[list[float]]:
"""Embed a text using the model"""
response = self.client.embeddings.create(
inputs=text, model=model_name or self.model_name, **kwargs
)
embeddings = [item.embedding for item in response.data]
if not embeddings:
return []
if isinstance(text, str) and embeddings[0]:
return embeddings[0]
return embeddings
async def _a_embed(
self, text: str | list[str], model_name: str | None, **kwargs
) -> list[float] | list[list[float]]:
"""Embed a text using the model"""
response = await self.client.embeddings.create_async(
inputs=text, model=model_name or self.model_name, **kwargs
)
embeddings = [item.embedding for item in response.data]
if not embeddings:
return []
if isinstance(text, str) and embeddings[0]:
return embeddings[0]
return embeddings or []
def parse_document(
self,
document_path: str,
autodelete: bool = True,
include_image_base64: bool = True,
) -> OCRResponse:
filename = os.path.basename(document_path)
with open(document_path, "rb") as f:
uploaded_pdf = self.client.files.upload(
file={"file_name": filename, "content": f}, purpose="ocr"
)
signed_url = self.client.files.get_signed_url(file_id=uploaded_pdf.id)
response = self.client.ocr.process(
model="mistral-ocr-latest",
document={
"type": "document_url",
"document_url": signed_url.url,
},
include_image_base64=include_image_base64,
)
if autodelete:
url = f"https://api.mistral.ai/v1/files/{uploaded_pdf.id}"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
requests.delete(url, headers=headers, timeout=30)
return response
```
## /datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-mistral"
version = "0.0.5"
description = "Mistral AI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"mistralai>=1.2.0,<2.0.0",
"requests>=2.25.0,<3.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py"
from datapizza.clients.mistral import MistralClient
def test_init_mistral_client():
client = MistralClient(api_key="test")
assert client is not None
```
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/README.md
# DataPizza AI - OpenAI-Like Client
A versatile client for DataPizza AI that supports OpenAI-compatible APIs, including local models through Ollama, Together AI, and other OpenAI-compatible services.
## Installation
```bash
pip install datapizza-ai-clients-openai-like
```
## Quick Start
### With Ollama (Local Models)
```python
from datapizza.clients.openai_like import OpenAILikeClient
# Create client for Ollama
client = OpenAILikeClient(
api_key="", # Ollama doesn't require an API key
model="gemma2:2b",
system_prompt="You are a helpful assistant.",
base_url="http://localhost:11434/v1",
)
response = client.invoke("What is the capital of France?")
print(response.content)
```
### With Together AI
```python
import os
from datapizza.clients.openai_like import OpenAILikeClient
client = OpenAILikeClient(
api_key=os.getenv("TOGETHER_API_KEY"),
model="meta-llama/Llama-2-7b-chat-hf",
system_prompt="You are a helpful assistant.",
base_url="https://api.together.xyz/v1",
)
response = client.invoke("Explain quantum computing")
print(response.content)
```
### With OpenRouter
```python
import os
from datapizza.clients.openai_like import OpenAILikeClient
client = OpenAILikeClient(
api_key=os.getenv("OPENROUTER_API_KEY"),
model="google/gemma-7b-it",
system_prompt="You are a helpful assistant.",
base_url="https://openrouter.ai/api/v1",
)
response = client.invoke("What is OpenRouter?")
print(response.content)
```
### With Other OpenAI-Compatible Services
```python
import os
from datapizza.clients.openai_like import OpenAILikeClient
client = OpenAILikeClient(
api_key=os.getenv("YOUR_API_KEY"),
model="your-model-name",
system_prompt="You are a helpful assistant.",
base_url="https://your-service-url/v1",
)
response = client.invoke("Your question here")
print(response.content)
```
## Features
- **OpenAI-Compatible**: Works with any service that implements the OpenAI API standard
- **Local Models**: Perfect for running with Ollama for privacy and cost control
- **Memory Support**: Built-in memory adapter for conversation history
- **Streaming**: Support for real-time streaming responses
- **Structured Outputs**: Generate structured data with Pydantic models
- **Tool Calling**: Function calling capabilities where supported
## Supported Services
- **Ollama** - Local model inference
- **Together AI** - Cloud-based model hosting
- **OpenRouter** - Access a variety of models through a single API
- **Perplexity AI** - Search-augmented models
- **Groq** - Fast inference API
- **Any OpenAI-compatible API**
## Advanced Usage
### With Memory
```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.memory import Memory
client = OpenAILikeClient(
api_key="",
model="llama3.1:8b",
base_url="http://localhost:11434/v1",
)
memory = Memory(client=client)
memory.add("I'm working on a Python project about machine learning.")
response = memory.query("What libraries should I use?")
```
### Streaming Responses
```python
client = OpenAILikeClient(
api_key="",
model="gemma2:7b",
base_url="http://localhost:11434/v1",
)
for chunk in client.stream("Tell me a story about AI"):
print(chunk.content, end="", flush=True)
```
### Structured Outputs
```python
from pydantic import BaseModel
from datapizza.clients.openai_like import OpenAILikeClient
class Person(BaseModel):
name: str
age: int
occupation: str
client = OpenAILikeClient(
api_key="",
model="llama3.1:8b",
base_url="http://localhost:11434/v1",
)
response = client.invoke(
"Generate a person profile",
response_format=Person
)
print(response.parsed) # Person object
```
## Configuration Options
| Parameter | Description | Default |
|-----------|-------------|---------|
| `api_key` | API key for the service | Required (empty string for Ollama) |
| `model` | Model name to use | Required |
| `base_url` | Base URL for the API | Required |
| `system_prompt` | System message for the model | None |
| `temperature` | Sampling temperature (0-2) | 0.7 |
| `max_tokens` | Maximum tokens in response | None |
| `timeout` | Request timeout in seconds | 30 |
## Ollama Setup
1. Install Ollama from [ollama.ai](https://ollama.ai)
2. Pull a model: `ollama pull gemma2:2b`
3. Start Ollama: `ollama serve`
4. Use with DataPizza AI as shown in the examples above
## Popular Ollama Models
- `gemma2:2b` - Lightweight, fast responses
- `gemma2:7b` - Balanced performance
- `llama3.1:8b` - High quality, more resource intensive
- `codellama:7b` - Specialized for coding tasks
- `mistral:7b` - Good general purpose model
## Error Handling
```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.core.clients.exceptions import ClientError
try:
client = OpenAILikeClient(
api_key="",
model="nonexistent-model",
base_url="http://localhost:11434/v1",
)
response = client.invoke("Hello")
except ClientError as e:
print(f"Client error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
```
## Contributing
Contributions are welcome! Please see our [Contributing Guide](../../CONTRIBUTING.md) for details.
## License
This project is licensed under the MIT License - see the [LICENSE](../../LICENSE) file for details.
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py"
from .openai_completion_client import OpenAILikeClient
__all__ = ["OpenAILikeClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py"
import base64
import json
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
class OpenAILikeMemoryAdapter(MemoryAdapter):
def _turn_to_message(self, turn: Turn) -> dict:
content = []
tool_calls = []
tool_call_id = None
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {"type": "text", "text": block.content}
case FunctionCallBlock():
tool_calls.append(
{
"id": block.id,
"function": {
"name": block.name,
"arguments": json.dumps(block.arguments),
},
"type": "function",
}
)
case FunctionCallResultBlock():
tool_call_id = block.id
block_dict = {"type": "text", "text": block.result}
case StructuredBlock():
block_dict = {"type": "text", "text": str(block.content)}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
case "pdf":
block_dict = self._process_pdf_block(block)
case "audio":
block_dict = self._process_audio_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
if block_dict:
content.append(block_dict)
messages = {
"role": turn.role.value,
"content": (content),
}
if tool_calls:
messages["tool_calls"] = tool_calls
if tool_call_id:
messages["tool_call_id"] = tool_call_id
return messages
def _process_audio_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "path":
with open(block.media.source, "rb") as f:
base64_audio = base64.b64encode(f.read()).decode("utf-8")
return {
"type": "input_audio",
"input_audio": {
"data": base64_audio,
"format": block.media.extension,
},
}
case "base_64":
return {
"type": "input_audio",
"input_audio": {
"data": block.media.source,
"format": block.media.extension,
},
}
case "raw":
base64_audio = base64.b64encode(block.media.source).decode("utf-8")
return {
"type": "input_audio",
"input_audio": {
"data": base64_audio,
"format": block.media.extension,
},
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
)
def _text_to_message(self, text: str, role: ROLE) -> dict:
return {"role": role.value, "content": text}
def _process_pdf_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "base64":
return {
"type": "file",
"file": {
"filename": "file.pdf",
"file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
},
}
case "path":
with open(block.media.source, "rb") as f:
base64_pdf = base64.b64encode(f.read()).decode("utf-8")
return {
"type": "file",
"file": {
"filename": "file.pdf",
"file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
},
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type}"
)
def _process_image_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return {
"type": "image_url",
"image_url": {"url": block.media.source},
}
case "base64":
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{block.media.extension};base64,{block.media.source}"
},
}
case "path":
with open(block.media.source, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
return {
"type": "image_url",
"image_url": {
"url": f"data:image/{block.media.extension};base64,{base64_image}"
},
}
case _:
raise ValueError(
f"Unsupported media source type: {block.media.source_type}"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py"
import json
from collections.abc import AsyncIterator, Iterator
from typing import Literal
import httpx
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools.tools import Tool
from datapizza.type import (
FunctionCallBlock,
Media,
MediaBlock,
Model,
StructuredBlock,
TextBlock,
)
from openai import AsyncOpenAI, OpenAI
from .memory_adapter import OpenAILikeMemoryAdapter
class OpenAILikeClient(Client):
"""A client for interacting with the OpenAI API.
This class provides methods for invoking the OpenAI API to generate responses
based on given input data. It extends the Client class.
"""
def __init__(
self,
api_key: str,
model: str = "gpt-4o-mini",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
base_url: str | httpx.URL | None = None,
):
if temperature and not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.base_url = base_url
self.api_key = api_key
self.memory_adapter = OpenAILikeMemoryAdapter()
self._set_client()
def _set_client(self):
if not self.client:
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
def _set_a_client(self):
if not self.a_client:
self.a_client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)
def _response_to_client_response(
self, response, tool_map: dict[str, Tool] | None
) -> ClientResponse:
blocks = []
for choice in response.choices:
if choice.message.content:
blocks.append(TextBlock(content=choice.message.content))
if choice.message.tool_calls and tool_map:
for tool_call in choice.message.tool_calls:
tool = tool_map.get(tool_call.function.name)
if not tool:
raise ValueError(f"Tool {tool_call.function.name} not found")
blocks.append(
FunctionCallBlock(
id=tool_call.id,
name=tool_call.function.name,
arguments=json.loads(tool_call.function.arguments),
tool=tool,
)
)
# Handle media content if present
if hasattr(choice.message, "media") and choice.message.media:
for media_item in choice.message.media:
media = Media(
media_type=media_item.type,
source_type="url" if media_item.source_url else "base64",
source=media_item.source_url or media_item.data,
detail=getattr(media_item, "detail", "high"),
)
blocks.append(MediaBlock(media=media))
return ClientResponse(
content=blocks,
stop_reason=response.choices[0].finish_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=response.usage.prompt_tokens_details.cached_tokens
if response.usage.prompt_tokens_details
else 0 or 0,
),
)
def _convert_tools(self, tools: Tool) -> dict:
"""Convert tools to OpenAI function format"""
return {"type": "function", "function": tools.schema}
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> dict | Literal["auto", "required", "none"]:
if isinstance(tool_choice, list) and len(tool_choice) > 1:
raise NotImplementedError(
"multiple function names is not supported by OpenAI"
)
elif isinstance(tool_choice, list):
return {
"type": "function",
"function": {"name": tool_choice[0]},
}
else:
return tool_choice
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
"model": self.model_name,
"messages": messages,
"stream": False,
"max_completion_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
client: OpenAI = self._get_client()
response = client.chat.completions.create(**kwargs)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
"model": self.model_name,
"messages": messages,
"stream": False,
"max_completion_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
a_client = self._get_a_client()
response = await a_client.chat.completions.create(**kwargs)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
kwargs = {
"model": self.model_name,
"messages": messages,
"stream": True,
"max_completion_tokens": max_tokens,
"stream_options": {"include_usage": True},
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
response = self.client.chat.completions.create(**kwargs)
message_content = ""
usage = TokenUsage()
finish_reason = None
for chunk in response:
usage += TokenUsage(
prompt_tokens=chunk.usage.prompt_tokens if chunk.usage else 0 or 0,
completion_tokens=chunk.usage.completion_tokens
if chunk.usage
else 0 or 0,
cached_tokens=getattr(
getattr(chunk.usage, "prompt_tokens_details", None),
"cached_tokens",
0,
)
or 0,
)
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
delta_content = delta.content if delta and delta.content else ""
message_content = message_content + delta_content
yield ClientResponse(
content=[TextBlock(content=message_content)],
delta=delta_content,
stop_reason=finish_reason or None,
)
yield ClientResponse(
content=[TextBlock(content=message_content)],
stop_reason=finish_reason or None,
usage=usage,
)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
system_prompt: str | None = None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
kwargs = {
"model": self.model_name,
"messages": messages,
"stream": True,
"max_completion_tokens": max_tokens,
"stream_options": {"include_usage": True},
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
a_client = self._get_a_client()
message_content = ""
usage = TokenUsage()
finish_reason = None
async for chunk in await a_client.chat.completions.create(**kwargs):
usage += TokenUsage(
prompt_tokens=chunk.usage.prompt_tokens if chunk.usage else 0 or 0,
completion_tokens=chunk.usage.completion_tokens
if chunk.usage
else 0 or 0,
cached_tokens=getattr(
getattr(chunk.usage, "prompt_tokens_details", {}),
"cached_tokens",
0,
)
or 0,
)
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
delta_content = delta.content if delta and delta.content else ""
message_content = message_content + delta_content
yield ClientResponse(
content=[TextBlock(content=message_content)],
delta=delta_content,
stop_reason=finish_reason or None,
)
yield ClientResponse(
content=[TextBlock(content=message_content)],
stop_reason=finish_reason or None,
usage=usage,
)
def _structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
# Add system message to enforce JSON output
messages = self._memory_to_contents(system_prompt, input, memory)
kwargs = {
"model": self.model_name,
"messages": messages,
"response_format": output_cls,
"max_completion_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
# Structured response needs strict mode and no additional properties
for tool in kwargs["tools"]:
tool["function"]["strict"] = True
tool["function"]["parameters"]["additionalProperties"] = False
response = self.client.beta.chat.completions.parse(**kwargs)
stop_reason = response.choices[0].finish_reason
if not response.choices[0].message.content:
raise ValueError("No content in response")
if hasattr(output_cls, "model_validate_json"):
structured_data = output_cls.model_validate_json(
response.choices[0].message.content
)
else:
structured_data = json.loads(response.choices[0].message.content)
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=response.usage.prompt_tokens_details.cached_tokens
if response.usage.prompt_tokens_details
else 0 or 0,
),
)
async def _a_structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float,
max_tokens: int,
system_prompt: str | None = None,
tools: list[Tool] | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
):
messages = self._memory_to_contents(system_prompt, input, memory)
kwargs = {
"model": self.model_name,
"messages": messages,
"response_format": output_cls,
"max_completion_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
# Structured response needs strict mode and no additional properties
for tool in kwargs["tools"]:
tool["function"]["strict"] = True
tool["function"]["parameters"]["additionalProperties"] = False
a_client = self._get_a_client()
response = await a_client.beta.chat.completions.parse(**kwargs)
stop_reason = response.choices[0].finish_reason
if hasattr(output_cls, "model_validate_json"):
structured_data = output_cls.model_validate_json(
response.choices[0].message.content
)
else:
structured_data = json.loads(response.choices[0].message.content)
return ClientResponse(
content=[StructuredBlock(content=structured_data)],
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=response.usage.prompt_tokens or 0,
completion_tokens=response.usage.completion_tokens or 0,
cached_tokens=response.usage.prompt_tokens_details.cached_tokens
if response.usage.prompt_tokens_details
else 0 or 0,
),
)
```
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-openai-like"
version = "0.0.8"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"httpx>=0.28.1",
"openai>=2.0.0,<3.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
# "E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py"
from datapizza.clients.openai_like import OpenAILikeClient
def test_init():
client = OpenAILikeClient(
api_key="test_api_key",
model="gpt-4o-mini",
system_prompt="You are a helpful assistant that can answer questions about piadina only in italian.",
)
assert client is not None
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/README.md
## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py"
from .openai_client import OpenAIClient
__all__ = ["OpenAIClient"]
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py"
import base64
import json
from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
ROLE,
FunctionCallBlock,
FunctionCallResultBlock,
MediaBlock,
StructuredBlock,
TextBlock,
)
from openai.types.responses import ResponseFunctionToolCall
class OpenAIMemoryAdapter(MemoryAdapter):
def _turn_to_message(self, turn: Turn) -> dict:
content = []
tool_calls = []
tool_call_id = None
response_function_tool_call = None
for block in turn:
block_dict = {}
match block:
case TextBlock():
block_dict = {
"type": "input_text"
if turn.role == ROLE.USER
else "output_text",
"text": block.content,
}
case FunctionCallBlock():
return ResponseFunctionToolCall(
call_id=block.id,
name=block.name,
arguments=json.dumps(block.arguments),
type="function_call",
# id="fc_" + block.id,
status="completed",
)
# block_dict = {
# "id": block.id,
# "name": block.name,
# "arguments": json.dumps(block.arguments),
# "type": "function_call",
# }
case FunctionCallResultBlock():
tool_call_id = block.id
return {
"type": "function_call_output",
"call_id": block.id,
"output": block.result,
}
case StructuredBlock():
block_dict = {"type": "text", "text": str(block.content)}
case MediaBlock():
match block.media.media_type:
case "image":
block_dict = self._process_image_block(block)
case "pdf":
block_dict = self._process_pdf_block(block)
case "audio":
block_dict = self._process_audio_block(block)
case _:
raise NotImplementedError(
f"Unsupported media type: {block.media.media_type}"
)
if block_dict:
content.append(block_dict)
messages = {
"role": turn.role.value,
"content": (content),
}
if tool_calls:
messages["tool_calls"] = tool_calls
if tool_call_id:
messages["tool_call_id"] = tool_call_id
if response_function_tool_call:
return response_function_tool_call
return messages
def _process_audio_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "path":
with open(block.media.source, "rb") as f:
base64_audio = base64.b64encode(f.read()).decode("utf-8")
return {
"type": "input_audio",
"input_audio": {
"data": base64_audio,
"format": block.media.extension,
},
}
case "base_64":
return {
"type": "input_audio",
"input_audio": {
"data": block.media.source,
"format": block.media.extension,
},
}
case "raw":
base64_audio = base64.b64encode(block.media.source).decode("utf-8")
return {
"type": "input_audio",
"input_audio": {
"data": base64_audio,
"format": block.media.extension,
},
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
)
def _text_to_message(self, text: str, role: ROLE) -> dict:
return {"role": role.value, "content": text}
def _process_pdf_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "base64":
return {
"type": "input_file",
"filename": "file.pdf",
"file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
}
case "path":
with open(block.media.source, "rb") as f:
base64_pdf = base64.b64encode(f.read()).decode("utf-8")
return {
"type": "input_file",
"filename": "file.pdf",
"file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
}
case _:
raise NotImplementedError(
f"Unsupported media source type: {block.media.source_type}"
)
def _process_image_block(self, block: MediaBlock) -> dict:
match block.media.source_type:
case "url":
return {
"type": "input_image",
"image_url": block.media.source,
}
case "base64":
return {
"type": "input_image",
"image_url": f"data:image/{block.media.extension};base64,{block.media.source}",
}
case "path":
with open(block.media.source, "rb") as image_file:
base64_image = base64.b64encode(image_file.read()).decode("utf-8")
return {
"type": "input_image",
"image_url": f"data:image/{block.media.extension};base64,{base64_image}",
}
case _:
raise ValueError(
f"Unsupported media source type: {block.media.source_type}"
)
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/openai_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/openai_client.py"
import json
from collections.abc import AsyncIterator, Iterator
from typing import Literal
import httpx
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
FunctionCallBlock,
Model,
StructuredBlock,
TextBlock,
ThoughtBlock,
)
from openai import (
AsyncOpenAI,
AzureOpenAI,
OpenAI,
)
from openai.types.responses import (
ParsedResponseOutputMessage,
ResponseCompletedEvent,
ResponseFunctionToolCall,
ResponseOutputMessage,
ResponseReasoningItem,
ResponseTextDeltaEvent,
)
from .memory_adapter import OpenAIMemoryAdapter
class OpenAIClient(Client):
"""A client for interacting with the OpenAI API.
This class provides methods for invoking the OpenAI API to generate responses
based on given input data. It extends the Client class.
"""
def __init__(
self,
api_key: str,
model: str = "gpt-4o-mini",
system_prompt: str = "",
temperature: float | None = None,
cache: Cache | None = None,
base_url: str | httpx.URL | None = None,
organization: str | None = None,
project: str | None = None,
webhook_secret: str | None = None,
websocket_base_url: str | httpx.URL | None = None,
timeout: float | httpx.Timeout | None = None,
max_retries: int = 2,
default_headers: dict[str, str] | None = None,
default_query: dict[str, object] | None = None,
http_client: httpx.Client | None = None,
):
"""
Args:
api_key: The API key for the OpenAI API.
model: The model to use for the OpenAI API.
system_prompt: The system prompt to use for the OpenAI API.
temperature: The temperature to use for the OpenAI API.
cache: The cache to use for the OpenAI API.
base_url: The base URL for the OpenAI API.
organization: The organization ID for the OpenAI API.
project: The project ID for the OpenAI API.
webhook_secret: The webhook secret for the OpenAI API.
websocket_base_url: The websocket base URL for the OpenAI API.
timeout: The timeout for the OpenAI API.
max_retries: The max retries for the OpenAI API.
default_headers: The default headers for the OpenAI API.
default_query: The default query for the OpenAI API.
http_client: The http_client for the OpenAI API.
"""
if temperature and not 0 <= temperature <= 2:
raise ValueError("Temperature must be between 0 and 2")
super().__init__(
model_name=model,
system_prompt=system_prompt,
temperature=temperature,
cache=cache,
)
self.api_key = api_key
self.base_url = base_url
self.organization = organization
self.project = project
self.webhook_secret = webhook_secret
self.websocket_base_url = websocket_base_url
self.timeout = timeout
self.max_retries = max_retries
self.default_headers = default_headers
self.default_query = default_query
self.http_client = http_client
self.memory_adapter = OpenAIMemoryAdapter()
self._set_client()
def _set_client(self):
if not self.client:
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url,
organization=self.organization,
project=self.project,
webhook_secret=self.webhook_secret,
websocket_base_url=self.websocket_base_url,
timeout=self.timeout,
max_retries=self.max_retries,
default_headers=self.default_headers,
default_query=self.default_query,
http_client=self.http_client,
)
def _set_a_client(self):
if not self.a_client:
self.a_client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
organization=self.organization,
project=self.project,
webhook_secret=self.webhook_secret,
websocket_base_url=self.websocket_base_url,
timeout=self.timeout,
max_retries=self.max_retries,
default_headers=self.default_headers,
default_query=self.default_query,
)
def _response_to_client_response(
self, response, tool_map: dict[str, Tool] | None
) -> ClientResponse:
blocks = []
# Handle new response format with direct content array
if hasattr(response, "output_parsed"):
blocks.append(StructuredBlock(content=response.output_parsed))
if hasattr(response, "output") and response.output:
for content_item in response.output:
if isinstance(content_item, ResponseOutputMessage) and not isinstance(
content_item, ParsedResponseOutputMessage
):
for content in content_item.content:
if content.type == "output_text":
blocks.append(TextBlock(content=content.text))
elif isinstance(content_item, ResponseReasoningItem):
if content_item.summary:
blocks.append(
ThoughtBlock(content=content_item.summary[0].text)
)
elif isinstance(content_item, ResponseFunctionToolCall):
if not tool_map:
raise ValueError("Tool map is required")
tool = tool_map.get(content_item.name)
if not tool:
raise ValueError(f"Tool {content_item.name} not found")
blocks.append(
FunctionCallBlock(
id=content_item.call_id,
name=content_item.name,
arguments=json.loads(content_item.arguments)
if isinstance(content_item.arguments, str)
else content_item.arguments,
tool=tool,
)
)
# Handle usage from new format
usage = getattr(response, "usage", None)
if usage:
prompt_tokens = getattr(usage, "input_tokens", 0)
completion_tokens = getattr(usage, "output_tokens", 0)
cached_tokens = 0
# Handle input_tokens_details for cached tokens
if hasattr(usage, "input_tokens_details") and usage.input_tokens_details:
cached_tokens = getattr(usage.input_tokens_details, "cached_tokens", 0)
# Handle stop reason - use status from new format
stop_reason = getattr(response, "status", None)
if not stop_reason and hasattr(response, "choices") and response.choices:
stop_reason = response.choices[0].finish_reason
return ClientResponse(
content=blocks,
stop_reason=stop_reason,
usage=TokenUsage(
prompt_tokens=prompt_tokens or 0,
completion_tokens=completion_tokens or 0,
cached_tokens=cached_tokens or 0,
),
)
def _convert_tools(self, tool: Tool) -> dict:
"""Convert tools to OpenAI function format"""
return {
"type": "function",
"name": tool.name,
"description": tool.description,
"parameters": {
"type": "object",
"properties": tool.properties,
"required": tool.required,
},
}
def _convert_tool_choice(
self, tool_choice: Literal["auto", "required", "none"] | list[str]
) -> dict | Literal["auto", "required", "none"]:
if isinstance(tool_choice, list) and len(tool_choice) > 1:
raise NotImplementedError(
"multiple function names is not supported by OpenAI"
)
elif isinstance(tool_choice, list):
return {
"type": "function",
"name": tool_choice[0],
}
else:
return tool_choice
def _invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
**kwargs,
"model": self.model_name,
"input": messages,
"stream": False,
"max_output_tokens": max_tokens,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
client: OpenAI = self._get_client()
response = client.responses.create(**kwargs)
return self._response_to_client_response(response, tool_map)
async def _a_invoke(
self,
*,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> ClientResponse:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
**kwargs,
"model": self.model_name,
"input": messages,
"stream": False,
"max_output_tokens": max_tokens,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
a_client = self._get_a_client()
response = await a_client.responses.create(**kwargs)
return self._response_to_client_response(response, tool_map)
def _stream_invoke(
self,
input: str,
tools: list[Tool] | None,
memory: Memory | None,
tool_choice: Literal["auto", "required", "none"] | list[str],
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
**kwargs,
) -> Iterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
**kwargs,
"model": self.model_name,
"input": messages,
"stream": True,
"max_output_tokens": max_tokens,
# "stream_options": {"include_usage": True},
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
response = self.client.responses.create(**kwargs)
for chunk in response:
if isinstance(chunk, ResponseTextDeltaEvent):
yield ClientResponse(
content=[],
delta=chunk.delta,
stop_reason=None,
usage=TokenUsage(
prompt_tokens=0,
completion_tokens=0,
cached_tokens=0,
),
)
if isinstance(chunk, ResponseCompletedEvent):
yield self._response_to_client_response(chunk.response, tool_map)
async def _a_stream_invoke(
self,
input: str,
tools: list[Tool] | None = None,
memory: Memory | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
temperature: float | None = None,
max_tokens: int | None = None,
system_prompt: str | None = None,
**kwargs,
) -> AsyncIterator[ClientResponse]:
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
**kwargs,
"model": self.model_name,
"input": messages,
"stream": True,
"max_output_tokens": max_tokens,
# "stream_options": {"include_usage": True},
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
a_client = self._get_a_client()
async for chunk in await a_client.responses.create(**kwargs):
if isinstance(chunk, ResponseTextDeltaEvent):
yield ClientResponse(
content=[],
delta=chunk.delta,
stop_reason=None,
usage=TokenUsage(
prompt_tokens=0,
completion_tokens=0,
cached_tokens=0,
),
)
if isinstance(chunk, ResponseCompletedEvent):
yield self._response_to_client_response(chunk.response, tool_map)
def _structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float | None,
max_tokens: int,
system_prompt: str | None,
tools: list[Tool] | None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
) -> ClientResponse:
# Add system message to enforce JSON output
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
"model": self.model_name,
"input": messages,
"text_format": output_cls,
"max_output_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
# Structured response needs strict mode and no additional properties
for tool in kwargs["tools"]:
tool["strict"] = True
tool["parameters"]["additionalProperties"] = False
response = self.client.responses.parse(**kwargs)
return self._response_to_client_response(response, tool_map)
async def _a_structured_response(
self,
input: str,
output_cls: type[Model],
memory: Memory | None,
temperature: float,
max_tokens: int,
system_prompt: str | None = None,
tools: list[Tool] | None = None,
tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
**kwargs,
):
if tools is None:
tools = []
messages = self._memory_to_contents(system_prompt, input, memory)
tool_map = {tool.name: tool for tool in tools}
kwargs = {
"model": self.model_name,
"input": messages,
"text_format": output_cls,
"max_output_tokens": max_tokens,
**kwargs,
}
if temperature:
kwargs["temperature"] = temperature
if tools:
kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
# Structured response needs strict mode and no additional properties
for tool in kwargs["tools"]:
tool["strict"] = True
tool["parameters"]["additionalProperties"] = False
a_client = self._get_a_client()
response = await a_client.responses.parse(**kwargs)
return self._response_to_client_response(response, tool_map)
def _embed(
self, text: str | list[str], model_name: str | None, **kwargs
) -> list[float] | list[list[float]]:
"""Embed a text using the model"""
response = self.client.embeddings.create(
input=text, model=model_name or self.model_name, **kwargs
)
embeddings = [item.embedding for item in response.data]
if isinstance(text, str):
return embeddings[0] if embeddings else []
return embeddings or []
async def _a_embed(
self, text: str | list[str], model_name: str | None, **kwargs
) -> list[float] | list[list[float]]:
"""Embed a text using the model"""
a_client = self._get_a_client()
response = await a_client.embeddings.create(
input=text, model=model_name or self.model_name, **kwargs
)
embeddings = [item.embedding for item in response.data]
if isinstance(text, str):
return embeddings[0] if embeddings else []
return embeddings or []
def _is_azure_client(self) -> bool:
return isinstance(self._get_client(), AzureOpenAI)
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml
```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml"
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
# Project metadata
[project]
name = "datapizza-ai-clients-openai"
version = "0.0.9"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10.0,<4"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"datapizza-ai-core>=0.0.7,<0.1.0",
"openai>=2,<3.0.0",
]
# Development dependencies
[dependency-groups]
dev = [
"deptry>=0.23.0",
"pytest",
"ruff>=0.11.5",
]
# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]
[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]
# Ruff configuration
[tool.ruff]
line-length = 88
[tool.ruff.lint]
select = [
# "E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"B", # flake8-bugbear
"I", # isort
"UP", # pyupgrade
"SIM", # flake8-simplify
"RUF", # Ruff-specific rules
"C4", # flake8-comprehensions
]
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py"
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py"
from unittest.mock import MagicMock, patch
import httpx
from datapizza.core.clients import ClientResponse
from datapizza.clients.openai import (
OpenAIClient,
)
def test_client_init():
client = OpenAIClient(
model="gpt-4o-mini",
api_key="test_api_key",
)
assert client is not None
@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_extra_args(mock_openai):
"""Tests that extra arguments are passed to the OpenAI client."""
OpenAIClient(
api_key="test_api_key",
organization="test-org",
project="test-project",
timeout=30.0,
max_retries=3,
)
mock_openai.assert_called_once_with(
api_key="test_api_key",
base_url=None,
organization="test-org",
project="test-project",
webhook_secret=None,
websocket_base_url=None,
timeout=30.0,
max_retries=3,
default_headers=None,
default_query=None,
http_client=None,
)
@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_http_client(mock_openai):
"""Tests that a custom http_client is passed to the OpenAI client."""
custom_http_client = httpx.Client()
OpenAIClient(
api_key="test_api_key",
http_client=custom_http_client,
)
mock_openai.assert_called_once_with(
api_key="test_api_key",
base_url=None,
organization=None,
project=None,
webhook_secret=None,
websocket_base_url=None,
timeout=None,
max_retries=2,
default_headers=None,
default_query=None,
http_client=custom_http_client,
)
@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_invoke_kwargs_override(mock_openai_class):
"""
Tests that kwargs like 'stream' are not overridden by user input
in non-streaming methods, but other kwargs are passed through.
"""
mock_openai_instance = mock_openai_class.return_value
mock_openai_instance.responses.create.return_value = MagicMock()
client = OpenAIClient(api_key="test")
client._response_to_client_response = MagicMock(
return_value=ClientResponse(content=[])
)
client.invoke("hello", stream=True, top_p=0.5)
mock_openai_instance.responses.create.assert_called_once()
called_kwargs = mock_openai_instance.responses.create.call_args.kwargs
assert called_kwargs.get("top_p") == 0.5
assert called_kwargs.get("stream") is False
@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_stream_invoke_kwargs_override(mock_openai_class):
"""
Tests that kwargs like 'stream' are not overridden by user input
in streaming methods.
"""
mock_openai_instance = mock_openai_class.return_value
mock_openai_instance.responses.create.return_value = []
client = OpenAIClient(api_key="test")
list(client.stream_invoke("hello", stream=False, top_p=0.5))
mock_openai_instance.responses.create.assert_called_once()
called_kwargs = mock_openai_instance.responses.create.call_args.kwargs
assert called_kwargs.get("top_p") == 0.5
assert called_kwargs.get("stream") is True
```
## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py
```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py"
import json
import pytest
from datapizza.memory.memory import Memory
from datapizza.tools.tools import tool
from datapizza.type.type import (
ROLE,
FunctionCallBlock,
Media,
MediaBlock,
StructuredBlock,
TextBlock,
)
from openai.types.responses import ResponseFunctionToolCall
from datapizza.clients.openai.memory_adapter import OpenAIMemoryAdapter
@pytest.fixture(
params=[
OpenAIMemoryAdapter(),
]
)
def adapter(request):
"""Parameterized fixture that provides different memory adapter implementations.
Each test using this fixture will run once for each adapter in the params list.
"""
return request.param
@pytest.fixture
def memory():
return Memory()
def test_empty_memory_to_messages(adapter, memory):
"""Test that an empty memory converts to an empty list of messages."""
messages = adapter.memory_to_messages(memory)
assert messages == []
def test_turn_with_some_text():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Hello!"))
memory.add_to_last_turn(TextBlock(content="Hi, how are u?"))
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
assert messages == [
{
"role": "user",
"content": [
{"type": "input_text", "text": "Hello!"},
{"type": "input_text", "text": "Hi, how are u?"},
],
}
]
def test_memory_to_messages_multiple_turns():
"""Test conversion of a memory with multiple turns to messages."""
# First turn: user asks a question
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="What's 2+2?"))
# Second turn: assistant responds
memory.new_turn(role=ROLE.ASSISTANT)
memory.add_to_last_turn(TextBlock(content="The answer is 4."))
# Third turn: user follows up
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Thanks!"))
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
expected = [
{"role": "user", "content": [{"type": "input_text", "text": "What's 2+2?"}]},
{
"role": "assistant",
"content": [{"type": "output_text", "text": "The answer is 4."}],
},
{"role": "user", "content": [{"type": "input_text", "text": "Thanks!"}]},
]
assert messages == expected
def test_memory_to_messages_function_call():
@tool
def add(a: int, b: int) -> int:
return a + b
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Call the add function."))
memory.new_turn(role=ROLE.ASSISTANT)
memory.add_to_last_turn(
FunctionCallBlock(id="call_1", name="add", arguments={"a": 2, "b": 2}, tool=add)
)
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
assert messages[0]["role"] == "user"
assert isinstance(messages[1], ResponseFunctionToolCall)
assert json.loads(messages[1].arguments) == {
"a": 2,
"b": 2,
}
def test_memory_to_messages_media_blocks():
image = Media(
media_type="image",
source_type="url",
source="http://example.com/image.png",
extension="png",
)
pdf = Media(
media_type="pdf",
source_type="base64",
source="THIS_IS_A_PDF_BASE64",
extension="pdf",
)
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(MediaBlock(media=image))
memory.add_to_last_turn(MediaBlock(media=pdf))
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
assert messages[0]["role"] == "user"
# Should contain both image and pdf blocks
# TODO: Check if the image and pdf blocks are correct
assert messages[0]["content"][1] == {
"type": "input_file",
"filename": "file.pdf",
"file_data": "data:application/pdf;base64,THIS_IS_A_PDF_BASE64",
}
def test_memory_to_messages_structured_block():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
assert messages[0]["content"] == "{'key': 'value'}" or messages[0]["content"] == [
{
"type": "text",
"text": "{'key': 'value'}",
}
]
def test_memory_to_messages_with_system_prompt():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Hello!"))
system_prompt = "You are a helpful assistant."
messages = OpenAIMemoryAdapter().memory_to_messages(
memory, system_prompt=system_prompt
)
assert messages[0]["role"] == "system"
assert messages[0]["content"] == system_prompt
assert messages[1]["role"] == "user"
def test_memory_to_messages_with_input_str():
memory = Memory()
input_str = "What is the weather?"
messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_str)
assert messages[-1]["role"] == "user"
assert messages[-1]["content"] == input_str
def test_memory_to_messages_with_input_block():
memory = Memory()
input_block = TextBlock(content="This is a block input.")
messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_block)
assert messages[-1]["role"] == "user"
assert "block input" in str(messages[-1]["content"])
def test_google_empty_memory_to_messages():
messages = OpenAIMemoryAdapter().memory_to_messages(Memory())
assert messages == []
def test_google_memory_to_messages_multiple_turns():
memory = Memory()
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="What's 2+2?"))
memory.new_turn(role=ROLE.ASSISTANT)
memory.add_to_last_turn(TextBlock(content="The answer is 4."))
memory.new_turn(role=ROLE.USER)
memory.add_to_last_turn(TextBlock(content="Thanks!"))
messages = OpenAIMemoryAdapter().memory_to_messages(memory)
assert messages[0]["role"] == "user"
assert messages[1]["role"] == "assistant"
assert messages[2]["role"] == "user"
```
## /datapizza-ai-core/README.md
This is the core of datapizza-ai framework
## /datapizza-ai-core/datapizza/agents/__init__.py
```py path="/datapizza-ai-core/datapizza/agents/__init__.py"
from .agent import Agent, StepResult
from .client_manager import ClientManager
__all__ = [
"Agent",
"ClientManager",
"StepResult",
]
```
## /datapizza-ai-core/datapizza/agents/__version__.py
```py path="/datapizza-ai-core/datapizza/agents/__version__.py"
VERSION = (3, 0, 8)
__version__ = ".".join(map(str, VERSION))
```
## /datapizza-ai-core/datapizza/agents/agent.py
```py path="/datapizza-ai-core/datapizza/agents/agent.py"
import inspect
from collections.abc import AsyncGenerator, Callable, Generator
from functools import wraps
from threading import Lock
from typing import Any, Literal, Union, cast
from pydantic import BaseModel
from datapizza.agents.logger import AgentLogger
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.core.executors.async_executor import AsyncExecutor
from datapizza.core.utils import sum_token_usage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.tracing.tracing import agent_span, tool_span
from datapizza.type import (
ROLE,
Block,
FunctionCallBlock,
FunctionCallResultBlock,
TextBlock,
)
PLANNING_PROMT = """in this moment you just tell me what you are going to do.
You need to define the next steps to solve the task.
Do not use tools to solve the task.
Do not solve the task, just plan the next steps.
"""
class StepResult:
def __init__(
self,
index: int,
content: list[Block],
usage: TokenUsage | None = None,
):
self.index = index
self.content = content
self.usage = usage or TokenUsage()
@property
def text(self) -> str:
return "\n".join(
block.content for block in self.content if isinstance(block, TextBlock)
)
@property
def tools_used(self) -> list[FunctionCallBlock]:
return [block for block in self.content if isinstance(block, FunctionCallBlock)]
class Plan(BaseModel):
task: str
steps: list[str]
def __str__(self):
separator = "\n - "
return f"I need to solve the task:\n\n{self.task}\n\nHere is the plan:\n\n - {separator.join(self.steps)}"
class Agent:
name: str
system_prompt: str = "You are a helpful assistant."
def __init__(
self,
name: str | None = None,
client: Client | None = None,
*,
system_prompt: str | None = None,
tools: list[Tool] | None = None,
max_steps: int | None = None,
terminate_on_text: bool | None = True,
stateless: bool = True,
gen_args: dict[str, Any] | None = None,
memory: Memory | None = None,
stream: bool | None = None,
# action_on_stop_reason: dict[str, Action] | None = None,
can_call: list["Agent"] | None = None,
logger: AgentLogger | None = None,
planning_interval: int = 0,
planning_prompt: str = PLANNING_PROMT,
):
"""
Initialize the agent.
Args:
name (str, optional): The name of the agent. Defaults to None.
client (Client): The client to use for the agent. Defaults to None.
system_prompt (str, optional): The system prompt to use for the agent. Defaults to None.
tools (list[Tool], optional): A list of tools to use with the agent. Defaults to None.
max_steps (int, optional): The maximum number of steps to execute. Defaults to None.
terminate_on_text (bool, optional): Whether to terminate the agent on text. Defaults to True.
stateless (bool, optional): Whether to use stateless execution. Defaults to True.
gen_args (dict[str, Any], optional): Additional arguments to pass to the agent's execution. Defaults to None.
memory (Memory, optional): The memory to use for the agent. Defaults to None.
stream (bool, optional): Whether to stream the agent's execution. Defaults to None.
can_call (list[Agent], optional): A list of agents that can call the agent. Defaults to None.
logger (AgentLogger, optional): The logger to use for the agent. Defaults to None.
planning_interval (int, optional): The planning interval to use for the agent. Defaults to 0.
planning_prompt (str, optional): The planning prompt to use for the agent planning steps. Defaults to PLANNING_PROMT.
"""
if not client:
raise ValueError("Client is required")
if not name and not getattr(self, "name", None):
raise ValueError(
"Name is required, you can pass it as a parameter or set it in the agent class"
)
if not system_prompt and not getattr(self, "system_prompt", None):
raise ValueError(
"System prompt is required, you can pass it as a parameter or set it in the agent class"
)
self.name = name or self.name
if not isinstance(self.name, str):
raise ValueError("Name must be a string")
self.system_prompt = system_prompt or self.system_prompt
if not isinstance(self.system_prompt, str):
raise ValueError("System prompt must be a string")
self._client = client
self._tools = tools or []
self._planning_interval = planning_interval
self._planning_prompt = planning_prompt
self._memory = memory or Memory()
self._stateless = stateless
if can_call:
self.can_call(can_call)
self._max_steps = max_steps
self._terminate_on_text = terminate_on_text
self._stream = stream
if not logger:
self._logger = AgentLogger(agent_name=self.name)
else:
self._logger = logger
for tool in self._decorator_tools():
self._add_tool(tool)
self._lock = Lock()
def can_call(self, agent: Union[list["Agent"], "Agent"]):
if isinstance(agent, Agent):
agent = [agent]
for a in agent:
self._tools.append(a.as_tool())
@classmethod
def _tool_from_agent(cls, agent: "Agent"):
async def invoke_agent(input_task: str):
return cast(StepResult, await agent.a_run(input_task)).text
a_tool = Tool(
func=invoke_agent,
name=agent.name,
description=agent.__doc__,
)
return a_tool
@staticmethod
def _lock_if_not_stateless(func: Callable):
@wraps(func)
def decorated(self, *args, **kwargs):
if not self._stateless and inspect.isgeneratorfunction(func):
# For generators, we need a locking wrapper
def locking_generator():
with self._lock:
yield from func(self, *args, **kwargs)
return locking_generator()
elif not self._stateless:
with self._lock:
return func(self, *args, **kwargs)
else:
return func(self, *args, **kwargs)
return decorated
@staticmethod
def _contains_ending_tool(step: StepResult) -> bool:
content = step.content
return any(
block.tool.end_invoke
for block in content
if isinstance(block, FunctionCallBlock)
)
def as_tool(self):
return Agent._tool_from_agent(self)
def _add_tool(self, tool: Tool):
self._tools.append(tool)
def _decorator_tools(self):
tools = []
for attr_name in dir(self):
attr = getattr(self, attr_name)
# Check for tool methods
if isinstance(attr, Tool):
tools.append(attr)
return tools
@_lock_if_not_stateless
def stream_invoke(
self,
task_input: str,
tool_choice: Literal["auto", "required", "none", "required_first"]
| list[str] = "auto",
**gen_kwargs,
) -> Generator[ClientResponse | StepResult | Plan | None, None]:
"""
Stream the agent's execution, yielding intermediate steps and final result.
Args:
task_input (str): The input text/prompt to send to the model
tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
**gen_kwargs: Additional keyword arguments to pass to the agent's execution
Yields:
The intermediate steps and final result of the agent's execution
"""
yield from self._invoke_stream(task_input, tool_choice, **gen_kwargs)
@_lock_if_not_stateless
async def a_stream_invoke(
self,
task_input: str,
tool_choice: Literal["auto", "required", "none", "required_first"]
| list[str] = "auto",
**gen_kwargs,
) -> AsyncGenerator[ClientResponse | StepResult | Plan | None]:
"""
Stream the agent's execution asynchronously, yielding intermediate steps and final result.
Args:
task_input (str): The input text/prompt to send to the model
tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
**gen_kwargs: Additional keyword arguments to pass to the agent's execution
Yields:
The intermediate steps and final result of the agent's execution
"""
async for step in self._a_invoke_stream(task_input, tool_choice, **gen_kwargs):
yield step
def _invoke_stream(
self, task_input: str, tool_choice, **kwargs
) -> Generator[ClientResponse | StepResult | Plan | None, None]:
self._logger.debug("STARTING AGENT")
final_answer = None
current_steps = 1
memory = self._memory.copy()
original_task = task_input
while final_answer is None and (
self._max_steps is None
or (self._max_steps and current_steps <= self._max_steps)
):
kwargs["tool_choice"] = tool_choice
if tool_choice == "required_first":
if current_steps == 1:
kwargs["tool_choice"] = "required"
else:
kwargs["tool_choice"] = "auto"
self._logger.debug(f"--- STEP {current_steps} ---")
# Planning step if interval is set
if self._planning_interval and (
current_steps == 1 or (current_steps - 1) % self._planning_interval == 0
):
plan = self._create_planning_prompt(
original_task, memory, current_steps
)
assert isinstance(plan, Plan)
memory.add_turn(
TextBlock(content=str(plan)),
role=ROLE.ASSISTANT,
)
memory.add_turn(
TextBlock(content="Ok, go ahead and now execute the plan."),
role=ROLE.USER,
)
yield plan
self._logger.log_panel(str(plan), title="PLAN")
# Execute planning step
step_output = None
for result in self._execute_planning_step(
current_steps, original_task, memory, **kwargs
):
if isinstance(result, ClientResponse):
yield result
elif isinstance(result, StepResult):
step_output = result.text
yield result
if step_output and self._terminate_on_text:
final_answer = step_output
break
if (
result
and isinstance(result, StepResult)
and Agent._contains_ending_tool(result)
):
self._logger.debug("ending tool found, ending agent")
break
current_steps += 1
original_task = ""
# Yield final answer if we have one
if final_answer:
self._logger.log_panel(final_answer, title="FINAL ANSWER")
if not self._stateless:
self._memory = memory
async def _a_invoke_stream(
self, task_input: str, tool_choice, **kwargs
) -> AsyncGenerator[ClientResponse | StepResult | Plan | None]:
self._logger.debug("STARTING AGENT")
final_answer = None
current_steps = 1
memory = self._memory.copy()
original_task = task_input
while final_answer is None and (
self._max_steps is None
or (self._max_steps and current_steps <= self._max_steps)
):
kwargs["tool_choice"] = tool_choice
if tool_choice == "required_first":
if current_steps == 1:
kwargs["tool_choice"] = "required"
else:
kwargs["tool_choice"] = "auto"
# step_action = StepResult(index=current_steps)
self._logger.debug(f"--- STEP {current_steps} ---")
# yield step_action
# Planning step if interval is set
if self._planning_interval and (
current_steps == 1 or (current_steps - 1) % self._planning_interval == 0
):
plan = await self._a_create_planning_prompt(
original_task, memory, current_steps
)
assert isinstance(plan, Plan)
memory.add_turn(
TextBlock(content=str(plan)),
role=ROLE.ASSISTANT,
)
memory.add_turn(
TextBlock(content="Ok, go ahead and now execute the plan."),
role=ROLE.USER,
)
yield plan
self._logger.log_panel(str(plan), title="PLAN")
# Execute planning step
step_output = None
async for result in self._a_execute_planning_step(
current_steps, original_task, memory, **kwargs
):
if isinstance(result, ClientResponse):
yield result
elif isinstance(result, StepResult):
step_output = result.text
yield result
if step_output and self._terminate_on_text:
final_answer = step_output
break
if (
result
and isinstance(result, StepResult)
and Agent._contains_ending_tool(result)
):
self._logger.debug("ending tool found, ending agent")
break
current_steps += 1
original_task = ""
# Yield final answer if we have one
if final_answer:
self._logger.log_panel(final_answer, title="FINAL ANSWER")
if not self._stateless:
self._memory = memory
def _create_planning_prompt(
self, original_task: str, memory: Memory, step_number: int
) -> Plan:
"""Create a planning prompt that asks the agent to define next steps."""
prompt = self.system_prompt + self._planning_prompt
client_response = self._client.structured_response(
input=original_task,
tools=self._tools,
tool_choice="none",
memory=memory,
system_prompt=prompt,
output_cls=Plan,
)
return Plan(**client_response.structured_data[0].model_dump())
async def _a_create_planning_prompt(
self, original_task: str, memory: Memory, step_number: int
) -> Plan:
"""Create a planning prompt that asks the agent to define next steps."""
prompt = self.system_prompt + self._planning_prompt
client_response = await self._client.a_structured_response(
input=original_task,
tools=self._tools,
tool_choice="none",
memory=memory,
system_prompt=prompt,
output_cls=Plan,
)
return Plan(**client_response.structured_data[0].model_dump())
def _execute_planning_step(
self, current_step, planning_prompt: str, memory: Memory, **kwargs
) -> Generator[StepResult | ClientResponse, None, None]:
"""Execute a planning step with streaming support."""
tool_results = []
step_usage = TokenUsage()
# Check if streaming is enabled
response: ClientResponse
if self._stream:
for chunk in self._client.stream_invoke(
input=planning_prompt,
tools=self._tools,
memory=memory,
system_prompt=self.system_prompt,
**kwargs,
):
step_usage += chunk.usage
response = chunk
if chunk.delta:
yield chunk
else:
# Use regular non-streaming generation
response = self._client.invoke(
input=planning_prompt,
tools=self._tools,
memory=memory,
system_prompt=self.system_prompt,
**kwargs,
)
step_usage += response.usage
if not response:
raise RuntimeError("No response from client")
if planning_prompt:
memory.add_turn(TextBlock(content=planning_prompt), role=ROLE.USER)
if response and response.text:
memory.add_turn(TextBlock(content=response.text), role=ROLE.ASSISTANT)
if response and response.function_calls:
memory.add_turn(response.function_calls, role=ROLE.ASSISTANT)
for tool_call in response.function_calls:
tool_results.append(self._execute_tool(tool_call))
if tool_results:
for x in tool_results:
memory.add_turn(x, role=ROLE.TOOL)
step_action = StepResult(
index=current_step,
content=response.content + tool_results,
usage=response.usage,
)
yield step_action
async def _a_execute_planning_step(
self, current_step, planning_prompt: str, memory: Memory, **kwargs
) -> AsyncGenerator[StepResult | ClientResponse, None]:
"""Execute a planning step with streaming support."""
tool_results = []
step_usage = TokenUsage()
# Check if streaming is enabled
response: ClientResponse
if self._stream:
async for chunk in self._client.a_stream_invoke(
input=planning_prompt,
tools=self._tools,
memory=memory,
system_prompt=self.system_prompt,
**kwargs,
):
step_usage += chunk.usage
response = chunk
if chunk.delta:
yield chunk
else:
# Use regular non-streaming generation
response = await self._client.a_invoke(
input=planning_prompt,
tools=self._tools,
memory=memory,
system_prompt=self.system_prompt,
**kwargs,
)
step_usage += response.usage
if planning_prompt:
memory.add_turn(TextBlock(content=planning_prompt), role=ROLE.USER)
if response.text:
memory.add_turn(TextBlock(content=response.text), role=ROLE.ASSISTANT)
if response.function_calls:
memory.add_turn(response.function_calls, role=ROLE.ASSISTANT)
for tool_call in response.function_calls:
tool_results.append(await self._a_execute_tool(tool_call))
if tool_results:
for x in tool_results:
memory.add_turn(x, role=ROLE.TOOL)
step_action = StepResult(
index=current_step,
content=response.content + tool_results,
usage=response.usage,
)
yield step_action
def _execute_tool(
self, function_call: FunctionCallBlock
) -> FunctionCallResultBlock:
with tool_span(f"Tool {function_call.tool.name}"):
result = function_call.tool(**function_call.arguments)
if inspect.iscoroutine(result):
result = AsyncExecutor.get_instance().run(result)
if result:
self._logger.log_panel(
result,
title=f"TOOL {function_call.tool.name.upper()} RESULT",
subtitle="args: " + str(function_call.arguments),
)
return FunctionCallResultBlock(
id=function_call.id,
tool=function_call.tool,
result=result,
)
async def _a_execute_tool(
self, function_call: FunctionCallBlock
) -> FunctionCallResultBlock:
with tool_span(f"Tool {function_call.tool.name}"):
result = function_call.tool(**function_call.arguments)
if inspect.iscoroutine(result):
result = await result
if result:
self._logger.log_panel(
result,
title=f"TOOL {function_call.tool.name.upper()} RESULT",
subtitle="args: " + str(function_call.arguments),
)
return FunctionCallResultBlock(
id=function_call.id,
tool=function_call.tool,
result=result,
)
@_lock_if_not_stateless
def run(
self,
task_input: str,
tool_choice: Literal["auto", "required", "none", "required_first"]
| list[str] = "auto",
**gen_kwargs,
) -> StepResult | None:
"""
Run the agent on a task input.
Args:
task_input (str): The input text/prompt to send to the model
tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
**gen_kwargs: Additional keyword arguments to pass to the agent's execution
Returns:
The final result of the agent's execution
"""
with agent_span(f"Agent {self.name}"):
usage = TokenUsage()
steps = list[ClientResponse | StepResult | Plan | None](
self._invoke_stream(task_input, tool_choice, **gen_kwargs)
)
usage += sum_token_usage(
[step.usage for step in steps if isinstance(step, StepResult)]
)
last_step = cast(
StepResult,
steps[-1],
)
last_step.usage = usage
return last_step
@_lock_if_not_stateless
async def a_run(
self,
task_input: str,
tool_choice: Literal["auto", "required", "none", "required_first"]
| list[str] = "auto",
**gen_kwargs,
) -> StepResult | None:
"""
Run the agent on a task input asynchronously.
Args:
task_input (str): The input text/prompt to send to the model
tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
**gen_kwargs: Additional keyword arguments to pass to the agent's execution
Returns:
The final result of the agent's execution
"""
with agent_span(f"Agent {self.name}"):
total_usage = TokenUsage()
results = []
async for result in self._a_invoke_stream(
task_input, tool_choice, **gen_kwargs
):
results.append(result)
total_usage += sum_token_usage(
[result.usage for result in results if isinstance(result, StepResult)]
)
last_result = results[-1] if results else None
if last_result:
last_result.usage = total_usage
return last_result
```
## /datapizza-ai-core/datapizza/agents/client_manager.py
```py path="/datapizza-ai-core/datapizza/agents/client_manager.py"
from threading import Lock
from datapizza.core.clients import Client
class ClientManager:
_instance: Client | None = None
_lock = Lock()
@classmethod
def set_global_client(cls, client: Client) -> None:
"""Set the global Client instance.
Args:
config: Client instance to be used globally
"""
with cls._lock:
cls._instance = client
@classmethod
def get_global_client(cls) -> Client | None:
"""Get the current global Client instance.
Returns:
The global client instance if set, None otherwise
"""
return cls._instance
@classmethod
def clear_global_client(cls) -> None:
"""Clear the global Client instance."""
with cls._lock:
cls._instance = None
```
## /datapizza-ai-core/datapizza/cache/__init__.py
```py path="/datapizza-ai-core/datapizza/cache/__init__.py"
# Import MemoryCache from core implementation
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
from datapizza.core.cache import MemoryCache
__all__ = ["MemoryCache"]
```
## /datapizza-ai-core/datapizza/clients/__init__.py
```py path="/datapizza-ai-core/datapizza/clients/__init__.py"
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
from .factory import ClientFactory
from .mock_client import MockClient
__all__ = ["ClientFactory", "MockClient"]
```
## /datapizza-ai-core/datapizza/clients/tests/test_client_factory.py
```py path="/datapizza-ai-core/datapizza/clients/tests/test_client_factory.py"
from datapizza.clients.openai import OpenAIClient
from datapizza.clients import ClientFactory
def test_client_factory_openai():
client = ClientFactory.create(
provider="openai",
api_key="test_api_key",
model="gpt-3.5-turbo",
system_prompt="You are a helpful assistant that can answer questions about piadina only in italian.",
)
assert client is not None
assert isinstance(client, OpenAIClient)
```
## /datapizza-ai-core/datapizza/core/__init__.py
```py path="/datapizza-ai-core/datapizza/core/__init__.py"
import logging
from datapizza.core.utils import _basic_config
# Setup base logging
# Create and configure the main package logger
log = logging.getLogger("datapizza")
_basic_config(log)
log.setLevel(logging.DEBUG)
```
## /datapizza-ai-embedders/cohere/README.md
The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.