datapizza-labs/datapizza-ai/main 602k tokens More Tools
```
├── .github/
   ├── ISSUE_TEMPLATE/
      ├── bug_report.md (100 tokens)
      ├── feature_request.md (100 tokens)
├── .gitignore (900 tokens)
├── .pre-commit-config.yaml (100 tokens)
├── CODE_OF_CONDUCT.md (1000 tokens)
├── CONTRIBUTING.md (1500 tokens)
├── LICENSE (omitted)
├── Makefile (100 tokens)
├── README.md (2.5k tokens)
├── datapizza-ai-cache/
   ├── redis/
      ├── README.md
      ├── datapizza/
         ├── cache/
            ├── redis/
               ├── __init__.py
               ├── cache.py (200 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_redis_cache.py
├── datapizza-ai-clients/
   ├── datapizza-ai-clients-anthropic/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── anthropic/
               ├── __init__.py
               ├── anthropic_client.py (2.8k tokens)
               ├── memory_adapter.py (1000 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_anthropic_memory_adapter.py (200 tokens)
   ├── datapizza-ai-clients-azure-openai/
      ├── datapizza/
         ├── clients/
            ├── azure_openai/
               ├── __init__.py
               ├── azure_openai_client.py (300 tokens)
      ├── pyproject.toml (300 tokens)
   ├── datapizza-ai-clients-bedrock/
      ├── README.md (1800 tokens)
      ├── datapizza/
         ├── clients/
            ├── bedrock/
               ├── __init__.py
               ├── bedrock_client.py (3.8k tokens)
               ├── memory_adapter.py (1100 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_bedrock_async.py (300 tokens)
         ├── test_bedrock_memory_adapter.py (400 tokens)
   ├── datapizza-ai-clients-google/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── google/
               ├── __init__.py
               ├── google_client.py (4.6k tokens)
               ├── memory_adapter.py (1000 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_memory_adapter.py (400 tokens)
   ├── datapizza-ai-clients-mistral/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── mistral/
               ├── __init__.py
               ├── memory_adapter.py (700 tokens)
               ├── mistral_client.py (3.4k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_mistral_client.py
   ├── datapizza-ai-clients-openai-like/
      ├── README.md (1100 tokens)
      ├── datapizza/
         ├── clients/
            ├── openai_like/
               ├── __init__.py
               ├── memory_adapter.py (1200 tokens)
               ├── openai_completion_client.py (3.1k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_openai_completion.py (100 tokens)
   ├── datapizza-ai-clients-openai/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── openai/
               ├── __init__.py
               ├── memory_adapter.py (1300 tokens)
               ├── openai_client.py (3.6k tokens)
      ├── pyproject.toml (200 tokens)
      ├── tests/
         ├── __init__.py
         ├── test_base_client.py (600 tokens)
         ├── test_memory_adapter.py (1200 tokens)
├── datapizza-ai-core/
   ├── README.md
   ├── datapizza/
      ├── agents/
         ├── __init__.py
         ├── __version__.py
         ├── agent.py (4.7k tokens)
         ├── client_manager.py (200 tokens)
         ├── logger.py (500 tokens)
         ├── tests/
            ├── test_base_agents.py (1500 tokens)
      ├── cache/
         ├── __init__.py
      ├── clients/
         ├── __init__.py
         ├── factory.py (900 tokens)
         ├── mock_client.py (1500 tokens)
         ├── tests/
            ├── test_client_factory.py (100 tokens)
      ├── core/
         ├── __init__.py
         ├── __version__.py
         ├── cache/
            ├── __init__.py
            ├── cache.py (700 tokens)
         ├── clients/
            ├── __init__.py
            ├── client.py (5.8k tokens)
            ├── models.py (1000 tokens)
            ├── tests/
               ├── test_mock_client.py (700 tokens)
               ├── test_token_usage.py (200 tokens)
         ├── constants.py (100 tokens)
         ├── embedder/
            ├── __init__.py
            ├── base.py (200 tokens)
         ├── executors/
            ├── async_executor.py (500 tokens)
         ├── models.py (700 tokens)
         ├── modules/
            ├── captioner.py (200 tokens)
            ├── metatagger.py (100 tokens)
            ├── parser.py (200 tokens)
            ├── prompt.py (100 tokens)
            ├── reranker.py (100 tokens)
            ├── rewriter.py (200 tokens)
            ├── splitter.py (100 tokens)
         ├── utils.py (1100 tokens)
         ├── vectorstore/
            ├── __init__.py
            ├── tests/
               ├── test_vectorstore_models.py (100 tokens)
            ├── vectorstore.py (600 tokens)
      ├── embedders/
         ├── __init__.py
         ├── embedders.py (900 tokens)
      ├── memory/
         ├── __init__.py
         ├── __version__.py
         ├── memory.py (1200 tokens)
         ├── memory_adapter.py (400 tokens)
         ├── tests/
            ├── __init__.py
            ├── test_memory.py (1600 tokens)
      ├── modules/
         ├── captioners/
            ├── __init__.py
            ├── llm_captioner.py (1500 tokens)
         ├── metatagger/
            ├── __init__.py
            ├── keyword_metatagger.py (600 tokens)
            ├── tests/
               ├── test_keyword_metagger.py (700 tokens)
         ├── parsers/
            ├── __init__.py
            ├── tests/
               ├── test_base_parser.py (200 tokens)
            ├── text_parser.py (600 tokens)
         ├── prompt/
            ├── __init__.py
            ├── image_rag.py (700 tokens)
            ├── prompt.py (600 tokens)
            ├── tests/
               ├── test_chat_prompt_template.py (500 tokens)
         ├── rewriters/
            ├── __init__.py
            ├── tests/
               ├── test_tool_rewriter.py (100 tokens)
            ├── tool_rewriter.py (700 tokens)
         ├── splitters/
            ├── __init__.py (100 tokens)
            ├── bbox_merger.py (700 tokens)
            ├── node_splitter.py (300 tokens)
            ├── pdf_image_splitter.py (1000 tokens)
            ├── recursive_splitter.py (700 tokens)
            ├── tests/
               ├── test_node_splitter.py (300 tokens)
               ├── test_recursive_splitter.py (100 tokens)
               ├── test_text_splitter.py (100 tokens)
            ├── text_splitter.py (400 tokens)
         ├── treebuilder/
            ├── __init__.py
            ├── llm_treebuilder.py (2k tokens)
            ├── test/
               ├── test_llm_treebuilder.py (4.5k tokens)
      ├── pipeline/
         ├── __init__.py
         ├── dag_pipeline.py (2.2k tokens)
         ├── functional_pipeline.py (5.6k tokens)
         ├── pipeline.py (1900 tokens)
         ├── tests/
            ├── config.yaml (200 tokens)
            ├── dag_config.yaml (200 tokens)
            ├── functional_pipeline_config.yaml (200 tokens)
            ├── test_functional_pipeline.py (1000 tokens)
            ├── test_graph_pipeline.py (800 tokens)
            ├── test_pipeline.py (1200 tokens)
      ├── tools/
         ├── __init__.py
         ├── google.py (100 tokens)
         ├── mcp_client.py (1500 tokens)
         ├── tests/
            ├── __init__.py
            ├── test_tools.py (500 tokens)
         ├── tools.py (900 tokens)
         ├── utils.py (700 tokens)
      ├── tracing/
         ├── __init__.py
         ├── memory_exporter.py (600 tokens)
         ├── tests/
            ├── test_tracing.py (3.2k tokens)
         ├── tracing.py (1000 tokens)
      ├── type/
         ├── __init__.py (100 tokens)
         ├── tests/
            ├── test_type.py (500 tokens)
         ├── type.py (2.7k tokens)
   ├── pyproject.toml (300 tokens)
├── datapizza-ai-embedders/
   ├── cohere/
      ├── README.md
      ├── datapizza/
         ├── embedders/
            ├── cohere/
               ├── __init__.py
               ├── cohere.py (400 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_base.py
   ├── fastembedder/
      ├── README.md
      ├── datapizza/
         ├── embedders/
            ├── fastembedder/
               ├── __init__.py
               ├── fastembedder.py (400 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_fastembedder.py
   ├── google/
      ├── README.md (100 tokens)
      ├── datapizza/
         ├── embedders/
            ├── google/
               ├── __init__.py
               ├── google.py (400 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_google_embedder.py (100 tokens)
   ├── image_embedder.py (3.7k tokens)
   ├── openai/
      ├── README.md (100 tokens)
      ├── datapizza/
         ├── embedders/
            ├── openai/
               ├── __init__.py
               ├── openai.py (400 tokens)
      ├── pyproject.toml (200 tokens)
      ├── tests/
         ├── test_openai_embedder.py (100 tokens)
├── datapizza-ai-eval/
   ├── metrics.py (5.6k tokens)
├── datapizza-ai-modules/
   ├── parsers/
      ├── azure/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── parsers/
                  ├── azure/
                     ├── __init__.py
                     ├── azure_parser.py (3.2k tokens)
         ├── pyproject.toml (300 tokens)
         ├── tests/
            ├── attention_wikipedia_test.json (413.5k tokens)
            ├── attention_wikipedia_test.pdf
            ├── test_azure_parser.py (900 tokens)
      ├── docling/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── parsers/
                  ├── docling/
                     ├── __init__.py
                     ├── docling_parser.py (7.4k tokens)
                     ├── ocr_options.py (700 tokens)
                     ├── tests/
                        ├── conftest.py (200 tokens)
                        ├── test_docling_parser.py (1500 tokens)
                     ├── utils.py (600 tokens)
         ├── mypy.ini
         ├── pyproject.toml (300 tokens)
   ├── rerankers/
      ├── cohere/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── rerankers/
                  ├── cohere/
                     ├── __init__.py
                     ├── cohere_reranker.py (900 tokens)
         ├── pyproject.toml (300 tokens)
      ├── together/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── rerankers/
                  ├── together/
                     ├── __init__.py
                     ├── together_reranker.py (500 tokens)
         ├── pyproject.toml (300 tokens)
├── datapizza-ai-tools/
   ├── SQLDatabase/
      ├── README.md (1400 tokens)
      ├── datapizza/
         ├── tools/
            ├── SQLDatabase/
               ├── __init__.py
               ├── base.py (500 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_sql_database_tool.py (500 tokens)
   ├── duckduckgo/
      ├── README.md
      ├── datapizza/
         ├── tools/
            ├── duckduckgo/
               ├── __init__.py
               ├── base.py (200 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_ddgs_tools.py
   ├── filesystem/
      ├── README.md (1700 tokens)
      ├── datapizza/
         ├── tools/
            ├── filesystem/
               ├── __init__.py
               ├── filesystem.py (2.1k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_file_path_matches_pattern.py (500 tokens)
         ├── test_filesystem.py (1700 tokens)
   ├── web_fetch/
      ├── README.md (600 tokens)
      ├── datapizza/
         ├── tools/
            ├── web_fetch/
               ├── __init__.py
               ├── base.py (500 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_web_fetch.py (500 tokens)
├── datapizza-ai-vectorstores/
   ├── datapizza-ai-vectorstores-milvus/
      ├── README.md
      ├── datapizza/
         ├── vectorstores/
            ├── milvus/
               ├── __init__.py
               ├── milvus_vectorstore.py (4.1k tokens)
               ├── tests/
                  ├── test_milvus_vectorstore.py (900 tokens)
      ├── pyproject.toml (300 tokens)
   ├── datapizza-ai-vectorstores-qdrant/
      ├── README.md
      ├── datapizza/
         ├── vectorstores/
            ├── qdrant/
               ├── __init__.py
               ├── qdrant_vectorstore.py (2.9k tokens)
               ├── tests/
                  ├── test_qdrant_vectorstore.py (900 tokens)
      ├── pyproject.toml (300 tokens)
├── docs/
   ├── .pages
   ├── API Reference/
      ├── .pages
      ├── Agents/
         ├── agent.md
      ├── Clients/
         ├── .pages
         ├── Avaiable_Clients/
            ├── .pages
            ├── anthropic.md (200 tokens)
            ├── google.md (100 tokens)
            ├── mistral.md (100 tokens)
            ├── openai-like.md (200 tokens)
            ├── openai.md (100 tokens)
         ├── cache.md
         ├── client_factory.md (300 tokens)
         ├── clients.md
         ├── models.md
      ├── Embedders/
         ├── chunk_embedder.md (400 tokens)
         ├── cohere_embedder.md (600 tokens)
         ├── fast_embedder.md (200 tokens)
         ├── google_embedder.md (300 tokens)
         ├── ollama_embedder.md (100 tokens)
         ├── openai_embedder.md (300 tokens)
      ├── Modules/
         ├── .pages
         ├── Parsers/
            ├── .pages
            ├── azure_parser.md (400 tokens)
            ├── docling_parser.md (1100 tokens)
            ├── index.md (300 tokens)
            ├── text_parser.md (600 tokens)
         ├── Prompt/
            ├── .pages
            ├── ChatPromptTemplate.md (300 tokens)
            ├── ImageRAGPrompt.md (200 tokens)
         ├── Rerankers/
            ├── .pages
            ├── cohere_reranker.md (400 tokens)
            ├── index.md (500 tokens)
            ├── together_reranker.md (500 tokens)
         ├── Splitters/
            ├── .pages
            ├── index.md (400 tokens)
            ├── node_splitter.md (300 tokens)
            ├── pdf_image_splitter.md (300 tokens)
            ├── recursive_splitter.md (200 tokens)
            ├── text_splitter.md (200 tokens)
         ├── captioners.md (400 tokens)
         ├── index.md (200 tokens)
         ├── metatagger.md (500 tokens)
         ├── rewriters.md (300 tokens)
         ├── treebuilder.md (700 tokens)
      ├── Pipelines/
         ├── dag.md
         ├── functional.md
         ├── ingestion.md
      ├── Tools/
         ├── .pages
         ├── SQLDatabase.md (300 tokens)
         ├── duckduckgo.md (300 tokens)
         ├── filesystem.md (700 tokens)
         ├── mcp.md
         ├── web_fetch.md (300 tokens)
      ├── Type/
         ├── block.md (100 tokens)
         ├── chunk.md (100 tokens)
         ├── media.md
         ├── node.md
         ├── tool.md
      ├── Vectorstore/
         ├── milvus_vectorstore.md (1300 tokens)
         ├── qdrant_vectorstore.md (500 tokens)
      ├── index.md
      ├── memory.md
   ├── Guides/
      ├── .pages
      ├── Agents/
         ├── agent.md (1600 tokens)
         ├── mcp.md (400 tokens)
      ├── Clients/
         ├── .pages
         ├── chatbot.md (300 tokens)
         ├── local_model.md (300 tokens)
         ├── multimodality.md (700 tokens)
         ├── quick_start.md (700 tokens)
         ├── streaming.md (200 tokens)
         ├── structured_responses.md (500 tokens)
         ├── tools.md (500 tokens)
      ├── Monitoring/
         ├── .pages
         ├── log.md (100 tokens)
         ├── tracing.md (1000 tokens)
      ├── Pipeline/
         ├── .pages
         ├── functional_pipeline.md (1400 tokens)
         ├── ingestion_pipeline.md (1200 tokens)
         ├── retrieval_pipeline.md (1700 tokens)
      ├── RAG/
         ├── .pages
         ├── rag.md (1200 tokens)
   ├── assets/
      ├── logo.png
      ├── logo_bg_dark.png
      ├── pool.png
      ├── workflow.png
   ├── index.md (300 tokens)
├── pyproject.toml (700 tokens)
├── pyrightconfig.json (100 tokens)
```


## /.github/ISSUE_TEMPLATE/bug_report.md

---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''

---

**Describe the bug**
A clear and concise description of what the bug is.

**Environment**
OS, Python version, datapizza-ai version

**To Reproduce**
Steps to reproduce the behavior

**Expected behavior**
A clear and concise description of what you expected to happen.

**Logs**
If applicable, attach logs to help explain your problem.

**Additional context**
Add any other context about the problem here.


## /.github/ISSUE_TEMPLATE/feature_request.md

---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''

---

**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

**Describe the solution you'd like**
A clear and concise description of what you want to happen.

**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.

**Additional context**
Add any other context or screenshots about the feature request here.


## /.gitignore

```gitignore path="/.gitignore" 
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/

# Milvus
milvus.db

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#   pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
#   https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/

# pixi
#   Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
#   Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
#   in the .venv directory. It is recommended not to include this directory in version control.
.pixi

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/

# Visual Studio Code
#  Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
#  that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
#  and can be added to the global gitignore or merged into this file. However, if you prefer,
#  you could uncomment the following to ignore the entire vscode folder
# .vscode/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

# Cursor
#  Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
#  exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
#  refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore

# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/

```

## /.pre-commit-config.yaml

```yaml path="/.pre-commit-config.yaml" 
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.14.1
    hooks:
      - id: ruff-check
        args: [ --fix ]
      - id: ruff-format

```

## /CODE_OF_CONDUCT.md

# Contributor Covenant Code of Conduct

## Our Pledge

We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.

We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.

## Our Standards

Examples of behavior that contributes to a positive environment for our
community include:

* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
  and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
  overall community

Examples of unacceptable behavior include:

* The use of sexualized language or imagery, and sexual attention or
  advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
  address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
  professional setting

## Enforcement Responsibilities

Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.

Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.

## Scope

This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
ai.support@datapizza.tech.
All complaints will be reviewed and investigated promptly and fairly.

All community leaders are obligated to respect the privacy and security of the
reporter of any incident.

## Enforcement Guidelines

Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:

### 1. Correction

**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.

**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.

### 2. Warning

**Community Impact**: A violation through a single incident or series
of actions.

**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.

### 3. Temporary Ban

**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.

**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.

### 4. Permanent Ban

**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior,  harassment of an
individual, or aggression toward or disparagement of classes of individuals.

**Consequence**: A permanent ban from any sort of public interaction within
the community.

## Attribution

This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.

Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).

[homepage]: https://www.contributor-covenant.org

For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.


## /CONTRIBUTING.md


# Contributing to datapizza-ai

First off, thanks for taking the time to contribute! ❤️

