datapizza-labs/datapizza-ai/main 638k tokens More Tools
```
├── .github/
   ├── ISSUE_TEMPLATE/
      ├── bug_report.md (100 tokens)
      ├── feature_request.md (100 tokens)
   ├── workflows/
      ├── test.yml (100 tokens)
├── .gitignore (900 tokens)
├── .pre-commit-config.yaml (100 tokens)
├── CODE_OF_CONDUCT.md (1000 tokens)
├── CONTRIBUTING.md (1500 tokens)
├── LICENSE (omitted)
├── Makefile (100 tokens)
├── README.md (2.5k tokens)
├── datapizza-ai-cache/
   ├── redis/
      ├── README.md
      ├── datapizza/
         ├── cache/
            ├── redis/
               ├── __init__.py
               ├── cache.py (200 tokens)
               ├── json_codec.py (300 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_redis_cache.py (500 tokens)
├── datapizza-ai-clients/
   ├── datapizza-ai-clients-anthropic/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── anthropic/
               ├── __init__.py
               ├── anthropic_client.py (4.5k tokens)
               ├── memory_adapter.py (1100 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_anthropic_memory_adapter.py (500 tokens)
         ├── test_anthropic_streaming.py (1000 tokens)
         ├── test_anthropic_structured_response.py (700 tokens)
   ├── datapizza-ai-clients-bedrock/
      ├── README.md (1800 tokens)
      ├── datapizza/
         ├── clients/
            ├── bedrock/
               ├── __init__.py
               ├── bedrock_client.py (3.8k tokens)
               ├── memory_adapter.py (1100 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_bedrock_async.py (300 tokens)
         ├── test_bedrock_memory_adapter.py (400 tokens)
   ├── datapizza-ai-clients-google/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── google/
               ├── __init__.py
               ├── google_client.py (5.8k tokens)
               ├── memory_adapter.py (1100 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_google_streaming.py (900 tokens)
         ├── test_google_token_usage.py (200 tokens)
         ├── test_memory_adapter.py (400 tokens)
   ├── datapizza-ai-clients-mistral/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── mistral/
               ├── __init__.py
               ├── memory_adapter.py (700 tokens)
               ├── mistral_client.py (3.4k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_mistral_client.py
   ├── datapizza-ai-clients-openai-like/
      ├── README.md (1100 tokens)
      ├── datapizza/
         ├── clients/
            ├── openai_like/
               ├── __init__.py
               ├── memory_adapter.py (1300 tokens)
               ├── openai_completion_client.py (3.7k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_openai_completion.py (1100 tokens)
   ├── datapizza-ai-clients-openai/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── openai/
               ├── __init__.py
               ├── memory_adapter.py (1300 tokens)
               ├── openai_client.py (3.7k tokens)
      ├── pyproject.toml (200 tokens)
      ├── tests/
         ├── __init__.py
         ├── test_base_client.py (800 tokens)
         ├── test_memory_adapter.py (1200 tokens)
   ├── datapizza-ai-clients-watsonx/
      ├── README.md
      ├── datapizza/
         ├── clients/
            ├── watsonx/
               ├── __init__.py
               ├── memory_adapter.py (1200 tokens)
               ├── watsonx_client.py (2000 tokens)
      ├── pyproject.toml (200 tokens)
      ├── tests/
         ├── test_watsonx.py
├── datapizza-ai-core/
   ├── README.md
   ├── datapizza/
      ├── agents/
         ├── __init__.py (100 tokens)
         ├── __version__.py
         ├── agent.py (2.5k tokens)
         ├── client_manager.py (200 tokens)
         ├── logger.py (500 tokens)
         ├── runner.py (8k tokens)
         ├── tests/
            ├── test_base_agents.py (6.1k tokens)
      ├── cache/
         ├── __init__.py
      ├── clients/
         ├── __init__.py
         ├── factory.py (1200 tokens)
         ├── mock_client.py (1600 tokens)
      ├── core/
         ├── __init__.py
         ├── __version__.py
         ├── cache/
            ├── __init__.py
            ├── cache.py (700 tokens)
            ├── tests/
               ├── test_cacheable.py (200 tokens)
         ├── clients/
            ├── __init__.py
            ├── client.py (5.9k tokens)
            ├── models.py (1200 tokens)
            ├── tests/
               ├── test_mock_client.py (700 tokens)
               ├── test_token_usage.py (200 tokens)
         ├── constants.py (100 tokens)
         ├── embedder/
            ├── __init__.py
            ├── base.py (200 tokens)
         ├── executors/
            ├── async_executor.py (500 tokens)
         ├── models.py (700 tokens)
         ├── modules/
            ├── captioner.py (200 tokens)
            ├── metatagger.py (100 tokens)
            ├── parser.py (200 tokens)
            ├── prompt.py (100 tokens)
            ├── reranker.py (100 tokens)
            ├── rewriter.py (200 tokens)
            ├── splitter.py (100 tokens)
         ├── utils.py (1200 tokens)
         ├── vectorstore/
            ├── __init__.py
            ├── tests/
               ├── test_vectorstore_models.py (100 tokens)
            ├── vectorstore.py (600 tokens)
      ├── embedders/
         ├── __init__.py
         ├── embedders.py (900 tokens)
      ├── memory/
         ├── __init__.py
         ├── __version__.py
         ├── memory.py (1200 tokens)
         ├── memory_adapter.py (400 tokens)
         ├── tests/
            ├── __init__.py
            ├── test_memory.py (1700 tokens)
      ├── modules/
         ├── captioners/
            ├── __init__.py
            ├── llm_captioner.py (1500 tokens)
         ├── metatagger/
            ├── __init__.py
            ├── keyword_metatagger.py (600 tokens)
            ├── tests/
               ├── test_keyword_metagger.py (700 tokens)
         ├── parsers/
            ├── __init__.py
            ├── md_parser.py (900 tokens)
            ├── tests/
               ├── test_base_parser.py (200 tokens)
               ├── test_md_parser.py (900 tokens)
            ├── text_parser.py (600 tokens)
         ├── prompt/
            ├── __init__.py
            ├── image_rag.py (700 tokens)
            ├── prompt.py (600 tokens)
            ├── tests/
               ├── test_chat_prompt_template.py (500 tokens)
         ├── rewriters/
            ├── __init__.py
            ├── tests/
               ├── test_tool_rewriter.py (100 tokens)
            ├── tool_rewriter.py (900 tokens)
         ├── splitters/
            ├── __init__.py (100 tokens)
            ├── bbox_merger.py (700 tokens)
            ├── node_splitter.py (300 tokens)
            ├── pdf_image_splitter.py (1000 tokens)
            ├── recursive_splitter.py (800 tokens)
            ├── tests/
               ├── test_node_splitter.py (300 tokens)
               ├── test_recursive_splitter.py (100 tokens)
               ├── test_text_splitter.py (200 tokens)
            ├── text_splitter.py (400 tokens)
         ├── treebuilder/
            ├── __init__.py
            ├── llm_treebuilder.py (2k tokens)
            ├── test/
               ├── test_llm_treebuilder.py (4.5k tokens)
      ├── pipeline/
         ├── __init__.py
         ├── dag_pipeline.py (2.2k tokens)
         ├── functional_pipeline.py (5.6k tokens)
         ├── pipeline.py (2.7k tokens)
         ├── tests/
            ├── config.yaml (200 tokens)
            ├── config_with_elements.yaml (200 tokens)
            ├── dag_config.yaml (100 tokens)
            ├── functional_pipeline_config.yaml (200 tokens)
            ├── test_functional_pipeline.py (1100 tokens)
            ├── test_graph_pipeline.py (800 tokens)
            ├── test_pipeline.py (1500 tokens)
      ├── tools/
         ├── __init__.py
         ├── google.py (100 tokens)
         ├── mcp_client.py (2.1k tokens)
         ├── tests/
            ├── __init__.py
            ├── test_mcp_client.py (1600 tokens)
            ├── test_tools.py (500 tokens)
         ├── tools.py (900 tokens)
         ├── utils.py (700 tokens)
      ├── tracing/
         ├── __init__.py (100 tokens)
         ├── instrumentor.py (800 tokens)
         ├── memory_exporter.py (600 tokens)
         ├── tests/
            ├── test_instrumentor.py (600 tokens)
            ├── test_tracing.py (3.2k tokens)
         ├── tracing.py (1100 tokens)
      ├── type/
         ├── __init__.py (100 tokens)
         ├── tests/
            ├── test_type.py (600 tokens)
         ├── type.py (3k tokens)
   ├── pyproject.toml (300 tokens)
├── datapizza-ai-embedders/
   ├── cohere/
      ├── README.md
      ├── datapizza/
         ├── embedders/
            ├── cohere/
               ├── __init__.py
               ├── cohere.py (400 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_base.py
   ├── fastembedder/
      ├── README.md
      ├── datapizza/
         ├── embedders/
            ├── fastembedder/
               ├── __init__.py
               ├── fastembedder.py (300 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_fastembedder.py
   ├── google/
      ├── README.md (100 tokens)
      ├── datapizza/
         ├── embedders/
            ├── google/
               ├── __init__.py
               ├── google.py (500 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_google_embedder.py (100 tokens)
   ├── image_embedder.py (3.7k tokens)
   ├── mistral/
      ├── README.md (100 tokens)
      ├── datapizza/
         ├── embedders/
            ├── mistral/
               ├── __init__.py
               ├── mistral.py (600 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_mistral_embedder.py (100 tokens)
   ├── openai/
      ├── README.md (100 tokens)
      ├── datapizza/
         ├── embedders/
            ├── openai/
               ├── __init__.py
               ├── openai.py (500 tokens)
      ├── pyproject.toml (200 tokens)
      ├── tests/
         ├── test_openai_embedder.py (100 tokens)
├── datapizza-ai-eval/
   ├── metrics.py (5.6k tokens)
├── datapizza-ai-modules/
   ├── parsers/
      ├── azure/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── parsers/
                  ├── azure/
                     ├── __init__.py
                     ├── azure_parser.py (3.3k tokens)
         ├── pyproject.toml (300 tokens)
         ├── tests/
            ├── attention_wikipedia_test.json (413.5k tokens)
            ├── attention_wikipedia_test.pdf
            ├── test_azure_parser.py (900 tokens)
      ├── docling/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── parsers/
                  ├── docling/
                     ├── __init__.py
                     ├── docling_parser.py (7.4k tokens)
                     ├── ocr_options.py (700 tokens)
                     ├── tests/
                        ├── conftest.py (200 tokens)
                        ├── test_docling_parser.py (1500 tokens)
                     ├── utils.py (600 tokens)
         ├── mypy.ini
         ├── pyproject.toml (300 tokens)
   ├── rerankers/
      ├── cohere/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── rerankers/
                  ├── cohere/
                     ├── __init__.py
                     ├── cohere_reranker.py (900 tokens)
         ├── pyproject.toml (300 tokens)
      ├── together/
         ├── README.md
         ├── datapizza/
            ├── modules/
               ├── rerankers/
                  ├── together/
                     ├── __init__.py
                     ├── together_reranker.py (500 tokens)
         ├── pyproject.toml (300 tokens)
├── datapizza-ai-tools/
   ├── SQLDatabase/
      ├── README.md (1400 tokens)
      ├── datapizza/
         ├── tools/
            ├── SQLDatabase/
               ├── __init__.py
               ├── base.py (500 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_sql_database_tool.py (500 tokens)
   ├── duckduckgo/
      ├── README.md
      ├── datapizza/
         ├── tools/
            ├── duckduckgo/
               ├── __init__.py
               ├── base.py (200 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_ddgs_tools.py
   ├── filesystem/
      ├── README.md (1700 tokens)
      ├── datapizza/
         ├── tools/
            ├── filesystem/
               ├── __init__.py
               ├── filesystem.py (2.1k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_file_path_matches_pattern.py (500 tokens)
         ├── test_filesystem.py (1700 tokens)
   ├── web_fetch/
      ├── README.md (600 tokens)
      ├── datapizza/
         ├── tools/
            ├── web_fetch/
               ├── __init__.py
               ├── base.py (500 tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_web_fetch.py (500 tokens)
├── datapizza-ai-vectorstores/
   ├── datapizza-ai-vectorstores-milvus/
      ├── README.md
      ├── datapizza/
         ├── vectorstores/
            ├── milvus/
               ├── __init__.py
               ├── milvus_vectorstore.py (4.1k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_milvus_vectorstore.py (900 tokens)
   ├── datapizza-ai-vectorstores-qdrant/
      ├── README.md
      ├── datapizza/
         ├── vectorstores/
            ├── qdrant/
               ├── __init__.py
               ├── qdrant_vectorstore.py (3.4k tokens)
      ├── pyproject.toml (300 tokens)
      ├── tests/
         ├── test_qdrant_vectorstore.py (2.1k tokens)
├── docs/
   ├── .pages
   ├── API Reference/
      ├── .pages
      ├── Agents/
         ├── agent.md
      ├── Clients/
         ├── .pages
         ├── Avaiable_Clients/
            ├── .pages
            ├── AzureOpenai.md (100 tokens)
            ├── anthropic.md (200 tokens)
            ├── google.md (700 tokens)
            ├── mistral.md (100 tokens)
            ├── openai-like.md (200 tokens)
            ├── openai.md (100 tokens)
            ├── watsonx.md (100 tokens)
         ├── cache.md
         ├── client_factory.md (300 tokens)
         ├── clients.md
         ├── models.md
      ├── Embedders/
         ├── chunk_embedder.md (400 tokens)
         ├── cohere_embedder.md (600 tokens)
         ├── fast_embedder.md (200 tokens)
         ├── google_embedder.md (300 tokens)
         ├── mistral_embedder.md (300 tokens)
         ├── ollama_embedder.md (100 tokens)
         ├── openai_embedder.md (300 tokens)
      ├── Modules/
         ├── .pages
         ├── Parsers/
            ├── .pages
            ├── azure_parser.md (400 tokens)
            ├── docling_parser.md (1100 tokens)
            ├── index.md (300 tokens)
            ├── text_parser.md (600 tokens)
         ├── Prompt/
            ├── .pages
            ├── ChatPromptTemplate.md (300 tokens)
            ├── ImageRAGPrompt.md (200 tokens)
         ├── Rerankers/
            ├── .pages
            ├── cohere_reranker.md (400 tokens)
            ├── index.md (500 tokens)
            ├── together_reranker.md (500 tokens)
         ├── Splitters/
            ├── .pages
            ├── index.md (400 tokens)
            ├── node_splitter.md (300 tokens)
            ├── pdf_image_splitter.md (300 tokens)
            ├── recursive_splitter.md (200 tokens)
            ├── text_splitter.md (200 tokens)
         ├── captioners.md (400 tokens)
         ├── index.md (200 tokens)
         ├── metatagger.md (500 tokens)
         ├── rewriters.md (300 tokens)
         ├── treebuilder.md (700 tokens)
      ├── Pipelines/
         ├── dag.md
         ├── functional.md
         ├── ingestion.md
      ├── Tools/
         ├── .pages
         ├── SQLDatabase.md (300 tokens)
         ├── duckduckgo.md (300 tokens)
         ├── filesystem.md (700 tokens)
         ├── mcp.md
         ├── web_fetch.md (300 tokens)
      ├── Type/
         ├── block.md (100 tokens)
         ├── chunk.md (100 tokens)
         ├── media.md
         ├── node.md
         ├── tool.md
      ├── Vectorstore/
         ├── milvus_vectorstore.md (1300 tokens)
         ├── qdrant_vectorstore.md (500 tokens)
      ├── index.md
      ├── memory.md
   ├── Guides/
      ├── .pages
      ├── Agents/
         ├── agent.md (2k tokens)
         ├── mcp.md (400 tokens)
      ├── Clients/
         ├── .pages
         ├── chatbot.md (300 tokens)
         ├── local_model.md (300 tokens)
         ├── multimodality.md (700 tokens)
         ├── quick_start.md (700 tokens)
         ├── streaming.md (200 tokens)
         ├── structured_responses.md (500 tokens)
         ├── tools.md (500 tokens)
      ├── Monitoring/
         ├── .pages
         ├── log.md (100 tokens)
         ├── tracing.md (1000 tokens)
      ├── Pipeline/
         ├── .pages
         ├── functional_pipeline.md (1400 tokens)
         ├── ingestion_pipeline.md (1400 tokens)
         ├── retrieval_pipeline.md (1700 tokens)
      ├── RAG/
         ├── .pages
         ├── rag.md (1200 tokens)
   ├── assets/
      ├── logo.png
      ├── logo_bg_dark.png
      ├── pool.png
      ├── workflow.png
   ├── index.md (300 tokens)
├── pyproject.toml (800 tokens)
├── pyrightconfig.json (100 tokens)
```


## /.github/ISSUE_TEMPLATE/bug_report.md

---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''

---

**Describe the bug**
A clear and concise description of what the bug is.

**Environment**
OS, Python version, datapizza-ai version

**To Reproduce**
Steps to reproduce the behavior

**Expected behavior**
A clear and concise description of what you expected to happen.

**Logs**
If applicable, attach logs to help explain your problem.

**Additional context**
Add any other context about the problem here.


## /.github/ISSUE_TEMPLATE/feature_request.md

---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''

---

**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

**Describe the solution you'd like**
A clear and concise description of what you want to happen.

**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.

**Additional context**
Add any other context or screenshots about the feature request here.


## /.github/workflows/test.yml

```yml path="/.github/workflows/test.yml" 
name: Test

on:
  push:
    branches: ["main"]
  pull_request:
    branches: ["main"]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Install uv
        uses: astral-sh/setup-uv@v4

      - name: Install uvtest-monorepo
        run: uv tool install uvtest-monorepo

      - name: Run ruff check
        run: uvx ruff check .

      - name: Run tests
        run: uvtest run --fail-fast -vv

```

## /.gitignore

```gitignore path="/.gitignore" 
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/

# Milvus
milvus.db

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#   pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
#   https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
#pdm.toml
.pdm-python
.pdm-build/

# pixi
#   Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
#   Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
#   in the .venv directory. It is recommended not to include this directory in version control.
.pixi

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/

# Visual Studio Code
#  Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
#  that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
#  and can be added to the global gitignore or merged into this file. However, if you prefer,
#  you could uncomment the following to ignore the entire vscode folder
# .vscode/

# Ruff stuff:
.ruff_cache/

# PyPI configuration file
.pypirc

# Cursor
#  Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
#  exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
#  refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore

# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/

```

## /.pre-commit-config.yaml

```yaml path="/.pre-commit-config.yaml" 
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.14.1
    hooks:
      - id: ruff-check
        args: [ --fix ]
      - id: ruff-format

```

## /CODE_OF_CONDUCT.md

# Contributor Covenant Code of Conduct

## Our Pledge

We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, religion, or sexual identity
and orientation.

We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.

## Our Standards

Examples of behavior that contributes to a positive environment for our
community include:

* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
  and learning from the experience
* Focusing on what is best not just for us as individuals, but for the
  overall community

Examples of unacceptable behavior include:

* The use of sexualized language or imagery, and sexual attention or
  advances of any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email
  address, without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
  professional setting

## Enforcement Responsibilities

Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.

Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.

## Scope

This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official e-mail address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.

## Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at
ai.support@datapizza.tech.
All complaints will be reviewed and investigated promptly and fairly.

All community leaders are obligated to respect the privacy and security of the
reporter of any incident.

## Enforcement Guidelines

Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:

### 1. Correction

**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.

**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.

### 2. Warning

**Community Impact**: A violation through a single incident or series
of actions.

**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or
permanent ban.

### 3. Temporary Ban

**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.

**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.

### 4. Permanent Ban

**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior,  harassment of an
individual, or aggression toward or disparagement of classes of individuals.

**Consequence**: A permanent ban from any sort of public interaction within
the community.

## Attribution

This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.0, available at
https://www.contributor-covenant.org/version/2/0/code_of_conduct.html.

Community Impact Guidelines were inspired by [Mozilla's code of conduct
enforcement ladder](https://github.com/mozilla/diversity).

[homepage]: https://www.contributor-covenant.org

For answers to common questions about this code of conduct, see the FAQ at
https://www.contributor-covenant.org/faq. Translations are available at
https://www.contributor-covenant.org/translations.


## /CONTRIBUTING.md


# Contributing to datapizza-ai

First off, thanks for taking the time to contribute! ❤️

All types of contributions are encouraged and valued. See the [Table of Contents](#table-of-contents) for different ways to help and details about how this project handles them. Please make sure to read the relevant section before making your contribution. It will make it a lot easier for us maintainers and smooth out the experience for all involved. The community looks forward to your contributions. 🎉

> And if you like the project, but just don't have time to contribute, that's fine. There are other easy ways to support the project and show your appreciation, which we would also be very happy about:
> - Star the project
> - Tweet about it
> - Refer this project in your project's readme
> - Mention the project at local meetups and tell your friends/colleagues

## Table of Contents

- [Code of Conduct](#code-of-conduct)
- [I Have a Question](#i-have-a-question)
- [I Want To Contribute](#i-want-to-contribute)
    - [Reporting Bugs](#reporting-bugs)
    - [Suggesting Enhancements](#suggesting-enhancements)
## Code of Conduct

This project and everyone participating in it is governed by the
[Datapizza Code of Conduct](https://github.com/datapizza-labs/datapizza-ai/blob/main/CODE_OF_CONDUCT.md).
By participating, you are expected to uphold this code. Please report unacceptable behavior
to <ai.support@datapizza.tech>.


## I Have a Question

> If you want to ask a question, we assume that you have read the available [Documentation](https://docs.datapizza.ai).

Before you ask a question, it is best to search for existing [Issues](https://github.com/datapizza-labs/datapizza-ai/issues) that might help you. In case you have found a suitable issue and still need clarification, you can write your question in this issue. It is also advisable to search the internet for answers first.

If you then still feel the need to ask a question and need clarification, we recommend the following:

- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new).
- Provide as much context as you can about what you're running into.
- Provide project and platform versions (docker, datapizza-ai, etc), depending on what seems relevant.

We will then take care of the issue as soon as possible.

## I Want To Contribute

> ### Legal Notice <!-- omit in toc -->
> When contributing to this project, you must agree that you have authored 100% of the content, that you have the necessary rights to the content and that the content you contribute may be provided under the project license.

### Reporting Bugs

#### Before Submitting a Bug Report

A good bug report shouldn't leave others needing to chase you up for more information. Therefore, we ask you to investigate carefully, collect information and describe the issue in detail in your report. Please complete the following steps in advance to help us fix any potential bug as fast as possible.

- Make sure that you are using the latest version.
- Determine if your bug is really a bug and not an error on your side e.g. using incompatible environment components/versions (Make sure that you have read the [documentation](https://docs.datapizza.ai). If you are looking for support, you might want to check [this section](#i-have-a-question)).
- To see if other users have experienced (and potentially already solved) the same issue you are having, check if there is not already a bug report existing for your bug or error in the [bug tracker](https://github.com/datapizza-labs/datapizza-ai/issues?q=is%3Aissue%20state%3Aopen%20label%3Abug).
- Also make sure to search the internet (including Stack Overflow) to see if users outside of the GitHub community have discussed the issue.
- Collect information about the bug:
    - Stack trace (Traceback)
    - OS, Platform and Version (Windows, Linux, macOS, x86, ARM)
    - Version of the interpreter, compiler, SDK, runtime environment, package manager, depending on what seems relevant.
    - Possibly your input and the output
    - Can you reliably reproduce the issue? And can you also reproduce it with older versions?

#### How Do I Submit a Good Bug Report?

> You must never report security related issues, vulnerabilities or bugs including sensitive information to the issue tracker, or elsewhere in public. Instead sensitive bugs must be sent by email to <ai.support@datapizza.tech>.

We use GitHub issues to track bugs and errors. If you run into an issue with the project:

- Open an [Issue](https://github.com/datapizza-labs/datapizza-ai/issues/new?template=bug_report.md). (Since we can't be sure at this point whether it is a bug or not, we ask you not to talk about a bug yet and not to label the issue.)
- Explain the behavior you would expect and the actual behavior.
- Please provide as much context as possible and describe the *reproduction steps* that someone else can follow to recreate the issue on their own. This usually includes your code. For good bug reports you should isolate the problem and create a reduced test case.
- Provide the information you collected in the previous section.

Once it's filed:

- The project team will label the issue accordingly.
- A team member will try to reproduce the issue with your provided steps. If there are no reproduction steps or no obvious way to reproduce the issue, the team will ask you for those steps and mark the issue as `needs-repro`. Bugs with the `needs-repro` tag will not be addressed until they are reproduced.

<!-- You might want to create an issue template for bugs and errors that can be used as a guide and that defines the structure of the information to be included. If you do so, reference it here in the description. -->


### Suggesting Enhancements

This section guides you through submitting an enhancement suggestion for datapizza-ai, **including completely new features and minor improvements to existing functionality**. Following these guidelines will help maintainers and the community to understand your suggestion and find related suggestions.

<!-- omit in toc -->
#### Before Submitting an Enhancement

- Make sure that you are using the latest version.
- Read the [documentation](https://docs.datapizza.ai) carefully and find out if the functionality is already covered, maybe by an individual configuration.
- Perform a [search](https://github.com/datapizza-labs/datapizza-ai/issues) to see if the enhancement has already been suggested. If it has, add a comment to the existing issue instead of opening a new one.
- Find out whether your idea fits with the scope and aims of the project. It's up to you to make a strong case to convince the project's developers of the merits of this feature. Keep in mind that we want features that will be useful to the majority of our users and not just a small subset. If you're just targeting a minority of users, consider writing an add-on/plugin library.

#### How Do I Submit a Good Enhancement Suggestion?

Enhancement suggestions are tracked as [GitHub issues](https://github.com/datapizza-labs/datapizza-ai/issues).

- Use a **clear and descriptive title** for the issue to identify the suggestion.
- Provide a **step-by-step description of the suggested enhancement** in as many details as possible.
- **Describe the current behavior** and **explain which behavior you expected to see instead** and why. At this point you can also tell which alternatives do not work for you.
- **Explain why this enhancement would be useful** to most datapizza-ai users. You may also want to point out the other projects that solved it better and which could serve as inspiration.


## /Makefile

``` path="/Makefile" 

test:
	uv run pytest --tb=short -v --ignore=datapizza-ai-vectorstores/datapizza-ai-vectorstores-milvus --ignore=datapizza-ai-modules/parsers/docling


watch-tests:
	find . -name "*.py" -not -path "*/site-packages/*" | entr uv run pytest --tb=short -v

format:
	uvx ruff format .

linter-check:
	uvx ruff check .

linter-fix:
	uvx ruff check --fix

linter-force-fix:
	uvx ruff check --fix --unsafe-fixes

dependency-check:
	uv run deptry .

run_docs:
	uv pip install mkdocs-material  pymdown-extensions mkdocs-awesome-pages-plugin mkdocstrings-python
	uv run mkdocs serve --livereload

```

## /README.md

<div align="center">

<img src="docs/assets/logo_bg_dark.png" alt="Datapizza AI Logo" width="200" height="200">

**Build reliable Gen AI solutions without overhead**

*Written in Python. Designed for speed. A no-fluff GenAI framework that gets your agents from dev to prod, fast*

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![PyPI version](https://img.shields.io/pypi/v/datapizza-ai.svg)](https://pypi.org/project/datapizza-ai/)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![Downloads](https://img.shields.io/pypi/dm/datapizza-ai.svg)](https://pypi.org/project/datapizza-ai/)
[![GitHub stars](https://img.shields.io/github/stars/datapizza-labs/datapizza-ai.svg?style=social&label=Star)](https://github.com/datapizza-labs/datapizza-ai)

[🏠Homepage](https://datapizza.tech/en/ai-framework/) • [🚀 Quick Start](#-quick-start) • [📖 Documentation](https://docs.datapizza.ai) • [🎯 Examples](#-examples) • [🤝 Community](#-community)

</div>

---

## 🌟 Why Datapizza AI?

A framework that keeps your agents predictable, your debugging fast, and your code trusted in production. Built by Engineers, trusted by Engineers.

<div align="center">

### ⚡ **Less abstraction, more control** | 🚀 **API-first design** | 🔧 **Observable by design**

</div>

## How to install
```sh
pip install datapizza-ai
```

## Client invoke

```python
from datapizza.clients.openai import OpenAIClient

client = OpenAIClient(api_key="YOUR_API_KEY")
result = client.invoke("Hi, how are u?")
print(result.text)
```

## ✨ Key Features

<table>
<tr>
<td width="50%" valign="top">

### 🎯 **API-first**
- **Multi-Provider Support**: OpenAI, Google Gemini, Anthropic, Mistral, Azure
- **Tool Integration**: Built-in web search, document processing, custom tools
- **Memory Management**: Persistent conversations and context awareness

</td>
<td width="50%" valign="top">

### 🔍 **Composable**
- **Reusable blocks**: Declarative configuration, easy overrides
- **Document Processing**: PDF, DOCX, images with Azure AI & Docling
- **Smart Chunking**: Context-aware text splitting and embedding
- **Built-in reranking**: Add a reranker (e.g., Cohere) to boost relevance

</td>
</tr>
<tr>
<td width="50%" valign="top">

### 🔧 **Observable**
- **OpenTelemetry tracing**: Standards-based instrumentation
- **Client I/O tracing**: Optional toggle to log inputs, outputs, and in-memory context
- **Custom spans**: Trace fine-grained phases and sub-steps to pinpoint bottlenecks

</td>
<td width="50%" valign="top">

### 🚀 **Vendor-Agnostic**
- **Swap models**: Change providers without rewiring business logic
- **Clear Interfaces**: Predictable APIs across all components
- **Rich Ecosystem**: Modular design with optional components
- **Migration-friendly**: Quick migration from other frameworks

</td>
</tr>
</table>

## 🚀 Quick Start

### Installation

```bash
# Core framework
pip install datapizza-ai

# With specific providers (optional)
pip install datapizza-ai-clients-openai
pip install datapizza-ai-clients-google
pip install datapizza-ai-clients-anthropic
```

### Start with Agent

```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool

@tool
def get_weather(city: str) -> str:
    return f"The weather in {city} is sunny"

client = OpenAIClient(api_key="YOUR_API_KEY")
agent = Agent(name="assistant", client=client, tools = [get_weather])

response = agent.run("What is the weather in Rome?")
# output: The weather in Rome is sunny
```


## 📊 Detailed Tracing


A key requirement for principled development of LLM applications over your data (RAG systems, agents) is being able to observe and debug.

Datapizza-ai provides built-in observability with OpenTelemetry tracing to help you monitor performance and understand execution flow.

<summary><b>🔍 Trace Your AI Operations</b></summary>

```sh
pip install datapizza-ai-tools-duckduckgo
```

```python
from datapizza.agents import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool
from datapizza.tracing import ContextTracing

client = OpenAIClient(api_key="OPENAI_API_KEY")
agent = Agent(name="assistant", client=client, tools = [DuckDuckGoSearchTool()])

with ContextTracing().trace("my_ai_operation"):
    response = agent.run("Tell me some news about Bitcoin")

# Output shows:
# ╭─ Trace Summary of my_ai_operation ──────────────────────────────────╮
# │ Total Spans: 3                                                      │
# │ Duration: 2.45s                                                     │
# │ ┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ |
# │ ┃ Model       ┃ Prompt Tokens ┃ Completion Tokens ┃ Cached Tokens ┃ |
# │ ┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ |
# │ │ gpt-4o-mini │ 31            │ 27                │ 0             │ |
# │ └─────────────┴───────────────┴───────────────────┴───────────────┘ |
# ╰─────────────────────────────────────────────────────────────────────╯
```


![Demo](https://github.com/user-attachments/assets/02742e87-aa48-4308-94c8-6f362e3218b4)


## 🎯 Examples

### 🌐 Multi-Agent System

Build sophisticated AI systems where multiple specialized agents collaborate to solve complex tasks. This example shows how to create a trip planning system with dedicated agents for weather information, web search, and planning coordination.

```sh
# Install DuckDuckGo tool
pip install datapizza-ai-tools-duckduckgo
```


```python
from datapizza.agents.agent import Agent
from datapizza.clients.openai import OpenAIClient
from datapizza.tools import tool
from datapizza.tools.duckduckgo import DuckDuckGoSearchTool

client = OpenAIClient(api_key="YOUR_API_KEY", model="gpt-4.1")

@tool
def get_weather(city: str) -> str:
    return f""" it's sunny all the week in {city}"""

weather_agent = Agent(
    name="weather_expert",
    client=client,
    system_prompt="You are a weather expert. Provide detailed weather information and forecasts.",
    tools=[get_weather]
)

web_search_agent = Agent(
    name="web_search_expert",
    client=client,
    system_prompt="You are a web search expert. You can search the web for information.",
    tools=[DuckDuckGoSearchTool()]
)

planner_agent = Agent(
    name="planner",
    client=client,
    system_prompt="You are a trip planner. You should provide a plan for the user. Make sure to provide a detailed plan with the best places to visit and the best time to visit them."
)

planner_agent.can_call([weather_agent, web_search_agent])

response = planner_agent.run(
    "I need to plan a hiking trip in Seattle next week. I want to see some waterfalls and a forest."
)
print(response.text)

```


### 📊 Document Ingestion

Process and index documents for retrieval-augmented generation (RAG). This pipeline automatically parses PDFs, splits them into chunks, generates embeddings, and stores them in a vector database for efficient similarity search.

```sh
pip install datapizza-ai-parsers-docling
```

```python
from datapizza.core.vectorstore import VectorConfig
from datapizza.embedders import ChunkEmbedder
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.parsers.docling import DoclingParser
from datapizza.modules.splitters import NodeSplitter
from datapizza.pipeline import IngestionPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore

vectorstore = QdrantVectorstore(location=":memory:")
embedder = ChunkEmbedder(client=OpenAIEmbedder(api_key="YOUR_API_KEY", model_name="text-embedding-3-small"))
vectorstore.create_collection("my_documents",vector_config=[VectorConfig(name="embedding", dimensions=1536)])

pipeline = IngestionPipeline(
    modules=[
        DoclingParser(),
        NodeSplitter(max_char=1024),
        embedder,
    ],
    vector_store=vectorstore,
    collection_name="my_documents"
)

pipeline.run("sample.pdf")

results = vectorstore.search(query_vector = [0.0] * 1536, collection_name="my_documents", k=5)
print(results)
```


### 📊 RAG (Retrieval-Augmented Generation)

Create a complete RAG pipeline that enhances AI responses with relevant document context. This example demonstrates query rewriting, embedding generation, document retrieval, and response generation in a connected workflow.

```python
from datapizza.clients.openai import OpenAIClient
from datapizza.embedders.openai import OpenAIEmbedder
from datapizza.modules.prompt import ChatPromptTemplate
from datapizza.modules.rewriters import ToolRewriter
from datapizza.pipeline import DagPipeline
from datapizza.vectorstores.qdrant import QdrantVectorstore

openai_client = OpenAIClient(
    model="gpt-4o-mini",
    api_key="YOUR_API_KEY"
)

dag_pipeline = DagPipeline()
dag_pipeline.add_module("rewriter", ToolRewriter(client=openai_client, system_prompt="Rewrite user queries to improve retrieval accuracy."))
dag_pipeline.add_module("embedder", OpenAIEmbedder(api_key= "YOUR_API_KEY", model_name="text-embedding-3-small"))
dag_pipeline.add_module("retriever", QdrantVectorstore(host="localhost", port=6333).as_retriever(collection_name="my_documents", k=5))
dag_pipeline.add_module("prompt", ChatPromptTemplate(user_prompt_template="User question: {{user_prompt}}\n:", retrieval_prompt_template="Retrieved content:\n{% for chunk in chunks %}{{ chunk.text }}\n{% endfor %}"))
dag_pipeline.add_module("generator", openai_client)

dag_pipeline.connect("rewriter", "embedder", target_key="text")
dag_pipeline.connect("embedder", "retriever", target_key="query_vector")
dag_pipeline.connect("retriever", "prompt", target_key="chunks")
dag_pipeline.connect("prompt", "generator", target_key="memory")

query = "tell me something about this document"
result = dag_pipeline.run({
    "rewriter": {"user_prompt": query},
    "prompt": {"user_prompt": query},
    "retriever": {"collection_name": "my_documents", "k": 3},
    "generator":{"input": query}
})

print(f"Generated response: {result['generator']}")
```


## 🌐 Ecosystem

### 🤖 Supported AI Providers

<table >
<tr>
<td align="center"><img src="https://logosandtypes.com/wp-content/uploads/2022/07/OpenAI.png" width="32" style="border-radius: 50%"><br><b>OpenAI</b></td>
<td align="center"><img src="https://www.google.com/favicon.ico" width="32"><br><b>Google Gemini</b></td>
<td align="center"><img src="https://anthropic.com/favicon.ico" width="32"><br><b>Anthropic</b></td>
<td align="center"><img src="https://mistral.ai/favicon.ico" width="32"><br><b>Mistral</b></td>
<td align="center"><img src="https://azure.microsoft.com/favicon.ico" width="32"><br><b>Azure OpenAI</b></td>
</tr>
</table>

### 🔧 Tools & Integrations

| Category | Components |
|----------|------------|
| **📄 Document Parsers** | Azure AI Document Intelligence, Docling |
| **🔍 Vector Stores** | Qdrant |
| **🎯 Rerankers** | Cohere, Together AI |
| **🌐 Tools** | DuckDuckGo Search, Custom Tools |
| **💾 Caching** | Redis integration for performance optimization |
| **📊 Embedders** | OpenAI, Google, Cohere, FastEmbed |

## 🎓 Learning Resources

- 📖 **[Complete Documentation](https://docs.datapizza.ai)** - Comprehensive guides and API reference
- 🎯 **[RAG Tutorial](https://docs.datapizza.ai/latest/Guides/RAG/rag/)** - Build production RAG systems
- 🤖 **[Agent Examples](https://docs.datapizza.ai/latest/Guides/Agents/agent/)** - Real-world agent implementations

## 🤝 Community


- 💬 **[Discord Community](https://discord.gg/s5sJNHz2C8)**
- 📚 **[Documentation](https://docs.datapizza.ai)**
- 📧 **[GitHub Issues](https://github.com/datapizza-labs/datapizza-ai/issues)**
- 🐦 **[Twitter](https://x.com/datapizza_ai)**

### 🌟 Contributing

We love contributions! Whether it's:

- 🐛 **Bug Reports** - Help us improve
- 💡 **Feature Requests** - Share your ideas
- 📝 **Documentation** - Make it better for everyone
- 🔧 **Code Contributions** - Build the future together

Check out our [Contributing Guide](CONTRIBUTING.md) to get started.

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

---

<div align="center">

**Built by Datapizza, the AI native company**

*A framework made to be easy to learn, easy to maintain and ready for production* 🍕

[⭐ Star us on GitHub](https://github.com/datapizza-labs/datapizza-ai) • [🚀 Get Started](https://docs.datapizza.ai) • [💬 Join Discord](https://discord.gg/s5sJNHz2C8)

## Star History

[![Star History Chart](https://api.star-history.com/svg?repos=datapizza-labs/datapizza-ai&type=Date)](https://www.star-history.com/#datapizza-labs/datapizza-ai&Date)

</div>


## /datapizza-ai-cache/redis/README.md



## /datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py

```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/__init__.py" 
# Import Redis cache implementation
from .cache import RedisCache

__all__ = ["RedisCache"]

```

## /datapizza-ai-cache/redis/datapizza/cache/redis/cache.py

```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/cache.py" 
import logging

from datapizza.core.cache import Cache

import redis

from .json_codec import decode_cache_value, encode_cache_value

log = logging.getLogger(__name__)


class RedisCache(Cache):
    """
    A Redis-based cache implementation.
    """

    def __init__(
        self, host="localhost", port=6379, db=0, expiration_time=3600
    ):  # 1 hour default
        self.redis = redis.Redis(host=host, port=port, db=db)
        self.expiration_time = expiration_time

    def get(self, key: str) -> object | None:
        """Retrieve and deserialize object"""
        raw_value = self.redis.get(key)
        if raw_value is None:
            return None

        try:
            return decode_cache_value(raw_value)
        except Exception as exc:
            log.warning("Invalid cache payload for key %s: %s", key, exc)
            return None

    def set(self, key: str, obj: object):
        """Serialize and store object"""
        serialized = encode_cache_value(obj)
        self.redis.set(key, serialized, ex=self.expiration_time)

```

## /datapizza-ai-cache/redis/datapizza/cache/redis/json_codec.py

```py path="/datapizza-ai-cache/redis/datapizza/cache/redis/json_codec.py" 
import json
from enum import Enum
from typing import cast

from datapizza.core.clients.models import ClientResponse

CACHE_SCHEMA_VERSION = 1


class CachePayloadType(str, Enum):
    CLIENT_RESPONSE = "client_response"
    JSON = "json"


def encode_cache_value(obj: object) -> bytes:
    if isinstance(obj, ClientResponse):
        response = cast(ClientResponse, obj)
        envelope = {
            "v": CACHE_SCHEMA_VERSION,
            "t": CachePayloadType.CLIENT_RESPONSE,
            "p": response.to_dict(),
        }
    else:
        envelope = {
            "v": CACHE_SCHEMA_VERSION,
            "t": CachePayloadType.JSON,
            "p": obj,
        }

    return json.dumps(envelope, separators=(",", ":")).encode("utf-8")


def decode_cache_value(raw_value: bytes | str) -> object | None:
    if isinstance(raw_value, str):
        raw_bytes = raw_value.encode("utf-8")
    elif isinstance(raw_value, bytes):
        raw_bytes = raw_value
    else:
        raise ValueError(f"unsupported redis payload type: {type(raw_value)!r}")

    envelope = json.loads(raw_bytes.decode("utf-8"))
    if not isinstance(envelope, dict):
        raise ValueError("cache envelope must be a dict")

    if envelope.get("v") != CACHE_SCHEMA_VERSION:
        return None

    payload_type = envelope.get("t")
    payload = envelope.get("p")

    if payload_type == CachePayloadType.CLIENT_RESPONSE:
        if not isinstance(payload, dict):
            return None
        return ClientResponse.from_dict(payload)

    if payload_type == CachePayloadType.JSON:
        return payload

    return None

```

## /datapizza-ai-cache/redis/pyproject.toml

```toml path="/datapizza-ai-cache/redis/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-cache-redis"
version = "0.0.5"
description = "An implementation using Redis for datapizza-ai cache"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "redis>=5.2.1,<6.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-cache/redis/tests/test_redis_cache.py

```py path="/datapizza-ai-cache/redis/tests/test_redis_cache.py" 
import json
from typing import cast

from datapizza.core.clients.models import ClientResponse, TokenUsage
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock

from datapizza.cache.redis import RedisCache


class _FakeRedis:
    def __init__(self):
        self.store = {}

    def get(self, key):
        return self.store.get(key)

    def set(self, key, value, ex=None):
        self.store[key] = value


def _build_cache(monkeypatch):
    fake = _FakeRedis()
    monkeypatch.setattr(
        "datapizza.cache.redis.cache.redis.Redis", lambda **kwargs: fake
    )
    cache = RedisCache(host="localhost", port=6379, db=0)
    return cache, fake


def test_redis_cache_initializes(monkeypatch):
    cache, _ = _build_cache(monkeypatch)
    assert cache is not None


def test_roundtrip_client_response(monkeypatch):
    cache, _ = _build_cache(monkeypatch)
    tool = Tool(name="calc", description="calculator")
    response = ClientResponse(
        content=[
            TextBlock(content="hello"),
            FunctionCallBlock(id="f1", arguments={"x": 1}, name="calc", tool=tool),
        ],
        delta="h",
        stop_reason="end",
        usage=TokenUsage(
            prompt_tokens=1, completion_tokens=2, cached_tokens=3, thinking_tokens=4
        ),
    )

    cache.set("k1", response)
    cached = cache.get("k1")

    assert isinstance(cached, ClientResponse)
    cached_response = cast(ClientResponse, cached)
    assert cached_response.text == "hello"
    assert len(cached_response.function_calls) == 1
    assert cached_response.usage == response.usage
    assert cached_response.stop_reason == "end"


def test_roundtrip_json_value(monkeypatch):
    cache, _ = _build_cache(monkeypatch)
    value = [[0.1, 0.2], [0.3, 0.4]]

    cache.set("embed", value)
    cached = cache.get("embed")

    assert cached == value


def test_invalid_payload_is_cache_miss(monkeypatch):
    cache, fake = _build_cache(monkeypatch)
    fake.store["broken"] = b"not-json"

    assert cache.get("broken") is None


def test_unknown_payload_version_is_cache_miss(monkeypatch):
    cache, fake = _build_cache(monkeypatch)
    fake.store["old"] = json.dumps({"v": 999, "t": "json", "p": 1}).encode("utf-8")

    assert cache.get("old") is None


def test_legacy_pickle_like_bytes_are_ignored(monkeypatch):
    cache, fake = _build_cache(monkeypatch)
    fake.store["legacy"] = b"\x80\x04legacy-pickle-bytes"

    assert cache.get("legacy") is None

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/README.md



## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/__init__.py" 
from .anthropic_client import AnthropicClient

__all__ = ["AnthropicClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/anthropic_client.py" 
import json
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    Block,
    FunctionCallBlock,
    Model,
    StructuredBlock,
    TextBlock,
    ThoughtBlock,
)

from anthropic import Anthropic, AsyncAnthropic

from .memory_adapter import AnthropicMemoryAdapter


class AnthropicClient(Client):
    """A client for interacting with the Anthropic API (Claude).

    This class provides methods for invoking the Anthropic API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "claude-3-5-sonnet-latest",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
    ):
        """
        Args:
            api_key: The API key for the Anthropic API.
            model: The model to use for the Anthropic API.
            system_prompt: The system prompt to use for the Anthropic API.
            temperature: The temperature to use for the Anthropic API.
            cache: The cache to use for the Anthropic API.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )
        self.api_key = api_key
        self.memory_adapter = AnthropicMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            self.client = Anthropic(api_key=self.api_key)

    def _set_a_client(self):
        if not self.a_client:
            self.a_client = AsyncAnthropic(api_key=self.api_key)

    def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
        """Convert tools to Anthropic tool format"""
        anthropic_tools = []
        for tool in tools:
            anthropic_tool = {
                "name": tool.name,
                "description": tool.description or "",
                "input_schema": {
                    "type": "object",
                    "properties": tool.properties,
                    "required": tool.required,
                },
            }
            anthropic_tools.append(anthropic_tool)
        return anthropic_tools

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by Anthropic"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "tool",
                "name": tool_choice[0],
            }
        elif tool_choice == "required":
            return {"type": "any"}
        elif tool_choice == "auto":
            return {"type": "auto"}
        else:
            return tool_choice

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        """Convert Anthropic response to ClientResponse"""
        blocks = []

        if hasattr(response, "content") and response.content:
            if isinstance(
                response.content, list
            ):  # Claude 3 returns a list of content blocks
                for content_block in response.content:
                    if content_block.type == "text":
                        blocks.append(TextBlock(content=content_block.text))
                    elif content_block.type == "thinking":
                        # Summarized thinking content
                        blocks.append(ThoughtBlock(content=content_block.thinking))
                    elif content_block.type == "tool_use":
                        tool = tool_map.get(content_block.name) if tool_map else None
                        if not tool:
                            raise ValueError(f"Tool {content_block.name} not found")

                        blocks.append(
                            FunctionCallBlock(
                                id=content_block.id,
                                name=content_block.name,
                                arguments=content_block.input,
                                tool=tool,
                            )
                        )
            else:  # Handle as string for compatibility
                blocks.append(TextBlock(content=str(response.content)))

        stop_reason = response.stop_reason if hasattr(response, "stop_reason") else None

        return ClientResponse(
            content=blocks,
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=response.usage.input_tokens,
                completion_tokens=response.usage.output_tokens,
                cached_tokens=response.usage.cache_read_input_tokens,
            ),
        )

    def _usage_from_anthropic_response(self, response: Any) -> TokenUsage:
        usage = response.usage
        return TokenUsage(
            prompt_tokens=usage.input_tokens,
            completion_tokens=usage.output_tokens,
            cached_tokens=getattr(usage, "cache_read_input_tokens", None) or 0,
        )

    def _new_stream_state(self) -> dict[str, Any]:
        return {
            "message_text": "",
            "thought_text": "",
            "tool_calls": {},
            "input_tokens": 0,
            "output_tokens": 0,
            "stop_reason": None,
        }

    def _consume_stream_event(self, state: dict[str, Any], chunk: Any) -> str:
        text_delta = ""

        if chunk.type == "message_start":
            if getattr(chunk, "message", None) and getattr(
                chunk.message, "usage", None
            ):
                state["input_tokens"] = chunk.message.usage.input_tokens or 0
            return text_delta

        if chunk.type == "content_block_start":
            content_block = getattr(chunk, "content_block", None)
            if content_block and getattr(content_block, "type", None) == "tool_use":
                state["tool_calls"][chunk.index] = {
                    "id": getattr(content_block, "id", None),
                    "name": getattr(content_block, "name", None),
                    "input_json_chunks": [],
                    "parsed_input": (
                        dict(content_block.input)
                        if getattr(content_block, "input", None)
                        else None
                    ),
                }
            return text_delta

        if chunk.type == "content_block_delta":
            delta = getattr(chunk, "delta", None)
            if not delta:
                return text_delta

            if getattr(delta, "text", None):
                text_delta = delta.text
                state["message_text"] += delta.text
                return text_delta

            if getattr(delta, "thinking", None):
                state["thought_text"] += delta.thinking
                return text_delta

            if getattr(delta, "partial_json", None):
                tool_state = state["tool_calls"].setdefault(
                    chunk.index,
                    {
                        "id": None,
                        "name": None,
                        "input_json_chunks": [],
                        "parsed_input": None,
                    },
                )
                tool_state["input_json_chunks"].append(delta.partial_json)
                return text_delta

            return text_delta

        if chunk.type == "message_delta":
            if getattr(chunk, "usage", None):
                state["output_tokens"] = max(
                    state["output_tokens"],
                    chunk.usage.output_tokens or 0,
                )

            delta = getattr(chunk, "delta", None)
            if delta and hasattr(delta, "stop_reason"):
                state["stop_reason"] = delta.stop_reason

        return text_delta

    def _build_streamed_tool_call_blocks(
        self,
        tool_calls: dict[int, dict[str, Any]],
        tool_map: dict[str, Tool],
    ) -> list[FunctionCallBlock]:
        blocks: list[FunctionCallBlock] = []

        for _, tool_state in sorted(tool_calls.items()):
            tool_name = tool_state.get("name")
            if not tool_name:
                continue

            tool = tool_map.get(tool_name)
            if not tool:
                raise ValueError(f"Tool {tool_name} not found")

            parsed_input = tool_state.get("parsed_input")
            if parsed_input is None:
                input_json = "".join(tool_state.get("input_json_chunks", []))
                parsed_input = json.loads(input_json) if input_json else {}

            blocks.append(
                FunctionCallBlock(
                    id=tool_state.get("id") or tool_name,
                    name=tool_name,
                    arguments=parsed_input,
                    tool=tool,
                )
            )

        return blocks

    def _build_stream_final_content(
        self,
        state: dict[str, Any],
        tool_map: dict[str, Tool],
    ) -> list[ThoughtBlock | TextBlock | FunctionCallBlock]:
        content: list[ThoughtBlock | TextBlock | FunctionCallBlock] = []

        if state["thought_text"]:
            content.append(ThoughtBlock(content=state["thought_text"]))
        if state["message_text"]:
            content.append(TextBlock(content=state["message_text"]))
        content.extend(
            self._build_streamed_tool_call_blocks(state["tool_calls"], tool_map)
        )
        return content

    def _structured_messages(
        self, input: list[Block], memory: Memory | None
    ) -> list[dict]:
        messages = self._memory_to_contents(None, input, memory)
        return [m for m in messages if m.get("role") != "model"]

    def _structured_request_params(
        self,
        *,
        messages: list[dict],
        max_tokens: int | None,
        temperature: float | None,
        system_prompt: str | None,
        tools: list[Tool],
        tool_choice: Literal["auto", "required", "none"] | list[str],
        **kwargs: Any,
    ) -> dict[str, Any]:
        params: dict[str, Any] = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }
        if temperature:
            params["temperature"] = temperature
        if system_prompt:
            params["system"] = system_prompt
        if tools:
            params["tools"] = self._convert_tools(tools)
            params["tool_choice"] = self._convert_tool_choice(tool_choice)
        return params

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method for Anthropic"""
        if tools is None:
            tools = []
        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)
        # remove the model from the messages
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = client.messages.create(**request_params)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        client = self._get_a_client()
        messages = self._memory_to_contents(None, input, memory)
        # remove the model from the messages
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await client.messages.create(**request_params)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int | None,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of the abstract _stream_invoke method for Anthropic"""
        if tools is None:
            tools = []
        messages = self._memory_to_contents(None, input, memory)
        client = self._get_client()
        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if system_prompt:
            request_params["system"] = system_prompt

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        stream = client.messages.create(**request_params)
        state = self._new_stream_state()

        for chunk in stream:
            text_delta = self._consume_stream_event(state, chunk)
            if text_delta:
                yield ClientResponse(
                    content=[
                        ThoughtBlock(content=state["thought_text"]),
                        TextBlock(content=state["message_text"]),
                    ],
                    delta=text_delta,
                    stop_reason=state["stop_reason"],
                )

        yield ClientResponse(
            content=self._build_stream_final_content(state, tool_map),
            delta="",
            stop_reason=state["stop_reason"],
            usage=TokenUsage(
                prompt_tokens=state["input_tokens"],
                completion_tokens=state["output_tokens"],
                cached_tokens=0,
            ),
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Implementation of the abstract _a_stream_invoke method for Anthropic"""
        if tools is None:
            tools = []
        messages = self._memory_to_contents(None, input, memory)
        client = self._get_a_client()
        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_tokens": max_tokens or 2048,
            **kwargs,
        }
        if temperature:
            request_params["temperature"] = temperature
        if system_prompt:
            request_params["system"] = system_prompt

        if max_tokens:
            request_params["max_tokens"] = max_tokens

        if tools:
            request_params["tools"] = self._convert_tools(tools)
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        stream = await client.messages.create(**request_params)
        state = self._new_stream_state()

        async for chunk in stream:
            text_delta = self._consume_stream_event(state, chunk)
            if text_delta:
                yield ClientResponse(
                    content=[
                        ThoughtBlock(content=state["thought_text"]),
                        TextBlock(content=state["message_text"]),
                    ],
                    delta=text_delta,
                    stop_reason=state["stop_reason"],
                )

        yield ClientResponse(
            content=self._build_stream_final_content(state, tool_map),
            delta="",
            stop_reason=state["stop_reason"],
            usage=TokenUsage(
                prompt_tokens=state["input_tokens"],
                completion_tokens=state["output_tokens"],
                cached_tokens=0,
            ),
        )

    def _structured_response(
        self,
        input: list[Block],
        output_cls: type[Model],
        memory: Memory | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs: Any,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        client = self._get_client()
        messages = self._structured_messages(input, memory)
        params = self._structured_request_params(
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature,
            system_prompt=system_prompt,
            tools=tools,
            tool_choice=tool_choice,
            **kwargs,
        )

        if output_cls == {"type": "json_object"}:
            params["output_config"] = {
                "format": {
                    "type": "json_schema",
                    "schema": {"type": "object", "additionalProperties": True},
                }
            }
            response = client.messages.create(**params)
            text: str | None = None
            for block in response.content:
                if block.type == "text":
                    text = block.text
                    break
            if text is None:
                raise ValueError("No text block in Anthropic structured response")
            data = json.loads(text)
            return ClientResponse(
                content=[StructuredBlock(content=data)],
                stop_reason=response.stop_reason,
                usage=self._usage_from_anthropic_response(response),
            )

        response = client.messages.parse(**params, output_format=output_cls)
        parsed = response.parsed_output
        if parsed is None:
            raise ValueError("No parsed_output in Anthropic structured response")
        return ClientResponse(
            content=[StructuredBlock(content=parsed)],
            stop_reason=response.stop_reason,
            usage=self._usage_from_anthropic_response(response),
        )

    async def _a_structured_response(
        self,
        input: list[Block],
        output_cls: type[Model],
        memory: Memory | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs: Any,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        client = self._get_a_client()
        messages = self._structured_messages(input, memory)
        params = self._structured_request_params(
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature,
            system_prompt=system_prompt,
            tools=tools,
            tool_choice=tool_choice,
            **kwargs,
        )

        if output_cls == {"type": "json_object"}:
            params["output_config"] = {
                "format": {
                    "type": "json_schema",
                    "schema": {"type": "object", "additionalProperties": True},
                }
            }
            response = await client.messages.create(**params)
            text: str | None = None
            for block in response.content:
                if block.type == "text":
                    text = block.text
                    break
            if text is None:
                raise ValueError("No text block in Anthropic structured response")
            data = json.loads(text)
            return ClientResponse(
                content=[StructuredBlock(content=data)],
                stop_reason=response.stop_reason,
                usage=self._usage_from_anthropic_response(response),
            )

        response = await client.messages.parse(**params, output_format=output_cls)
        parsed = response.parsed_output
        if parsed is None:
            raise ValueError("No parsed_output in Anthropic structured response")
        return ClientResponse(
            content=[StructuredBlock(content=parsed)],
            stop_reason=response.stop_reason,
            usage=self._usage_from_anthropic_response(response),
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/datapizza/clients/anthropic/memory_adapter.py" 
import base64
import json
from typing import Any

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class AnthropicMemoryAdapter(MemoryAdapter):
    """Adapter for converting Memory objects to Anthropic API message format"""

    @staticmethod
    def _normalize_tool_result_content(result: Any) -> str | list[dict[str, Any]]:
        if result is None:
            return ""
        if isinstance(result, str):
            return result
        if isinstance(result, list):
            return result
        if isinstance(result, dict):
            return json.dumps(result)
        return str(result)

    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"type": "text", "text": block.content}
                case FunctionCallBlock():
                    block_dict = {
                        "type": "tool_use",
                        "id": block.id,
                        "name": block.name,
                        "input": block.arguments,
                    }

                case FunctionCallResultBlock():
                    block_dict = {
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": self._normalize_tool_result_content(block.result),
                    }
                case StructuredBlock():
                    block_dict = {
                        "type": "text",
                        "text": str(block.content),
                    }
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        if all(isinstance(block, dict) for block in content) and all(
            list(block.keys()) == ["type", "text"] for block in content
        ):
            content = "".join([block["text"] for block in content])

        return {
            "role": turn.role.anthropic_role,
            "content": (content),
        }

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        """Convert text and role to Anthropic message format"""
        # Anthropic uses 'user', 'assistant', and 'system' roles

        return {"role": role.anthropic_role, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "document",
                    "source": {
                        "type": "url",
                        "url": block.media.source,
                    },
                }

            case "base64":
                return {
                    "type": "document",
                    "source": {
                        "type": "base64",
                        "media_type": "application/pdf",
                        "data": block.media.source,
                    },
                }

            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "document",
                    "source": {
                        "type": "base64",
                        "media_type": "application/pdf",
                        "data": base64_pdf,
                    },
                }

            case _:
                raise NotImplementedError("Source type not supported")

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image",
                    "source": {
                        "type": "url",
                        "url": block.media.source,
                    },
                }

            case "base64":
                return {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": f"image/{block.media.extension}",
                        "data": block.media.source,
                    },
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": f"image/{block.media.extension}",
                        "data": base64_image,
                    },
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media type: {block.media.media_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-anthropic"
version = "0.0.8"
description = "Anthropic (Claude) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "anthropic>=0.77.0,<1.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_memory_adapter.py" 
from datapizza.memory.memory import Memory
from datapizza.tools.tools import tool
from datapizza.type import ROLE, FunctionCallBlock, FunctionCallResultBlock, TextBlock

from datapizza.clients.anthropic import AnthropicClient
from datapizza.clients.anthropic.memory_adapter import AnthropicMemoryAdapter


def test_init_anthropic_client():
    client = AnthropicClient(api_key="test")
    assert client is not None


def test_anthropic_memory_adapter():
    memory_adapter = AnthropicMemoryAdapter()
    memory = Memory()
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.ASSISTANT)

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": "Hello, world!",
        },
        {
            "role": "assistant",
            "content": "Hello, world!",
        },
    ]


def test_anthropic_memory_adapter_tool_result_turn_maps_to_user_role():
    @tool
    def get_city() -> str:
        return "Rome"

    memory_adapter = AnthropicMemoryAdapter()
    memory = Memory()
    memory.add_turn(
        blocks=[FunctionCallResultBlock(id="toolu_1", tool=get_city, result="Rome")],
        role=ROLE.TOOL,
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages[0]["role"] == "user"
    assert messages[0]["content"] == [
        {
            "type": "tool_result",
            "tool_use_id": "toolu_1",
            "content": "Rome",
        }
    ]


def test_anthropic_memory_adapter_function_call_turn_maps_to_assistant_tool_use():
    @tool
    def get_city() -> str:
        return "Rome"

    memory_adapter = AnthropicMemoryAdapter()
    memory = Memory()
    memory.add_turn(
        blocks=[
            FunctionCallBlock(
                id="toolu_1",
                name="get_city",
                arguments={"country": "Italy"},
                tool=get_city,
            )
        ],
        role=ROLE.ASSISTANT,
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages[0]["role"] == "assistant"
    assert messages[0]["content"] == [
        {
            "type": "tool_use",
            "id": "toolu_1",
            "name": "get_city",
            "input": {"country": "Italy"},
        }
    ]

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_streaming.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_streaming.py" 
import asyncio
from types import SimpleNamespace

from datapizza.tools import tool
from datapizza.type import FunctionCallBlock

from datapizza.clients.anthropic.anthropic_client import AnthropicClient
from datapizza.clients.anthropic.memory_adapter import AnthropicMemoryAdapter


def _message_start(input_tokens=5):
    return SimpleNamespace(
        type="message_start",
        message=SimpleNamespace(
            usage=SimpleNamespace(input_tokens=input_tokens),
        ),
    )


def _message_delta(output_tokens=7, stop_reason="tool_use"):
    return SimpleNamespace(
        type="message_delta",
        usage=SimpleNamespace(output_tokens=output_tokens),
        delta=SimpleNamespace(stop_reason=stop_reason),
    )


def _tool_start(index, name, tool_id, input=None):
    return SimpleNamespace(
        type="content_block_start",
        index=index,
        content_block=SimpleNamespace(
            type="tool_use",
            id=tool_id,
            name=name,
            input=input,
        ),
    )


def _text_delta(text):
    return SimpleNamespace(
        type="content_block_delta",
        delta=SimpleNamespace(text=text),
    )


def _thinking_delta(thinking):
    return SimpleNamespace(
        type="content_block_delta",
        delta=SimpleNamespace(thinking=thinking),
    )


class FakeSyncMessages:
    def __init__(self, chunks):
        self._chunks = chunks

    def create(self, **kwargs):
        return iter(self._chunks)


class FakeAsyncMessages:
    def __init__(self, chunks):
        self._chunks = chunks

    async def create(self, **kwargs):
        async def iterator():
            for chunk in self._chunks:
                yield chunk

        return iterator()


class FakeSyncClient:
    def __init__(self, chunks):
        self.messages = FakeSyncMessages(chunks)


class FakeAsyncClient:
    def __init__(self, chunks):
        self.messages = FakeAsyncMessages(chunks)


def _make_client():
    client = object.__new__(AnthropicClient)
    client.model_name = "claude-test"
    client.system_prompt = ""
    client.temperature = None
    client.memory_adapter = AnthropicMemoryAdapter()
    return client


@tool
def get_weather(location: str, when: str) -> str:
    return "25 C"


def test_stream_invoke_preserves_tool_use_blocks():
    client = _make_client()
    client.client = FakeSyncClient(
        [
            _message_start(),
            _tool_start(
                index=0,
                name="get_weather",
                tool_id="tool_1",
                input={"location": "Milan", "when": "tomorrow"},
            ),
            _message_delta(stop_reason="tool_use"),
        ]
    )

    responses = list(
        client._stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    final_response = responses[-1]
    assert len(final_response.function_calls) == 1
    assert isinstance(final_response.function_calls[0], FunctionCallBlock)
    assert final_response.function_calls[0].name == "get_weather"
    assert final_response.function_calls[0].arguments == {
        "location": "Milan",
        "when": "tomorrow",
    }
    assert final_response.stop_reason == "tool_use"


def test_stream_invoke_preserves_text_and_thinking():
    client = _make_client()
    client.client = FakeSyncClient(
        [
            _message_start(),
            _thinking_delta("Thinking..."),
            _text_delta("Hel"),
            _text_delta("lo"),
            _message_delta(stop_reason="end_turn"),
        ]
    )

    responses = list(
        client._stream_invoke(
            input="hello",
            tools=[],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    assert [response.delta for response in responses[:-1]] == ["Hel", "lo"]
    assert responses[-1].text == "Hello"
    assert responses[-1].thoughts == "Thinking..."


def test_a_stream_invoke_preserves_tool_use_blocks():
    client = _make_client()
    client.a_client = FakeAsyncClient(
        [
            _message_start(),
            _tool_start(
                index=0,
                name="get_weather",
                tool_id="tool_1",
                input={"location": "Milan", "when": "tomorrow"},
            ),
            _message_delta(stop_reason="tool_use"),
        ]
    )

    async def collect():
        items = []
        async for response in client._a_stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        ):
            items.append(response)
        return items

    responses = asyncio.run(collect())
    final_response = responses[-1]
    assert len(final_response.function_calls) == 1
    assert final_response.function_calls[0].name == "get_weather"

```

## /datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_structured_response.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-anthropic/tests/test_anthropic_structured_response.py" 
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from datapizza.type import StructuredBlock, TextBlock
from pydantic import BaseModel

from datapizza.clients.anthropic import AnthropicClient


class _Person(BaseModel):
    name: str
    age: int


@pytest.fixture
def mock_usage():
    u = MagicMock()
    u.input_tokens = 12
    u.output_tokens = 34
    u.cache_read_input_tokens = 0
    return u


@patch("datapizza.clients.anthropic.anthropic_client.Anthropic")
def test_structured_response_parse_sync(mock_anthropic_class, mock_usage):
    mock_instance = mock_anthropic_class.return_value
    parsed_msg = MagicMock()
    parsed_msg.parsed_output = _Person(name="Ada", age=36)
    parsed_msg.stop_reason = "end_turn"
    parsed_msg.usage = mock_usage
    mock_instance.messages.parse.return_value = parsed_msg

    client = AnthropicClient(api_key="key", model="claude-sonnet-4-20250514")
    result = client._structured_response(
        input=[TextBlock(content="Who?")],
        output_cls=_Person,
        max_tokens=512,
    )

    mock_instance.messages.parse.assert_called_once()
    call_kw = mock_instance.messages.parse.call_args.kwargs
    assert call_kw["model"] == "claude-sonnet-4-20250514"
    assert call_kw["output_format"] is _Person
    assert call_kw["max_tokens"] == 512

    assert len(result.content) == 1
    assert isinstance(result.content[0], StructuredBlock)
    assert result.content[0].content == _Person(name="Ada", age=36)
    assert result.prompt_tokens_used == 12
    assert result.completion_tokens_used == 34


@patch("datapizza.clients.anthropic.anthropic_client.AsyncAnthropic")
def test_structured_response_parse_async(mock_async_class, mock_usage):
    mock_instance = mock_async_class.return_value
    parsed_msg = MagicMock()
    parsed_msg.parsed_output = _Person(name="Bob", age=40)
    parsed_msg.stop_reason = "end_turn"
    parsed_msg.usage = mock_usage
    mock_instance.messages.parse = AsyncMock(return_value=parsed_msg)

    client = AnthropicClient(api_key="key", model="claude-sonnet-4-20250514")

    async def run():
        return await client._a_structured_response(
            input=[TextBlock(content="Hi")],
            output_cls=_Person,
        )

    result = asyncio.run(run())

    mock_instance.messages.parse.assert_awaited_once()
    assert result.content[0].content.name == "Bob"
    assert result.stop_reason == "end_turn"


@patch("datapizza.clients.anthropic.anthropic_client.Anthropic")
def test_structured_response_json_object_sentinel(mock_anthropic_class, mock_usage):
    mock_instance = mock_anthropic_class.return_value
    text_block = MagicMock()
    text_block.type = "text"
    text_block.text = '{"foo": 42}'
    raw_msg = MagicMock()
    raw_msg.content = [text_block]
    raw_msg.stop_reason = "end_turn"
    raw_msg.usage = mock_usage
    mock_instance.messages.create.return_value = raw_msg

    client = AnthropicClient(api_key="key", model="claude-haiku")
    result = client._structured_response(
        input=[TextBlock(content="return json")],
        output_cls={"type": "json_object"},
    )

    mock_instance.messages.create.assert_called_once()
    assert mock_instance.messages.parse.call_count == 0
    kw = mock_instance.messages.create.call_args.kwargs
    assert kw["output_config"]["format"]["type"] == "json_schema"
    assert result.content[0].content == {"foo": 42}

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/README.md

# DataPizza AI - AWS Bedrock Client

AWS Bedrock client implementation for the datapizza-ai framework. This client provides seamless integration with AWS Bedrock's Converse API, supporting various foundation models including Anthropic's Claude models.

## Features

- Full support for AWS Bedrock Converse API
- Multiple authentication methods (AWS Profile, Access Keys, Environment Variables)
- Streaming and non-streaming responses
- Tool/function calling support
- Memory/conversation history management
- Image and document (PDF) support
- Async support

## Installation

```bash
pip install datapizza-ai-clients-bedrock
```

Or install from source in editable mode:

```bash
cd datapizza-ai/datapizza-ai-clients/datapizza-ai-clients-bedrock
pip install -e .
```

## Quick Start

### Basic Usage

```python
from datapizza.clients.bedrock import BedrockClient

# Using AWS Profile
client = BedrockClient(
    profile_name="my-aws-profile",
    region_name="us-east-1"
)

# Or using access keys
client = BedrockClient(
    aws_access_key_id="YOUR_ACCESS_KEY",
    aws_secret_access_key="YOUR_SECRET_KEY",
    region_name="us-east-1"
)

# Simple invocation
result = client.invoke("What is AWS Bedrock?")

# Extract text from response
for block in result.content:
    if hasattr(block, 'content'):
        print(block.content)
```

## Authentication Methods

The client supports multiple authentication methods in the following priority order:

### 1. Explicit Credentials

```python
client = BedrockClient(
    aws_access_key_id="YOUR_ACCESS_KEY",
    aws_secret_access_key="YOUR_SECRET_KEY",
    aws_session_token="YOUR_SESSION_TOKEN",  # Optional, for temporary credentials
    region_name="us-east-1"
)
```

### 2. AWS Profile

```python
client = BedrockClient(
    profile_name="my-aws-profile",
    region_name="us-east-1"
)
```

### 3. Environment Variables

Set these environment variables:
```bash
export AWS_ACCESS_KEY_ID="your-access-key"
export AWS_SECRET_ACCESS_KEY="your-secret-key"
export AWS_SESSION_TOKEN="your-session-token"  # Optional
export AWS_PROFILE="my-aws-profile"  # Or use profile
```

Then initialize without parameters:
```python
client = BedrockClient(region_name="us-east-1")
```

### 4. Default AWS Credentials Chain

If no credentials are provided, boto3 will use the default credentials chain (IAM roles, ~/.aws/credentials, etc.)

```python
client = BedrockClient(region_name="us-east-1")
```

## Available Models

The client works with any Bedrock model that supports the Converse API. Popular models include:

- `anthropic.claude-3-5-sonnet-20241022-v2:0` (default)
- `anthropic.claude-3-5-sonnet-20240620-v1:0`
- `anthropic.claude-3-opus-20240229-v1:0`
- `anthropic.claude-3-sonnet-20240229-v1:0`
- `anthropic.claude-3-haiku-20240307-v1:0`
- `meta.llama3-70b-instruct-v1:0`
- `mistral.mistral-large-2402-v1:0`
- And many more...

```python
client = BedrockClient(
    model="anthropic.claude-3-opus-20240229-v1:0",
    region_name="us-east-1"
)
```

## Usage Examples

### With System Prompt

```python
client = BedrockClient(
    system_prompt="You are a helpful coding assistant specialized in Python.",
    region_name="us-east-1"
)

result = client.invoke("How do I read a CSV file?")
```

### Streaming Responses

```python
for chunk in client.stream_invoke("Tell me a long story"):
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
print()
```

### With Memory (Conversation History)

```python
from datapizza.memory import Memory

memory = Memory()
client = BedrockClient(region_name="us-east-1")

# First message
result1 = client.invoke("My favorite color is blue", memory=memory)

# The conversation is tracked in memory
result2 = client.invoke("What's my favorite color?", memory=memory)
# Response: "Your favorite color is blue."
```

### With Temperature and Max Tokens

```python
result = client.invoke(
    "Write a creative story",
    temperature=0.9,  # Higher = more creative (0-1)
    max_tokens=1000
)
```

### With Tools/Function Calling

```python
from datapizza.tools import Tool

def get_weather(location: str, unit: str = "celsius") -> str:
    """Get the weather for a location"""
    return f"The weather in {location} is 22°{unit[0].upper()}"

weather_tool = Tool(
    name="get_weather",
    description="Get the current weather for a location",
    function=get_weather,
    properties={
        "location": {
            "type": "string",
            "description": "The city name"
        },
        "unit": {
            "type": "string",
            "enum": ["celsius", "fahrenheit"],
            "description": "Temperature unit"
        }
    },
    required=["location"]
)

result = client.invoke(
    "What's the weather in Paris?",
    tools=[weather_tool]
)

# Check for function calls
for block in result.content:
    if isinstance(block, FunctionCallBlock):
        print(f"Function: {block.name}")
        print(f"Arguments: {block.arguments}")
```

### Async Support

```python
import asyncio

async def main():
    client = BedrockClient(region_name="us-east-1")
    result = await client.a_invoke("Hello!")
    print(result.content[0].content)

asyncio.run(main())
```

### Async Streaming

```python
async def stream_example():
    client = BedrockClient(region_name="us-east-1")
    async for chunk in client.a_stream_invoke("Count to 10"):
        if chunk.delta:
            print(chunk.delta, end="", flush=True)

asyncio.run(stream_example())
```

## Configuration

### Constructor Parameters

```python
BedrockClient(
    model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
    system_prompt: str = "",
    temperature: float | None = None,  # 0-1 for most models
    cache: Cache | None = None,
    region_name: str = "us-east-1",
    aws_access_key_id: str | None = None,
    aws_secret_access_key: str | None = None,
    aws_session_token: str | None = None,
    profile_name: str | None = None,
)
```

### Invoke Parameters

```python
client.invoke(
    input: str,                    # The user message
    tools: list[Tool] | None = None,
    memory: Memory | None = None,
    tool_choice: "auto" | "required" | "none" | list[str] = "auto",
    temperature: float | None = None,
    max_tokens: int = 2048,
    system_prompt: str | None = None,  # Override instance system_prompt
)
```

## Response Format

All methods return a `ClientResponse` object:

```python
response = client.invoke("Hello")

# Access content blocks
for block in response.content:
    if isinstance(block, TextBlock):
        print(block.content)  # The text
    elif isinstance(block, FunctionCallBlock):
        print(block.name)      # Function name
        print(block.arguments) # Function arguments

# Token usage
print(f"Prompt tokens: {response.prompt_tokens_used}")
print(f"Completion tokens: {response.completion_tokens_used}")
print(f"Stop reason: {response.stop_reason}")
```

## IAM Permissions

Your AWS credentials need the following permissions:

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "bedrock:InvokeModel",
                "bedrock:InvokeModelWithResponseStream"
            ],
            "Resource": [
                "arn:aws:bedrock:*::foundation-model/*"
            ]
        }
    ]
}
```

## Model Access

Before using a model, you need to request access in the AWS Bedrock console:

1. Go to AWS Bedrock console
2. Navigate to "Model access"
3. Request access to the models you want to use
4. Wait for approval (usually instant for most models)

## Limitations

- Structured responses are not natively supported (unlike OpenAI's structured output)
- Some advanced features may vary by model
- Token usage metrics may not include caching information

## Error Handling

```python
from botocore.exceptions import BotoCoreError, ClientError

try:
    result = client.invoke("Hello")
except ClientError as e:
    if e.response['Error']['Code'] == 'AccessDeniedException':
        print("Model access not granted. Check Bedrock console.")
    elif e.response['Error']['Code'] == 'ResourceNotFoundException':
        print("Model not found in this region.")
    else:
        print(f"AWS Error: {e}")
except BotoCoreError as e:
    print(f"Boto3 Error: {e}")
```

## Development

### Running Tests

```bash
pip install -e ".[dev]"
pytest tests/
```

### Code Formatting

```bash
ruff check .
ruff format .
```

## License

MIT License - see LICENSE file for details

## Contributing

Contributions are welcome! Please see the main datapizza-ai repository for contribution guidelines.

## Support

For issues and questions:
- GitHub Issues: [datapizza-ai repository](https://github.com/datapizza/datapizza-ai)
- Documentation: [DataPizza AI Docs](https://docs.datapizza.ai)


## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/__init__.py" 
from .bedrock_client import BedrockClient

__all__ = ["BedrockClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/bedrock_client.py" 
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

import aioboto3
import boto3
from botocore.config import Config
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import FunctionCallBlock, TextBlock

from .memory_adapter import BedrockMemoryAdapter


class BedrockClient(Client):
    """A client for interacting with AWS Bedrock API.

    This class provides methods for invoking AWS Bedrock models (Claude, etc.)
    to generate responses. It extends the Client class and supports authentication
    via AWS profile or access keys.
    """

    def __init__(
        self,
        model: str = "anthropic.claude-3-5-sonnet-20241022-v2:0",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        region_name: str = "us-east-1",
        aws_access_key_id: str | None = None,
        aws_secret_access_key: str | None = None,
        aws_session_token: str | None = None,
        profile_name: str | None = None,
    ):
        """
        Args:
            model: The Bedrock model to use (e.g., 'anthropic.claude-3-5-sonnet-20241022-v2:0').
            system_prompt: The system prompt to use for the model.
            temperature: The temperature to use for generation (0-1 for most models).
            cache: The cache to use for responses.
            region_name: AWS region name (default: us-east-1).
            aws_access_key_id: AWS access key ID (optional, can use AWS_ACCESS_KEY_ID env var).
            aws_secret_access_key: AWS secret access key (optional, can use AWS_SECRET_ACCESS_KEY env var).
            aws_session_token: AWS session token (optional, for temporary credentials).
            profile_name: AWS profile name (optional, can use AWS_PROFILE env var).
        """
        if temperature and not 0 <= temperature <= 1:
            raise ValueError("Temperature must be between 0 and 1 for Bedrock models")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.region_name = region_name
        self.aws_access_key_id = aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID")
        self.aws_secret_access_key = aws_secret_access_key or os.getenv(
            "AWS_SECRET_ACCESS_KEY"
        )
        self.aws_session_token = aws_session_token or os.getenv("AWS_SESSION_TOKEN")
        self.profile_name = profile_name or os.getenv("AWS_PROFILE")

        self.memory_adapter = BedrockMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            session_kwargs = {}

            # Priority: explicit credentials > profile > default credentials
            if self.aws_access_key_id and self.aws_secret_access_key:
                session_kwargs["aws_access_key_id"] = self.aws_access_key_id
                session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
                if self.aws_session_token:
                    session_kwargs["aws_session_token"] = self.aws_session_token
            elif self.profile_name:
                session_kwargs["profile_name"] = self.profile_name

            session = boto3.Session(**session_kwargs)

            # Create bedrock-runtime client with retry configuration
            config = Config(
                retries={"max_attempts": 3, "mode": "adaptive"},
                read_timeout=300,
            )

            self.client = session.client(
                service_name="bedrock-runtime",
                region_name=self.region_name,
                config=config,
            )

    def _set_a_client(self):
        """Initialize async bedrock-runtime client using aioboto3"""
        if not self.a_client:
            session_kwargs = {}

            # Priority: explicit credentials > profile > default credentials
            if self.aws_access_key_id and self.aws_secret_access_key:
                session_kwargs["aws_access_key_id"] = self.aws_access_key_id
                session_kwargs["aws_secret_access_key"] = self.aws_secret_access_key
                if self.aws_session_token:
                    session_kwargs["aws_session_token"] = self.aws_session_token
            elif self.profile_name:
                session_kwargs["profile_name"] = self.profile_name

            # Create async session
            session = aioboto3.Session(**session_kwargs)

            # Create bedrock-runtime client with retry configuration
            config = Config(
                retries={"max_attempts": 3, "mode": "adaptive"},
                read_timeout=300,
            )

            # Store the session and config for async client creation
            self.a_session = session
            self.a_config = config
            self.a_region_name = self.region_name

    def _convert_tools(self, tools: list[Tool]) -> list[dict[str, Any]]:
        """Convert tools to Bedrock tool format (similar to Anthropic)"""
        bedrock_tools = []
        for tool in tools:
            bedrock_tool = {
                "toolSpec": {
                    "name": tool.name,
                    "description": tool.description or "",
                    "inputSchema": {
                        "json": {
                            "type": "object",
                            "properties": tool.properties,
                            "required": tool.required,
                        }
                    },
                }
            }
            bedrock_tools.append(bedrock_tool)
        return bedrock_tools

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict:
        """Convert tool choice to Bedrock format"""
        if isinstance(tool_choice, list):
            if len(tool_choice) > 1:
                raise NotImplementedError(
                    "Multiple function names not supported by Bedrock"
                )
            return {"tool": {"name": tool_choice[0]}}
        elif tool_choice == "required":
            return {"any": {}}
        elif tool_choice == "auto":
            return {"auto": {}}
        else:  # none
            return {}

    def _response_to_client_response(
        self, response: dict, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        """Convert Bedrock response to ClientResponse"""
        blocks = []

        # Parse the response body
        response_body = response.get("output", {})

        # Handle message content
        message = response_body.get("message", {})
        content_items = message.get("content", [])

        for content_item in content_items:
            if "text" in content_item:
                blocks.append(TextBlock(content=content_item["text"]))
            elif "toolUse" in content_item:
                tool_use = content_item["toolUse"]
                tool = tool_map.get(tool_use["name"]) if tool_map else None
                if not tool:
                    raise ValueError(f"Tool {tool_use['name']} not found")

                blocks.append(
                    FunctionCallBlock(
                        id=tool_use["toolUseId"],
                        name=tool_use["name"],
                        arguments=tool_use["input"],
                        tool=tool,
                    )
                )

        # Extract usage information
        usage = response.get("usage", {})
        prompt_tokens = usage.get("inputTokens", 0)
        completion_tokens = usage.get("outputTokens", 0)

        # Extract stop reason
        stop_reason = response.get("stopReason")

        return ClientResponse(
            content=blocks,
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=prompt_tokens,
                completion_tokens=completion_tokens,
                cached_tokens=0,  # Bedrock doesn't expose cache metrics in the same way
            ),
        )

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method for Bedrock"""
        if tools is None:
            tools = []

        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages (Bedrock doesn't support this)
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        # Build the request body according to Bedrock Converse API
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        # Add system prompt if provided
        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        # Add tools if provided
        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        # Add any additional kwargs
        request_body.update(kwargs)

        # Call Bedrock Converse API
        response = client.converse(**request_body)

        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Async implementation using aioboto3"""
        if tools is None:
            tools = []

        # Ensure async client is initialized
        if not hasattr(self, "a_session"):
            self._set_a_client()

        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages (Bedrock doesn't support this)
        messages = [message for message in messages if message.get("role") != "model"]

        tool_map = {tool.name: tool for tool in tools}

        # Build the request body according to Bedrock Converse API
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        # Add system prompt if provided
        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        # Add tools if provided
        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        # Add any additional kwargs
        request_body.update(kwargs)

        # Call Bedrock Converse API asynchronously
        async with self.a_session.client(
            service_name="bedrock-runtime",
            region_name=self.a_region_name,
            config=self.a_config,
        ) as client:
            response = await client.converse(**request_body)

        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of streaming for Bedrock"""
        if tools is None:
            tools = []

        client = self._get_client()
        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages
        messages = [message for message in messages if message.get("role") != "model"]

        # Build the request body
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        request_body.update(kwargs)

        # Call Bedrock ConverseStream API
        response = client.converse_stream(**request_body)

        # Process streaming response
        message_text = ""
        input_tokens = 0
        output_tokens = 0
        stop_reason = None

        stream = response.get("stream")
        if stream:
            for event in stream:
                if "contentBlockDelta" in event:
                    delta = event["contentBlockDelta"].get("delta", {})
                    if "text" in delta:
                        text_delta = delta["text"]
                        message_text += text_delta
                        yield ClientResponse(
                            content=[TextBlock(content=message_text)],
                            delta=text_delta,
                            stop_reason=None,
                        )

                elif "metadata" in event:
                    metadata = event["metadata"]
                    usage = metadata.get("usage", {})
                    input_tokens = usage.get("inputTokens", 0)
                    output_tokens = usage.get("outputTokens", 0)

                elif "messageStop" in event:
                    stop_reason = event["messageStop"].get("stopReason")

        # Final response with complete information
        yield ClientResponse(
            content=[TextBlock(content=message_text)],
            delta="",
            stop_reason=stop_reason,
            usage=TokenUsage(
                prompt_tokens=input_tokens,
                completion_tokens=output_tokens,
                cached_tokens=0,
            ),
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Async streaming implementation using aioboto3"""
        if tools is None:
            tools = []

        # Ensure async client is initialized
        if not hasattr(self, "a_session"):
            self._set_a_client()

        messages = self._memory_to_contents(None, input, memory)

        # Remove model role messages
        messages = [message for message in messages if message.get("role") != "model"]

        # Build the request body
        request_body = {
            "modelId": self.model_name,
            "messages": messages,
            "inferenceConfig": {
                "maxTokens": max_tokens or 2048,
            },
        }

        if temperature is not None:
            request_body["inferenceConfig"]["temperature"] = temperature

        if system_prompt:
            request_body["system"] = [{"text": system_prompt}]

        if tools:
            request_body["toolConfig"] = {
                "tools": self._convert_tools(tools),
                "toolChoice": self._convert_tool_choice(tool_choice),
            }

        request_body.update(kwargs)

        # Call Bedrock ConverseStream API asynchronously
        async with self.a_session.client(
            service_name="bedrock-runtime",
            region_name=self.a_region_name,
            config=self.a_config,
        ) as client:
            response = await client.converse_stream(**request_body)

            # Process streaming response
            message_text = ""
            input_tokens = 0
            output_tokens = 0
            stop_reason = None

            stream = response.get("stream")
            if stream:
                async for event in stream:
                    if "contentBlockDelta" in event:
                        delta = event["contentBlockDelta"].get("delta", {})
                        if "text" in delta:
                            text_delta = delta["text"]
                            message_text += text_delta
                            yield ClientResponse(
                                content=[TextBlock(content=message_text)],
                                delta=text_delta,
                                stop_reason=None,
                            )

                    elif "metadata" in event:
                        metadata = event["metadata"]
                        usage = metadata.get("usage", {})
                        input_tokens = usage.get("inputTokens", 0)
                        output_tokens = usage.get("outputTokens", 0)

                    elif "messageStop" in event:
                        stop_reason = event["messageStop"].get("stopReason")

            # Final response with complete information
            yield ClientResponse(
                content=[TextBlock(content=message_text)],
                delta="",
                stop_reason=stop_reason,
                usage=TokenUsage(
                    prompt_tokens=input_tokens,
                    completion_tokens=output_tokens,
                    cached_tokens=0,
                ),
            )

    def _structured_response(
        self,
        input: str,
        output_cls: type,
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Bedrock doesn't natively support structured output like OpenAI

        This would require prompting techniques or using Anthropic's prompt caching
        """
        raise NotImplementedError(
            "Bedrock doesn't natively support structured responses. "
            "Consider using prompt engineering or JSON mode with validation."
        )

    async def _a_structured_response(self, *args, **kwargs):
        raise NotImplementedError(
            "Bedrock doesn't natively support structured responses"
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/datapizza/clients/bedrock/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class BedrockMemoryAdapter(MemoryAdapter):
    """Adapter for converting Memory objects to AWS Bedrock Converse API message format

    The Bedrock Converse API uses a message format similar to Anthropic's Claude API.
    """

    def _turn_to_message(self, turn: Turn) -> dict:
        """Convert a Turn to Bedrock message format"""
        content = []

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"text": block.content}

                case FunctionCallBlock():
                    # Bedrock uses toolUse format
                    block_dict = {
                        "toolUse": {
                            "toolUseId": block.id,
                            "name": block.name,
                            "input": block.arguments,
                        }
                    }

                case FunctionCallResultBlock():
                    # Bedrock uses toolResult format
                    block_dict = {
                        "toolResult": {
                            "toolUseId": block.id,
                            "content": [{"text": str(block.result)}],
                        }
                    }

                case StructuredBlock():
                    # Convert structured content to text
                    block_dict = {
                        "text": json.dumps(block.content)
                        if not isinstance(block.content, str)
                        else block.content,
                    }

                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_document_block(block)
                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        # Bedrock expects content as a list
        return {
            "role": self._convert_role(turn.role),
            "content": content,
        }

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        """Convert text and role to Bedrock message format"""
        return {
            "role": self._convert_role(role),
            "content": [{"text": text}],
        }

    def _convert_role(self, role: ROLE) -> str:
        """Convert ROLE to Bedrock role string

        Bedrock Converse API supports 'user' and 'assistant' roles
        """
        if role.value == "user":
            return "user"
        elif role.value == "assistant":
            return "assistant"
        else:
            # Default to user for system or other roles
            return "user"

    def _process_document_block(self, block: MediaBlock) -> dict:
        """Process document (PDF) blocks for Bedrock"""
        match block.media.source_type:
            case "base64":
                return {
                    "document": {
                        "format": "pdf",
                        "name": "document.pdf",
                        "source": {"bytes": base64.b64decode(block.media.source)},
                    }
                }

            case "path":
                with open(block.media.source, "rb") as f:
                    pdf_bytes = f.read()
                return {
                    "document": {
                        "format": "pdf",
                        "name": block.media.source.split("/")[-1],
                        "source": {"bytes": pdf_bytes},
                    }
                }

            case "url":
                raise NotImplementedError(
                    "Bedrock Converse API does not support document URLs directly. "
                    "Please download and provide as bytes."
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        """Process image blocks for Bedrock"""
        match block.media.source_type:
            case "base64":
                return {
                    "image": {
                        "format": block.media.extension or "png",
                        "source": {"bytes": base64.b64decode(block.media.source)},
                    }
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    image_bytes = image_file.read()
                return {
                    "image": {
                        "format": block.media.extension or "png",
                        "source": {"bytes": image_bytes},
                    }
                }

            case "url":
                raise NotImplementedError(
                    "Bedrock Converse API does not support image URLs directly. "
                    "Please download and provide as bytes."
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-bedrock"
version = "0.0.5"
description = "AWS Bedrock client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "boto3>=1.35.0",
    "botocore>=1.35.0",
    "aioboto3>=13.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
    "moto>=5.0.0",  # For mocking AWS services in tests
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_async.py" 
from datapizza.clients.bedrock import BedrockClient


def test_async_client_initialization():
    """Test that async client can be initialized"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Initialize async client
    client._set_a_client()

    # Verify async session is created
    assert hasattr(client, "a_session")
    assert hasattr(client, "a_config")
    assert hasattr(client, "a_region_name")
    assert client.a_region_name == "us-east-1"


def test_a_invoke_method_exists():
    """Test that _a_invoke method is implemented and doesn't raise NotImplementedError"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Verify the method exists and is async
    assert hasattr(client, "_a_invoke")
    assert callable(client._a_invoke)


def test_a_stream_invoke_method_exists():
    """Test that _a_stream_invoke method is implemented and doesn't raise NotImplementedError"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-east-1",
    )

    # Verify the method exists and is async
    assert hasattr(client, "_a_stream_invoke")
    assert callable(client._a_stream_invoke)

```

## /datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-bedrock/tests/test_bedrock_memory_adapter.py" 
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, TextBlock

from datapizza.clients.bedrock import BedrockClient
from datapizza.clients.bedrock.memory_adapter import BedrockMemoryAdapter


def test_init_bedrock_client():
    """Test that BedrockClient can be initialized"""
    client = BedrockClient()
    assert client is not None
    assert client.model_name == "anthropic.claude-3-5-sonnet-20241022-v2:0"


def test_init_bedrock_client_with_credentials():
    """Test BedrockClient initialization with explicit credentials"""
    client = BedrockClient(
        aws_access_key_id="test_key",
        aws_secret_access_key="test_secret",
        region_name="us-west-2",
    )
    assert client is not None
    assert client.aws_access_key_id == "test_key"
    assert client.region_name == "us-west-2"


def test_bedrock_memory_adapter():
    """Test that the memory adapter converts memory to Bedrock message format"""
    memory_adapter = BedrockMemoryAdapter()
    memory = Memory()
    memory.add_turn(blocks=[TextBlock(content="Hello, world!")], role=ROLE.USER)
    memory.add_turn(
        blocks=[TextBlock(content="Hello! How can I help you?")], role=ROLE.ASSISTANT
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": [{"text": "Hello, world!"}],
        },
        {
            "role": "assistant",
            "content": [{"text": "Hello! How can I help you?"}],
        },
    ]


def test_bedrock_memory_adapter_multiple_blocks():
    """Test memory adapter with multiple text blocks in a single turn"""
    memory_adapter = BedrockMemoryAdapter()
    memory = Memory()
    memory.add_turn(
        blocks=[
            TextBlock(content="First message."),
            TextBlock(content="Second message."),
        ],
        role=ROLE.USER,
    )

    messages = memory_adapter.memory_to_messages(memory)

    assert messages == [
        {
            "role": "user",
            "content": [{"text": "First message."}, {"text": "Second message."}],
        },
    ]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/README.md



## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/__init__.py" 
from .google_client import GoogleClient

__all__ = ["GoogleClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/google_client.py" 
import base64
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Media,
    MediaBlock,
    Model,
    StructuredBlock,
    TextBlock,
    ThoughtBlock,
)

from google import genai
from google.genai import types
from google.oauth2 import service_account

from .memory_adapter import GoogleMemoryAdapter


class GoogleClient(Client):
    """A client for interacting with Google's Generative AI APIs.

    This class provides methods for invoking the Google GenAI API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str | None = None,
        model: str = "gemini-2.0-flash",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        project_id: str | None = None,
        location: str | None = None,
        credentials_path: str | None = None,
        use_vertexai: bool = False,
        **kwargs,
    ):
        """
        Args:
            api_key: The API key for the Google API.
            model: The model to use for the Google API.
            system_prompt: The system prompt to use for the Google API.
            temperature: The temperature to use for the Google API.
            cache: The cache to use for the Google API.
            project_id: The project ID for the Google API.
            location: The location for the Google API.
            credentials_path: The path to the credentials for the Google API.
            use_vertexai: Whether to use Vertex AI for the Google API.
            **kwargs: Additional keyword arguments forwarded to the underlying
                ``genai.Client`` constructor.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )
        self.memory_adapter = GoogleMemoryAdapter()

        try:
            if use_vertexai:
                if not credentials_path:
                    raise ValueError("credentials_path must be provided")
                if not project_id:
                    raise ValueError("project_id must be provided")
                if not location:
                    raise ValueError("location must be provided")

                credentials = service_account.Credentials.from_service_account_file(
                    credentials_path,
                    scopes=["https://www.googleapis.com/auth/cloud-platform"],
                )
                self.client = genai.Client(
                    vertexai=True,
                    project=project_id,
                    location=location,
                    credentials=credentials,
                    **kwargs,
                )
            else:
                if not api_key:
                    raise ValueError("api_key must be provided")

                self.client = genai.Client(api_key=api_key, **kwargs)

        except Exception as e:
            raise RuntimeError(
                f"Failed to initialize Google GenAI client: {e!s}"
            ) from None

    def _sanitize_schema(self, schema: Any) -> Any:
        """Remove JSON Schema keys unsupported by Gemini's function schema."""
        if isinstance(schema, dict):
            sanitized = {}
            for key, value in schema.items():
                if key in [
                    "additionalProperties",
                    "additional_properties",
                    "minLength",
                    "maxLength",
                    "pattern",
                    "format",
                    "minItems",
                    "maxItems",
                    "uniqueItems",
                ]:
                    continue
                if key == "exclusiveMinimum":
                    # For integers: exclusiveMinimum: 0 → minimum: 1 (next valid int)
                    # For floats: just use the value (slight relaxation, allows boundary)
                    if isinstance(value, int) or (
                        isinstance(value, float) and value.is_integer()
                    ):
                        sanitized["minimum"] = int(value) + 1
                    else:
                        sanitized["minimum"] = value
                    continue
                if key == "exclusiveMaximum":
                    # Same logic for maximum
                    if isinstance(value, int) or (
                        isinstance(value, float) and value.is_integer()
                    ):
                        sanitized["maximum"] = int(value) - 1
                    else:
                        sanitized["maximum"] = value
                    continue

                sanitized[key] = self._sanitize_schema(value)
            return sanitized
        if isinstance(schema, list):
            return [self._sanitize_schema(item) for item in schema]
        return schema

    def _convert_tool(self, tool: Tool) -> dict:
        """Convert tools to Google function format"""
        parameters_schema = self._sanitize_schema(tool.schema["parameters"])
        parameters = {
            "type": parameters_schema.get("type"),
            "properties": parameters_schema.get("properties", {}),
            "required": parameters_schema.get("required", []),
        }

        return {
            "name": tool.schema["name"],
            "description": tool.schema["description"],
            "parameters": parameters,
        }

    def _prepare_tools(self, tools: list[Tool] | None) -> list[types.Tool] | None:
        if not tools:
            return None

        google_tools = []
        function_declarations = []
        has_google_search = False

        for tool in tools:
            # Check if tool has google search capability
            if hasattr(tool, "name") and "google_search" in tool.name.lower():
                has_google_search = True
            elif isinstance(tool, Tool):
                function_declarations.append(self._convert_tool(tool))
            elif isinstance(tool, dict):
                google_tools.append(tool)
            else:
                raise ValueError(f"Unknown tool type: {type(tool)}")

        if function_declarations:
            google_tools.append(types.Tool(function_declarations=function_declarations))

        if has_google_search:
            google_tools.append(types.Tool(google_search=types.GoogleSearch()))

        return google_tools if google_tools else None

    def _token_usage_from_metadata(self, usage_metadata: Any | None) -> TokenUsage:
        if not usage_metadata:
            return TokenUsage()

        return TokenUsage(
            prompt_tokens=getattr(usage_metadata, "prompt_token_count", 0) or 0,
            completion_tokens=(
                getattr(usage_metadata, "candidates_token_count", None)
                or getattr(usage_metadata, "response_token_count", 0)
                or 0
            ),
            cached_tokens=getattr(usage_metadata, "cached_content_token_count", 0) or 0,
            thinking_tokens=getattr(usage_metadata, "thoughts_token_count", 0) or 0,
        )

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> types.ToolConfig:
        adjusted_tool_choice: types.ToolConfig
        if isinstance(tool_choice, list):
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",  # type: ignore
                    allowed_function_names=tool_choice,
                )
            )
        elif tool_choice == "required":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="ANY")  # type: ignore
            )
        elif tool_choice == "none":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="NONE")  # type: ignore
            )
        elif tool_choice == "auto":
            adjusted_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(mode="AUTO")  # type: ignore
            )
        return adjusted_tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,  # type: ignore
        )
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)

        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        response = await self.client.aio.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,  # type: ignore
        )
        return self._response_to_client_response(response, tool_map)

    def _build_stream_function_call_block(
        self,
        part,
        tool_map: dict[str, Tool],
        index: int,
    ) -> FunctionCallBlock:
        fc = part.function_call
        tool = tool_map.get(fc.name)
        if not tool:
            raise ValueError(f"Tool {fc.name} not found in tool map")

        thought_sig = getattr(part, "thought_signature", None)
        return FunctionCallBlock(
            name=fc.name,
            arguments=dict(fc.args) if fc.args else {},
            id=f"fc_{index}",
            tool=tool,
            thought_signature=thought_sig,
        )

    def _consume_stream_parts(
        self,
        parts,
        *,
        tool_map: dict[str, Tool],
        thought_text: str,
        function_call_blocks: list[FunctionCallBlock],
    ) -> tuple[str, str]:
        text_delta_parts: list[str] = []

        for part in parts:
            if hasattr(part, "function_call") and part.function_call:
                function_call_blocks.append(
                    self._build_stream_function_call_block(
                        part,
                        tool_map,
                        len(function_call_blocks),
                    )
                )
                continue

            part_text = getattr(part, "text", None)
            if not part_text:
                continue

            if hasattr(part, "thought") and part.thought:
                thought_text += part_text
                continue

            text_delta_parts.append(part_text)

        return "".join(text_delta_parts), thought_text

    def _build_stream_final_content(
        self,
        *,
        thought_text: str,
        message_text: str,
        function_call_blocks: list[FunctionCallBlock],
    ) -> list[ThoughtBlock | TextBlock | FunctionCallBlock]:
        content: list[ThoughtBlock | TextBlock | FunctionCallBlock] = []

        if thought_text:
            content.append(ThoughtBlock(content=thought_text))
        if message_text:
            content.append(TextBlock(content=message_text))
        content.extend(function_call_blocks)
        return content

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        """Implementation of the abstract _stream_invoke method"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)
        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        message_text = ""
        stop_reason = ""
        thought_text = ""
        function_call_blocks: list[FunctionCallBlock] = []
        usage = TokenUsage()

        for chunk in self.client.models.generate_content_stream(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,
        ):
            usage_metadata = getattr(chunk, "usage_metadata", None)
            usage += self._token_usage_from_metadata(usage_metadata)
            if not chunk.candidates:
                raise ValueError("No candidates in response")

            finish_reason = chunk.candidates[0].finish_reason
            stop_reason = (
                finish_reason.value.lower()
                if finish_reason is not None
                else finish_reason
            )

            if not chunk.candidates[0].content:
                raise ValueError("No content in response")

            if not chunk.candidates[0].content.parts:
                if chunk.text:
                    yield ClientResponse(
                        content=[],
                        delta=chunk.text,
                        stop_reason=stop_reason,
                        usage=usage,
                    )
                continue

            text_delta, thought_text = self._consume_stream_parts(
                chunk.candidates[0].content.parts,
                tool_map=tool_map,
                thought_text=thought_text,
                function_call_blocks=function_call_blocks,
            )
            if text_delta:
                message_text += text_delta
                yield ClientResponse(
                    content=[],
                    delta=text_delta,
                    stop_reason=stop_reason,
                )
        yield ClientResponse(
            content=self._build_stream_final_content(
                thought_text=thought_text,
                message_text=message_text,
                function_call_blocks=function_call_blocks,
            ),
            stop_reason=stop_reason,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        """Implementation of the abstract _a_stream_invoke method for Google"""
        if tools is None:
            tools = []
        contents = self._memory_to_contents(None, input, memory)
        tool_map = {tool.name: tool for tool in tools if isinstance(tool, Tool)}

        prepared_tools = self._prepare_tools(tools)
        config = types.GenerateContentConfig(
            temperature=temperature or self.temperature,
            system_instruction=system_prompt or self.system_prompt,
            max_output_tokens=max_tokens or None,
            tools=prepared_tools,  # type: ignore
            tool_config=self._convert_tool_choice(tool_choice)
            if tools and any(isinstance(tool, Tool) for tool in tools)
            else None,
            **kwargs,
        )

        usage = TokenUsage()
        message_text = ""
        stop_reason = ""
        thought_text = ""
        function_call_blocks: list[FunctionCallBlock] = []
        async for chunk in await self.client.aio.models.generate_content_stream(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=config,
        ):  # type: ignore
            usage_metadata = getattr(chunk, "usage_metadata", None)
            usage += self._token_usage_from_metadata(usage_metadata)

            if not chunk.candidates:
                raise ValueError("No candidates in response")

            finish_reason = chunk.candidates[0].finish_reason
            stop_reason = (
                finish_reason.value.lower()
                if finish_reason is not None
                else finish_reason
            )

            # Handle the case where the response has no parts
            candidate_content = chunk.candidates[0].content
            if not candidate_content or not candidate_content.parts:
                if chunk.text:
                    yield ClientResponse(
                        content=[],
                        delta=chunk.text,
                        stop_reason=stop_reason,
                    )
                continue

            text_delta, thought_text = self._consume_stream_parts(
                candidate_content.parts,
                tool_map=tool_map,
                thought_text=thought_text,
                function_call_blocks=function_call_blocks,
            )
            if text_delta:
                message_text += text_delta
                yield ClientResponse(
                    content=[],
                    delta=text_delta,
                    stop_reason=stop_reason,
                )
        yield ClientResponse(
            content=self._build_stream_final_content(
                thought_text=thought_text,
                message_text=message_text,
                function_call_blocks=function_call_blocks,
            ),
            stop_reason=stop_reason,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _structured_response method"""
        contents = self._memory_to_contents(self.system_prompt, input, memory)

        prepared_tools = self._prepare_tools(tools)
        response = self.client.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=types.GenerateContentConfig(
                system_instruction=system_prompt,
                temperature=temperature,
                max_output_tokens=max_tokens,
                response_mime_type="application/json",
                tools=prepared_tools,  # type: ignore
                tool_config=self._convert_tool_choice(tool_choice)
                if tools and any(isinstance(tool, Tool) for tool in tools)
                else None,
                response_schema=(
                    output_cls.model_json_schema()
                    if hasattr(output_cls, "model_json_schema")
                    else output_cls
                ),
            ),
        )
        if not response or not response.candidates:
            raise ValueError("No response from Google GenAI")

        structured_data = output_cls.model_validate_json(str(response.text))
        usage_metadata = getattr(response, "usage_metadata", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=response.candidates[0].finish_reason.value.lower()
            if response.candidates[0].finish_reason
            else None,
            usage=token_usage,
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        """Implementation of the abstract _structured_response method"""
        contents = self._memory_to_contents(self.system_prompt, input, memory)
        prepared_tools = self._prepare_tools(tools)
        response = await self.client.aio.models.generate_content(
            model=self.model_name,
            contents=contents,  # type: ignore
            config=types.GenerateContentConfig(
                system_instruction=system_prompt,
                temperature=temperature,
                max_output_tokens=max_tokens,
                response_mime_type="application/json",
                tools=prepared_tools,  # type: ignore
                tool_config=self._convert_tool_choice(tool_choice)
                if tools and any(isinstance(tool, Tool) for tool in tools)
                else None,
                response_schema=(
                    output_cls.model_json_schema()
                    if hasattr(output_cls, "model_json_schema")
                    else output_cls
                ),
            ),
        )

        if not response or not response.candidates:
            raise ValueError("No response from Google GenAI")

        structured_data = output_cls.model_validate_json(str(response.text))
        usage_metadata = getattr(response, "usage_metadata", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=response.candidates[0].finish_reason.value.lower()
            if response.candidates[0].finish_reason
            else None,
            usage=token_usage,
        )

    def _embed(
        self,
        text: str | list[str],
        model_name: str | None,
        task_type: str = "RETRIEVAL_DOCUMENT",
        output_dimensionality: int = 768,
        title: str | None = None,
        **kwargs,
    ) -> list[float] | list[list[float] | None]:
        """Embed a text using the model"""
        response = self.client.models.embed_content(
            model=model_name or self.model_name,
            contents=text,  # type: ignore
            config=types.EmbedContentConfig(
                task_type=task_type,
                output_dimensionality=output_dimensionality,
                title=title,
                **kwargs,
            ),
        )
        # Extract the embedding values from the response
        if not response.embeddings:
            return []

        embeddings = [embedding.values for embedding in response.embeddings]

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    async def _a_embed(
        self,
        text: str | list[str],
        model_name: str | None,
        task_type: str = "RETRIEVAL_DOCUMENT",
        output_dimensionality: int = 768,
        title: str | None = None,
        **kwargs,
    ) -> list[float] | list[list[float] | None]:
        """Embed a text using the model"""
        response = await self.client.aio.models.embed_content(
            model=model_name or self.model_name,
            contents=text,  # type: ignore
            config=types.EmbedContentConfig(
                task_type=task_type,
                output_dimensionality=output_dimensionality,
                title=title,
                **kwargs,
            ),
        )
        # Extract the embedding values from the response
        if not response.embeddings:
            return []
        embeddings = [embedding.values for embedding in response.embeddings]

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        blocks = []

        # Check if response has candidates with parts
        if (
            hasattr(response, "candidates")
            and response.candidates
            and response.candidates[0].content
            and response.candidates[0].content.parts
        ):
            for part in response.candidates[0].content.parts:
                # Handle function calls - extract thought_signature from part
                if hasattr(part, "function_call") and part.function_call:
                    fc = part.function_call
                    if not tool_map:
                        raise ValueError("Tool map is required")

                    tool = tool_map.get(fc.name, None)
                    if not tool:
                        raise ValueError(f"Tool {fc.name} not found in tool map")

                    # Extract thought_signature if present (required for Gemini 2.0+)
                    thought_sig = None
                    if hasattr(part, "thought_signature"):
                        thought_sig = part.thought_signature

                    blocks.append(
                        FunctionCallBlock(
                            name=fc.name,
                            arguments=dict(fc.args) if fc.args else {},
                            id=f"fc_{id(fc)}",
                            tool=tool,
                            thought_signature=thought_sig,
                        )
                    )
                # Handle inline_data (images from generation or code execution)
                elif hasattr(part, "inline_data") and part.inline_data is not None:
                    media = Media(
                        media_type="image",
                        source_type="base64",
                        source=base64.b64encode(part.inline_data.data).decode("utf-8"),
                        extension=(part.inline_data.mime_type.split("/")[-1])
                        if part.inline_data.mime_type
                        else "png",
                    )
                    blocks.append(MediaBlock(media=media))
                elif hasattr(part, "thought") and part.thought and part.text:
                    blocks.append(ThoughtBlock(content=part.text))
                elif hasattr(part, "text") and part.text:
                    blocks.append(TextBlock(content=part.text))

        usage_metadata = getattr(response, "usage_metadata", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=blocks,
            stop_reason=(response.candidates[0].finish_reason.value.lower())
            if hasattr(response, "candidates") and response.candidates
            else None,
            usage=token_usage,
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/datapizza/clients/google/memory_adapter.py" 
import base64

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)

from google.genai import types


class GoogleMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"text": block.content}
                case FunctionCallBlock():
                    block_dict = {
                        "function_call": {"name": block.name, "args": block.arguments}
                    }
                    # Include thought_signature if present (required for Gemini 2.0+)
                    if block.thought_signature is not None:
                        block_dict["thought_signature"] = block.thought_signature
                case FunctionCallResultBlock():
                    block_dict = types.Part.from_function_response(
                        name=block.tool.name,
                        response={"result": block.result},
                    )
                case StructuredBlock():
                    block_dict = {"text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)

                        case "audio":
                            block_dict = self._process_audio_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            content.append(block_dict)

        return {
            "role": turn.role.google_role,
            "parts": (content),
        }

    def _process_audio_block(self, block: MediaBlock) -> types.Part:
        match block.media.source_type:
            case "raw":
                return types.Part.from_bytes(
                    data=block.media.source,
                    mime_type="audio/mp3",
                )

            case "path":
                with open(block.media.source, "rb") as f:
                    audio_bytes = f.read()

                return types.Part.from_bytes(
                    data=audio_bytes,
                    mime_type="audio/mp3",
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _process_pdf_block(self, block: MediaBlock) -> types.Part | dict:
        match block.media.source_type:
            case "raw":
                return types.Part.from_bytes(
                    data=block.media.source,
                    mime_type="application/pdf",
                )
            case "base64":
                return {
                    "inline_data": {
                        "mime_type": "application/pdf",
                        "data": block.media.source,
                    }
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    pdf_bytes = f.read()

                return {
                    "inline_data": {
                        "mime_type": "application/pdf",
                        "data": pdf_bytes,
                    }
                }
            case "url":
                return types.Part.from_uri(
                    file_uri=block.media.source,
                    mime_type="application/pdf",
                )

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} only supported: raw, base64, path"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return types.Part.from_uri(
                    file_uri=block.media.source,
                    mime_type=f"image/{block.media.extension}",
                )  # type: ignore
            case "base64":
                return {
                    "inline_data": {
                        "mime_type": f"image/{block.media.extension}",
                        "data": block.media.source,
                    }
                }
            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "inline_data": {
                        "mime_type": f"image/{block.media.extension}",
                        "data": base64_image,
                    }
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for image, only url, base64, path are supported"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.google_role, "parts": [{"text": text}]}

```

## /datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-google/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-google"
version = "0.0.11"
description = "Google (Gemini) client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"

classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Topic :: Scientific/Engineering :: Artificial Intelligence",
    "Topic :: Software Development :: Libraries :: Application Frameworks",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "google-genai>=1.3.0,<2.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-google/tests/test_google_streaming.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/tests/test_google_streaming.py" 
import asyncio
from types import SimpleNamespace

from datapizza.tools import tool
from datapizza.type import FunctionCallBlock

from datapizza.clients.google.google_client import GoogleClient
from datapizza.clients.google.memory_adapter import GoogleMemoryAdapter


def _part(*, text=None, thought=False, function_call=None, thought_signature=None):
    return SimpleNamespace(
        text=text,
        thought=thought,
        function_call=function_call,
        thought_signature=thought_signature,
    )


def _chunk(parts, *, finish_reason="STOP", text=""):
    return SimpleNamespace(
        text=text,
        usage_metadata=None,
        candidates=[
            SimpleNamespace(
                finish_reason=SimpleNamespace(value=finish_reason),
                content=SimpleNamespace(parts=parts),
            )
        ],
    )


class FakeModels:
    def __init__(self, chunks):
        self._chunks = chunks

    def generate_content_stream(self, **kwargs):
        return iter(self._chunks)


class FakeAsyncModels:
    def __init__(self, chunks):
        self._chunks = chunks

    async def generate_content_stream(self, **kwargs):
        async def iterator():
            for chunk in self._chunks:
                yield chunk

        return iterator()


class FakeGoogleGenAIClient:
    def __init__(self, chunks):
        self.models = FakeModels(chunks)
        self.aio = SimpleNamespace(models=FakeAsyncModels(chunks))


def _make_client(chunks):
    client = object.__new__(GoogleClient)
    client.model_name = "gemini-test"
    client.system_prompt = ""
    client.temperature = None
    client.memory_adapter = GoogleMemoryAdapter()
    client.client = FakeGoogleGenAIClient(chunks)
    return client


@tool
def get_weather(location: str, when: str) -> str:
    return "25 C"


def test_stream_invoke_preserves_function_calls():
    client = _make_client(
        [
            _chunk(
                [
                    _part(
                        function_call=SimpleNamespace(
                            name="get_weather",
                            args={"location": "Milan", "when": "tomorrow"},
                        )
                    )
                ]
            )
        ]
    )

    responses = list(
        client._stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    final_response = responses[-1]
    assert len(final_response.function_calls) == 1
    assert isinstance(final_response.function_calls[0], FunctionCallBlock)
    assert final_response.function_calls[0].name == "get_weather"
    assert final_response.function_calls[0].arguments == {
        "location": "Milan",
        "when": "tomorrow",
    }


def test_stream_invoke_uses_part_text_without_duplication():
    client = _make_client(
        [
            _chunk([_part(text="Hel")], text="Hel"),
            _chunk([_part(text="lo")], text="lo"),
        ]
    )

    responses = list(
        client._stream_invoke(
            input="hello",
            tools=[],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    assert [response.delta for response in responses[:-1]] == ["Hel", "lo"]
    assert responses[-1].text == "Hello"


def test_a_stream_invoke_preserves_function_calls():
    client = _make_client(
        [
            _chunk(
                [
                    _part(
                        function_call=SimpleNamespace(
                            name="get_weather",
                            args={"location": "Milan", "when": "tomorrow"},
                        )
                    )
                ]
            )
        ]
    )

    async def collect():
        items = []
        async for response in client._a_stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        ):
            items.append(response)
        return items

    responses = asyncio.run(collect())
    final_response = responses[-1]
    assert len(final_response.function_calls) == 1
    assert final_response.function_calls[0].name == "get_weather"

```

## /datapizza-ai-clients/datapizza-ai-clients-google/tests/test_google_token_usage.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/tests/test_google_token_usage.py" 
from types import SimpleNamespace

from datapizza.clients.google.google_client import GoogleClient


def test_google_token_usage_maps_thinking_tokens():
    client = object.__new__(GoogleClient)
    usage_metadata = SimpleNamespace(
        prompt_token_count=10,
        candidates_token_count=20,
        cached_content_token_count=30,
        thoughts_token_count=40,
    )

    usage = client._token_usage_from_metadata(usage_metadata)

    assert usage.prompt_tokens == 10
    assert usage.completion_tokens == 20
    assert usage.cached_tokens == 30
    assert usage.thinking_tokens == 40


def test_google_token_usage_uses_response_token_count_fallback():
    client = object.__new__(GoogleClient)
    usage_metadata = SimpleNamespace(
        prompt_token_count=10,
        response_token_count=25,
        cached_content_token_count=30,
    )

    usage = client._token_usage_from_metadata(usage_metadata)

    assert usage.completion_tokens == 25
    assert usage.thinking_tokens == 0

```

## /datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-google/tests/test_memory_adapter.py" 
import pytest
from datapizza.memory.memory import Memory
from datapizza.type import ROLE, StructuredBlock, TextBlock

from datapizza.clients.google.memory_adapter import GoogleMemoryAdapter


def test_google_memory_to_messages_structured_block():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
    messages = GoogleMemoryAdapter().memory_to_messages(memory)
    # Google adapter may serialize as string or dict in "parts"
    assert "key" in str(messages[0]["parts"][0])


def test_google_memory_to_messages_with_system_prompt():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    system_prompt = "You are a helpful assistant."
    messages = GoogleMemoryAdapter().memory_to_messages(
        memory, system_prompt=system_prompt
    )
    assert messages[0]["role"] == "model"
    assert system_prompt in str(messages[0]["parts"])
    assert messages[1]["role"] == "user"


def test_google_memory_to_messages_with_input_str():
    memory = Memory()
    input_str = "What is the weather?"
    messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_str)
    assert messages[-1]["role"] == "user"
    assert input_str in str(messages[-1]["parts"])


def test_google_memory_to_messages_with_input_block():
    memory = Memory()
    input_block = TextBlock(content="This is a block input.")
    messages = GoogleMemoryAdapter().memory_to_messages(memory, input=input_block)
    assert messages[-1]["role"] == "user"
    assert "block input" in str(messages[-1]["parts"])


def test_google_memory_to_messages_unsupported_input():
    memory = Memory()

    class Dummy:
        pass

    with pytest.raises(ValueError):
        GoogleMemoryAdapter().memory_to_messages(memory, input=Dummy())

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/README.md



## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/__init__.py" 
from datapizza.clients.mistral.mistral_client import MistralClient

__all__ = ["MistralClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class MistralMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        tool_calls = []
        tool_call_id = None

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {"type": "text", "text": block.content}
                case FunctionCallBlock():
                    tool_calls.append(
                        {
                            "id": block.id,
                            "function": {
                                "name": block.name,
                                "arguments": json.dumps(block.arguments),
                            },
                            "type": "function",
                        }
                    )
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    block_dict = {"type": "text", "text": block.result}
                case StructuredBlock():
                    block_dict = {"type": "text", "text": str(block.content)}
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        # case "pdf":
                        #    block_dict = self._process_pdf_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}, only image are supported"
                            )

            if block_dict:
                content.append(block_dict)

        messages: dict = {
            "role": turn.role.value,
        }

        if content:
            messages["content"] = content

        if tool_calls:
            messages["tool_calls"] = tool_calls

        if tool_call_id:
            messages["tool_call_id"] = tool_call_id

        return messages

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image_url",
                    "image_url": {"url": block.media.source},
                }
            case "base64":
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{block.media.source}"
                    },
                }
            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{base64_image}"
                    },
                }
            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}, only url, base64, path are supported"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/datapizza/clients/mistral/mistral_client.py" 
import json
import logging
import os
from collections.abc import AsyncIterator, Iterator
from typing import Any, Literal

import requests
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Media,
    MediaBlock,
    Model,
    StructuredBlock,
    TextBlock,
)
from mistralai import Mistral
from mistralai.models.ocrresponse import OCRResponse

from .memory_adapter import MistralMemoryAdapter

log = logging.getLogger(__name__)


class MistralClient(Client):
    """A client for interacting with the Mistral API.

    This class provides methods for invoking the Mistral API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "mistral-large-latest",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
    ):
        """
        Args:
            api_key: The API key for the Mistral API.
            model: The model to use for the Mistral API.
            system_prompt: The system prompt to use for the Mistral API.
            temperature: The temperature to use for the Mistral API.
            cache: The cache to use for the Mistral API.
        """
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.api_key = api_key
        self.memory_adapter = MistralMemoryAdapter()
        self._set_client()

    def _set_client(self):
        self.client = Mistral(api_key=self.api_key)

    def _token_usage_from_metadata(self, usage_metadata: Any | None) -> TokenUsage:
        if not usage_metadata:
            return TokenUsage()

        return TokenUsage(
            prompt_tokens=getattr(usage_metadata, "prompt_tokens", 0) or 0,
            completion_tokens=getattr(usage_metadata, "completion_tokens", 0) or 0,
            cached_tokens=getattr(usage_metadata, "cached_tokens", 0) or 0,
        )

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None = None
    ) -> ClientResponse:
        blocks = []
        for choice in response.choices:
            if choice.message.content:
                blocks.append(TextBlock(content=choice.message.content))

            if choice.message.tool_calls:
                for tool_call in choice.message.tool_calls:
                    tool = tool_map.get(tool_call.function.name) if tool_map else None

                    if tool is None:
                        raise ValueError(f"Tool {tool_call.function.name} not found")

                    blocks.append(
                        FunctionCallBlock(
                            id=tool_call.id,
                            name=tool_call.function.name,
                            arguments=json.loads(tool_call.function.arguments),
                            tool=tool,
                        )
                    )

            # Handle media content if present
            if hasattr(choice.message, "media") and choice.message.media:
                for media_item in choice.message.media:
                    media = Media(
                        media_type=media_item.type,
                        source_type="url" if media_item.source_url else "base64",
                        source=media_item.source_url or media_item.data,
                        detail=getattr(media_item, "detail", "high"),
                    )
                    blocks.append(MediaBlock(media=media))

        log.debug(f"{self.__class__.__name__} response = {response}")
        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=blocks,
            stop_reason=response.choices[0].finish_reason,
            usage=token_usage,
        )

    def _convert_tools(self, tools: Tool) -> dict:
        """Convert tools to Mistral function format"""
        return {"type": "function", "function": tools.schema}

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by Mistral"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "function",
                "function": {"name": tool_choice[0]},
            }
        else:
            return tool_choice

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        log.debug(f"{self.__class__.__name__} input = {input}")
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.complete(**request_params)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        log.debug(f"{self.__class__.__name__} input = {input}")
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        request_params = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.complete_async(**request_params)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.stream(**request_params)
        text = ""
        usage = TokenUsage()
        stop_reason = None
        for chunk in response:
            token_usage = self._token_usage_from_metadata(
                getattr(chunk.data, "usage", None)
            )
            usage += token_usage
            stop_reason = chunk.data.choices[0].finish_reason
            delta_content = chunk.data.choices[0].delta.content
            delta = str(delta_content or "")
            text += delta
            yield ClientResponse(
                content=[],
                delta=delta,
                stop_reason=stop_reason,
            )

        yield ClientResponse(
            content=[TextBlock(content=text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        request_params = {
            "model": self.model_name,
            "messages": messages,
            "max_tokens": max_tokens or 1024,
            **kwargs,
        }

        if temperature:
            request_params["temperature"] = temperature

        if tools:
            request_params["tools"] = [self._convert_tools(tool) for tool in tools]
            request_params["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.stream_async(**request_params)
        text = ""
        usage = TokenUsage()
        stop_reason = None
        async for chunk in response:
            token_usage = self._token_usage_from_metadata(
                getattr(chunk.data, "usage", None)
            )
            usage += token_usage
            stop_reason = chunk.data.choices[0].finish_reason
            delta_content = chunk.data.choices[0].delta.content
            delta = str(delta_content or "")
            text += delta
            yield ClientResponse(
                content=[],
                delta=delta,
                stop_reason=stop_reason,
            )

        yield ClientResponse(
            content=[TextBlock(content=text)],
            stop_reason=stop_reason,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        if not tools:
            tools = []

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.parse(
            model=self.model_name,
            messages=messages,
            response_format=output_cls,
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs,
        )

        if not response.choices:
            raise ValueError("No response from Mistral")

        log.debug(f"{self.__class__.__name__} structured response: {response}")
        stop_reason = response.choices[0].finish_reason if response.choices else None
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                str(response.choices[0].message.content)  # type: ignore
            )
        else:
            structured_data = json.loads(str(response.choices[0].message.content))  # type: ignore
        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=token_usage,
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        if not tools:
            tools = []

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = await self.client.chat.parse_async(
            model=self.model_name,
            messages=messages,
            response_format=output_cls,
            temperature=temperature,
            max_tokens=max_tokens,
            **kwargs,
        )

        if not response.choices:
            raise ValueError("No response from Mistral")

        log.debug(f"{self.__class__.__name__} structured response: {response}")
        stop_reason = response.choices[0].finish_reason if response.choices else None
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                str(response.choices[0].message.content)  # type: ignore
            )
        else:
            structured_data = json.loads(str(response.choices[0].message.content))  # type: ignore
        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=token_usage,
        )

    def _embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""
        response = self.client.embeddings.create(
            inputs=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [
            item.embedding for item in response.data if item.embedding is not None
        ]

        if not embeddings:
            return []

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    async def _a_embed(
        self, text: str | list[str], model_name: str | None, **kwargs
    ) -> list[float] | list[list[float]]:
        """Embed a text using the model"""
        response = await self.client.embeddings.create_async(
            inputs=text, model=model_name or self.model_name, **kwargs
        )

        embeddings = [
            item.embedding for item in response.data if item.embedding is not None
        ]

        if not embeddings:
            return []

        if isinstance(text, str) and embeddings[0]:
            return embeddings[0]

        return embeddings

    def parse_document(
        self,
        document_path: str,
        autodelete: bool = True,
        include_image_base64: bool = True,
    ) -> OCRResponse:
        filename = os.path.basename(document_path)
        with open(document_path, "rb") as f:
            uploaded_pdf = self.client.files.upload(
                file={"file_name": filename, "content": f}, purpose="ocr"
            )

        signed_url = self.client.files.get_signed_url(file_id=uploaded_pdf.id)

        response = self.client.ocr.process(
            model="mistral-ocr-latest",
            document={
                "type": "document_url",
                "document_url": signed_url.url,
            },
            include_image_base64=include_image_base64,
        )

        if autodelete:
            url = f"https://api.mistral.ai/v1/files/{uploaded_pdf.id}"
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {self.api_key}",
            }

            requests.delete(url, headers=headers, timeout=30)

        return response

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-mistral/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-mistral"
version = "0.0.6"
description = "Mistral AI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "mistralai>=1.2.0,<2.0.0",
    "requests>=2.25.0,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-mistral/tests/test_mistral_client.py" 
from datapizza.clients.mistral import MistralClient


def test_init_mistral_client():
    client = MistralClient(api_key="test")
    assert client is not None

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/README.md

# DataPizza AI - OpenAI-Like Client

A versatile client for DataPizza AI that supports OpenAI-compatible APIs, including local models through Ollama, Together AI, and other OpenAI-compatible services.

## Installation

```bash
pip install datapizza-ai-clients-openai-like
```

## Quick Start

### With Ollama (Local Models)

```python
from datapizza.clients.openai_like import OpenAILikeClient

# Create client for Ollama
client = OpenAILikeClient(
    api_key="",  # Ollama doesn't require an API key
    model="gemma2:2b",
    system_prompt="You are a helpful assistant.",
    base_url="http://localhost:11434/v1",
)

response = client.invoke("What is the capital of France?")
print(response.content)
```

### With Together AI

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("TOGETHER_API_KEY"),
    model="meta-llama/Llama-2-7b-chat-hf",
    system_prompt="You are a helpful assistant.",
    base_url="https://api.together.xyz/v1",
)

response = client.invoke("Explain quantum computing")
print(response.content)
```

### With OpenRouter

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("OPENROUTER_API_KEY"),
    model="google/gemma-7b-it",
    system_prompt="You are a helpful assistant.",
    base_url="https://openrouter.ai/api/v1",
)

response = client.invoke("What is OpenRouter?")
print(response.content)
```

### With Other OpenAI-Compatible Services

```python
import os
from datapizza.clients.openai_like import OpenAILikeClient

client = OpenAILikeClient(
    api_key=os.getenv("YOUR_API_KEY"),
    model="your-model-name",
    system_prompt="You are a helpful assistant.",
    base_url="https://your-service-url/v1",
)

response = client.invoke("Your question here")
print(response.content)
```

## Features

- **OpenAI-Compatible**: Works with any service that implements the OpenAI API standard
- **Local Models**: Perfect for running with Ollama for privacy and cost control
- **Memory Support**: Built-in memory adapter for conversation history
- **Streaming**: Support for real-time streaming responses
- **Structured Outputs**: Generate structured data with Pydantic models
- **Tool Calling**: Function calling capabilities where supported

## Supported Services

- **Ollama** - Local model inference
- **Together AI** - Cloud-based model hosting
- **OpenRouter** - Access a variety of models through a single API
- **Perplexity AI** - Search-augmented models
- **Groq** - Fast inference API
- **Any OpenAI-compatible API**

## Advanced Usage

### With Memory

```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.memory import Memory

client = OpenAILikeClient(
    api_key="",
    model="llama3.1:8b",
    base_url="http://localhost:11434/v1",
)

memory = Memory(client=client)
memory.add("I'm working on a Python project about machine learning.")
response = memory.query("What libraries should I use?")
```

### Streaming Responses

```python
client = OpenAILikeClient(
    api_key="",
    model="gemma2:7b",
    base_url="http://localhost:11434/v1",
)

for chunk in client.stream("Tell me a story about AI"):
    print(chunk.content, end="", flush=True)
```

### Structured Outputs

```python
from pydantic import BaseModel
from datapizza.clients.openai_like import OpenAILikeClient

class Person(BaseModel):
    name: str
    age: int
    occupation: str

client = OpenAILikeClient(
    api_key="",
    model="llama3.1:8b",
    base_url="http://localhost:11434/v1",
)

response = client.invoke(
    "Generate a person profile",
    response_format=Person
)
print(response.parsed)  # Person object
```

## Configuration Options

| Parameter | Description | Default |
|-----------|-------------|---------|
| `api_key` | API key for the service | Required (empty string for Ollama) |
| `model` | Model name to use | Required |
| `base_url` | Base URL for the API | Required |
| `system_prompt` | System message for the model | None |
| `temperature` | Sampling temperature (0-2) | 0.7 |
| `max_tokens` | Maximum tokens in response | None |
| `timeout` | Request timeout in seconds | 30 |

## Ollama Setup

1. Install Ollama from [ollama.ai](https://ollama.ai)
2. Pull a model: `ollama pull gemma2:2b`
3. Start Ollama: `ollama serve`
4. Use with DataPizza AI as shown in the examples above

## Popular Ollama Models

- `gemma2:2b` - Lightweight, fast responses
- `gemma2:7b` - Balanced performance
- `llama3.1:8b` - High quality, more resource intensive
- `codellama:7b` - Specialized for coding tasks
- `mistral:7b` - Good general purpose model

## Error Handling

```python
from datapizza.clients.openai_like import OpenAILikeClient
from datapizza.core.clients.exceptions import ClientError

try:
    client = OpenAILikeClient(
        api_key="",
        model="nonexistent-model",
        base_url="http://localhost:11434/v1",
    )
    response = client.invoke("Hello")
except ClientError as e:
    print(f"Client error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
```

## Contributing

Contributions are welcome! Please see our [Contributing Guide](../../CONTRIBUTING.md) for details.

## License

This project is licensed under the MIT License - see the [LICENSE](../../LICENSE) file for details.


## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/__init__.py" 
from .openai_completion_client import OpenAILikeClient

__all__ = ["OpenAILikeClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)


class OpenAILikeMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content_blocks: list[dict] = []
        tool_calls = []
        tool_call_id = None
        text_parts: list[str] = []

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    text_parts.append(block.content)
                case StructuredBlock():
                    text_parts.append(str(block.content))
                case FunctionCallBlock():
                    tool_calls.append(
                        {
                            "id": block.id,
                            "function": {
                                "name": block.name,
                                "arguments": json.dumps(block.arguments),
                            },
                            "type": "function",
                        }
                    )
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    text_parts.append(block.result)
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)
                        case "audio":
                            block_dict = self._process_audio_block(block)
                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            if block_dict:
                content_blocks.append(block_dict)

        text_content = "".join(text_parts) if text_parts else None

        if content_blocks:
            if text_content:
                content_blocks.insert(0, {"type": "text", "text": text_content})
            content_value = content_blocks
        else:
            content_value = text_content if text_content is not None else ""

        message = {"role": turn.role.value, "content": content_value}

        if tool_calls:
            message["tool_calls"] = tool_calls
        if tool_call_id:
            message["tool_call_id"] = tool_call_id

        return message

    def _process_audio_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_audio = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case "base_64":
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": block.media.source,
                        "format": block.media.extension,
                    },
                }
            case "raw":
                base64_audio = base64.b64encode(block.media.source).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "base64":
                return {
                    "type": "file",
                    "file": {
                        "filename": "file.pdf",
                        "file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
                    },
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "file",
                    "file": {
                        "filename": "file.pdf",
                        "file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "image_url",
                    "image_url": {"url": block.media.source},
                }

            case "base64":
                return {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/{block.media.extension};base64,{block.media.source}"
                    },
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                    return {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/{block.media.extension};base64,{base64_image}"
                        },
                    }

            case _:
                raise ValueError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/datapizza/clients/openai_like/openai_completion_client.py" 
import json
from collections.abc import AsyncIterator, Iterator
from typing import Literal

import httpx
from datapizza.core.cache import Cache
from datapizza.core.clients import Client, ClientResponse
from datapizza.core.clients.models import TokenUsage
from datapizza.memory import Memory
from datapizza.tools.tools import Tool
from datapizza.type import (
    FunctionCallBlock,
    Media,
    MediaBlock,
    Model,
    StructuredBlock,
    TextBlock,
)
from openai import AsyncOpenAI, OpenAI

from .memory_adapter import OpenAILikeMemoryAdapter


class OpenAILikeClient(Client):
    """A client for interacting with the OpenAI API.

    This class provides methods for invoking the OpenAI API to generate responses
    based on given input data. It extends the Client class.
    """

    def __init__(
        self,
        api_key: str,
        model: str = "gpt-4o-mini",
        system_prompt: str = "",
        temperature: float | None = None,
        cache: Cache | None = None,
        base_url: str | httpx.URL | None = None,
    ):
        if temperature and not 0 <= temperature <= 2:
            raise ValueError("Temperature must be between 0 and 2")

        super().__init__(
            model_name=model,
            system_prompt=system_prompt,
            temperature=temperature,
            cache=cache,
        )

        self.base_url = base_url
        self.api_key = api_key
        self.memory_adapter = OpenAILikeMemoryAdapter()
        self._set_client()

    def _set_client(self):
        if not self.client:
            self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)

    def _set_a_client(self):
        if not self.a_client:
            self.a_client = AsyncOpenAI(api_key=self.api_key, base_url=self.base_url)

    def _token_usage_from_metadata(self, usage_metadata) -> TokenUsage:
        if not usage_metadata:
            return TokenUsage()

        return TokenUsage(
            prompt_tokens=getattr(usage_metadata, "prompt_tokens", 0) or 0,
            completion_tokens=getattr(usage_metadata, "completion_tokens", 0) or 0,
            cached_tokens=getattr(
                getattr(usage_metadata, "prompt_tokens_details", None),
                "cached_tokens",
                0,
            )
            or 0,
        )

    def _response_to_client_response(
        self, response, tool_map: dict[str, Tool] | None
    ) -> ClientResponse:
        blocks = []
        for choice in response.choices:
            if choice.message.content:
                blocks.append(TextBlock(content=choice.message.content))

            if choice.message.tool_calls and tool_map:
                for tool_call in choice.message.tool_calls:
                    tool = tool_map.get(tool_call.function.name)

                    if not tool:
                        raise ValueError(f"Tool {tool_call.function.name} not found")

                    blocks.append(
                        FunctionCallBlock(
                            id=tool_call.id,
                            name=tool_call.function.name,
                            arguments=json.loads(tool_call.function.arguments),
                            tool=tool,
                        )
                    )

            # Handle media content if present
            if hasattr(choice.message, "media") and choice.message.media:
                for media_item in choice.message.media:
                    media = Media(
                        media_type=media_item.type,
                        source_type="url" if media_item.source_url else "base64",
                        source=media_item.source_url or media_item.data,
                        detail=getattr(media_item, "detail", "high"),
                    )
                    blocks.append(MediaBlock(media=media))

        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=blocks,
            stop_reason=response.choices[0].finish_reason,
            usage=token_usage,
        )

    def _convert_tools(self, tools: Tool) -> dict:
        """Convert tools to OpenAI function format"""
        return {"type": "function", "function": tools.schema}

    def _convert_tool_choice(
        self, tool_choice: Literal["auto", "required", "none"] | list[str]
    ) -> dict | Literal["auto", "required", "none"]:
        if isinstance(tool_choice, list) and len(tool_choice) > 1:
            raise NotImplementedError(
                "multiple function names is not supported by OpenAI"
            )
        elif isinstance(tool_choice, list):
            return {
                "type": "function",
                "function": {"name": tool_choice[0]},
            }
        else:
            return tool_choice

    def _accumulate_stream_delta(
        self,
        delta,
        streamed_tool_calls: dict[int | str, dict[str, str]],
    ) -> str:
        if delta is None:
            return ""

        if getattr(delta, "tool_calls", None):
            for tool_call_delta in delta.tool_calls:
                key = getattr(tool_call_delta, "index", None)
                if key is None:
                    key = getattr(tool_call_delta, "id", None) or 0

                entry = streamed_tool_calls.setdefault(
                    key,
                    {"id": "", "name": "", "arguments": ""},
                )

                tool_call_id = getattr(tool_call_delta, "id", None)
                if tool_call_id:
                    entry["id"] = tool_call_id

                function = getattr(tool_call_delta, "function", None)
                if function is None:
                    continue

                function_name = getattr(function, "name", None)
                if function_name:
                    entry["name"] = function_name

                function_arguments = getattr(function, "arguments", None)
                if function_arguments:
                    entry["arguments"] += function_arguments

        return delta.content if getattr(delta, "content", None) else ""

    def _build_streamed_tool_call_blocks(
        self,
        streamed_tool_calls: dict[int | str, dict[str, str]],
        tool_map: dict[str, Tool],
    ) -> list[FunctionCallBlock]:
        blocks: list[FunctionCallBlock] = []

        for _, streamed_tool_call in sorted(
            streamed_tool_calls.items(), key=lambda item: str(item[0])
        ):
            function_name = streamed_tool_call["name"]
            if not function_name:
                continue

            tool = tool_map.get(function_name)
            if not tool:
                raise ValueError(f"Tool {function_name} not found")

            raw_arguments = streamed_tool_call["arguments"] or "{}"
            blocks.append(
                FunctionCallBlock(
                    id=streamed_tool_call["id"] or function_name,
                    name=function_name,
                    arguments=json.loads(raw_arguments),
                    tool=tool,
                )
            )

        return blocks

    def _build_streamed_content(
        self,
        message_content: str,
        streamed_tool_calls: dict[int | str, dict[str, str]],
        tool_map: dict[str, Tool],
    ) -> list[TextBlock | FunctionCallBlock]:
        content: list[TextBlock | FunctionCallBlock] = []

        if message_content:
            content.append(TextBlock(content=message_content))

        content.extend(
            self._build_streamed_tool_call_blocks(streamed_tool_calls, tool_map)
        )
        return content

    def _invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        client: OpenAI = self._get_client()
        response = client.chat.completions.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    async def _a_invoke(
        self,
        *,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> ClientResponse:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)

        tool_map = {tool.name: tool for tool in tools}

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": False,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()
        response = await a_client.chat.completions.create(**kwargs)
        return self._response_to_client_response(response, tool_map)

    def _stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None,
        memory: Memory | None,
        tool_choice: Literal["auto", "required", "none"] | list[str],
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        **kwargs,
    ) -> Iterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        tool_map = {tool.name: tool for tool in tools}
        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_completion_tokens": max_tokens,
            "stream_options": {"include_usage": True},
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        response = self.client.chat.completions.create(**kwargs)
        message_content = ""
        usage = TokenUsage()
        finish_reason = None
        streamed_tool_calls: dict[int | str, dict[str, str]] = {}

        for chunk in response:
            usage_metadata = getattr(chunk, "usage", None)
            token_usage = self._token_usage_from_metadata(usage_metadata)
            usage += token_usage

            delta = None
            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

            delta_content = self._accumulate_stream_delta(delta, streamed_tool_calls)
            message_content = message_content + delta_content

            if delta_content:
                yield ClientResponse(
                    content=[TextBlock(content=message_content)],
                    delta=delta_content,
                    stop_reason=finish_reason or None,
                )
        yield ClientResponse(
            content=self._build_streamed_content(
                message_content, streamed_tool_calls, tool_map
            ),
            stop_reason=finish_reason or None,
            usage=usage,
        )

    async def _a_stream_invoke(
        self,
        input: str,
        tools: list[Tool] | None = None,
        memory: Memory | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        temperature: float | None = None,
        max_tokens: int | None = None,
        system_prompt: str | None = None,
        **kwargs,
    ) -> AsyncIterator[ClientResponse]:
        if tools is None:
            tools = []
        messages = self._memory_to_contents(system_prompt, input, memory)
        tool_map = {tool.name: tool for tool in tools}
        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "stream": True,
            "max_completion_tokens": max_tokens,
            "stream_options": {"include_usage": True},
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)

        a_client = self._get_a_client()
        message_content = ""
        usage = TokenUsage()
        finish_reason = None
        streamed_tool_calls: dict[int | str, dict[str, str]] = {}

        async for chunk in await a_client.chat.completions.create(**kwargs):
            usage_metadata = getattr(chunk, "usage", None)
            token_usage = self._token_usage_from_metadata(usage_metadata)
            usage += token_usage

            delta = None
            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

            delta_content = self._accumulate_stream_delta(delta, streamed_tool_calls)
            message_content = message_content + delta_content

            if delta_content:
                yield ClientResponse(
                    content=[TextBlock(content=message_content)],
                    delta=delta_content,
                    stop_reason=finish_reason or None,
                )
        yield ClientResponse(
            content=self._build_streamed_content(
                message_content, streamed_tool_calls, tool_map
            ),
            stop_reason=finish_reason or None,
            usage=usage,
        )

    def _structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float | None,
        max_tokens: int,
        system_prompt: str | None,
        tools: list[Tool] | None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ) -> ClientResponse:
        # Add system message to enforce JSON output
        messages = self._memory_to_contents(system_prompt, input, memory)

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "response_format": output_cls,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["function"]["strict"] = True
                tool["function"]["parameters"]["additionalProperties"] = False

        response = self.client.beta.chat.completions.parse(**kwargs)

        stop_reason = response.choices[0].finish_reason

        if not response.choices[0].message.content:
            raise ValueError("No content in response")

        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                response.choices[0].message.content
            )
        else:
            structured_data = json.loads(response.choices[0].message.content)
        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=token_usage,
        )

    async def _a_structured_response(
        self,
        input: str,
        output_cls: type[Model],
        memory: Memory | None,
        temperature: float,
        max_tokens: int,
        system_prompt: str | None = None,
        tools: list[Tool] | None = None,
        tool_choice: Literal["auto", "required", "none"] | list[str] = "auto",
        **kwargs,
    ):
        messages = self._memory_to_contents(system_prompt, input, memory)

        kwargs = {
            "model": self.model_name,
            "messages": messages,
            "response_format": output_cls,
            "max_completion_tokens": max_tokens,
            **kwargs,
        }
        if temperature:
            kwargs["temperature"] = temperature

        if tools:
            kwargs["tools"] = [self._convert_tools(tool) for tool in tools]
            kwargs["tool_choice"] = self._convert_tool_choice(tool_choice)
            # Structured response needs strict mode and no additional properties
            for tool in kwargs["tools"]:
                tool["function"]["strict"] = True
                tool["function"]["parameters"]["additionalProperties"] = False

        a_client = self._get_a_client()
        response = await a_client.beta.chat.completions.parse(**kwargs)

        stop_reason = response.choices[0].finish_reason
        if hasattr(output_cls, "model_validate_json"):
            structured_data = output_cls.model_validate_json(
                response.choices[0].message.content
            )
        else:
            structured_data = json.loads(response.choices[0].message.content)
        usage_metadata = getattr(response, "usage", None)
        token_usage = self._token_usage_from_metadata(usage_metadata)
        return ClientResponse(
            content=[StructuredBlock(content=structured_data)],
            stop_reason=stop_reason,
            usage=token_usage,
        )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-openai-like"
version = "0.0.11"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}
authors = [
    {name = "Datapizza", email = "datapizza@datapizza.tech"}
]
requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "httpx>=0.28.1",
    "openai>=2.0.0,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai-like/tests/test_openai_completion.py" 
import asyncio
from types import SimpleNamespace

from datapizza.tools import tool
from datapizza.type import FunctionCallBlock

from datapizza.clients.openai_like import OpenAILikeClient


def _chunk(*, content=None, finish_reason=None, tool_calls=None, usage=None):
    return SimpleNamespace(
        choices=[
            SimpleNamespace(
                delta=SimpleNamespace(content=content, tool_calls=tool_calls),
                finish_reason=finish_reason,
            )
        ],
        usage=usage,
    )


def _usage_chunk():
    return SimpleNamespace(choices=[], usage=None)


class FakeStreamClient:
    def __init__(self, chunks):
        self._chunks = chunks
        self.chat = SimpleNamespace(
            completions=SimpleNamespace(create=lambda **kwargs: iter(self._chunks))
        )


class FakeAsyncStreamClient:
    def __init__(self, chunks):
        self._chunks = chunks
        self.chat = SimpleNamespace(completions=SimpleNamespace(create=self.create))

    async def create(self, **kwargs):
        async def iterator():
            for chunk in self._chunks:
                yield chunk

        return iterator()


@tool
def get_weather(location: str, when: str) -> str:
    return "25 C"


def test_init():
    client = OpenAILikeClient(
        api_key="test_api_key",
        model="gpt-4o-mini",
        system_prompt="You are a helpful assistant that can answer questions about piadina only in italian.",
    )
    assert client is not None


def test_stream_invoke_reconstructs_tool_calls():
    client = OpenAILikeClient(api_key="test", model="test-model")
    client.client = FakeStreamClient(
        [
            _chunk(
                tool_calls=[
                    SimpleNamespace(
                        index=0,
                        id="call_1",
                        function=SimpleNamespace(
                            name="get_weather",
                            arguments='{"location":"Mi',
                        ),
                    )
                ]
            ),
            _chunk(
                tool_calls=[
                    SimpleNamespace(
                        index=0,
                        id=None,
                        function=SimpleNamespace(
                            name=None,
                            arguments='lan","when":"tomorrow"}',
                        ),
                    )
                ]
            ),
            _chunk(finish_reason="tool_calls"),
        ]
    )

    responses = list(
        client.stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    final_response = responses[-1]
    function_calls = final_response.function_calls
    assert len(function_calls) == 1
    assert isinstance(function_calls[0], FunctionCallBlock)
    assert function_calls[0].name == "get_weather"
    assert function_calls[0].arguments == {
        "location": "Milan",
        "when": "tomorrow",
    }
    assert final_response.stop_reason == "tool_calls"


def test_stream_invoke_ignores_usage_only_chunks_without_reusing_stale_delta():
    client = OpenAILikeClient(api_key="test", model="test-model")
    client.client = FakeStreamClient(
        [
            _chunk(content="Hel"),
            _usage_chunk(),
            _chunk(content="lo", finish_reason="stop"),
        ]
    )

    responses = list(
        client.stream_invoke(
            input="hello",
            tools=[],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        )
    )

    assert [response.delta for response in responses[:-1]] == ["Hel", "lo"]
    assert responses[-1].text == "Hello"


def test_a_stream_invoke_reconstructs_tool_calls():
    client = OpenAILikeClient(api_key="test", model="test-model")
    client.a_client = FakeAsyncStreamClient(
        [
            _chunk(
                tool_calls=[
                    SimpleNamespace(
                        index=0,
                        id="call_1",
                        function=SimpleNamespace(
                            name="get_weather",
                            arguments='{"location":"Mi',
                        ),
                    )
                ]
            ),
            _chunk(
                tool_calls=[
                    SimpleNamespace(
                        index=0,
                        id=None,
                        function=SimpleNamespace(
                            name=None,
                            arguments='lan","when":"tomorrow"}',
                        ),
                    )
                ]
            ),
            _chunk(finish_reason="tool_calls"),
        ]
    )

    async def collect():
        items = []
        async for response in client.a_stream_invoke(
            input="weather?",
            tools=[get_weather],
            memory=None,
            tool_choice="auto",
            temperature=None,
            max_tokens=256,
            system_prompt=None,
        ):
            items.append(response)
        return items

    responses = asyncio.run(collect())
    final_response = responses[-1]
    function_calls = final_response.function_calls
    assert len(function_calls) == 1
    assert isinstance(function_calls[0], FunctionCallBlock)
    assert function_calls[0].name == "get_weather"
    assert function_calls[0].arguments == {
        "location": "Milan",
        "when": "tomorrow",
    }
    assert final_response.stop_reason == "tool_calls"

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/README.md



## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/__init__.py" 
from .openai_client import OpenAIClient

__all__ = ["OpenAIClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/datapizza/clients/openai/memory_adapter.py" 
import base64
import json

from datapizza.memory.memory import Turn
from datapizza.memory.memory_adapter import MemoryAdapter
from datapizza.type import (
    ROLE,
    FunctionCallBlock,
    FunctionCallResultBlock,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)

from openai.types.responses import ResponseFunctionToolCall


class OpenAIMemoryAdapter(MemoryAdapter):
    def _turn_to_message(self, turn: Turn) -> dict:
        content = []
        tool_calls = []
        tool_call_id = None
        response_function_tool_call = None

        for block in turn:
            block_dict = {}

            match block:
                case TextBlock():
                    block_dict = {
                        "type": "input_text"
                        if turn.role == ROLE.USER
                        else "output_text",
                        "text": block.content,
                    }
                case FunctionCallBlock():
                    return ResponseFunctionToolCall(
                        call_id=block.id,
                        name=block.name,
                        arguments=json.dumps(block.arguments),
                        type="function_call",
                        # id="fc_" + block.id,
                        status="completed",
                    )
                    # block_dict = {
                    #    "id": block.id,
                    #    "name": block.name,
                    #    "arguments": json.dumps(block.arguments),
                    #    "type": "function_call",
                    # }
                case FunctionCallResultBlock():
                    tool_call_id = block.id
                    return {
                        "type": "function_call_output",
                        "call_id": block.id,
                        "output": block.result,
                    }

                case StructuredBlock():
                    block_dict = {
                        "type": "input_text"
                        if turn.role == ROLE.USER
                        else "output_text",
                        "text": str(block.content),
                    }
                case MediaBlock():
                    match block.media.media_type:
                        case "image":
                            block_dict = self._process_image_block(block)
                        case "pdf":
                            block_dict = self._process_pdf_block(block)
                        case "audio":
                            block_dict = self._process_audio_block(block)

                        case _:
                            raise NotImplementedError(
                                f"Unsupported media type: {block.media.media_type}"
                            )

            if block_dict:
                content.append(block_dict)

        messages = {
            "role": turn.role.value,
            "content": (content),
        }

        if tool_calls:
            messages["tool_calls"] = tool_calls

        if tool_call_id:
            messages["tool_call_id"] = tool_call_id

        if response_function_tool_call:
            return response_function_tool_call

        return messages

    def _process_audio_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_audio = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case "base_64":
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": block.media.source,
                        "format": block.media.extension,
                    },
                }
            case "raw":
                base64_audio = base64.b64encode(block.media.source).decode("utf-8")
                return {
                    "type": "input_audio",
                    "input_audio": {
                        "data": base64_audio,
                        "format": block.media.extension,
                    },
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type} for audio, source type supported: raw, path"
                )

    def _text_to_message(self, text: str, role: ROLE) -> dict:
        return {"role": role.value, "content": text}

    def _process_pdf_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "base64":
                return {
                    "type": "input_file",
                    "filename": "file.pdf",
                    "file_data": f"data:application/{block.media.extension};base64,{block.media.source}",
                }
            case "path":
                with open(block.media.source, "rb") as f:
                    base64_pdf = base64.b64encode(f.read()).decode("utf-8")
                return {
                    "type": "input_file",
                    "filename": "file.pdf",
                    "file_data": f"data:application/{block.media.extension};base64,{base64_pdf}",
                }

            case _:
                raise NotImplementedError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

    def _process_image_block(self, block: MediaBlock) -> dict:
        match block.media.source_type:
            case "url":
                return {
                    "type": "input_image",
                    "image_url": block.media.source,
                }

            case "base64":
                return {
                    "type": "input_image",
                    "image_url": f"data:image/{block.media.extension};base64,{block.media.source}",
                }

            case "path":
                with open(block.media.source, "rb") as image_file:
                    base64_image = base64.b64encode(image_file.read()).decode("utf-8")
                    return {
                        "type": "input_image",
                        "image_url": f"data:image/{block.media.extension};base64,{base64_image}",
                    }

            case _:
                raise ValueError(
                    f"Unsupported media source type: {block.media.source_type}"
                )

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-openai/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-openai"
version = "0.0.13"
description = "OpenAI client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.10.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "openai>=2,<3.0.0",
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

# Ruff configuration
[tool.ruff]
line-length = 88

[tool.ruff.lint]
select = [
    # "E",   # pycodestyle errors
    "W",   # pycodestyle warnings
    "F",   # pyflakes
    "B",   # flake8-bugbear
    "I",   # isort
    "UP",  # pyupgrade
    "SIM", # flake8-simplify
    "RUF", # Ruff-specific rules
    "C4",  # flake8-comprehensions
]

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/__init__.py" 

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_base_client.py" 
from types import SimpleNamespace
from unittest.mock import MagicMock, patch

import httpx
from datapizza.core.clients import ClientResponse

from datapizza.clients.openai import (
    OpenAIClient,
)


def test_client_init():
    client = OpenAIClient(
        model="gpt-4o-mini",
        api_key="test_api_key",
    )
    assert client is not None


def test_token_usage_maps_reasoning_tokens():
    client = object.__new__(OpenAIClient)
    usage_metadata = SimpleNamespace(
        input_tokens=41,
        output_tokens=456,
        input_tokens_details=SimpleNamespace(cached_tokens=7),
        output_tokens_details=SimpleNamespace(reasoning_tokens=192),
    )

    usage = client._token_usage_from_metadata(usage_metadata)

    assert usage.prompt_tokens == 41
    assert usage.completion_tokens == 456
    assert usage.cached_tokens == 7
    assert usage.thinking_tokens == 192


def test_token_usage_defaults_missing_details_to_zero():
    client = object.__new__(OpenAIClient)
    usage_metadata = SimpleNamespace(input_tokens=41, output_tokens=456)

    usage = client._token_usage_from_metadata(usage_metadata)

    assert usage.prompt_tokens == 41
    assert usage.completion_tokens == 456
    assert usage.cached_tokens == 0
    assert usage.thinking_tokens == 0


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_extra_args(mock_openai):
    """Tests that extra arguments are passed to the OpenAI client."""
    OpenAIClient(
        api_key="test_api_key",
        organization="test-org",
        project="test-project",
        timeout=30.0,
        max_retries=3,
    )
    mock_openai.assert_called_once_with(
        api_key="test_api_key",
        base_url=None,
        organization="test-org",
        project="test-project",
        webhook_secret=None,
        websocket_base_url=None,
        timeout=30.0,
        max_retries=3,
        default_headers=None,
        default_query=None,
        http_client=None,
    )


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_client_init_with_http_client(mock_openai):
    """Tests that a custom http_client is passed to the OpenAI client."""
    custom_http_client = httpx.Client()
    OpenAIClient(
        api_key="test_api_key",
        http_client=custom_http_client,
    )
    mock_openai.assert_called_once_with(
        api_key="test_api_key",
        base_url=None,
        organization=None,
        project=None,
        webhook_secret=None,
        websocket_base_url=None,
        timeout=None,
        max_retries=2,
        default_headers=None,
        default_query=None,
        http_client=custom_http_client,
    )


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_invoke_kwargs_override(mock_openai_class):
    """
    Tests that kwargs like 'stream' are not overridden by user input
    in non-streaming methods, but other kwargs are passed through.
    """
    mock_openai_instance = mock_openai_class.return_value
    mock_openai_instance.responses.create.return_value = MagicMock()

    client = OpenAIClient(api_key="test")
    client._response_to_client_response = MagicMock(
        return_value=ClientResponse(content=[])
    )

    client.invoke("hello", stream=True, top_p=0.5)

    mock_openai_instance.responses.create.assert_called_once()
    called_kwargs = mock_openai_instance.responses.create.call_args.kwargs

    assert called_kwargs.get("top_p") == 0.5
    assert called_kwargs.get("stream") is False


@patch("datapizza.clients.openai.openai_client.OpenAI")
def test_stream_invoke_kwargs_override(mock_openai_class):
    """
    Tests that kwargs like 'stream' are not overridden by user input
    in streaming methods.
    """
    mock_openai_instance = mock_openai_class.return_value
    mock_openai_instance.responses.create.return_value = []

    client = OpenAIClient(api_key="test")

    list(client.stream_invoke("hello", stream=False, top_p=0.5))

    mock_openai_instance.responses.create.assert_called_once()
    called_kwargs = mock_openai_instance.responses.create.call_args.kwargs

    assert called_kwargs.get("top_p") == 0.5
    assert called_kwargs.get("stream") is True

```

## /datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-openai/tests/test_memory_adapter.py" 
import json

import pytest
from datapizza.memory.memory import Memory
from datapizza.tools.tools import tool
from datapizza.type.type import (
    ROLE,
    FunctionCallBlock,
    Media,
    MediaBlock,
    StructuredBlock,
    TextBlock,
)
from openai.types.responses import ResponseFunctionToolCall

from datapizza.clients.openai.memory_adapter import OpenAIMemoryAdapter


@pytest.fixture(
    params=[
        OpenAIMemoryAdapter(),
    ]
)
def adapter(request):
    """Parameterized fixture that provides different memory adapter implementations.

    Each test using this fixture will run once for each adapter in the params list.
    """
    return request.param


@pytest.fixture
def memory():
    return Memory()


def test_empty_memory_to_messages(adapter, memory):
    """Test that an empty memory converts to an empty list of messages."""
    messages = adapter.memory_to_messages(memory)
    assert messages == []


def test_turn_with_some_text():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    memory.add_to_last_turn(TextBlock(content="Hi, how are u?"))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages == [
        {
            "role": "user",
            "content": [
                {"type": "input_text", "text": "Hello!"},
                {"type": "input_text", "text": "Hi, how are u?"},
            ],
        }
    ]


def test_memory_to_messages_multiple_turns():
    """Test conversion of a memory with multiple turns to messages."""
    # First turn: user asks a question
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="What's 2+2?"))

    # Second turn: assistant responds
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(TextBlock(content="The answer is 4."))

    # Third turn: user follows up
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Thanks!"))

    messages = OpenAIMemoryAdapter().memory_to_messages(memory)

    expected = [
        {"role": "user", "content": [{"type": "input_text", "text": "What's 2+2?"}]},
        {
            "role": "assistant",
            "content": [{"type": "output_text", "text": "The answer is 4."}],
        },
        {"role": "user", "content": [{"type": "input_text", "text": "Thanks!"}]},
    ]
    assert messages == expected


def test_memory_to_messages_function_call():
    @tool
    def add(a: int, b: int) -> int:
        return a + b

    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Call the add function."))
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(
        FunctionCallBlock(id="call_1", name="add", arguments={"a": 2, "b": 2}, tool=add)
    )

    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    assert isinstance(messages[1], ResponseFunctionToolCall)
    assert json.loads(messages[1].arguments) == {
        "a": 2,
        "b": 2,
    }


def test_memory_to_messages_media_blocks():
    image = Media(
        media_type="image",
        source_type="url",
        source="http://example.com/image.png",
        extension="png",
    )
    pdf = Media(
        media_type="pdf",
        source_type="base64",
        source="THIS_IS_A_PDF_BASE64",
        extension="pdf",
    )
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(MediaBlock(media=image))
    memory.add_to_last_turn(MediaBlock(media=pdf))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    # Should contain both image and pdf blocks

    # TODO: Check if the image and pdf blocks are correct
    assert messages[0]["content"][1] == {
        "type": "input_file",
        "filename": "file.pdf",
        "file_data": "data:application/pdf;base64,THIS_IS_A_PDF_BASE64",
    }


def test_memory_to_messages_structured_block():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(StructuredBlock(content={"key": "value"}))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["content"] == "{'key': 'value'}" or messages[0]["content"] == [
        {
            "type": "input_text",
            "text": "{'key': 'value'}",
        }
    ]


def test_memory_to_messages_with_system_prompt():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Hello!"))
    system_prompt = "You are a helpful assistant."
    messages = OpenAIMemoryAdapter().memory_to_messages(
        memory, system_prompt=system_prompt
    )
    assert messages[0]["role"] == "system"
    assert messages[0]["content"] == system_prompt
    assert messages[1]["role"] == "user"


def test_memory_to_messages_with_input_str():
    memory = Memory()
    input_str = "What is the weather?"
    messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_str)
    assert messages[-1]["role"] == "user"
    assert messages[-1]["content"] == input_str


def test_memory_to_messages_with_input_block():
    memory = Memory()
    input_block = TextBlock(content="This is a block input.")
    messages = OpenAIMemoryAdapter().memory_to_messages(memory, input=input_block)
    assert messages[-1]["role"] == "user"
    assert "block input" in str(messages[-1]["content"])


def test_google_empty_memory_to_messages():
    messages = OpenAIMemoryAdapter().memory_to_messages(Memory())
    assert messages == []


def test_google_memory_to_messages_multiple_turns():
    memory = Memory()
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="What's 2+2?"))
    memory.new_turn(role=ROLE.ASSISTANT)
    memory.add_to_last_turn(TextBlock(content="The answer is 4."))
    memory.new_turn(role=ROLE.USER)
    memory.add_to_last_turn(TextBlock(content="Thanks!"))
    messages = OpenAIMemoryAdapter().memory_to_messages(memory)
    assert messages[0]["role"] == "user"
    assert messages[1]["role"] == "assistant"
    assert messages[2]["role"] == "user"

```

## /datapizza-ai-clients/datapizza-ai-clients-watsonx/README.md



## /datapizza-ai-clients/datapizza-ai-clients-watsonx/datapizza/clients/watsonx/__init__.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-watsonx/datapizza/clients/watsonx/__init__.py" 
from .watsonx_client import WatsonXClient

__all__ = ["WatsonXClient"]

```

## /datapizza-ai-clients/datapizza-ai-clients-watsonx/pyproject.toml

```toml path="/datapizza-ai-clients/datapizza-ai-clients-watsonx/pyproject.toml" 
# Build system configuration
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

# Project metadata
[project]
name = "datapizza-ai-clients-watsonx"
version = "0.0.2"
description = "Ibm WatsonX client for the datapizza-ai framework"
readme = "README.md"
license = {text = "MIT"}

requires-python = ">=3.11.0,<4"
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
dependencies = [
    "datapizza-ai-core>=0.1.0,<0.2.0",
    "ibm-watsonx-ai>=1.4.0,<2.0.0"
]

# Development dependencies
[dependency-groups]
dev = [
    "deptry>=0.23.0",
    "pytest",
    "ruff>=0.11.5",
]

# Hatch build configuration
[tool.hatch.build.targets.sdist]
include = ["datapizza"]
exclude = ["**/BUILD"]

[tool.hatch.build.targets.wheel]
include = ["datapizza"]
exclude = ["**/BUILD"]

```

## /datapizza-ai-clients/datapizza-ai-clients-watsonx/tests/test_watsonx.py

```py path="/datapizza-ai-clients/datapizza-ai-clients-watsonx/tests/test_watsonx.py" 
def test_init():
    assert 1 == 1

```

## /datapizza-ai-core/README.md


This is the core of datapizza-ai framework


## /datapizza-ai-core/datapizza/agents/__init__.py

```py path="/datapizza-ai-core/datapizza/agents/__init__.py" 
from .agent import Agent, AgentHooks, StepContext, StepResult
from .client_manager import ClientManager
from .runner import AgentRunner, AgentRunnerResult, HandoffRequest

__all__ = [
    "Agent",
    "AgentHooks",
    "AgentRunner",
    "AgentRunnerResult",
    "ClientManager",
    "HandoffRequest",
    "StepContext",
    "StepResult",
]

```

## /datapizza-ai-core/datapizza/agents/__version__.py

```py path="/datapizza-ai-core/datapizza/agents/__version__.py" 
VERSION = (3, 0, 8)

__version__ = ".".join(map(str, VERSION))

```

## /datapizza-ai-core/datapizza/cache/__init__.py

```py path="/datapizza-ai-core/datapizza/cache/__init__.py" 
# Import MemoryCache from core implementation
from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)

from datapizza.core.cache import MemoryCache

__all__ = ["MemoryCache"]

```

## /datapizza-ai-embedders/cohere/README.md



## /datapizza-ai-embedders/fastembedder/README.md




The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.
Copied!