All types of contributions are encouraged and valued. See the [Table of Contents](#table-of-contents) for different ways to help and details about how this project handles them. Please make sure to read the relevant section before making your contribution. It will make it a lot easier for us maintainers and smooth out the experience for all involved. The community looks forward to your contributions. 🎉

> And if you like the project, but just don't have time to contribute, that's fine. There are other easy ways to support the project and show your appreciation, which we would also be very happy about:
> - Star the project
> - Tweet about it
> - Refer this project in your project's readme
> - Mention the project at local meetups and tell your friends/colleagues

## Table of Contents

- [Code of Conduct](#code-of-conduct)
- [I Have a Question](#i-have-a-question)
- [I Want To Contribute](#i-want-to-contribute)
    - [Reporting Bugs](#reporting-bugs)
    - [Suggesting Enhancements](#suggesting-enhancements)
## Code of Conduct

This project and everyone participating in it is governed by the
[Datapizza Code of Conduct](https://github.com/datapizza-labs/datapizza-ai/blob/main/CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior
to <ai.support@datapizza.tech>.


## I Have a Question

> If you want to ask a question, we assume that you have read the available [Documentation](https://docs.datapizza.ai).

Before you ask a question, it is best to search for existing [Issues](https://github.com/datapizza-labs/datapizza-ai/issues) that might help you. In case you have found a suitable issue and still need clarification, you can write your question in this issue. It is also advisable to search the internet for answers first.

If you then still feel the need to ask a question and need clarification, we recommend the following:

- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new).
- Provide as much context as you can about what you're running into.
- Provide project and platform versions (docker, datapizza-ai, etc), depending on what seems relevant.

We will then take care of the issue as soon as possible.

## I Want To Contribute

> ### Legal Notice <!-- omit in toc -->
> When contributing to this project, you must agree that you have authored 100% of the content, that you have the necessary rights to the content and that the content you contribute may be provided under the project license.

### Reporting Bugs

#### Before Submitting a Bug Report

A good bug report shouldn't leave others needing to chase you up for more information. Therefore, we ask you to investigate carefully, collect information and describe the issue in detail in your report. Please complete the following steps in advance to help us fix any potential bug as fast as possible.

- Make sure that you are using the latest version.
- Determine if your bug is really a bug and not an error on your side e.g. using incompatible environment components/versions (Make sure that you have read the [documentation](https://docs.datapizza.ai). If you are looking for support, you might want to check [this section](#i-have-a-question)).
- To see if other users have experienced (and potentially already solved) the same issue you are having, check if there is not already a bug report existing for your bug or error in the [bug tracker](https://github.com/datapizza-labs/datapizza-ai/issues?q=is%3Aissue%20state%3Aopen%20label%3Abug).
- Also make sure to search the internet (including Stack Overflow) to see if users outside of the GitHub community have discussed the issue.
- Collect information about the bug:
    - Stack trace (Traceback)
    - OS, Platform and Version (Windows, Linux, macOS, x86, ARM)
    - Version of the interpreter, compiler, SDK, runtime environment, package manager, depending on what seems relevant.
    - Possibly your input and the output
    - Can you reliably reproduce the issue? And can you also reproduce it with older versions?

#### How Do I Submit a Good Bug Report?

> You must never report security related issues, vulnerabilities or bugs including sensitive information to the issue tracker, or elsewhere in public. Instead sensitive bugs must be sent by email to <ai.support@datapizza.tech>.

We use GitHub issues to track bugs and errors. If you run into an issue with the project:

- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new?template=bug_report.md). (Since we can't be sure at this point whether it is a bug or not, we ask you not to talk about a bug yet and not to label the issue.)
- Explain the behavior you would expect and the actual behavior.
- Please provide as much context as possible and describe the *reproduction steps* that someone else can follow to recreate the issue on their own. This usually includes your code. For good bug reports you should isolate the problem and create a reduced test case.
- Provide the information you collected in the previous section.

Once it's filed:

- The project team will label the issue accordingly.
- A team member will try to reproduce the issue with your provided steps. If there are no reproduction steps or no obvious way to reproduce the issue, the team will ask you for those steps and mark the issue as `needs-repro`. Bugs with the `needs-repro` tag will not be addressed until they are reproduced.

<!-- You might want to create an issue template for bugs and errors that can be used as a guide and that defines the structure of the information to be included. If you do so, reference it here in the description. -->


### Suggesting Enhancements

This section guides you through submitting an enhancement suggestion for datapizza-ai, **including completely new features and minor improvements to existing functionality**. Following these guidelines will help maintainers and the community to understand your suggestion and find related suggestions.

<!-- omit in toc -->
#### Before Submitting an Enhancement

- Make sure that you are using the latest version.
- Read the [documentation](https://docs.datapizza.ai) carefully and find out if the functionality is already covered, maybe by an individual configuration.
- Perform a [search](https://github.com/datapizza-labs/datapizza-ai/issues) to see if the enhancement has already been suggested. If it has, add a comment to the existing issue instead of opening a new one.
- Find out whether your idea fits with the scope and aims of the project. It's up to you to make a strong case to convince the project's developers of the merits of this feature. Keep in mind that we want features that will be useful to the majority of our users and not just a small subset. If you're just targeting a minority of users, consider writing an add-on/plugin library.

#### How Do I Submit a Good Enhancement Suggestion?

Enhancement suggestions are tracked as [GitHub issues](https://github.com/datapizza-labs/datapizza-ai/issues).

- Use a **clear and descriptive title** for the issue to identify the suggestion.
- Provide a **step-by-step description of the suggested enhancement** in as many details as possible.
- **Describe the current behavior** and **explain which behavior you expected to see instead** and why. At this point you can also tell which alternatives do not work for you.
- **Explain why this enhancement would be useful** to most datapizza-ai users. You may also want to point out the other projects that solved it better and which could serve as inspiration.


## /Makefile

``` path="/Makefile" 

test:
	uv run pytest --tb=short -v

watch-tests:
	find . -name "*.py" -not -path "*/site-packages/*" | entr uv run pytest --tb=short -v

format:
	uvx ruff format .

linter-check:
	uvx ruff check .

linter-fix:
	uvx ruff check --fix

linter-force-fix:
	uvx ruff check --fix --unsafe-fixes

dependency-check:
	uv run deptry .

run_docs:
	uv pip install mkdocs-material  pymdown-extensions mkdocs-awesome-pages-plugin mkdocstrings-python
	uv run mkdocs serve --livereload

```

## /README.md

<div align="center">

<img src="docs/assets/logo_bg_dark.png" alt="Datapizza AI Logo" width="200" height="200">

**Build reliable Gen AI solutions without overhead**

*Written in Python. Designed for speed. A no-fluff GenAI framework that gets your agents from dev to prod, fast*

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![PyPI version](https://img.shields.io/pypi/v/datapizza-ai.svg)](https://pypi.org/project/datapizza-ai/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![Downloads](https://img.shields.io/pypi/dm/datapizza-ai.svg)](https://pypi.org/project/datapizza-ai/)
[![GitHub stars](https://img.shields.io/github/stars/datapizza-labs/datapizza-ai.svg?style=social&label=Star)](https://github.com/datapizza-labs/datapizza-ai)

[🏠Homepage](https://datapizza.tech/en/ai-framework/) • [🚀 Quick Start](#-quick-start) • [📖 Documentation](https://docs.datapizza.ai) • [🎯 Examples](#-examples) • [🤝 Community](#-community)

</div>

---

## 🌟 Why Datapizza AI?

A framework that keeps your agents predictable, your debugging fast, and your code trusted in production. Built by Engineers, trusted by Engineers.

<div align="center">

### ⚡ **Less abstraction, more control** | 🚀 **API-first design** | 🔧 **Observable by design**

</div>

## How to install
```sh
pip install datapizza-ai
```

## Client invoke

```python
from datapizza.clients.openai import OpenAIClient

client = OpenAIClient(api_key="YOUR_API_KEY")
result = client.invoke("Hi, how are u?")
print(result.text)
```

## ✨ Key Features

<table>
<tr>
<td width="50%" valign="top">

### 🎯 **API-first**
- **Multi-Provider Support**: OpenAI, Google Gemini, Anthropic, Mistral, Azure
- **Tool Integration**: Built-in web search, document processing, custom tools
- **Memory Management**: Persistent conversations and context awareness

</td>
<td width="50%" valign="top">

### 🔍 **Composable**
- **Reusable blocks**: Declarative configuration, easy overrides
- **Document Processing**: PDF, DOCX, images with Azure AI & Docling
- **Smart Chunking**: Context-aware text splitting and embedding
- **Built-in reranking**: Add a reranker (e.g., Cohere) to boost relevance

</td>
</tr>
<tr>
<td width="50%" valign="top">

### 🔧 **Observable**
- **OpenTelemetry tracing**: Standards-based instrumentation
- **Client I/O tracing**: Optional toggle to log inputs, outputs, and in-memory context
- **Custom spans**: Trace fine-grained phases and sub-steps to pinpoint bottlenecks

</td>
<td width="50%" valign="top">

### 🚀 **Vendor-Agnostic**
- **Swap models**: Change providers without rewiring business logic
- **Clear Interfaces**: Predictable APIs across all components
- **Rich Ecosystem**: Modular design with optional components
- **Migration-friendly**: Quick migration from other frameworks

</td>
</tr>
</table>

## 🚀 Quick Start

### Installation

```bash
# Core framework
pip install datapizza-ai

# With specific providers (optional)
pip install datapizza-ai-clients-openai
pip install datapizza-ai-clients-google
pip install datapizza-ai-clients-anthropic
```

### Start with Agent

```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool

@tool
def get_weather(city: str) -> str:
    return f"The weather in {city} is sunny"

client = OpenAIClient(api_key="YOUR_API_KEY")
agent = Agent(name="assistant", client=client, tools = [get_weather])

response = agent.run("What is the weather in Rome?")
# output: The weather in Rome is sunny
```


## 📊 Detailed Tracing


A key requirement for principled development of LLM applications over your data (RAG systems, agents) is being able to observe and debug.

Datapizza-ai provides built-in observability with OpenTelemetry tracing to help you monitor performance and understand execution flow.

<summary><b>🔍 Trace Your AI Operations</b></summary>

```sh
pip install datapizza-ai-tools-duckduckgo
```

```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool
from datapizza.tracing import ContextTracing

client = OpenAIClient(api_key="OPENAI_API_KEY")
agent = Agent(name="assistant", client=client, tools = [DuckDuckGoSearchTool()])

with ContextTracing().trace("my_ai_operation"):
    response = agent.run("Tell me some news about Bitcoin")

# Output shows:
# ╭─ Trace Summary of my_ai_operation ──────────────────────────────────╮
# │ Total Spans: 3                                                      │
# │ Duration: 2.45s                                                     │
# │ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ |
# │ ┃ Model       ┃ Prompt Tokens ┃ Completion Tokens ┃ Cached Tokens ┃ |
# │ ┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ |
# │ │ gpt-4o-mini │ 31            │ 27                │ 0             │ |
# │ └─────────────┴───────────────┴───────────────────┴───────────────┘ |
# ╰─────────────────────────────────────────────────────────────────────╯
```


![Demo](https://github.com/user-attachments/assets/02742e87-aa48-4308-94c8-6f362e3218b4)


## 🎯 Examples

### 🌐 Multi-Agent System

Build sophisticated AI systems where multiple specialized agents collaborate to solve complex tasks. This example shows how to create a trip planning system with dedicated agents for weather information, web search, and planning coordination.

```sh
# Install DuckDuckGo tool
pip install datapizza-ai-tools-duckduckgo
```


```python
from datapizza.agents.agent import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool

client = OpenAIClient(api_key="YOUR_API_KEY", model="gpt-4.1")

@tool
def get_weather(city: str) -> str:
    return f""" it's sunny all the week in {city}"""

weather_agent = Agent(
    name="weather_expert",
    client=client,
    system_prompt="You are a weather expert. Provide detailed weather information and forecasts.",
    tools=[get_weather]
)

web_search_agent = Agent(
    name="web_search_expert",
    client=client,
    system_prompt="You are a web search expert. You can search the web for information.",
    tools=[DuckDuckGoSearchTool()]
)

planner_agent = Agent(
    name="planner",
    client=client,
    system_prompt="You are a trip planner. You should provide a plan for the user. Make sure to provide a detailed plan with the best places to visit and the best time to visit them."
)

planner_agent.can_call([weather_agent, web_search_agent])

response = planner_agent.run(
    "I need to plan a hiking trip in Seattle next week. I want to see some waterfalls and a forest."
)
print(response.text)

```


### 📊 Document Ingestion

Process and index documents for retrieval-augmented generation (RAG). This pipeline automatically parses PDFs, splits them into chunks, generates embeddings, and stores them in a vector database for efficient similarity search.

```sh
pip install datapizza-ai-parsers-docling
```

```python
from datapizza.core.vectorstore import VectorConfig
from datapizza.embedders import ChunkEmbedder
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.parsers.docling import DoclingParser
from datapizza.modules.splitters import NodeSplitter
from datapizza.pipeline import IngestionPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore

vectorstore = QdrantVectorstore(location=":memory:")
embedder = ChunkEmbedder(client=OpenAIEmbedder(api_key="YOUR_API_KEY", model_name="text-embedding-3-small"))
vectorstore.create_collection("my_documents",vector_config=[VectorConfig(name="embedding", dimensions=1536)])

pipeline = IngestionPipeline(
    modules=[
        DoclingParser(),
        NodeSplitter(max_char=1024),
        embedder,
    ],
    vector_store=vectorstore,
    collection_name="my_documents"
)

pipeline.run("sample.pdf")

results = vectorstore.search(query_vector = [0.0] * 1536, collection_name="my_documents", k=5)
print(results)
```


### 📊 RAG (Retrieval-Augmented Generation)

Create a complete RAG pipeline that enhances AI responses with relevant document context. This example demonstrates query rewriting, embedding generation, document retrieval, and response generation in a connected workflow.

```python
from datapizza.clients.openai import OpenAIClient
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.prompt import ChatPromptTemplate
from datapizza.modules.rewriters import ToolRewriter
from datapizza.pipeline import DagPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore

openai_client = OpenAIClient(
    model="gpt-4o-mini",
    api_key="YOUR_API_KEY"
)

dag_pipeline = DagPipeline()
dag_pipeline.add_module("rewriter", ToolRewriter(client=openai_client, system_prompt="Rewrite user queries to improve retrieval accuracy."))
dag_pipeline.add_module("embedder", OpenAIEmbedder(api_key= "YOUR_API_KEY", model_name="text-embedding-3-small"))
dag_pipeline.add_module("retriever", QdrantVectorstore(host="localhost", port=6333).as_retriever(collection_name="my_documents", k=5))
dag_pipeline.add_module("prompt", ChatPromptTemplate(user_prompt_template="User question: {{user_prompt}}\n:", retrieval_prompt_template="Retrieved content:\n{% for chunk in chunks %}{{ chunk.text }}\n{% endfor %}"))
dag_pipeline.add_module("generator", openai_client)

dag_pipeline.connect("rewriter", "embedder", target_key="text")
dag_pipeline.connect("embedder", "retriever", target_key="query_vector")
dag_pipeline.connect("retriever", "prompt", target_key="chunks")
dag_pipeline.connect("prompt", "generator", target_key="memory")

query = "tell me something about this document"
result = dag_pipeline.run({
    "rewriter": {"user_prompt": query},
    "prompt": {"user_prompt": query},
    "retriever": {"collection_name": "my_documents", "k": 3},
    "generator":{"input": query}
})

print(f"Generated response: {result['generator']}")
```


## 🌐 Ecosystem

### 🤖 Supported AI Providers

<table >
<tr>
<td align="center"><img src="https://logosandtypes.com/wp-content/uploads/2022/07/OpenAI.png" width="32" style="border-radius: 50%"><br><b>OpenAI</b></td>
<td align="center"><img src="https://www.google.com/favicon.ico" width="32"><br><b>Google Gemini</b></td>
<td align="center"><img src="https://anthropic.com/favicon.ico" width="32"><br><b>Anthropic</b></td>
<td align="center"><img src="https://mistral.ai/favicon.ico" width="32"><br><b>Mistral</b></td>
<td align="center"><img src="https://azure.microsoft.com/favicon.ico" width="32"><br><b>Azure OpenAI</b></td>
</tr>
</table>

### 🔧 Tools & Integrations

| Category | Components |
|----------|------------|
| **📄 Document Parsers** | Azure AI Document Intelligence, Docling |
| **🔍 Vector Stores** | Qdrant |
| **🎯 Rerankers** | Cohere, Together AI |
| **🌐 Tools** | DuckDuckGo Search, Custom Tools |
| **💾 Caching** | Redis integration for performance optimization |
| **📊 Embedders** | OpenAI, Google, Cohere, FastEmbed |

## 🎓 Learning Resources

- 📖 **[Complete Documentation](https://docs.datapizza.ai)** - Comprehensive guides and API reference
- 🎯 **[RAG Tutorial](https://docs.datapizza.ai/latest/Guides/RAG/rag/)** - Build production RAG systems
- 🤖 **[Agent Examples](https://docs.datapizza.ai/latest/Guides/agent/)** - Real-world agent implementations

## 🤝 Community


- 💬 **[Discord Community](https://discord.gg/s5sJNHz2C8)**
- 📚 **[Documentation](https://docs.datapizza.ai)**
- 📧 **[GitHub Issues](https://github.com/datapizza-labs/datapizza-ai/issues)**
- 🐦 **[Twitter](https://x.com/datapizza_ai)**

### 🌟 Contributing

We love contributions! Whether it's:

- 🐛 **Bug Reports** - Help us improve
- 💡 **Feature Requests** - Share your ideas
- 📝 **Documentation** - Make it better for everyone
- 🔧 **Code Contributions** - Build the future together

Check out our [Contributing Guide](CONTRIBUTING.md) to get started.

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

---

<div align="center">

**Built by Datapizza, the AI native company**

*A framework made to be easy to learn, easy to maintain and ready for production* 🍕

[⭐ Star us on GitHub](https://github.com/datapizza-labs/datapizza-ai) • [🚀 Get Started](https://docs.datapizza.ai) • [💬 Join Discord](https://discord.gg/s5sJNHz2C8)

## Star History

[![Star History Chart](https://api.star-history.com/svg?repos=datapizza-labs/datapizza-ai&type=Date)](https://www.star-history.com/#datapizza-labs/datapizza-ai&Date)

</div>


## /datapizza-ai-cache/redis/README.md



## /datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py

```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py" 
# Import Redis cache implementation
from .cache import RedisCache

__all__ = ["RedisCache"]

```

## /datapizza-ai-cache/redis/datapizza/cache/redis/cache.py

```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/cache.py" 
import logging
import pickle

from datapizza.core.cache import Cache

import redis

log = logging.getLogger(__name__)


class RedisCache(Cache):
    """
    A Redis-based cache implementation.
    """

    def __init__(
        self, host="localhost", port=6379, db=0, expiration_time=3600
    ):  # 1 hour default
        self.redis = redis.Redis(host=host, port=port, db=db)
        self.expiration_time = expiration_time

    def get(self, key: str) -> str | None:
        """Retrieve and deserialize object"""
        pickled_obj = self.redis.get(key)
        if pickled_obj is None:
            return None
        return pickle.loads(pickled_obj)  # type: ignore

    def set(self, key: str, obj):
        """Serialize and store object"""
        pickled_obj = pickle.dumps(obj)
        self.redis.set(key, pickled_obj, ex=self.expiration_time)

```

## /datapizza-ai-cache/redis/pyproject.toml

```toml path="/datapizza-ai-cache/redis/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-cache-redis"
version = "0.0.3"
description = "An implementation using Redis for datapizza-ai cache"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.0.0,<0.1.0",
    "redis>=5.2.1,<6.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-cache/redis/tests/test_redis_cache.py

```py path="/datapizza-ai-cache/redis/tests/test_redis_cache.py" 
from datapizza.cache.redis import RedisCache


def test_redis_cache():
    cache = RedisCache(host="localhost", port=6379, db=0)
    assert cache is not None

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/README.md



## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py" 
from .anthropic_client import AnthropicClient

__all__ = ["AnthropicClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py" 
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock, ThoughtBlock

from anthropic import Anthropic, AsyncAnthropic

from .memory_adapter import AnthropicMemoryAdapter


class AnthropicClient(Client):
    """A client for interacting with the Anthropic API (Claude).

    This class provides methods for invoking the Anthropic API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "claude-3-5-sonnet-latest",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
    ):
        """
        Args:
            api_key: The API key for the Anthropic API.
            model: The model to use for the Anthropic API.
            system_prompt: The system prompt to use for the Anthropic API.
            temperature: The temperature to use for the Anthropic API.
            cache: The cache to use for the Anthropic API.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )
        self.api_key = api_key
        self.memory_adapter = AnthropicMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            self.client = Anthropic(api_key=self.api_key)

    def _set_a_client(self):
        if not self.a_client:
            self.a_client = AsyncAnthropic(api_key=self.api_key)

    def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
        """Convert tools to Anthropic tool format"""
        anthropic_tools = []
        for tool in tools:
            anthropic_tool = {
                "name": tool.name,
                "description": tool.description or "",
                "input_schema": {
                    "type": "object",
                    "properties": tool.properties,
                    "required": tool.required,
                },
            }
            anthropic_tools.append(anthropic_tool)
        return anthropic_tools

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by Anthropic"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "tool",
                "name": tool_choice[0],
            }
        elif tool_choice == "required":
            return {"type": "any"}
        elif tool_choice == "auto":
            return {"type": "auto"}
        else:
            return tool_choice

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        """Convert Anthropic response to ClientResponse"""
        blocks = []

        if hasattr(response, "content") and response.content:
            if isinstance(
                response.content, list
            ):  # Claude 3 returns a list of content blocks
                for content_block in response.content:
                    if content_block.type == "text":
                        blocks.append(TextBlock(content=content_block.text))
                    elif content_block.type == "thinking":
                        # Summarized thinking content
                        blocks.append(ThoughtBlock(content=content_block.thinking))
                    elif content_block.type == "tool_use":
                        tool = tool_map.get(content_block.name) if tool_map else None
                        if not tool:
                            raise ValueError(f"Tool {content_block.name} not found")

                        blocks.append(
                            FunctionCallBlock(
                                id=content_block.id,
                                name=content_block.name,
                                arguments=content_block.input,
                                tool=tool,
                            )
                        )
            else:  # Handle as string for compatibility
                blocks.append(TextBlock(content=str(response.content)))

        stop_reason = response.stop_reason if hasattr(response, "stop_reason") else None

        return ClientResponse(
            content=blocks,
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.input_tokens,
                completion_tokens=response.usage.output_tokens,
                cached_tokens=response.usage.cache_read_input_tokens,
            ),
        )

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method for Anthropic"""
        if tools is None:
            tools = []
        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)
        # remove the model from the messages
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = client.messages.create(**request_params)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        client = self._get_a_client()
        messages = self._memory_to_contents(None, input, memory)
        # remove the model from the messages
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await client.messages.create(**request_params)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of the abstract _stream_invoke method for Anthropic"""
        if tools is None:
            tools = []
        messages = self._memory_to_contents(None, input, memory)
        client = self._get_client()

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        stream = client.messages.create(**request_params)

        input_tokens = 0
        output_tokens = 0
        message_text = ""
        thought_text = ""

        for chunk in stream:
            if (
                chunk.type == "content_block_delta"
                and hasattr(chunk, "delta")
                and chunk.delta
            ):
                if hasattr(chunk.delta, "text") and chunk.delta.text:
                    message_text += chunk.delta.text
                    yield ClientResponse(
                        content=[
                            ThoughtBlock(content=thought_text),
                            TextBlock(content=message_text),
                        ],
                        delta=chunk.delta.text,
                    )
                elif hasattr(chunk.delta, "thinking") and chunk.delta.thinking:
                    thought_text += chunk.delta.thinking

            if chunk.type == "message_start":
                input_tokens = (
                    chunk.message.usage.input_tokens if chunk.message.usage else 0
                )

            if chunk.type == "message_delta":
                output_tokens = max(
                    output_tokens, chunk.usage.output_tokens if chunk.usage else 0
                )

        yield ClientResponse(
            content=[
                ThoughtBlock(content=thought_text),
                TextBlock(content=message_text),
            ],
            delta="",
            stop_reason="end_turn",
            usage=TokenUsage(
                prompt_tokens=input_tokens,
                completion_tokens=output_tokens,
                cached_tokens=0,
            ),
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Implementation of the abstract _a_stream_invoke method for Anthropic"""
        if tools is None:
            tools = []
        messages = self._memory_to_contents(None, input, memory)
        client = self._get_a_client()

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }
        if temperature:
            request_params["temperature"] = temperature
        if system_prompt:
            request_params["system"] = system_prompt

        if max_tokens:
            request_params["max_tokens"] = max_tokens

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        stream = await client.messages.create(**request_params)

        input_tokens = 0
        output_tokens = 0
        message_text = ""
        thought_text = ""

        async for chunk in stream:
            if (
                chunk.type == "content_block_delta"
                and hasattr(chunk, "delta")
                and chunk.delta
            ):
                if hasattr(chunk.delta, "text") and chunk.delta.text:
                    message_text += chunk.delta.text
                    yield ClientResponse(
                        content=[
                            ThoughtBlock(content=thought_text),
                            TextBlock(content=message_text),
                        ],
                        delta=chunk.delta.text,
                    )
                elif hasattr(chunk.delta, "thinking") and chunk.delta.thinking:
                    thought_text += chunk.delta.thinking

            if chunk.type == "message_start":
                input_tokens = (
                    chunk.message.usage.input_tokens if chunk.message.usage else 0
                )

            if chunk.type == "message_delta":
                output_tokens = max(
                    output_tokens, chunk.usage.output_tokens if chunk.usage else 0
                )

        yield ClientResponse(
            content=[
                ThoughtBlock(content=thought_text),
                TextBlock(content=message_text),
            ],
            delta="",
            stop_reason="end_turn",
            usage=TokenUsage(
                prompt_tokens=input_tokens,
                completion_tokens=output_tokens,
                cached_tokens=0,
            ),
        )

    def _structured_response(
        self,
        *args,
        **kwargs,
    ) -> ClientResponse:
        raise NotImplementedError("Anthropic does not support structured responses")

    async def _a_structured_response(self, *args, **kwargs):
        raise NotImplementedError("Anthropic does not support structured responses")

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class AnthropicMemoryAdapter(MemoryAdapter):
    """Adapter for converting Memory objects to Anthropic API message format"""

    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"type": "text", "text": block.content}
                case FunctionCallBlock():
                    block_dict = json.dumps(
                        {
                            "type": "tool_call",
                            "id": block.id,
                            "tool_name": block.name,
                            "tool_args": block.arguments,
                        }
                    )

                case FunctionCallResultBlock():
                    block_dict = json.dumps(
                        {
                            "type": "tool_result",
                            "tool_use_id": block.id,
                            "content": block.result,
                        }
                    )
                case StructuredBlock():
                    block_dict = {
                        "type": "text",
                        "text": str(block.content),
                    }
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        if all(isinstance(block, dict) for block in content) and all(
            list(block.keys()) == ["type", "text"] for block in content
        ):
            content = "".join([block["text"] for block in content])

        if len(content) == 1:
            content = content[0]

        return {
            "role": turn.role.anthropic_role,
            "content": (content),
        }

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        """Convert text and role to Anthropic message format"""
        # Anthropic uses 'user', 'assistant', and 'system' roles

        return {"role": role.anthropic_role, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "document",
                    "source": {
                        "type": "url",
                        "url": block.media.source,
                    },
                }

            case "base64":
                return {
                    "type": "document",
                    "source": {
                        "type": "base64",
                        "media_type": "application/pdf",
                        "data": block.media.source,
                    },
                }

            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "document",
                    "source": {
                        "type": "base64",
                        "media_type": "application/pdf",
                        "data": base64_pdf,
                    },
                }

            case _:
                raise NotImplementedError("Source type not supported")

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image",
                    "source": {
                        "type": "url",
                        "url": block.media.source,
                    },
                }

            case "base64":
                return {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": f"image/{block.media.extension}",
                        "data": block.media.source,
                    },
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": f"image/{block.media.extension}",
                        "data": base64_image,
                    },
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media type: {block.media.media_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-anthropic"
version = "0.0.4"
description = "Anthropic (Claude) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "anthropic>=0.40.0,<1.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py" 
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, TextBlock

from datapizza.clients.anthropic import AnthropicClient
from datapizza.clients.anthropic.memory_adapter import AnthropicMemoryAdapter


def test_init_anthropic_client():
    client = AnthropicClient(api_key="test")
    assert client is not None


def test_anthropic_memory_adapter():
    memory_adapter = AnthropicMemoryAdapter()
    memory = Memory()
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.ASSISTANT)

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": "Hello, world!",
        },
        {
            "role": "assistant",
            "content": "Hello, world!",
        },
    ]

```

## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/__init__.py" 
from datapizza.clients.azure_openai.azure_openai_client import AzureOpenAIClient

__all__ = ["AzureOpenAIClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/azure_openai_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/datapizza/clients/azure_openai/azure_openai_client.py" 
from datapizza.clients.openai.openai_client import OpenAIClient
from datapizza.core.cache import Cache
from openai import AsyncAzureOpenAI, AzureOpenAI


class AzureOpenAIClient(OpenAIClient):
    def __init__(
        self,
        api_key: str,
        azure_endpoint: str,
        *,
        model: str = "gpt-4o-mini",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        azure_deployment: str | None = None,
        api_version: str | None = None,
    ):
        self.azure_endpoint = azure_endpoint
        self.azure_deployment = azure_deployment
        self.api_version = api_version

        super().__init__(
            api_key=api_key,
            model=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )
        self._set_client()

    def _set_client(self):
        self.client = AzureOpenAI(
            api_key=self.api_key,
            api_version=self.api_version,
            azure_endpoint=self.azure_endpoint,
            azure_deployment=self.azure_deployment,
        )

    def _set_a_client(self):
        self.a_client = AsyncAzureOpenAI(
            api_key=self.api_key,
            api_version=self.api_version,
            azure_endpoint=self.azure_endpoint,
            azure_deployment=self.azure_deployment,
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-azure-openai/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-azure-openai/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-azure-openai"
version = "0.0.4"
description = "Azure OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "datapizza-ai-clients-openai>=0.0.1",
    "openai>=1.63.2,<2.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/README.md

# DataPizza AI - AWS Bedrock Client

AWS Bedrock client implementation for the datapizza-ai framework. This client provides seamless integration with AWS Bedrock's Converse API, supporting various foundation models including Anthropic's Claude models.

## Features

- Full support for AWS Bedrock Converse API
- Multiple authentication methods (AWS Profile, Access Keys, Environment Variables)
- Streaming and non-streaming responses
- Tool/function calling support
- Memory/conversation history management
- Image and document (PDF) support
- Async support

## Installation

```bash
pip install datapizza-ai-clients-bedrock
```

Or install from source in editable mode:

```bash
cd datapizza-ai/datapizza-ai-clients/datapizza-ai-clients-bedrock
pip install -e .
```

## Quick Start

### Basic Usage

```python
from datapizza.clients.bedrock import BedrockClient

# Using AWS Profile
client = BedrockClient(
    profile_name="my-aws-profile",
    region_name="us-east-1"
)

# Or using access keys
client = BedrockClient(
    aws_access_key_id="YOUR_ACCESS_KEY",
    aws_secret_access_key="YOUR_SECRET_KEY",
    region_name="us-east-1"
)

# Simple invocation
result = client.invoke("What is AWS Bedrock?")

# Extract text from response
for block in result.content:
    if hasattr(block, 'content'):
        print(block.content)
```

## Authentication Methods

The client supports multiple authentication methods in the following priority order:

### 1. Explicit Credentials

```python
client = BedrockClient(
    aws_access_key_id="YOUR_ACCESS_KEY",
    aws_secret_access_key="YOUR_SECRET_KEY",
    aws_session_token="YOUR_SESSION_TOKEN",  # Optional, for temporary credentials
    region_name="us-east-1"
)
```

### 2. AWS Profile

```python
client = BedrockClient(
    profile_name="my-aws-profile",
    region_name="us-east-1"
)
```

### 3. Environment Variables

Set these environment variables:
```bash
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_SESSION_TOKEN="your-session-token"  # Optional
export AWS_PROFILE="my-aws-profile"  # Or use profile
```

Then initialize without parameters:
```python
client = BedrockClient(region_name="us-east-1")
```

### 4. Default AWS Credentials Chain

If no credentials are provided, boto3 will use the default credentials chain (IAM roles, ~/.aws/credentials, etc.)

```python
client = BedrockClient(region_name="us-east-1")
```

## Available Models

The client works with any Bedrock model that supports the Converse API. Popular models include:

- `anthropic.claude-3-5-sonnet-20241022-v2:0` (default)
- `anthropic.claude-3-5-sonnet-20240620-v1:0`
- `anthropic.claude-3-opus-20240229-v1:0`
- `anthropic.claude-3-sonnet-20240229-v1:0`
- `anthropic.claude-3-haiku-20240307-v1:0`
- `meta.llama3-70b-instruct-v1:0`
- `mistral.mistral-large-2402-v1:0`
- And many more...

```python
client = BedrockClient(
    model="anthropic.claude-3-opus-20240229-v1:0",
    region_name="us-east-1"
)
```

## Usage Examples

### With System Prompt

```python
client = BedrockClient(
    system_prompt="You are a helpful coding assistant specialized in Python.",
    region_name="us-east-1"
)

result = client.invoke("How do I read a CSV file?")
```

### Streaming Responses

```python
for chunk in client.stream_invoke("Tell me a long story"):
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
print()
```

### With Memory (Conversation History)

```python
from datapizza.memory import Memory

memory = Memory()
client = BedrockClient(region_name="us-east-1")

# First message
result1 = client.invoke("My favorite color is blue", memory=memory)

# The conversation is tracked in memory
result2 = client.invoke("What's my favorite color?", memory=memory)
# Response: "Your favorite color is blue."
```

### With Temperature and Max Tokens

```python
result = client.invoke(
    "Write a creative story",
    temperature=0.9,  # Higher = more creative (0-1)
    max_tokens=1000
)
```

### With Tools/Function Calling

```python
from datapizza.tools import Tool

def get_weather(location: str, unit: str = "celsius") -> str:
    """Get the weather for a location"""
    return f"The weather in {location} is 22°{unit[0].upper()}"

weather_tool = Tool(
    name="get_weather",
    description="Get the current weather for a location",
    function=get_weather,
    properties={
        "location": {
            "type": "string",
            "description": "The city name"
        },
        "unit": {
            "type": "string",
            "enum": ["celsius", "fahrenheit"],
            "description": "Temperature unit"
        }
    },
    required=["location"]
)

result = client.invoke(
    "What's the weather in Paris?",
    tools=[weather_tool]
)

# Check for function calls
for block in result.content:
    if isinstance(block, FunctionCallBlock):
        print(f"Function: {block.name}")
        print(f"Arguments: {block.arguments}")
```

### Async Support

```python
import asyncio

async def main():
    client = BedrockClient(region_name="us-east-1")
    result = await client.a_invoke("Hello!")
    print(result.content[0].content)

asyncio.run(main())
```

### Async Streaming

```python
async def stream_example():
    client = BedrockClient(region_name="us-east-1")
    async for chunk in client.a_stream_invoke("Count to 10"):
        if chunk.delta:
            print(chunk.delta, end="", flush=True)

asyncio.run(stream_example())
```

## Configuration

### Constructor Parameters

```python
BedrockClient(
    model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
    system_prompt: str = "",
    temperature: float | None = None,  # 0-1 for most models
    cache: Cache | None = None,
    region_name: str = "us-east-1",
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    profile_name: str | None = None,
)
```

### Invoke Parameters

```python
client.invoke(
    input: str,                    # The user message
    tools: list[Tool] | None = None,
    memory: Memory | None = None,
    tool_choice: "auto" | "required" | "none" | list[str] = "auto",
    temperature: float | None = None,
    max_tokens: int = 2048,
    system_prompt: str | None = None,  # Override instance system_prompt
)
```

## Response Format

All methods return a `ClientResponse` object:

```python
response = client.invoke("Hello")

# Access content blocks
for block in response.content:
    if isinstance(block, TextBlock):
        print(block.content)  # The text
    elif isinstance(block, FunctionCallBlock):
        print(block.name)      # Function name
        print(block.arguments) # Function arguments

# Token usage
print(f"Prompt tokens: {response.prompt_tokens_used}")
print(f"Completion tokens: {response.completion_tokens_used}")
print(f"Stop reason: {response.stop_reason}")
```

## IAM Permissions

Your AWS credentials need the following permissions:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:*::foundation-model/*"
            ]
        }
    ]
}
```

## Model Access

Before using a model, you need to request access in the AWS Bedrock console:

1. Go to AWS Bedrock console
2. Navigate to "Model access"
3. Request access to the models you want to use
4. Wait for approval (usually instant for most models)

## Limitations

- Structured responses are not natively supported (unlike OpenAI's structured output)
- Some advanced features may vary by model
- Token usage metrics may not include caching information

## Error Handling

```python
from botocore.exceptions import BotoCoreError, ClientError

try:
    result = client.invoke("Hello")
except ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print("Model access not granted. Check Bedrock console.")
    elif e.response['Error']['Code'] == 'ResourceNotFoundException':
        print("Model not found in this region.")
    else:
        print(f"AWS Error: {e}")
except BotoCoreError as e:
    print(f"Boto3 Error: {e}")
```

## Development

### Running Tests

```bash
pip install -e ".[dev]"
pytest tests/
```

### Code Formatting

```bash
ruff check .
ruff format .
```

## License

MIT License - see LICENSE file for details

## Contributing

Contributions are welcome! Please see the main datapizza-ai repository for contribution guidelines.

## Support

For issues and questions:
- GitHub Issues: [datapizza-ai repository](https://github.com/datapizza/datapizza-ai)
- Documentation: [DataPizza AI Docs](https://docs.datapizza.ai)


## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py" 
from .bedrock_client import BedrockClient

__all__ = ["BedrockClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py" 
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

import aioboto3
import boto3
from botocore.config import Config
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock

from .memory_adapter import BedrockMemoryAdapter


class BedrockClient(Client):
    """A client for interacting with AWS Bedrock API.

    This class provides methods for invoking AWS Bedrock models (Claude, etc.)
    to generate responses. It extends the Client class and supports authentication
    via AWS profile or access keys.
    """

    def __init__(
        self,
        model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        region_name: str = "us-east-1",
        aws_access_key_id: str | None = None,
        aws_secret_access_key: str | None = None,
        aws_session_token: str | None = None,
        profile_name: str | None = None,
    ):
        """
        Args:
            model: The Bedrock model to use (e.g., 'anthropic.claude-3-5-sonnet-20241022-v2:0').
            system_prompt: The system prompt to use for the model.
            temperature: The temperature to use for generation (0-1 for most models).
            cache: The cache to use for responses.
            region_name: AWS region name (default: us-east-1).
            aws_access_key_id: AWS access key ID (optional, can use AWS_ACCESS_KEY_ID env var).
            aws_secret_access_key: AWS secret access key (optional, can use AWS_SECRET_ACCESS_KEY env var).
            aws_session_token: AWS session token (optional, for temporary credentials).
            profile_name: AWS profile name (optional, can use AWS_PROFILE env var).
        """
        if temperature and not 0 <= temperature <= 1:
            raise ValueError("Temperature must be between 0 and 1 for Bedrock models")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.region_name = region_name
        self.aws_access_key_id = aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID")
        self.aws_secret_access_key = aws_secret_access_key or os.getenv(
            "AWS_SECRET_ACCESS_KEY"
        )
        self.aws_session_token = aws_session_token or os.getenv("AWS_SESSION_TOKEN")
        self.profile_name = profile_name or os.getenv("AWS_PROFILE")

        self.memory_adapter = BedrockMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            session_kwargs = {}

            # Priority: explicit credentials > profile > default credentials
            if self.aws_access_key_id and self.aws_secret_access_key:
                session_kwargs["aws_access_key_id"] = self.aws_access_key_id
                session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
                if self.aws_session_token:
                    session_kwargs["aws_session_token"] = self.aws_session_token
            elif self.profile_name:
                session_kwargs["profile_name"] = self.profile_name

            session = boto3.Session(**session_kwargs)

            # Create bedrock-runtime client with retry configuration
            config = Config(
                retries={"max_attempts": 3, "mode": "adaptive"},
                read_timeout=300,
            )

            self.client = session.client(
                service_name="bedrock-runtime",
                region_name=self.region_name,
                config=config,
            )

    def _set_a_client(self):
        """Initialize async bedrock-runtime client using aioboto3"""
        if not self.a_client:
            session_kwargs = {}

            # Priority: explicit credentials > profile > default credentials
            if self.aws_access_key_id and self.aws_secret_access_key:
                session_kwargs["aws_access_key_id"] = self.aws_access_key_id
                session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
                if self.aws_session_token:
                    session_kwargs["aws_session_token"] = self.aws_session_token
            elif self.profile_name:
                session_kwargs["profile_name"] = self.profile_name

            # Create async session
            session = aioboto3.Session(**session_kwargs)

            # Create bedrock-runtime client with retry configuration
            config = Config(
                retries={"max_attempts": 3, "mode": "adaptive"},
                read_timeout=300,
            )

            # Store the session and config for async client creation
            self.a_session = session
            self.a_config = config
            self.a_region_name = self.region_name

    def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
        """Convert tools to Bedrock tool format (similar to Anthropic)"""
        bedrock_tools = []
        for tool in tools:
            bedrock_tool = {
                "toolSpec": {
                    "name": tool.name,
                    "description": tool.description or "",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": tool.properties,
                            "required": tool.required,
                        }
                    },
                }
            }
            bedrock_tools.append(bedrock_tool)
        return bedrock_tools

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict:
        """Convert tool choice to Bedrock format"""
        if isinstance(tool_choice, list):
            if len(tool_choice) > 1:
                raise NotImplementedError(
                    "Multiple function names not supported by Bedrock"
                )
            return {"tool": {"name": tool_choice[0]}}
        elif tool_choice == "required":
            return {"any": {}}
        elif tool_choice == "auto":
            return {"auto": {}}
        else:  # none
            return {}

    def _response_to_client_response(
        self, response: dict, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        """Convert Bedrock response to ClientResponse"""
        blocks = []

        # Parse the response body
        response_body = response.get("output", {})

        # Handle message content
        message = response_body.get("message", {})
        content_items = message.get("content", [])

        for content_item in content_items:
            if "text" in content_item:
                blocks.append(TextBlock(content=content_item["text"]))
            elif "toolUse" in content_item:
                tool_use = content_item["toolUse"]
                tool = tool_map.get(tool_use["name"]) if tool_map else None
                if not tool:
                    raise ValueError(f"Tool {tool_use['name']} not found")

                blocks.append(
                    FunctionCallBlock(
                        id=tool_use["toolUseId"],
                        name=tool_use["name"],
                        arguments=tool_use["input"],
                        tool=tool,
                    )
                )

        # Extract usage information
        usage = response.get("usage", {})
        prompt_tokens = usage.get("inputTokens", 0)
        completion_tokens = usage.get("outputTokens", 0)

        # Extract stop reason
        stop_reason = response.get("stopReason")

        return ClientResponse(
            content=blocks,
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                cached_tokens=0,  # Bedrock doesn't expose cache metrics in the same way
            ),
        )

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method for Bedrock"""
        if tools is None:
            tools = []

        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages (Bedrock doesn't support this)
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        # Build the request body according to Bedrock Converse API
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        # Add system prompt if provided
        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        # Add tools if provided
        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        # Add any additional kwargs
        request_body.update(kwargs)

        # Call Bedrock Converse API
        response = client.converse(**request_body)

        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Async implementation using aioboto3"""
        if tools is None:
            tools = []

        # Ensure async client is initialized
        if not hasattr(self, "a_session"):
            self._set_a_client()

        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages (Bedrock doesn't support this)
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        # Build the request body according to Bedrock Converse API
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        # Add system prompt if provided
        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        # Add tools if provided
        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        # Add any additional kwargs
        request_body.update(kwargs)

        # Call Bedrock Converse API asynchronously
        async with self.a_session.client(
            service_name="bedrock-runtime",
            region_name=self.a_region_name,
            config=self.a_config,
        ) as client:
            response = await client.converse(**request_body)

        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of streaming for Bedrock"""
        if tools is None:
            tools = []

        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages
        messages = [message for message in messages if message.get("role") != "model"]

        # Build the request body
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        request_body.update(kwargs)

        # Call Bedrock ConverseStream API
        response = client.converse_stream(**request_body)

        # Process streaming response
        message_text = ""
        input_tokens = 0
        output_tokens = 0
        stop_reason = None

        stream = response.get("stream")
        if stream:
            for event in stream:
                if "contentBlockDelta" in event:
                    delta = event["contentBlockDelta"].get("delta", {})
                    if "text" in delta:
                        text_delta = delta["text"]
                        message_text += text_delta
                        yield ClientResponse(
                            content=[TextBlock(content=message_text)],
                            delta=text_delta,
                            stop_reason=None,
                        )

                elif "metadata" in event:
                    metadata = event["metadata"]
                    usage = metadata.get("usage", {})
                    input_tokens = usage.get("inputTokens", 0)
                    output_tokens = usage.get("outputTokens", 0)

                elif "messageStop" in event:
                    stop_reason = event["messageStop"].get("stopReason")

        # Final response with complete information
        yield ClientResponse(
            content=[TextBlock(content=message_text)],
            delta="",
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=input_tokens,
                completion_tokens=output_tokens,
                cached_tokens=0,
            ),
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Async streaming implementation using aioboto3"""
        if tools is None:
            tools = []

        # Ensure async client is initialized
        if not hasattr(self, "a_session"):
            self._set_a_client()

        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages
        messages = [message for message in messages if message.get("role") != "model"]

        # Build the request body
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        request_body.update(kwargs)

        # Call Bedrock ConverseStream API asynchronously
        async with self.a_session.client(
            service_name="bedrock-runtime",
            region_name=self.a_region_name,
            config=self.a_config,
        ) as client:
            response = await client.converse_stream(**request_body)

            # Process streaming response
            message_text = ""
            input_tokens = 0
            output_tokens = 0
            stop_reason = None

            stream = response.get("stream")
            if stream:
                async for event in stream:
                    if "contentBlockDelta" in event:
                        delta = event["contentBlockDelta"].get("delta", {})
                        if "text" in delta:
                            text_delta = delta["text"]
                            message_text += text_delta
                            yield ClientResponse(
                                content=[TextBlock(content=message_text)],
                                delta=text_delta,
                                stop_reason=None,
                            )

                    elif "metadata" in event:
                        metadata = event["metadata"]
                        usage = metadata.get("usage", {})
                        input_tokens = usage.get("inputTokens", 0)
                        output_tokens = usage.get("outputTokens", 0)

                    elif "messageStop" in event:
                        stop_reason = event["messageStop"].get("stopReason")

            # Final response with complete information
            yield ClientResponse(
                content=[TextBlock(content=message_text)],
                delta="",
                stop_reason=stop_reason,
                usage=TokenUsage(
                    prompt_tokens=input_tokens,
                    completion_tokens=output_tokens,
                    cached_tokens=0,
                ),
            )

    def _structured_response(
        self,
        input: str,
        output_cls: type,
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Bedrock doesn't natively support structured output like OpenAI

        This would require prompting techniques or using Anthropic's prompt caching
        """
        raise NotImplementedError(
            "Bedrock doesn't natively support structured responses. "
            "Consider using prompt engineering or JSON mode with validation."
        )

    async def _a_structured_response(self, *args, **kwargs):
        raise NotImplementedError(
            "Bedrock doesn't natively support structured responses"
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class BedrockMemoryAdapter(MemoryAdapter):
    """Adapter for converting Memory objects to AWS Bedrock Converse API message format

    The Bedrock Converse API uses a message format similar to Anthropic's Claude API.
    """

    def _turn_to_message(self, turn: Turn) -> dict:
        """Convert a Turn to Bedrock message format"""
        content = []

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"text": block.content}

                case FunctionCallBlock():
                    # Bedrock uses toolUse format
                    block_dict = {
                        "toolUse": {
                            "toolUseId": block.id,
                            "name": block.name,
                            "input": block.arguments,
                        }
                    }

                case FunctionCallResultBlock():
                    # Bedrock uses toolResult format
                    block_dict = {
                        "toolResult": {
                            "toolUseId": block.id,
                            "content": [{"text": str(block.result)}],
                        }
                    }

                case StructuredBlock():
                    # Convert structured content to text
                    block_dict = {
                        "text": json.dumps(block.content)
                        if not isinstance(block.content, str)
                        else block.content,
                    }

                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_document_block(block)
                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        # Bedrock expects content as a list
        return {
            "role": self._convert_role(turn.role),
            "content": content,
        }

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        """Convert text and role to Bedrock message format"""
        return {
            "role": self._convert_role(role),
            "content": [{"text": text}],
        }

    def _convert_role(self, role: ROLE) -> str:
        """Convert ROLE to Bedrock role string

        Bedrock Converse API supports 'user' and 'assistant' roles
        """
        if role.value == "user":
            return "user"
        elif role.value == "assistant":
            return "assistant"
        else:
            # Default to user for system or other roles
            return "user"

    def _process_document_block(self, block: MediaBlock) -> dict:
        """Process document (PDF) blocks for Bedrock"""
        match block.media.source_type:
            case "base64":
                return {
                    "document": {
                        "format": "pdf",
                        "name": "document.pdf",
                        "source": {"bytes": base64.b64decode(block.media.source)},
                    }
                }

            case "path":
                with open(block.media.source, "rb") as f:
                    pdf_bytes = f.read()
                return {
                    "document": {
                        "format": "pdf",
                        "name": block.media.source.split("/")[-1],
                        "source": {"bytes": pdf_bytes},
                    }
                }

            case "url":
                raise NotImplementedError(
                    "Bedrock Converse API does not support document URLs directly. "
                    "Please download and provide as bytes."
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        """Process image blocks for Bedrock"""
        match block.media.source_type:
            case "base64":
                return {
                    "image": {
                        "format": block.media.extension or "png",
                        "source": {"bytes": base64.b64decode(block.media.source)},
                    }
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    image_bytes = image_file.read()
                return {
                    "image": {
                        "format": block.media.extension or "png",
                        "source": {"bytes": image_bytes},
                    }
                }

            case "url":
                raise NotImplementedError(
                    "Bedrock Converse API does not support image URLs directly. "
                    "Please download and provide as bytes."
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-bedrock"
version = "0.0.4"
description = "AWS Bedrock client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "boto3>=1.35.0",
    "botocore>=1.35.0",
    "aioboto3>=13.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
    "moto>=5.0.0",  # For mocking AWS services in tests
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py" 
from datapizza.clients.bedrock import BedrockClient


def test_async_client_initialization():
    """Test that async client can be initialized"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Initialize async client
    client._set_a_client()

    # Verify async session is created
    assert hasattr(client, "a_session")
    assert hasattr(client, "a_config")
    assert hasattr(client, "a_region_name")
    assert client.a_region_name == "us-east-1"


def test_a_invoke_method_exists():
    """Test that _a_invoke method is implemented and doesn't raise NotImplementedError"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Verify the method exists and is async
    assert hasattr(client, "_a_invoke")
    assert callable(client._a_invoke)


def test_a_stream_invoke_method_exists():
    """Test that _a_stream_invoke method is implemented and doesn't raise NotImplementedError"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Verify the method exists and is async
    assert hasattr(client, "_a_stream_invoke")
    assert callable(client._a_stream_invoke)

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py" 
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, TextBlock

from datapizza.clients.bedrock import BedrockClient
from datapizza.clients.bedrock.memory_adapter import BedrockMemoryAdapter


def test_init_bedrock_client():
    """Test that BedrockClient can be initialized"""
    client = BedrockClient()
    assert client is not None
    assert client.model_name == "anthropic.claude-3-5-sonnet-20241022-v2:0"


def test_init_bedrock_client_with_credentials():
    """Test BedrockClient initialization with explicit credentials"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-west-2",
    )
    assert client is not None
    assert client.aws_access_key_id == "test_key"
    assert client.region_name == "us-west-2"


def test_bedrock_memory_adapter():
    """Test that the memory adapter converts memory to Bedrock message format"""
    memory_adapter = BedrockMemoryAdapter()
    memory = Memory()
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
    memory.add_turn(
        blocks=[TextBlock(content="Hello! How can I help you?")], role=ROLE.ASSISTANT
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": [{"text": "Hello, world!"}],
        },
        {
            "role": "assistant",
            "content": [{"text": "Hello! How can I help you?"}],
        },
    ]


def test_bedrock_memory_adapter_multiple_blocks():
    """Test memory adapter with multiple text blocks in a single turn"""
    memory_adapter = BedrockMemoryAdapter()
    memory = Memory()
    memory.add_turn(
        blocks=[
            TextBlock(content="First message."),
            TextBlock(content="Second message."),
        ],
        role=ROLE.USER,
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": [{"text": "First message."}, {"text": "Second message."}],
        },
    ]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/README.md



## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py" 
from .google_client import GoogleClient

__all__ = ["GoogleClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py" 
from collections.abc import AsyncIterator, Iterator
from typing import Literal

from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Model,
    StructuredBlock,
    TextBlock,
    ThoughtBlock,
)

from google import genai
from google.genai import types
from google.oauth2 import service_account

from .memory_adapter import GoogleMemoryAdapter


class GoogleClient(Client):
    """A client for interacting with Google's Generative AI APIs.

    This class provides methods for invoking the Google GenAI API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str | None = None,
        model: str = "gemini-2.0-flash",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        project_id: str | None = None,
        location: str | None = None,
        credentials_path: str | None = None,
        use_vertexai: bool = False,
    ):
        """
        Args:
            api_key: The API key for the Google API.
            model: The model to use for the Google API.
            system_prompt: The system prompt to use for the Google API.
            temperature: The temperature to use for the Google API.
            cache: The cache to use for the Google API.
            project_id: The project ID for the Google API.
            location: The location for the Google API.
            credentials_path: The path to the credentials for the Google API.
            use_vertexai: Whether to use Vertex AI for the Google API.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )
        self.memory_adapter = GoogleMemoryAdapter()

        try:
            if use_vertexai:
                if not credentials_path:
                    raise ValueError("credentials_path must be provided")
                if not project_id:
                    raise ValueError("project_id must be provided")
                if not location:
                    raise ValueError("location must be provided")

                credentials = service_account.Credentials.from_service_account_file(
                    credentials_path,
                    scopes=["https://www.googleapis.com/auth/cloud-platform"],
                )
                self.client = genai.Client(
                    vertexai=True,
                    project=project_id,
                    location=location,
                    credentials=credentials,
                )
            else:
                if not api_key:
                    raise ValueError("api_key must be provided")

                self.client = genai.Client(api_key=api_key)

        except Exception as e:
            raise RuntimeError(
                f"Failed to initialize Google GenAI client: {e!s}"
            ) from None

    def _convert_tool(self, tool: Tool) -> dict:
        """Convert tools to Google function format"""
        parameters = {
            "type": tool.schema["parameters"]["type"],
            "properties": tool.schema["parameters"]["properties"],
            "required": tool.schema["parameters"]["required"],
        }

        return {
            "name": tool.schema["name"],
            "description": tool.schema["description"],
            "parameters": parameters,
        }

    def _prepare_tools(self, tools: list[Tool] | None) -> list[types.Tool] | None:
        if not tools:
            return None

        google_tools = []
        function_declarations = []
        has_google_search = False

        for tool in tools:
            # Check if tool has google search capability
            if hasattr(tool, "name") and "google_search" in tool.name.lower():
                has_google_search = True
            elif isinstance(tool, Tool):
                function_declarations.append(self._convert_tool(tool))
            elif isinstance(tool, dict):
                google_tools.append(tool)
            else:
                raise ValueError(f"Unknown tool type: {type(tool)}")

        if function_declarations:
            google_tools.append(types.Tool(function_declarations=function_declarations))

        if has_google_search:
            google_tools.append(types.Tool(google_search=types.GoogleSearch()))

        return google_tools if google_tools else None

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> types.ToolConfig:
        adjusted_tool_choice: types.ToolConfig
        if isinstance(tool_choice, list):
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",  # type: ignore
                    allowed_function_names=tool_choice,
                )
            )
        elif tool_choice == "required":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="ANY")  # type: ignore
            )
        elif tool_choice == "none":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="NONE")  # type: ignore
            )
        elif tool_choice == "auto":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="AUTO")  # type: ignore
            )
        return adjusted_tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,  # type: ignore
        )
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        response = await self.client.aio.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,  # type: ignore
        )
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of the abstract _stream_invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        message_text = ""
        thought_block = ThoughtBlock(content="")

        usage = TokenUsage()

        for chunk in self.client.models.generate_content_stream(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,
        ):
            usage += TokenUsage(
                prompt_tokens=chunk.usage_metadata.prompt_token_count or 0,
                completion_tokens=chunk.usage_metadata.candidates_token_count or 0,
                cached_tokens=chunk.usage_metadata.cached_content_token_count or 0,
            )
            if not chunk.candidates:
                raise ValueError("No candidates in response")

            finish_reason = chunk.candidates[0].finish_reason
            stop_reason = (
                finish_reason.value.lower()
                if finish_reason is not None
                else finish_reason
            )

            if not chunk.candidates[0].content:
                raise ValueError("No content in response")

            if not chunk.candidates[0].content.parts:
                yield ClientResponse(
                    content=[],
                    delta=chunk.text or "",
                    stop_reason=stop_reason,
                    usage=usage,
                )
                continue

            for part in chunk.candidates[0].content.parts:
                if not part.text:
                    continue
                elif hasattr(part, "thought") and part.thought:
                    thought_block.content += part.text
                else:  # If it's not a thought, it's a message
                    if part.text:
                        message_text += str(chunk.text or "")

                        yield ClientResponse(
                            content=[],
                            delta=chunk.text or "",
                            stop_reason=stop_reason,
                        )
        yield ClientResponse(
            content=[TextBlock(content=message_text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Implementation of the abstract _a_stream_invoke method for Google"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        usage = TokenUsage()
        message_text = ""
        thought_block = ThoughtBlock(content="")
        async for chunk in await self.client.aio.models.generate_content_stream(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,
        ):  # type: ignore
            usage += TokenUsage(
                prompt_tokens=chunk.usage_metadata.prompt_token_count or 0,
                completion_tokens=chunk.usage_metadata.candidates_token_count or 0,
                cached_tokens=chunk.usage_metadata.cached_content_token_count or 0,
            )

            finish_reason = chunk.candidates[0].finish_reason
            stop_reason = (
                finish_reason.value.lower()
                if finish_reason is not None
                else finish_reason
            )

            # Handle the case where the response has no parts
            if not chunk.candidates[0].content.parts:
                yield ClientResponse(
                    content=[],
                    delta=chunk.text or "",
                    stop_reason=stop_reason,
                )
                continue

            for part in chunk.candidates[0].content.parts:
                if not part.text:
                    continue
                elif hasattr(part, "thought") and part.thought:
                    thought_block.content += part.text
                else:  # If it's not a thought, it's a message
                    if part.text:
                        message_text += chunk.text or ""
                        yield ClientResponse(
                            content=[],
                            delta=chunk.text or "",
                            stop_reason=stop_reason,
                        )
        yield ClientResponse(
            content=[TextBlock(content=message_text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _structured_response method"""
        contents = self._memory_to_contents(self.system_prompt, input, memory)

        prepared_tools = self._prepare_tools(tools)
        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=types.GenerateContentConfig(
                system_instruction=system_prompt,
                temperature=temperature,
                max_output_tokens=max_tokens,
                response_mime_type="application/json",
                tools=prepared_tools,  # type: ignore
                tool_config=self._convert_tool_choice(tool_choice)
                if tools and any(isinstance(tool, Tool) for tool in tools)
                else None,
                response_schema=(
                    output_cls.model_json_schema()
                    if hasattr(output_cls, "model_json_schema")
                    else output_cls
                ),
            ),
        )
        if not response or not response.candidates:
            raise ValueError("No response from Google GenAI")

        structured_data = output_cls.model_validate_json(str(response.text))
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=response.candidates[0].finish_reason.value.lower()
            if response.candidates[0].finish_reason
            else None,
            usage=TokenUsage(
                prompt_tokens=response.usage_metadata.prompt_token_count or 0,
                completion_tokens=response.usage_metadata.candidates_token_count or 0,
                cached_tokens=response.usage_metadata.cached_content_token_count or 0,
            ),
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _structured_response method"""
        contents = self._memory_to_contents(self.system_prompt, input, memory)
        prepared_tools = self._prepare_tools(tools)
        response = await self.client.aio.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=types.GenerateContentConfig(
                system_instruction=system_prompt,
                temperature=temperature,
                max_output_tokens=max_tokens,
                response_mime_type="application/json",
                tools=prepared_tools,  # type: ignore
                tool_config=self._convert_tool_choice(tool_choice)
                if tools and any(isinstance(tool, Tool) for tool in tools)
                else None,
                response_schema=(
                    output_cls.model_json_schema()
                    if hasattr(output_cls, "model_json_schema")
                    else output_cls
                ),
            ),
        )

        if not response or not response.candidates:
            raise ValueError("No response from Google GenAI")

        structured_data = output_cls.model_validate_json(str(response.text))
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=response.candidates[0].finish_reason.value.lower()
            if response.candidates[0].finish_reason
            else None,
            usage=TokenUsage(
                prompt_tokens=response.usage_metadata.prompt_token_count or 0,
                completion_tokens=response.usage_metadata.candidates_token_count or 0,
                cached_tokens=response.usage_metadata.cached_content_token_count or 0,
            ),
        )

    def _embed(
        self,
        text: str | list[str],
        model_name: str | None,
        task_type: str = "RETRIEVAL_DOCUMENT",
        output_dimensionality: int = 768,
        title: str | None = None,
        **kwargs,
    ) -> list[float] | list[list[float] | None]:
        """Embed a text using the model"""
        response = self.client.models.embed_content(
            model=model_name or self.model_name,
            contents=text,  # type: ignore
            config=types.EmbedContentConfig(
                task_type=task_type,
                output_dimensionality=output_dimensionality,
                title=title,
                **kwargs,
            ),
        )
        # Extract the embedding values from the response
        if not response.embeddings:
            return []

        embeddings = [embedding.values for embedding in response.embeddings]

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    async def _a_embed(
        self,
        text: str | list[str],
        model_name: str | None,
        task_type: str = "RETRIEVAL_DOCUMENT",
        output_dimensionality: int = 768,
        title: str | None = None,
        **kwargs,
    ) -> list[float] | list[list[float] | None]:
        """Embed a text using the model"""
        response = await self.client.aio.models.embed_content(
            model=model_name or self.model_name,
            contents=text,  # type: ignore
            config=types.EmbedContentConfig(
                task_type=task_type,
                output_dimensionality=output_dimensionality,
                title=title,
                **kwargs,
            ),
        )
        # Extract the embedding values from the response
        if not response.embeddings:
            return []
        embeddings = [embedding.values for embedding in response.embeddings]

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        blocks = []
        # Handle function calls if present
        if hasattr(response, "function_calls") and response.function_calls:
            for fc in response.function_calls:
                if not tool_map:
                    raise ValueError("Tool map is required")

                tool = tool_map.get(fc.name, None)
                if not tool:
                    raise ValueError(f"Tool {fc.name} not found in tool map")

                blocks.append(
                    FunctionCallBlock(
                        name=fc.name,
                        arguments=fc.args,
                        id=f"fc_{id(fc)}",
                        tool=tool,
                    )
                )
        else:
            if hasattr(response, "text") and response.text:
                blocks.append(TextBlock(content=response.text))

        if hasattr(response, "candidates") and response.candidates:
            for part in response.candidates[0].content.parts:
                if not part.text:
                    continue
                if hasattr(part, "thought") and part.thought:
                    blocks.append(ThoughtBlock(content=part.text))

        usage_metadata = getattr(response, "usage_metadata", None)
        return ClientResponse(
            content=blocks,
            stop_reason=(response.candidates[0].finish_reason.value.lower())
            if hasattr(response, "candidates") and response.candidates
            else None,
            usage=TokenUsage(
                prompt_tokens=usage_metadata.prompt_token_count or 0,
                completion_tokens=usage_metadata.candidates_token_count or 0,
                cached_tokens=usage_metadata.cached_content_token_count or 0,
            ),
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py" 
import base64

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)

from google.genai import types


class GoogleMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"text": block.content}
                case FunctionCallBlock():
                    block_dict = {
                        "function_call": {"name": block.name, "args": block.arguments}
                    }
                case FunctionCallResultBlock():
                    block_dict = types.Part.from_function_response(
                        name=block.tool.name,
                        response={"result": block.result},
                    )
                case StructuredBlock():
                    block_dict = {"text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)

                        case "audio":
                            block_dict = self._process_audio_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        return {
            "role": turn.role.google_role,
            "parts": (content),
        }

    def _process_audio_block(self, block: MediaBlock) -> types.Part:
        match block.media.source_type:
            case "raw":
                return types.Part.from_bytes(
                    data=block.media.source,
                    mime_type="audio/mp3",
                )

            case "path":
                with open(block.media.source, "rb") as f:
                    audio_bytes = f.read()

                return types.Part.from_bytes(
                    data=audio_bytes,
                    mime_type="audio/mp3",
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _process_pdf_block(self, block: MediaBlock) -> types.Part | dict:
        match block.media.source_type:
            case "raw":
                return types.Part.from_bytes(
                    data=block.media.source,
                    mime_type="application/pdf",
                )
            case "base64":
                return {
                    "inline_data": {
                        "mime_type": "application/pdf",
                        "data": block.media.source,
                    }
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    pdf_bytes = f.read()

                return {
                    "inline_data": {
                        "mime_type": "application/pdf",
                        "data": pdf_bytes,
                    }
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} only supported: raw, base64, path"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return types.Part.from_uri(
                    file_uri=block.media.source,
                    mime_type=f"image/{block.media.extension}",
                )  # type: ignore
            case "base64":
                return {
                    "inline_data": {
                        "mime_type": f"image/{block.media.extension}",
                        "data": block.media.source,
                    }
                }
            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "inline_data": {
                        "mime_type": f"image/{block.media.extension}",
                        "data": base64_image,
                    }
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for image, only url, base64, path are supported"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.google_role, "parts": [{"text": text}]}

```

## /datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-google"
version = "0.0.5"
description = "Google (Gemini) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"

classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "google-genai>=1.3.0,<2.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py" 
import pytest
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, StructuredBlock, TextBlock

from datapizza.clients.google.memory_adapter import GoogleMemoryAdapter


def test_google_memory_to_messages_structured_block():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
    messages = GoogleMemoryAdapter().memory_to_messages(memory)
    # Google adapter may serialize as string or dict in "parts"
    assert "key" in str(messages[0]["parts"][0])


def test_google_memory_to_messages_with_system_prompt():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    system_prompt = "You are a helpful assistant."
    messages = GoogleMemoryAdapter().memory_to_messages(
        memory, system_prompt=system_prompt
    )
    assert messages[0]["role"] == "model"
    assert system_prompt in str(messages[0]["parts"])
    assert messages[1]["role"] == "user"


def test_google_memory_to_messages_with_input_str():
    memory = Memory()
    input_str = "What is the weather?"
    messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_str)
    assert messages[-1]["role"] == "user"
    assert input_str in str(messages[-1]["parts"])


def test_google_memory_to_messages_with_input_block():
    memory = Memory()
    input_block = TextBlock(content="This is a block input.")
    messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_block)
    assert messages[-1]["role"] == "user"
    assert "block input" in str(messages[-1]["parts"])


def test_google_memory_to_messages_unsupported_input():
    memory = Memory()

    class Dummy:
        pass

    with pytest.raises(ValueError):
        GoogleMemoryAdapter().memory_to_messages(memory, input=Dummy())

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/README.md



## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py" 
from datapizza.clients.mistral.mistral_client import MistralClient

__all__ = ["MistralClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class MistralMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        tool_calls = []
        tool_call_id = None

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"type": "text", "text": block.content}
                case FunctionCallBlock():
                    tool_calls.append(
                        {
                            "id": block.id,
                            "function": {
                                "name": block.name,
                                "arguments": json.dumps(block.arguments),
                            },
                            "type": "function",
                        }
                    )
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    block_dict = {"type": "text", "text": block.result}
                case StructuredBlock():
                    block_dict = {"type": "text", "text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        # case "pdf":
                        #    block_dict = self._process_pdf_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}, only image are supported"
                            )

            if block_dict:
                content.append(block_dict)

        messages: dict = {
            "role": turn.role.value,
        }

        if content:
            messages["content"] = content

        if tool_calls:
            messages["tool_calls"] = tool_calls

        if tool_call_id:
            messages["tool_call_id"] = tool_call_id

        return messages

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image_url",
                    "image_url": {"url": block.media.source},
                }
            case "base64":
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{block.media.source}"
                    },
                }
            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{base64_image}"
                    },
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}, only url, base64, path are supported"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py" 
import json
import logging
import os
from collections.abc import AsyncIterator, Iterator
from typing import Literal

import requests
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Media,
    MediaBlock,
    Model,
    StructuredBlock,
    TextBlock,
)
from mistralai import Mistral
from mistralai.models.ocrresponse import OCRResponse

from .memory_adapter import MistralMemoryAdapter

log = logging.getLogger(__name__)


class MistralClient(Client):
    """A client for interacting with the Mistral API.

    This class provides methods for invoking the Mistral API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "mistral-large-latest",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
    ):
        """
        Args:
            api_key: The API key for the Mistral API.
            model: The model to use for the Mistral API.
            system_prompt: The system prompt to use for the Mistral API.
            temperature: The temperature to use for the Mistral API.
            cache: The cache to use for the Mistral API.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.api_key = api_key
        self.memory_adapter = MistralMemoryAdapter()
        self._set_client()

    def _set_client(self):
        self.client = Mistral(api_key=self.api_key)

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        blocks = []
        for choice in response.choices:
            if choice.message.content:
                blocks.append(TextBlock(content=choice.message.content))

            if choice.message.tool_calls:
                for tool_call in choice.message.tool_calls:
                    tool = tool_map.get(tool_call.function.name) if tool_map else None

                    if tool is None:
                        raise ValueError(f"Tool {tool_call.function.name} not found")

                    blocks.append(
                        FunctionCallBlock(
                            id=tool_call.id,
                            name=tool_call.function.name,
                            arguments=json.loads(tool_call.function.arguments),
                            tool=tool,
                        )
                    )

            # Handle media content if present
            if hasattr(choice.message, "media") and choice.message.media:
                for media_item in choice.message.media:
                    media = Media(
                        media_type=media_item.type,
                        source_type="url" if media_item.source_url else "base64",
                        source=media_item.source_url or media_item.data,
                        detail=getattr(media_item, "detail", "high"),
                    )
                    blocks.append(MediaBlock(media=media))

        log.debug(f"{self.__class__.__name__} response = {response}")
        return ClientResponse(
            content=blocks,
            stop_reason=response.choices[0].finish_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=0,
            ),
        )

    def _convert_tools(self, tools: Tool) -> dict:
        """Convert tools to Mistral function format"""
        return {"type": "function", "function": tools.schema}

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by Mistral"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "function",
                "function": {"name": tool_choice[0]},
            }
        else:
            return tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        log.debug(f"{self.__class__.__name__} input = {input}")
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.complete(**request_params)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        log.debug(f"{self.__class__.__name__} input = {input}")
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.complete_async(**request_params)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.stream(**request_params)
        text = ""
        usage = TokenUsage()
        stop_reason = None
        for chunk in response:
            usage += TokenUsage(
                prompt_tokens=chunk.data.usage.prompt_tokens
                if chunk.data.usage
                else 0 or 0,
                completion_tokens=chunk.data.usage.completion_tokens
                if chunk.data.usage
                else 0 or 0,
                cached_tokens=0,
            )
            stop_reason = chunk.data.choices[0].finish_reason
            delta = chunk.data.choices[0].delta.content or ""
            text += delta
            yield ClientResponse(
                content=[],
                delta=str(delta),
                stop_reason=stop_reason,
            )

        yield ClientResponse(
            content=[TextBlock(content=text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 1024,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.stream_async(**request_params)
        text = ""
        usage = TokenUsage()
        stop_reason = None
        async for chunk in response:
            usage += TokenUsage(
                prompt_tokens=chunk.data.usage.prompt_tokens
                if chunk.data.usage
                else 0 or 0,
                completion_tokens=chunk.data.usage.completion_tokens
                if chunk.data.usage
                else 0 or 0,
                cached_tokens=0,
            )
            stop_reason = chunk.data.choices[0].finish_reason
            delta = chunk.data.choices[0].delta.content or ""
            text += delta
            yield ClientResponse(
                content=[],
                delta=str(delta),
                stop_reason=stop_reason,
            )

        yield ClientResponse(
            content=[TextBlock(content=text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        if not tools:
            tools = []

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.parse(
            model=self.model_name,
            messages=messages,
            response_format=output_cls,
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs,
        )

        if not response.choices:
            raise ValueError("No response from Mistral")

        log.debug(f"{self.__class__.__name__} structured response: {response}")
        stop_reason = response.choices[0].finish_reason if response.choices else None
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                str(response.choices[0].message.content)  # type: ignore
            )
        else:
            structured_data = json.loads(str(response.choices[0].message.content))  # type: ignore
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=0,
            ),
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        if not tools:
            tools = []

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.parse_async(
            model=self.model_name,
            messages=messages,
            response_format=output_cls,
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs,
        )

        if not response.choices:
            raise ValueError("No response from Mistral")

        log.debug(f"{self.__class__.__name__} structured response: {response}")
        stop_reason = response.choices[0].finish_reason if response.choices else None
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                str(response.choices[0].message.content)  # type: ignore
            )
        else:
            structured_data = json.loads(str(response.choices[0].message.content))  # type: ignore
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=0,
            ),
        )

    def _embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""
        response = self.client.embeddings.create(
            inputs=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [item.embedding for item in response.data]

        if not embeddings:
            return []

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    async def _a_embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""
        response = await self.client.embeddings.create_async(
            inputs=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [item.embedding for item in response.data]

        if not embeddings:
            return []

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings or []

    def parse_document(
        self,
        document_path: str,
        autodelete: bool = True,
        include_image_base64: bool = True,
    ) -> OCRResponse:
        filename = os.path.basename(document_path)
        with open(document_path, "rb") as f:
            uploaded_pdf = self.client.files.upload(
                file={"file_name": filename, "content": f}, purpose="ocr"
            )

        signed_url = self.client.files.get_signed_url(file_id=uploaded_pdf.id)

        response = self.client.ocr.process(
            model="mistral-ocr-latest",
            document={
                "type": "document_url",
                "document_url": signed_url.url,
            },
            include_image_base64=include_image_base64,
        )

        if autodelete:
            url = f"https://api.mistral.ai/v1/files/{uploaded_pdf.id}"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {self.api_key}",
            }

            requests.delete(url, headers=headers, timeout=30)

        return response

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-mistral"
version = "0.0.5"
description = "Mistral AI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "mistralai>=1.2.0,<2.0.0",
    "requests>=2.25.0,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py" 
from datapizza.clients.mistral import MistralClient


def test_init_mistral_client():
    client = MistralClient(api_key="test")
    assert client is not None

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/README.md

# DataPizza AI - OpenAI-Like Client

A versatile client for DataPizza AI that supports OpenAI-compatible APIs, including local models through Ollama, Together AI, and other OpenAI-compatible services.

## Installation

```bash
pip install datapizza-ai-clients-openai-like
```

## Quick Start

### With Ollama (Local Models)

```python
from datapizza.clients.openai_like import OpenAILikeClient

# Create client for Ollama
client = OpenAILikeClient(
    api_key="",  # Ollama doesn't require an API key
    model="gemma2:2b",
    system_prompt="You are a helpful assistant.",
    base_url="http://localhost:11434/v1",
)

response = client.invoke("What is the capital of France?")
print(response.content)
```

### With Together AI

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("TOGETHER_API_KEY"),
    model="meta-llama/Llama-2-7b-chat-hf",
    system_prompt="You are a helpful assistant.",
    base_url="https://api.together.xyz/v1",
)

response = client.invoke("Explain quantum computing")
print(response.content)
```

### With OpenRouter

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("OPENROUTER_API_KEY"),
    model="google/gemma-7b-it",
    system_prompt="You are a helpful assistant.",
    base_url="https://openrouter.ai/api/v1",
)

response = client.invoke("What is OpenRouter?")
print(response.content)
```

### With Other OpenAI-Compatible Services

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("YOUR_API_KEY"),
    model="your-model-name",
    system_prompt="You are a helpful assistant.",
    base_url="https://your-service-url/v1",
)

response = client.invoke("Your question here")
print(response.content)
```

## Features

- **OpenAI-Compatible**: Works with any service that implements the OpenAI API standard
- **Local Models**: Perfect for running with Ollama for privacy and cost control
- **Memory Support**: Built-in memory adapter for conversation history
- **Streaming**: Support for real-time streaming responses
- **Structured Outputs**: Generate structured data with Pydantic models
- **Tool Calling**: Function calling capabilities where supported

## Supported Services

- **Ollama** - Local model inference
- **Together AI** - Cloud-based model hosting
- **OpenRouter** - Access a variety of models through a single API
- **Perplexity AI** - Search-augmented models
- **Groq** - Fast inference API
- **Any OpenAI-compatible API**

## Advanced Usage

### With Memory

```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.memory import Memory

client = OpenAILikeClient(
    api_key="",
    model="llama3.1:8b",
    base_url="http://localhost:11434/v1",
)

memory = Memory(client=client)
memory.add("I'm working on a Python project about machine learning.")
response = memory.query("What libraries should I use?")
```

### Streaming Responses

```python
client = OpenAILikeClient(
    api_key="",
    model="gemma2:7b",
    base_url="http://localhost:11434/v1",
)

for chunk in client.stream("Tell me a story about AI"):
    print(chunk.content, end="", flush=True)
```

### Structured Outputs

```python
from pydantic import BaseModel
from datapizza.clients.openai_like import OpenAILikeClient

class Person(BaseModel):
    name: str
    age: int
    occupation: str

client = OpenAILikeClient(
    api_key="",
    model="llama3.1:8b",
    base_url="http://localhost:11434/v1",
)

response = client.invoke(
    "Generate a person profile",
    response_format=Person
)
print(response.parsed)  # Person object
```

## Configuration Options

| Parameter | Description | Default |
|-----------|-------------|---------|
| `api_key` | API key for the service | Required (empty string for Ollama) |
| `model` | Model name to use | Required |
| `base_url` | Base URL for the API | Required |
| `system_prompt` | System message for the model | None |
| `temperature` | Sampling temperature (0-2) | 0.7 |
| `max_tokens` | Maximum tokens in response | None |
| `timeout` | Request timeout in seconds | 30 |

## Ollama Setup

1. Install Ollama from [ollama.ai](https://ollama.ai)
2. Pull a model: `ollama pull gemma2:2b`
3. Start Ollama: `ollama serve`
4. Use with DataPizza AI as shown in the examples above

## Popular Ollama Models

- `gemma2:2b` - Lightweight, fast responses
- `gemma2:7b` - Balanced performance
- `llama3.1:8b` - High quality, more resource intensive
- `codellama:7b` - Specialized for coding tasks
- `mistral:7b` - Good general purpose model

## Error Handling

```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.core.clients.exceptions import ClientError

try:
    client = OpenAILikeClient(
        api_key="",
        model="nonexistent-model",
        base_url="http://localhost:11434/v1",
    )
    response = client.invoke("Hello")
except ClientError as e:
    print(f"Client error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
```

## Contributing

Contributions are welcome! Please see our [Contributing Guide](../../CONTRIBUTING.md) for details.

## License

This project is licensed under the MIT License - see the [LICENSE](../../LICENSE) file for details.


## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py" 
from .openai_completion_client import OpenAILikeClient

__all__ = ["OpenAILikeClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class OpenAILikeMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        tool_calls = []
        tool_call_id = None

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"type": "text", "text": block.content}
                case FunctionCallBlock():
                    tool_calls.append(
                        {
                            "id": block.id,
                            "function": {
                                "name": block.name,
                                "arguments": json.dumps(block.arguments),
                            },
                            "type": "function",
                        }
                    )
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    block_dict = {"type": "text", "text": block.result}

                case StructuredBlock():
                    block_dict = {"type": "text", "text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)
                        case "audio":
                            block_dict = self._process_audio_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            if block_dict:
                content.append(block_dict)

        messages = {
            "role": turn.role.value,
            "content": (content),
        }

        if tool_calls:
            messages["tool_calls"] = tool_calls

        if tool_call_id:
            messages["tool_call_id"] = tool_call_id

        return messages

    def _process_audio_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_audio = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case "base_64":
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": block.media.source,
                        "format": block.media.extension,
                    },
                }
            case "raw":
                base64_audio = base64.b64encode(block.media.source).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "base64":
                return {
                    "type": "file",
                    "file": {
                        "filename": "file.pdf",
                        "file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
                    },
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "file",
                    "file": {
                        "filename": "file.pdf",
                        "file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image_url",
                    "image_url": {"url": block.media.source},
                }

            case "base64":
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{block.media.source}"
                    },
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                    return {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/{block.media.extension};base64,{base64_image}"
                        },
                    }

            case _:
                raise ValueError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py" 
import json
from collections.abc import AsyncIterator, Iterator
from typing import Literal

import httpx
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Media,
    MediaBlock,
    Model,
    StructuredBlock,
    TextBlock,
)
from openai import AsyncOpenAI, OpenAI

from .memory_adapter import OpenAILikeMemoryAdapter


class OpenAILikeClient(Client):
    """A client for interacting with the OpenAI API.

    This class provides methods for invoking the OpenAI API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "gpt-4o-mini",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        base_url: str | httpx.URL | None = None,
    ):
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.base_url = base_url
        self.api_key = api_key
        self.memory_adapter = OpenAILikeMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)

    def _set_a_client(self):
        if not self.a_client:
            self.a_client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None
    ) -> ClientResponse:
        blocks = []
        for choice in response.choices:
            if choice.message.content:
                blocks.append(TextBlock(content=choice.message.content))

            if choice.message.tool_calls and tool_map:
                for tool_call in choice.message.tool_calls:
                    tool = tool_map.get(tool_call.function.name)

                    if not tool:
                        raise ValueError(f"Tool {tool_call.function.name} not found")

                    blocks.append(
                        FunctionCallBlock(
                            id=tool_call.id,
                            name=tool_call.function.name,
                            arguments=json.loads(tool_call.function.arguments),
                            tool=tool,
                        )
                    )

            # Handle media content if present
            if hasattr(choice.message, "media") and choice.message.media:
                for media_item in choice.message.media:
                    media = Media(
                        media_type=media_item.type,
                        source_type="url" if media_item.source_url else "base64",
                        source=media_item.source_url or media_item.data,
                        detail=getattr(media_item, "detail", "high"),
                    )
                    blocks.append(MediaBlock(media=media))

        return ClientResponse(
            content=blocks,
            stop_reason=response.choices[0].finish_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=response.usage.prompt_tokens_details.cached_tokens
                if response.usage.prompt_tokens_details
                else 0 or 0,
            ),
        )

    def _convert_tools(self, tools: Tool) -> dict:
        """Convert tools to OpenAI function format"""
        return {"type": "function", "function": tools.schema}

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by OpenAI"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "function",
                "function": {"name": tool_choice[0]},
            }
        else:
            return tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        client: OpenAI = self._get_client()
        response = client.chat.completions.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()
        response = await a_client.chat.completions.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_completion_tokens": max_tokens,
            "stream_options": {"include_usage": True},
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.completions.create(**kwargs)
        message_content = ""
        usage = TokenUsage()
        finish_reason = None

        for chunk in response:
            usage += TokenUsage(
                prompt_tokens=chunk.usage.prompt_tokens if chunk.usage else 0 or 0,
                completion_tokens=chunk.usage.completion_tokens
                if chunk.usage
                else 0 or 0,
                cached_tokens=getattr(
                    getattr(chunk.usage, "prompt_tokens_details", None),
                    "cached_tokens",
                    0,
                )
                or 0,
            )

            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

            delta_content = delta.content if delta and delta.content else ""
            message_content = message_content + delta_content
            yield ClientResponse(
                content=[TextBlock(content=message_content)],
                delta=delta_content,
                stop_reason=finish_reason or None,
            )
        yield ClientResponse(
            content=[TextBlock(content=message_content)],
            stop_reason=finish_reason or None,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_completion_tokens": max_tokens,
            "stream_options": {"include_usage": True},
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()
        message_content = ""
        usage = TokenUsage()
        finish_reason = None

        async for chunk in await a_client.chat.completions.create(**kwargs):
            usage += TokenUsage(
                prompt_tokens=chunk.usage.prompt_tokens if chunk.usage else 0 or 0,
                completion_tokens=chunk.usage.completion_tokens
                if chunk.usage
                else 0 or 0,
                cached_tokens=getattr(
                    getattr(chunk.usage, "prompt_tokens_details", {}),
                    "cached_tokens",
                    0,
                )
                or 0,
            )

            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

            delta_content = delta.content if delta and delta.content else ""
            message_content = message_content + delta_content

            yield ClientResponse(
                content=[TextBlock(content=message_content)],
                delta=delta_content,
                stop_reason=finish_reason or None,
            )
        yield ClientResponse(
            content=[TextBlock(content=message_content)],
            stop_reason=finish_reason or None,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "response_format": output_cls,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["function"]["strict"] = True
                tool["function"]["parameters"]["additionalProperties"] = False

        response = self.client.beta.chat.completions.parse(**kwargs)

        stop_reason = response.choices[0].finish_reason

        if not response.choices[0].message.content:
            raise ValueError("No content in response")

        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                response.choices[0].message.content
            )
        else:
            structured_data = json.loads(response.choices[0].message.content)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=response.usage.prompt_tokens_details.cached_tokens
                if response.usage.prompt_tokens_details
                else 0 or 0,
            ),
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float,
        max_tokens: int,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ):
        messages = self._memory_to_contents(system_prompt, input, memory)

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "response_format": output_cls,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["function"]["strict"] = True
                tool["function"]["parameters"]["additionalProperties"] = False

        a_client = self._get_a_client()
        response = await a_client.beta.chat.completions.parse(**kwargs)

        stop_reason = response.choices[0].finish_reason
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                response.choices[0].message.content
            )
        else:
            structured_data = json.loads(response.choices[0].message.content)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.prompt_tokens or 0,
                completion_tokens=response.usage.completion_tokens or 0,
                cached_tokens=response.usage.prompt_tokens_details.cached_tokens
                if response.usage.prompt_tokens_details
                else 0 or 0,
            ),
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-openai-like"
version = "0.0.8"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "httpx>=0.28.1",
    "openai>=2.0.0,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py" 
from datapizza.clients.openai_like import OpenAILikeClient


def test_init():
    client = OpenAILikeClient(
        api_key="test_api_key",
        model="gpt-4o-mini",
        system_prompt="You are a helpful assistant that can answer questions about piadina only in italian.",
    )
    assert client is not None

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/README.md



## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py" 
from .openai_client import OpenAIClient

__all__ = ["OpenAIClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)

from openai.types.responses import ResponseFunctionToolCall


class OpenAIMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        tool_calls = []
        tool_call_id = None
        response_function_tool_call = None

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {
                        "type": "input_text"
                        if turn.role == ROLE.USER
                        else "output_text",
                        "text": block.content,
                    }
                case FunctionCallBlock():
                    return ResponseFunctionToolCall(
                        call_id=block.id,
                        name=block.name,
                        arguments=json.dumps(block.arguments),
                        type="function_call",
                        # id="fc_" + block.id,
                        status="completed",
                    )
                    # block_dict = {
                    #    "id": block.id,
                    #    "name": block.name,
                    #    "arguments": json.dumps(block.arguments),
                    #    "type": "function_call",
                    # }
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    return {
                        "type": "function_call_output",
                        "call_id": block.id,
                        "output": block.result,
                    }

                case StructuredBlock():
                    block_dict = {"type": "text", "text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)
                        case "audio":
                            block_dict = self._process_audio_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            if block_dict:
                content.append(block_dict)

        messages = {
            "role": turn.role.value,
            "content": (content),
        }

        if tool_calls:
            messages["tool_calls"] = tool_calls

        if tool_call_id:
            messages["tool_call_id"] = tool_call_id

        if response_function_tool_call:
            return response_function_tool_call

        return messages

    def _process_audio_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_audio = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case "base_64":
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": block.media.source,
                        "format": block.media.extension,
                    },
                }
            case "raw":
                base64_audio = base64.b64encode(block.media.source).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "base64":
                return {
                    "type": "input_file",
                    "filename": "file.pdf",
                    "file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_file",
                    "filename": "file.pdf",
                    "file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "input_image",
                    "image_url": block.media.source,
                }

            case "base64":
                return {
                    "type": "input_image",
                    "image_url": f"data:image/{block.media.extension};base64,{block.media.source}",
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                    return {
                        "type": "input_image",
                        "image_url": f"data:image/{block.media.extension};base64,{base64_image}",
                    }

            case _:
                raise ValueError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/openai_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/openai_client.py" 
import json
from collections.abc import AsyncIterator, Iterator
from typing import Literal

import httpx
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Model,
    StructuredBlock,
    TextBlock,
    ThoughtBlock,
)

from openai import (
    AsyncOpenAI,
    AzureOpenAI,
    OpenAI,
)
from openai.types.responses import (
    ParsedResponseOutputMessage,
    ResponseCompletedEvent,
    ResponseFunctionToolCall,
    ResponseOutputMessage,
    ResponseReasoningItem,
    ResponseTextDeltaEvent,
)

from .memory_adapter import OpenAIMemoryAdapter


class OpenAIClient(Client):
    """A client for interacting with the OpenAI API.

    This class provides methods for invoking the OpenAI API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "gpt-4o-mini",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        base_url: str | httpx.URL | None = None,
        organization: str | None = None,
        project: str | None = None,
        webhook_secret: str | None = None,
        websocket_base_url: str | httpx.URL | None = None,
        timeout: float | httpx.Timeout | None = None,
        max_retries: int = 2,
        default_headers: dict[str, str] | None = None,
        default_query: dict[str, object] | None = None,
        http_client: httpx.Client | None = None,
    ):
        """
        Args:
            api_key: The API key for the OpenAI API.
            model: The model to use for the OpenAI API.
            system_prompt: The system prompt to use for the OpenAI API.
            temperature: The temperature to use for the OpenAI API.
            cache: The cache to use for the OpenAI API.
            base_url: The base URL for the OpenAI API.
            organization: The organization ID for the OpenAI API.
            project: The project ID for the OpenAI API.
            webhook_secret: The webhook secret for the OpenAI API.
            websocket_base_url: The websocket base URL for the OpenAI API.
            timeout: The timeout for the OpenAI API.
            max_retries: The max retries for the OpenAI API.
            default_headers: The default headers for the OpenAI API.
            default_query: The default query for the OpenAI API.
            http_client: The http_client for the OpenAI API.
        """

        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.api_key = api_key
        self.base_url = base_url
        self.organization = organization
        self.project = project
        self.webhook_secret = webhook_secret
        self.websocket_base_url = websocket_base_url
        self.timeout = timeout
        self.max_retries = max_retries
        self.default_headers = default_headers
        self.default_query = default_query
        self.http_client = http_client

        self.memory_adapter = OpenAIMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            self.client = OpenAI(
                api_key=self.api_key,
                base_url=self.base_url,
                organization=self.organization,
                project=self.project,
                webhook_secret=self.webhook_secret,
                websocket_base_url=self.websocket_base_url,
                timeout=self.timeout,
                max_retries=self.max_retries,
                default_headers=self.default_headers,
                default_query=self.default_query,
                http_client=self.http_client,
            )

    def _set_a_client(self):
        if not self.a_client:
            self.a_client = AsyncOpenAI(
                api_key=self.api_key,
                base_url=self.base_url,
                organization=self.organization,
                project=self.project,
                webhook_secret=self.webhook_secret,
                websocket_base_url=self.websocket_base_url,
                timeout=self.timeout,
                max_retries=self.max_retries,
                default_headers=self.default_headers,
                default_query=self.default_query,
            )

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None
    ) -> ClientResponse:
        blocks = []

        # Handle new response format with direct content array
        if hasattr(response, "output_parsed"):
            blocks.append(StructuredBlock(content=response.output_parsed))

        if hasattr(response, "output") and response.output:
            for content_item in response.output:
                if isinstance(content_item, ResponseOutputMessage) and not isinstance(
                    content_item, ParsedResponseOutputMessage
                ):
                    for content in content_item.content:
                        if content.type == "output_text":
                            blocks.append(TextBlock(content=content.text))
                elif isinstance(content_item, ResponseReasoningItem):
                    if content_item.summary:
                        blocks.append(
                            ThoughtBlock(content=content_item.summary[0].text)
                        )

                elif isinstance(content_item, ResponseFunctionToolCall):
                    if not tool_map:
                        raise ValueError("Tool map is required")

                    tool = tool_map.get(content_item.name)
                    if not tool:
                        raise ValueError(f"Tool {content_item.name} not found")
                    blocks.append(
                        FunctionCallBlock(
                            id=content_item.call_id,
                            name=content_item.name,
                            arguments=json.loads(content_item.arguments)
                            if isinstance(content_item.arguments, str)
                            else content_item.arguments,
                            tool=tool,
                        )
                    )

        # Handle usage from new format
        usage = getattr(response, "usage", None)
        if usage:
            prompt_tokens = getattr(usage, "input_tokens", 0)
            completion_tokens = getattr(usage, "output_tokens", 0)
            cached_tokens = 0
            # Handle input_tokens_details for cached tokens
            if hasattr(usage, "input_tokens_details") and usage.input_tokens_details:
                cached_tokens = getattr(usage.input_tokens_details, "cached_tokens", 0)

        # Handle stop reason - use status from new format
        stop_reason = getattr(response, "status", None)
        if not stop_reason and hasattr(response, "choices") and response.choices:
            stop_reason = response.choices[0].finish_reason

        return ClientResponse(
            content=blocks,
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=prompt_tokens or 0,
                completion_tokens=completion_tokens or 0,
                cached_tokens=cached_tokens or 0,
            ),
        )

    def _convert_tools(self, tool: Tool) -> dict:
        """Convert tools to OpenAI function format"""
        return {
            "type": "function",
            "name": tool.name,
            "description": tool.description,
            "parameters": {
                "type": "object",
                "properties": tool.properties,
                "required": tool.required,
            },
        }

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by OpenAI"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "function",
                "name": tool_choice[0],
            }
        else:
            return tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            **kwargs,
            "model": self.model_name,
            "input": messages,
            "stream": False,
            "max_output_tokens": max_tokens,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        client: OpenAI = self._get_client()
        response = client.responses.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            **kwargs,
            "model": self.model_name,
            "input": messages,
            "stream": False,
            "max_output_tokens": max_tokens,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()
        response = await a_client.responses.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            **kwargs,
            "model": self.model_name,
            "input": messages,
            "stream": True,
            "max_output_tokens": max_tokens,
            # "stream_options": {"include_usage": True},
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.responses.create(**kwargs)
        for chunk in response:
            if isinstance(chunk, ResponseTextDeltaEvent):
                yield ClientResponse(
                    content=[],
                    delta=chunk.delta,
                    stop_reason=None,
                    usage=TokenUsage(
                        prompt_tokens=0,
                        completion_tokens=0,
                        cached_tokens=0,
                    ),
                )

            if isinstance(chunk, ResponseCompletedEvent):
                yield self._response_to_client_response(chunk.response, tool_map)

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}
        kwargs = {
            **kwargs,
            "model": self.model_name,
            "input": messages,
            "stream": True,
            "max_output_tokens": max_tokens,
            # "stream_options": {"include_usage": True},
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()

        async for chunk in await a_client.responses.create(**kwargs):
            if isinstance(chunk, ResponseTextDeltaEvent):
                yield ClientResponse(
                    content=[],
                    delta=chunk.delta,
                    stop_reason=None,
                    usage=TokenUsage(
                        prompt_tokens=0,
                        completion_tokens=0,
                        cached_tokens=0,
                    ),
                )

            if isinstance(chunk, ResponseCompletedEvent):
                yield self._response_to_client_response(chunk.response, tool_map)

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output

        if tools is None:
            tools = []

        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}
        kwargs = {
            "model": self.model_name,
            "input": messages,
            "text_format": output_cls,
            "max_output_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["strict"] = True
                tool["parameters"]["additionalProperties"] = False

        response = self.client.responses.parse(**kwargs)

        return self._response_to_client_response(response, tool_map)

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float,
        max_tokens: int,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ):
        if tools is None:
            tools = []

        messages = self._memory_to_contents(system_prompt, input, memory)
        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            "model": self.model_name,
            "input": messages,
            "text_format": output_cls,
            "max_output_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["strict"] = True
                tool["parameters"]["additionalProperties"] = False

        a_client = self._get_a_client()
        response = await a_client.responses.parse(**kwargs)

        return self._response_to_client_response(response, tool_map)

    def _embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""
        response = self.client.embeddings.create(
            input=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [item.embedding for item in response.data]

        if isinstance(text, str):
            return embeddings[0] if embeddings else []

        return embeddings or []

    async def _a_embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""

        a_client = self._get_a_client()
        response = await a_client.embeddings.create(
            input=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [item.embedding for item in response.data]

        if isinstance(text, str):
            return embeddings[0] if embeddings else []

        return embeddings or []

    def _is_azure_client(self) -> bool:
        return isinstance(self._get_client(), AzureOpenAI)

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-openai"
version = "0.0.9"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.0.7,<0.1.0",
    "openai>=2,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py" 

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py" 
from unittest.mock import MagicMock, patch

import httpx
from datapizza.core.clients import ClientResponse

from datapizza.clients.openai import (
    OpenAIClient,
)


def test_client_init():
    client = OpenAIClient(
        model="gpt-4o-mini",
        api_key="test_api_key",
    )
    assert client is not None


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_extra_args(mock_openai):
    """Tests that extra arguments are passed to the OpenAI client."""
    OpenAIClient(
        api_key="test_api_key",
        organization="test-org",
        project="test-project",
        timeout=30.0,
        max_retries=3,
    )
    mock_openai.assert_called_once_with(
        api_key="test_api_key",
        base_url=None,
        organization="test-org",
        project="test-project",
        webhook_secret=None,
        websocket_base_url=None,
        timeout=30.0,
        max_retries=3,
        default_headers=None,
        default_query=None,
        http_client=None,
    )


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_http_client(mock_openai):
    """Tests that a custom http_client is passed to the OpenAI client."""
    custom_http_client = httpx.Client()
    OpenAIClient(
        api_key="test_api_key",
        http_client=custom_http_client,
    )
    mock_openai.assert_called_once_with(
        api_key="test_api_key",
        base_url=None,
        organization=None,
        project=None,
        webhook_secret=None,
        websocket_base_url=None,
        timeout=None,
        max_retries=2,
        default_headers=None,
        default_query=None,
        http_client=custom_http_client,
    )


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_invoke_kwargs_override(mock_openai_class):
    """
    Tests that kwargs like 'stream' are not overridden by user input
    in non-streaming methods, but other kwargs are passed through.
    """
    mock_openai_instance = mock_openai_class.return_value
    mock_openai_instance.responses.create.return_value = MagicMock()

    client = OpenAIClient(api_key="test")
    client._response_to_client_response = MagicMock(
        return_value=ClientResponse(content=[])
    )

    client.invoke("hello", stream=True, top_p=0.5)

    mock_openai_instance.responses.create.assert_called_once()
    called_kwargs = mock_openai_instance.responses.create.call_args.kwargs

    assert called_kwargs.get("top_p") == 0.5
    assert called_kwargs.get("stream") is False


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_stream_invoke_kwargs_override(mock_openai_class):
    """
    Tests that kwargs like 'stream' are not overridden by user input
    in streaming methods.
    """
    mock_openai_instance = mock_openai_class.return_value
    mock_openai_instance.responses.create.return_value = []

    client = OpenAIClient(api_key="test")

    list(client.stream_invoke("hello", stream=False, top_p=0.5))

    mock_openai_instance.responses.create.assert_called_once()
    called_kwargs = mock_openai_instance.responses.create.call_args.kwargs

    assert called_kwargs.get("top_p") == 0.5
    assert called_kwargs.get("stream") is True

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py" 
import json

import pytest
from datapizza.memory.memory import Memory
from datapizza.tools.tools import tool
from datapizza.type.type import (
    ROLE,
    FunctionCallBlock,
    Media,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)
from openai.types.responses import ResponseFunctionToolCall

from datapizza.clients.openai.memory_adapter import OpenAIMemoryAdapter


@pytest.fixture(
    params=[
        OpenAIMemoryAdapter(),
    ]
)
def adapter(request):
    """Parameterized fixture that provides different memory adapter implementations.

    Each test using this fixture will run once for each adapter in the params list.
    """
    return request.param


@pytest.fixture
def memory():
    return Memory()


def test_empty_memory_to_messages(adapter, memory):
    """Test that an empty memory converts to an empty list of messages."""
    messages = adapter.memory_to_messages(memory)
    assert messages == []


def test_turn_with_some_text():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    memory.add_to_last_turn(TextBlock(content="Hi, how are u?"))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages == [
        {
            "role": "user",
            "content": [
                {"type": "input_text", "text": "Hello!"},
                {"type": "input_text", "text": "Hi, how are u?"},
            ],
        }
    ]


def test_memory_to_messages_multiple_turns():
    """Test conversion of a memory with multiple turns to messages."""
    # First turn: user asks a question
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="What's 2+2?"))

    # Second turn: assistant responds
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(TextBlock(content="The answer is 4."))

    # Third turn: user follows up
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Thanks!"))

    messages = OpenAIMemoryAdapter().memory_to_messages(memory)

    expected = [
        {"role": "user", "content": [{"type": "input_text", "text": "What's 2+2?"}]},
        {
            "role": "assistant",
            "content": [{"type": "output_text", "text": "The answer is 4."}],
        },
        {"role": "user", "content": [{"type": "input_text", "text": "Thanks!"}]},
    ]
    assert messages == expected


def test_memory_to_messages_function_call():
    @tool
    def add(a: int, b: int) -> int:
        return a + b

    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Call the add function."))
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(
        FunctionCallBlock(id="call_1", name="add", arguments={"a": 2, "b": 2}, tool=add)
    )

    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    assert isinstance(messages[1], ResponseFunctionToolCall)
    assert json.loads(messages[1].arguments) == {
        "a": 2,
        "b": 2,
    }


def test_memory_to_messages_media_blocks():
    image = Media(
        media_type="image",
        source_type="url",
        source="http://example.com/image.png",
        extension="png",
    )
    pdf = Media(
        media_type="pdf",
        source_type="base64",
        source="THIS_IS_A_PDF_BASE64",
        extension="pdf",
    )
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(MediaBlock(media=image))
    memory.add_to_last_turn(MediaBlock(media=pdf))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    # Should contain both image and pdf blocks

    # TODO: Check if the image and pdf blocks are correct
    assert messages[0]["content"][1] == {
        "type": "input_file",
        "filename": "file.pdf",
        "file_data": "data:application/pdf;base64,THIS_IS_A_PDF_BASE64",
    }


def test_memory_to_messages_structured_block():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["content"] == "{'key': 'value'}" or messages[0]["content"] == [
        {
            "type": "text",
            "text": "{'key': 'value'}",
        }
    ]


def test_memory_to_messages_with_system_prompt():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    system_prompt = "You are a helpful assistant."
    messages = OpenAIMemoryAdapter().memory_to_messages(
        memory, system_prompt=system_prompt
    )
    assert messages[0]["role"] == "system"
    assert messages[0]["content"] == system_prompt
    assert messages[1]["role"] == "user"


def test_memory_to_messages_with_input_str():
    memory = Memory()
    input_str = "What is the weather?"
    messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_str)
    assert messages[-1]["role"] == "user"
    assert messages[-1]["content"] == input_str


def test_memory_to_messages_with_input_block():
    memory = Memory()
    input_block = TextBlock(content="This is a block input.")
    messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_block)
    assert messages[-1]["role"] == "user"
    assert "block input" in str(messages[-1]["content"])


def test_google_empty_memory_to_messages():
    messages = OpenAIMemoryAdapter().memory_to_messages(Memory())
    assert messages == []


def test_google_memory_to_messages_multiple_turns():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="What's 2+2?"))
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(TextBlock(content="The answer is 4."))
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Thanks!"))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    assert messages[1]["role"] == "assistant"
    assert messages[2]["role"] == "user"

```

## /datapizza-ai-core/README.md


This is the core of datapizza-ai framework


## /datapizza-ai-core/datapizza/agents/__init__.py

```py path="/datapizza-ai-core/datapizza/agents/__init__.py" 
from .agent import Agent, StepResult
from .client_manager import ClientManager

__all__ = [
    "Agent",
    "ClientManager",
    "StepResult",
]

```

## /datapizza-ai-core/datapizza/agents/__version__.py

```py path="/datapizza-ai-core/datapizza/agents/__version__.py" 
VERSION = (3, 0, 8)

__version__ = ".".join(map(str, VERSION))

```

## /datapizza-ai-core/datapizza/agents/agent.py

```py path="/datapizza-ai-core/datapizza/agents/agent.py" 
import inspect
from collections.abc import AsyncGenerator, Callable, Generator
from functools import wraps
from threading import Lock
from typing import Any, Literal, Union, cast

from pydantic import BaseModel

from datapizza.agents.logger import AgentLogger
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.core.executors.async_executor import AsyncExecutor
from datapizza.core.utils import sum_token_usage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.tracing.tracing import agent_span, tool_span
from datapizza.type import (
    ROLE,
    Block,
    FunctionCallBlock,
    FunctionCallResultBlock,
    TextBlock,
)

PLANNING_PROMT = """in this moment you just tell me what you are going to do.
You need to define the next steps to solve the task.
Do not use tools to solve the task.
Do not solve the task, just plan the next steps.
"""


class StepResult:
    def __init__(
        self,
        index: int,
        content: list[Block],
        usage: TokenUsage | None = None,
    ):
        self.index = index
        self.content = content
        self.usage = usage or TokenUsage()

    @property
    def text(self) -> str:
        return "\n".join(
            block.content for block in self.content if isinstance(block, TextBlock)
        )

    @property
    def tools_used(self) -> list[FunctionCallBlock]:
        return [block for block in self.content if isinstance(block, FunctionCallBlock)]


class Plan(BaseModel):
    task: str
    steps: list[str]

    def __str__(self):
        separator = "\n - "
        return f"I need to solve the task:\n\n{self.task}\n\nHere is the plan:\n\n - {separator.join(self.steps)}"


class Agent:
    name: str
    system_prompt: str = "You are a helpful assistant."

    def __init__(
        self,
        name: str | None = None,
        client: Client | None = None,
        *,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        max_steps: int | None = None,
        terminate_on_text: bool | None = True,
        stateless: bool = True,
        gen_args: dict[str, Any] | None = None,
        memory: Memory | None = None,
        stream: bool | None = None,
        # action_on_stop_reason: dict[str, Action] | None = None,
        can_call: list["Agent"] | None = None,
        logger: AgentLogger | None = None,
        planning_interval: int = 0,
        planning_prompt: str = PLANNING_PROMT,
    ):
        """
        Initialize the agent.

        Args:
            name (str, optional): The name of the agent. Defaults to None.
            client (Client): The client to use for the agent. Defaults to None.
            system_prompt (str, optional): The system prompt to use for the agent. Defaults to None.

            tools (list[Tool], optional): A list of tools to use with the agent. Defaults to None.
            max_steps (int, optional): The maximum number of steps to execute. Defaults to None.
            terminate_on_text (bool, optional): Whether to terminate the agent on text. Defaults to True.
            stateless (bool, optional): Whether to use stateless execution. Defaults to True.
            gen_args (dict[str, Any], optional): Additional arguments to pass to the agent's execution. Defaults to None.
            memory (Memory, optional): The memory to use for the agent. Defaults to None.
            stream (bool, optional): Whether to stream the agent's execution. Defaults to None.
            can_call (list[Agent], optional): A list of agents that can call the agent. Defaults to None.
            logger (AgentLogger, optional): The logger to use for the agent. Defaults to None.
            planning_interval (int, optional): The planning interval to use for the agent. Defaults to 0.
            planning_prompt (str, optional): The planning prompt to use for the agent planning steps. Defaults to PLANNING_PROMT.

        """
        if not client:
            raise ValueError("Client is required")

        if not name and not getattr(self, "name", None):
            raise ValueError(
                "Name is required, you can pass it as a parameter or set it in the agent class"
            )

        if not system_prompt and not getattr(self, "system_prompt", None):
            raise ValueError(
                "System prompt is required, you can pass it as a parameter or set it in the agent class"
            )

        self.name = name or self.name
        if not isinstance(self.name, str):
            raise ValueError("Name must be a string")

        self.system_prompt = system_prompt or self.system_prompt
        if not isinstance(self.system_prompt, str):
            raise ValueError("System prompt must be a string")

        self._client = client
        self._tools = tools or []
        self._planning_interval = planning_interval
        self._planning_prompt = planning_prompt
        self._memory = memory or Memory()
        self._stateless = stateless

        if can_call:
            self.can_call(can_call)

        self._max_steps = max_steps
        self._terminate_on_text = terminate_on_text
        self._stream = stream

        if not logger:
            self._logger = AgentLogger(agent_name=self.name)
        else:
            self._logger = logger

        for tool in self._decorator_tools():
            self._add_tool(tool)

        self._lock = Lock()

    def can_call(self, agent: Union[list["Agent"], "Agent"]):
        if isinstance(agent, Agent):
            agent = [agent]

        for a in agent:
            self._tools.append(a.as_tool())

    @classmethod
    def _tool_from_agent(cls, agent: "Agent"):
        async def invoke_agent(input_task: str):
            return cast(StepResult, await agent.a_run(input_task)).text

        a_tool = Tool(
            func=invoke_agent,
            name=agent.name,
            description=agent.__doc__,
        )
        return a_tool

    @staticmethod
    def _lock_if_not_stateless(func: Callable):
        @wraps(func)
        def decorated(self, *args, **kwargs):
            if not self._stateless and inspect.isgeneratorfunction(func):
                # For generators, we need a locking wrapper
                def locking_generator():
                    with self._lock:
                        yield from func(self, *args, **kwargs)

                return locking_generator()
            elif not self._stateless:
                with self._lock:
                    return func(self, *args, **kwargs)
            else:
                return func(self, *args, **kwargs)

        return decorated

    @staticmethod
    def _contains_ending_tool(step: StepResult) -> bool:
        content = step.content
        return any(
            block.tool.end_invoke
            for block in content
            if isinstance(block, FunctionCallBlock)
        )

    def as_tool(self):
        return Agent._tool_from_agent(self)

    def _add_tool(self, tool: Tool):
        self._tools.append(tool)

    def _decorator_tools(self):
        tools = []
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            # Check for tool methods
            if isinstance(attr, Tool):
                tools.append(attr)

        return tools

    @_lock_if_not_stateless
    def stream_invoke(
        self,
        task_input: str,
        tool_choice: Literal["auto", "required", "none", "required_first"]
        | list[str] = "auto",
        **gen_kwargs,
    ) -> Generator[ClientResponse | StepResult | Plan | None, None]:
        """
        Stream the agent's execution, yielding intermediate steps and final result.

        Args:
            task_input (str): The input text/prompt to send to the model
            tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
            **gen_kwargs: Additional keyword arguments to pass to the agent's execution

        Yields:
            The intermediate steps and final result of the agent's execution

        """
        yield from self._invoke_stream(task_input, tool_choice, **gen_kwargs)

    @_lock_if_not_stateless
    async def a_stream_invoke(
        self,
        task_input: str,
        tool_choice: Literal["auto", "required", "none", "required_first"]
        | list[str] = "auto",
        **gen_kwargs,
    ) -> AsyncGenerator[ClientResponse | StepResult | Plan | None]:
        """
        Stream the agent's execution asynchronously, yielding intermediate steps and final result.

        Args:
            task_input (str): The input text/prompt to send to the model
            tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
            **gen_kwargs: Additional keyword arguments to pass to the agent's execution

        Yields:
            The intermediate steps and final result of the agent's execution

        """
        async for step in self._a_invoke_stream(task_input, tool_choice, **gen_kwargs):
            yield step

    def _invoke_stream(
        self, task_input: str, tool_choice, **kwargs
    ) -> Generator[ClientResponse | StepResult | Plan | None, None]:
        self._logger.debug("STARTING AGENT")
        final_answer = None
        current_steps = 1
        memory = self._memory.copy()
        original_task = task_input

        while final_answer is None and (
            self._max_steps is None
            or (self._max_steps and current_steps <= self._max_steps)
        ):
            kwargs["tool_choice"] = tool_choice
            if tool_choice == "required_first":
                if current_steps == 1:
                    kwargs["tool_choice"] = "required"
                else:
                    kwargs["tool_choice"] = "auto"

            self._logger.debug(f"--- STEP {current_steps} ---")

            # Planning step if interval is set
            if self._planning_interval and (
                current_steps == 1 or (current_steps - 1) % self._planning_interval == 0
            ):
                plan = self._create_planning_prompt(
                    original_task, memory, current_steps
                )
                assert isinstance(plan, Plan)
                memory.add_turn(
                    TextBlock(content=str(plan)),
                    role=ROLE.ASSISTANT,
                )
                memory.add_turn(
                    TextBlock(content="Ok, go ahead and now execute the plan."),
                    role=ROLE.USER,
                )

                yield plan

                self._logger.log_panel(str(plan), title="PLAN")

            # Execute planning step
            step_output = None
            for result in self._execute_planning_step(
                current_steps, original_task, memory, **kwargs
            ):
                if isinstance(result, ClientResponse):
                    yield result
                elif isinstance(result, StepResult):
                    step_output = result.text
                    yield result

            if step_output and self._terminate_on_text:
                final_answer = step_output
                break

            if (
                result
                and isinstance(result, StepResult)
                and Agent._contains_ending_tool(result)
            ):
                self._logger.debug("ending tool found, ending agent")
                break

            current_steps += 1
            original_task = ""

        # Yield final answer if we have one
        if final_answer:
            self._logger.log_panel(final_answer, title="FINAL ANSWER")

        if not self._stateless:
            self._memory = memory

    async def _a_invoke_stream(
        self, task_input: str, tool_choice, **kwargs
    ) -> AsyncGenerator[ClientResponse | StepResult | Plan | None]:
        self._logger.debug("STARTING AGENT")
        final_answer = None
        current_steps = 1
        memory = self._memory.copy()
        original_task = task_input

        while final_answer is None and (
            self._max_steps is None
            or (self._max_steps and current_steps <= self._max_steps)
        ):
            kwargs["tool_choice"] = tool_choice
            if tool_choice == "required_first":
                if current_steps == 1:
                    kwargs["tool_choice"] = "required"
                else:
                    kwargs["tool_choice"] = "auto"

            # step_action = StepResult(index=current_steps)
            self._logger.debug(f"--- STEP {current_steps} ---")
            # yield step_action

            # Planning step if interval is set
            if self._planning_interval and (
                current_steps == 1 or (current_steps - 1) % self._planning_interval == 0
            ):
                plan = await self._a_create_planning_prompt(
                    original_task, memory, current_steps
                )
                assert isinstance(plan, Plan)
                memory.add_turn(
                    TextBlock(content=str(plan)),
                    role=ROLE.ASSISTANT,
                )
                memory.add_turn(
                    TextBlock(content="Ok, go ahead and now execute the plan."),
                    role=ROLE.USER,
                )

                yield plan

                self._logger.log_panel(str(plan), title="PLAN")

            # Execute planning step
            step_output = None
            async for result in self._a_execute_planning_step(
                current_steps, original_task, memory, **kwargs
            ):
                if isinstance(result, ClientResponse):
                    yield result
                elif isinstance(result, StepResult):
                    step_output = result.text
                    yield result

            if step_output and self._terminate_on_text:
                final_answer = step_output
                break

            if (
                result
                and isinstance(result, StepResult)
                and Agent._contains_ending_tool(result)
            ):
                self._logger.debug("ending tool found, ending agent")
                break

            current_steps += 1
            original_task = ""

        # Yield final answer if we have one
        if final_answer:
            self._logger.log_panel(final_answer, title="FINAL ANSWER")

        if not self._stateless:
            self._memory = memory

    def _create_planning_prompt(
        self, original_task: str, memory: Memory, step_number: int
    ) -> Plan:
        """Create a planning prompt that asks the agent to define next steps."""

        prompt = self.system_prompt + self._planning_prompt

        client_response = self._client.structured_response(
            input=original_task,
            tools=self._tools,
            tool_choice="none",
            memory=memory,
            system_prompt=prompt,
            output_cls=Plan,
        )
        return Plan(**client_response.structured_data[0].model_dump())

    async def _a_create_planning_prompt(
        self, original_task: str, memory: Memory, step_number: int
    ) -> Plan:
        """Create a planning prompt that asks the agent to define next steps."""
        prompt = self.system_prompt + self._planning_prompt

        client_response = await self._client.a_structured_response(
            input=original_task,
            tools=self._tools,
            tool_choice="none",
            memory=memory,
            system_prompt=prompt,
            output_cls=Plan,
        )
        return Plan(**client_response.structured_data[0].model_dump())

    def _execute_planning_step(
        self, current_step, planning_prompt: str, memory: Memory, **kwargs
    ) -> Generator[StepResult | ClientResponse, None, None]:
        """Execute a planning step with streaming support."""
        tool_results = []
        step_usage = TokenUsage()

        # Check if streaming is enabled
        response: ClientResponse
        if self._stream:
            for chunk in self._client.stream_invoke(
                input=planning_prompt,
                tools=self._tools,
                memory=memory,
                system_prompt=self.system_prompt,
                **kwargs,
            ):
                step_usage += chunk.usage
                response = chunk
                if chunk.delta:
                    yield chunk

        else:
            # Use regular non-streaming generation
            response = self._client.invoke(
                input=planning_prompt,
                tools=self._tools,
                memory=memory,
                system_prompt=self.system_prompt,
                **kwargs,
            )
            step_usage += response.usage

        if not response:
            raise RuntimeError("No response from client")

        if planning_prompt:
            memory.add_turn(TextBlock(content=planning_prompt), role=ROLE.USER)

        if response and response.text:
            memory.add_turn(TextBlock(content=response.text), role=ROLE.ASSISTANT)

        if response and response.function_calls:
            memory.add_turn(response.function_calls, role=ROLE.ASSISTANT)

        for tool_call in response.function_calls:
            tool_results.append(self._execute_tool(tool_call))

        if tool_results:
            for x in tool_results:
                memory.add_turn(x, role=ROLE.TOOL)

        step_action = StepResult(
            index=current_step,
            content=response.content + tool_results,
            usage=response.usage,
        )

        yield step_action

    async def _a_execute_planning_step(
        self, current_step, planning_prompt: str, memory: Memory, **kwargs
    ) -> AsyncGenerator[StepResult | ClientResponse, None]:
        """Execute a planning step with streaming support."""
        tool_results = []
        step_usage = TokenUsage()
        # Check if streaming is enabled
        response: ClientResponse
        if self._stream:
            async for chunk in self._client.a_stream_invoke(
                input=planning_prompt,
                tools=self._tools,
                memory=memory,
                system_prompt=self.system_prompt,
                **kwargs,
            ):
                step_usage += chunk.usage
                response = chunk
                if chunk.delta:
                    yield chunk

        else:
            # Use regular non-streaming generation
            response = await self._client.a_invoke(
                input=planning_prompt,
                tools=self._tools,
                memory=memory,
                system_prompt=self.system_prompt,
                **kwargs,
            )
            step_usage += response.usage

        if planning_prompt:
            memory.add_turn(TextBlock(content=planning_prompt), role=ROLE.USER)

        if response.text:
            memory.add_turn(TextBlock(content=response.text), role=ROLE.ASSISTANT)

        if response.function_calls:
            memory.add_turn(response.function_calls, role=ROLE.ASSISTANT)

        for tool_call in response.function_calls:
            tool_results.append(await self._a_execute_tool(tool_call))

        if tool_results:
            for x in tool_results:
                memory.add_turn(x, role=ROLE.TOOL)

        step_action = StepResult(
            index=current_step,
            content=response.content + tool_results,
            usage=response.usage,
        )

        yield step_action

    def _execute_tool(
        self, function_call: FunctionCallBlock
    ) -> FunctionCallResultBlock:
        with tool_span(f"Tool {function_call.tool.name}"):
            result = function_call.tool(**function_call.arguments)

            if inspect.iscoroutine(result):
                result = AsyncExecutor.get_instance().run(result)

            if result:
                self._logger.log_panel(
                    result,
                    title=f"TOOL {function_call.tool.name.upper()} RESULT",
                    subtitle="args: " + str(function_call.arguments),
                )
            return FunctionCallResultBlock(
                id=function_call.id,
                tool=function_call.tool,
                result=result,
            )

    async def _a_execute_tool(
        self, function_call: FunctionCallBlock
    ) -> FunctionCallResultBlock:
        with tool_span(f"Tool {function_call.tool.name}"):
            result = function_call.tool(**function_call.arguments)

            if inspect.iscoroutine(result):
                result = await result

            if result:
                self._logger.log_panel(
                    result,
                    title=f"TOOL {function_call.tool.name.upper()} RESULT",
                    subtitle="args: " + str(function_call.arguments),
                )
            return FunctionCallResultBlock(
                id=function_call.id,
                tool=function_call.tool,
                result=result,
            )

    @_lock_if_not_stateless
    def run(
        self,
        task_input: str,
        tool_choice: Literal["auto", "required", "none", "required_first"]
        | list[str] = "auto",
        **gen_kwargs,
    ) -> StepResult | None:
        """
        Run the agent on a task input.

        Args:
            task_input (str): The input text/prompt to send to the model
            tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
            **gen_kwargs: Additional keyword arguments to pass to the agent's execution

        Returns:
            The final result of the agent's execution
        """
        with agent_span(f"Agent {self.name}"):
            usage = TokenUsage()
            steps = list[ClientResponse | StepResult | Plan | None](
                self._invoke_stream(task_input, tool_choice, **gen_kwargs)
            )
            usage += sum_token_usage(
                [step.usage for step in steps if isinstance(step, StepResult)]
            )

            last_step = cast(
                StepResult,
                steps[-1],
            )
            last_step.usage = usage
            return last_step

    @_lock_if_not_stateless
    async def a_run(
        self,
        task_input: str,
        tool_choice: Literal["auto", "required", "none", "required_first"]
        | list[str] = "auto",
        **gen_kwargs,
    ) -> StepResult | None:
        """
        Run the agent on a task input asynchronously.

        Args:
            task_input (str): The input text/prompt to send to the model
            tool_choice (Literal["auto", "required", "none", "required_first"] | list[str], optional): Controls which tool to use ("auto" by default)
            **gen_kwargs: Additional keyword arguments to pass to the agent's execution

        Returns:
            The final result of the agent's execution
        """
        with agent_span(f"Agent {self.name}"):
            total_usage = TokenUsage()
            results = []
            async for result in self._a_invoke_stream(
                task_input, tool_choice, **gen_kwargs
            ):
                results.append(result)

            total_usage += sum_token_usage(
                [result.usage for result in results if isinstance(result, StepResult)]
            )
            last_result = results[-1] if results else None
            if last_result:
                last_result.usage = total_usage
            return last_result

```

## /datapizza-ai-core/datapizza/agents/client_manager.py

```py path="/datapizza-ai-core/datapizza/agents/client_manager.py" 
from threading import Lock

from datapizza.core.clients import Client


class ClientManager:
    _instance: Client | None = None
    _lock = Lock()

    @classmethod
    def set_global_client(cls, client: Client) -> None:
        """Set the global Client instance.

        Args:
            config: Client instance to be used globally
        """
        with cls._lock:
            cls._instance = client

    @classmethod
    def get_global_client(cls) -> Client | None:
        """Get the current global Client instance.

        Returns:
            The global client instance if set, None otherwise
        """
        return cls._instance

    @classmethod
    def clear_global_client(cls) -> None:
        """Clear the global Client instance."""
        with cls._lock:
            cls._instance = None

```

## /datapizza-ai-core/datapizza/cache/__init__.py

```py path="/datapizza-ai-core/datapizza/cache/__init__.py" 
# Import MemoryCache from core implementation
from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)

from datapizza.core.cache import MemoryCache

__all__ = ["MemoryCache"]

```

## /datapizza-ai-core/datapizza/clients/__init__.py

```py path="/datapizza-ai-core/datapizza/clients/__init__.py" 
from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)

from .factory import ClientFactory
from .mock_client import MockClient

__all__ = ["ClientFactory", "MockClient"]

```

## /datapizza-ai-core/datapizza/clients/tests/test_client_factory.py

```py path="/datapizza-ai-core/datapizza/clients/tests/test_client_factory.py" 
from datapizza.clients.openai import OpenAIClient

from datapizza.clients import ClientFactory


def test_client_factory_openai():
    client = ClientFactory.create(
        provider="openai",
        api_key="test_api_key",
        model="gpt-3.5-turbo",
        system_prompt="You are a helpful assistant that can answer questions about piadina only in italian.",
    )

    assert client is not None

    assert isinstance(client, OpenAIClient)

```

## /datapizza-ai-core/datapizza/core/__init__.py

```py path="/datapizza-ai-core/datapizza/core/__init__.py" 
import logging

from datapizza.core.utils import _basic_config

# Setup base logging

# Create and configure the main package logger
log = logging.getLogger("datapizza")
_basic_config(log)

log.setLevel(logging.DEBUG)

```

## /datapizza-ai-embedders/cohere/README.md




The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.
Copied!