browser-use/browser-use/main 356k tokens More Tools
```
├── .dockerignore (100 tokens)
├── .env.example (omitted)
├── .gitattributes (omitted)
├── .github/
   ├── CONTRIBUTING.md (100 tokens)
   ├── ISSUE_TEMPLATE/
      ├── 1_element_detection_bug.yml (900 tokens)
      ├── 2_bug_report.yml (800 tokens)
      ├── 3_feature_request.yml (300 tokens)
      ├── 4_docs_issue.yml (300 tokens)
      ├── config.yml (100 tokens)
   ├── workflows/
      ├── cloud_evals.yml (200 tokens)
      ├── docker.yml (400 tokens)
      ├── lint.yml (200 tokens)
      ├── package.yaml (300 tokens)
      ├── publish.yml (500 tokens)
      ├── test.yaml (300 tokens)
├── .gitignore (100 tokens)
├── .pre-commit-config.yaml (300 tokens)
├── .python-version
├── .vscode/
   ├── launch.json (500 tokens)
   ├── settings.json (100 tokens)
├── Dockerfile (2.1k tokens)
├── LICENSE (omitted)
├── README.md (1500 tokens)
├── SECURITY.md (200 tokens)
├── browser_use/
   ├── README.md (200 tokens)
   ├── __init__.py (300 tokens)
   ├── agent/
      ├── gif.py (2.1k tokens)
      ├── memory/
         ├── __init__.py
         ├── service.py (1100 tokens)
         ├── views.py (500 tokens)
      ├── message_manager/
         ├── service.py (2.5k tokens)
         ├── tests.py (1600 tokens)
         ├── utils.py (1000 tokens)
         ├── views.py (800 tokens)
      ├── playwright_script_generator.py (6.5k tokens)
      ├── playwright_script_helpers.py (800 tokens)
      ├── prompts.py (1200 tokens)
      ├── service.py (11.1k tokens)
      ├── system_prompt.md (1000 tokens)
      ├── tests.py (1100 tokens)
      ├── views.py (2.9k tokens)
   ├── browser/
      ├── browser.py (3.1k tokens)
      ├── chrome.py (2.1k tokens)
      ├── context.py (14.2k tokens)
      ├── dolphin_service.py (2.1k tokens)
      ├── tests/
         ├── httpx_client_test.py (300 tokens)
         ├── screenshot_test.py (200 tokens)
         ├── test_clicks.py (600 tokens)
      ├── utils/
         ├── screen_resolution.py (300 tokens)
      ├── views.py (300 tokens)
   ├── cli.py (9.4k tokens)
   ├── controller/
      ├── registry/
         ├── service.py (1800 tokens)
         ├── views.py (900 tokens)
      ├── service.py (6.6k tokens)
      ├── views.py (500 tokens)
   ├── dom/
      ├── __init__.py
      ├── buildDomTree.js (10.2k tokens)
      ├── clickable_element_processor/
         ├── service.py (500 tokens)
      ├── history_tree_processor/
         ├── service.py (800 tokens)
         ├── view.py (300 tokens)
      ├── service.py (1200 tokens)
      ├── tests/
         ├── debug_page_structure.py (800 tokens)
         ├── extraction_test.py (1300 tokens)
         ├── process_dom_test.py (200 tokens)
      ├── views.py (1600 tokens)
   ├── exceptions.py
   ├── logging_config.py (800 tokens)
   ├── telemetry/
      ├── service.py (700 tokens)
      ├── views.py (200 tokens)
   ├── utils.py (2.4k tokens)
├── codebeaver.yml
├── conftest.py
├── docs/
   ├── README.md (100 tokens)
   ├── cloud/
      ├── implementation.mdx (1100 tokens)
      ├── quickstart.mdx (800 tokens)
   ├── customize/
      ├── agent-settings.mdx (2.3k tokens)
      ├── browser-settings.mdx (1600 tokens)
      ├── custom-functions.mdx (700 tokens)
      ├── hooks.mdx (2.5k tokens)
      ├── output-format.mdx (300 tokens)
      ├── real-browser.mdx (400 tokens)
      ├── sensitive-data.mdx (600 tokens)
      ├── supported-models.mdx (1400 tokens)
      ├── system-prompt.mdx (400 tokens)
   ├── development.mdx (800 tokens)
   ├── development/
      ├── contribution-guide.mdx (200 tokens)
      ├── evaluations.mdx (300 tokens)
      ├── local-setup.mdx (600 tokens)
      ├── n8n-integration.mdx (800 tokens)
      ├── observability.mdx (500 tokens)
      ├── roadmap.mdx
      ├── telemetry.mdx (200 tokens)
   ├── favicon.svg (300 tokens)
   ├── images/
      ├── browser-use.png
      ├── checks-passed.png
      ├── laminar.png
   ├── introduction.mdx (500 tokens)
   ├── logo/
      ├── dark.svg (1900 tokens)
      ├── light.svg (1900 tokens)
   ├── mint.json (400 tokens)
   ├── quickstart.mdx (300 tokens)
├── eval/
   ├── claude-3.5.py (100 tokens)
   ├── claude-3.6.py (100 tokens)
   ├── claude-3.7.py (100 tokens)
   ├── deepseek-r1.py (100 tokens)
   ├── deepseek.py (100 tokens)
   ├── gemini-1.5-flash.py (100 tokens)
   ├── gemini-2.0-flash.py (100 tokens)
   ├── gemini-2.5-preview.py (100 tokens)
   ├── gpt-4.1.py (100 tokens)
   ├── gpt-4o-no-boundingbox.py (100 tokens)
   ├── gpt-4o-no-vision.py (100 tokens)
   ├── gpt-4o-viewport-0.py (100 tokens)
   ├── gpt-4o.py (100 tokens)
   ├── gpt-o4-mini.py (100 tokens)
   ├── grok.py (100 tokens)
   ├── service.py (10.7k tokens)
├── examples/
   ├── browser/
      ├── real_browser.py (200 tokens)
      ├── stealth.py (300 tokens)
      ├── using_cdp.py (300 tokens)
   ├── custom-functions/
      ├── action_filters.py (700 tokens)
      ├── advanced_search.py (500 tokens)
      ├── clipboard.py (300 tokens)
      ├── custom_hooks_before_after_step.py (1300 tokens)
      ├── file_upload.py (600 tokens)
      ├── hover_element.py (600 tokens)
      ├── notification.py (200 tokens)
      ├── onepassword_2fa.py (300 tokens)
      ├── save_to_file_hugging_face.py (200 tokens)
   ├── features/
      ├── click_fallback_options.py (1300 tokens)
      ├── cross_origin_iframes.py (200 tokens)
      ├── custom_output.py (300 tokens)
      ├── custom_system_prompt.py (200 tokens)
      ├── custom_user_agent.py (400 tokens)
      ├── download_file.py (200 tokens)
      ├── drag_drop.py (200 tokens)
      ├── follow_up_tasks.py (200 tokens)
      ├── initial_actions.py (100 tokens)
      ├── multi-tab_handling.py (100 tokens)
      ├── multiple_agents_same_browser.py (300 tokens)
      ├── outsource_state.py (300 tokens)
      ├── parallel_agents.py (300 tokens)
      ├── pause_agent.py (400 tokens)
      ├── planner.py (100 tokens)
      ├── playwright_script_generation.py (900 tokens)
      ├── restrict_urls.py (200 tokens)
      ├── result_processing.py (300 tokens)
      ├── save_trace.py (200 tokens)
      ├── sensitive_data.py (100 tokens)
      ├── small_model_for_extraction.py (100 tokens)
      ├── task_with_memory.py (700 tokens)
      ├── validate_output.py (200 tokens)
   ├── integrations/
      ├── discord/
         ├── discord_api.py (800 tokens)
         ├── discord_example.py (600 tokens)
      ├── slack/
         ├── README.md (700 tokens)
         ├── slack_api.py (800 tokens)
         ├── slack_example.py (300 tokens)
   ├── models/
      ├── README.md
      ├── _ollama.py (200 tokens)
      ├── azure_openai.py (300 tokens)
      ├── bedrock_claude.py (400 tokens)
      ├── claude-3.7-sonnet.py (100 tokens)
      ├── deepseek-r1.py (200 tokens)
      ├── deepseek.py (200 tokens)
      ├── gemini.py (200 tokens)
      ├── gpt-4o.py (100 tokens)
      ├── grok.py (200 tokens)
      ├── novita.py (200 tokens)
      ├── qwen.py (100 tokens)
   ├── notebook/
      ├── agent_browsing.ipynb (10.4k tokens)
   ├── simple.py (100 tokens)
   ├── ui/
      ├── README.md (100 tokens)
      ├── command_line.py (500 tokens)
      ├── gradio_demo.py (500 tokens)
      ├── streamlit_demo.py (400 tokens)
   ├── use-cases/
      ├── README.md (200 tokens)
      ├── captcha.py (200 tokens)
      ├── check_appointment.py (300 tokens)
      ├── find_and_apply_to_jobs.py (1000 tokens)
      ├── find_influencer_profiles.py (500 tokens)
      ├── google_sheets.py (1500 tokens)
      ├── online_coding_agent.py (300 tokens)
      ├── post-twitter.py (700 tokens)
      ├── scrolling_page.py (200 tokens)
      ├── shopping.py (800 tokens)
      ├── test_cv.txt
      ├── twitter_cookies.txt
      ├── twitter_post_using_cookies.py (300 tokens)
      ├── web_voyager_agent.py (500 tokens)
      ├── wikipedia_banana_to_quantum.py (200 tokens)
├── pyproject.toml (700 tokens)
├── pytest.ini (100 tokens)
├── static/
   ├── browser-use-dark.png
   ├── browser-use.png
   ├── kayak.gif
   ├── photos.gif
├── tests/
   ├── conftest.py (300 tokens)
   ├── mind2web_data/
      ├── processed.json (138.5k tokens)
   ├── test_action_filters.py (2.1k tokens)
   ├── test_agent_actions.py (1400 tokens)
   ├── test_attach_chrome.py (400 tokens)
   ├── test_browser.py (3.8k tokens)
   ├── test_browser_config_models.py (1500 tokens)
   ├── test_browser_window_size_height.py (600 tokens)
   ├── test_browser_window_size_height_no_viewport.py (200 tokens)
   ├── test_context.py (3k tokens)
   ├── test_controller.py (8.8k tokens)
   ├── test_core_functionality.py (1100 tokens)
   ├── test_dropdown.py (200 tokens)
   ├── test_dropdown_complex.py (300 tokens)
   ├── test_dropdown_error.py (200 tokens)
   ├── test_excluded_actions.py (500 tokens)
   ├── test_full_screen.py (100 tokens)
   ├── test_gif_path.py (200 tokens)
   ├── test_mind2web.py (700 tokens)
   ├── test_models.py (900 tokens)
   ├── test_qwen.py (300 tokens)
   ├── test_react_dropdown.py (200 tokens)
   ├── test_save_conversation.py (500 tokens)
   ├── test_self_registered_actions.py (1100 tokens)
   ├── test_sensitive_data.py (700 tokens)
   ├── test_service.py (2.4k tokens)
   ├── test_stress.py (600 tokens)
   ├── test_tab_management.py (4.7k tokens)
   ├── test_url_allowlist_security.py (1000 tokens)
   ├── test_vision.py (400 tokens)
   ├── test_wait_for_element.py (400 tokens)
```


## /.dockerignore

```dockerignore path="/.dockerignore" 
docs/
static/
.claude/
.github/

# Cache files
.DS_Store
__pycache__/
*.py[cod]
*$py.class
.mypy_cache/
.ruff_cache/
.pytest_cache/
.ipynb_checkpoints

# Virtual Environments
.venv
venv/

# Editor cruft
.vscode/
.idea/

# Build Files
dist/

# Data files
*.gif
*.txt
*.pdf
*.csv
*.json
*.jsonl

# Secrets and sensitive files
secrets.env
.env
browser_cookies.json
cookies.json
gcp-login.json
saved_trajectories/
AgentHistory.json
AgentHistoryList.json
private_example.py
private_example

```

## /.github/CONTRIBUTING.md

# Contributing to browser-use

We love contributions! Please read through these links to get started:

 - 🔢 [Contribution Guidelines](https://docs.browser-use.com/development/contribution-guide)
 - 👾 [Local Development Setup Guide](https://docs.browser-use.com/development/local-setup)
 - 🏷️ [Issues Tagged: `#help-wanted`](https://github.com/browser-use/browser-use/issues?q=is%3Aissue%20state%3Aopen%20label%3A%22help%20wanted%22)


## /.github/ISSUE_TEMPLATE/1_element_detection_bug.yml

```yml path="/.github/ISSUE_TEMPLATE/1_element_detection_bug.yml" 
name: 🎯 Agent Page Interaction Issue
description: Agent fails to detect, click, scroll, input, or otherwise interact with some type of element on some page(s)
labels: ["bug", "element-detection"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for taking the time to fill out this bug report! Please fill out the form below to help us reproduce and fix the issue.

  - type: input
    id: version
    attributes:
      label: Browser Use Version
      description: What version of the `browser-use` library are you using? (Run `uv pip show browser-use` or `git log -n 1` to find out) **DO NOT JUST WRITE `latest version` or `main`**
      placeholder: "e.g. 0.4.45 or 62760baaefd"
    validations:
      required: true

  - type: dropdown
    id: model
    attributes:
      label: LLM Model
      description: Which LLM model(s) are you using?
      multiple: true
      options:
        - gpt-4o
        - gpt-4o-mini
        - gpt-4
        - gpt-4.1
        - gpt-4.1-mini
        - gpt-4.1-nano
        - claude-3.7-sonnet
        - claude-3.5-sonnet
        - gemini-2.6-flash-preview
        - gemini-2.5-pro
        - gemini-2.0-flash
        - gemini-2.0-flash-lite
        - gemini-1.5-flash
        - deepseek-chat
        - Local Model (Specify model in description)
        - Other (specify in description)
    validations:
      required: true

  - type: textarea
    id: prompt
    attributes:
      label: Screenshots, Description, and Task Prompt Given to Agent
      description: The full task prompt you're giving the agent (redact any sensitive data) + a description of the issue and screenshots.
      placeholder: |
        1. go to https://example.com and click the xyz button...
        2. type "abc" in the dropdown search to find the "abc" option  <- agent fails to click dropdown here
        3. Click the "Submit" button, then extract the result as JSON
        ...
        include relevant URLs and/or redacted screenshots of the relevant page(s) if possible
    validations:
      required: true

  - type: textarea
    id: html
    attributes:
      label: HTML around where it's failing
      description: A snippet of the HTML from the failing page around where the Agent is failing to interact.
      render: html
      placeholder: |
        <form na-someform="abc">
          <div class="element-to-click">
            <div data-isbutton="true">Click me</div>
          </div>
          <input id="someinput" name="someinput" type="text" />
          ...
        </form>
    validations:
      required: true

  - type: input
    id: os
    attributes:
      label: Operating System
      description: What operating system are you using?
      placeholder: "e.g., macOS 13.1, Windows 11, Ubuntu 22.04"
    validations:
      required: true

  - type: textarea
    id: code
    attributes:
      label: Python Code Sample
      description: Include some python code that reproduces the issue
      render: python
      placeholder: |
        from dotenv import load_dotenv
        load_dotenv()
        from browser_use import Agent, Browser, Controller
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-4o")
        browser = Browser(chrome_binary_path='/usr/bin/google-chrome')
        agent = Agent(llm=llm, browser=browser))
        ...

  - type: textarea
    id: logs
    attributes:
      label: Full DEBUG Log Output
      description: Please copy and paste the *full* log output *from the start of the run*. Make sure to set `BROWSER_USE_LOG_LEVEL=DEBUG` in your `.env` or shell environment.
      render: shell
      placeholder: |
        $ python /app/browser-use/examples/browser/real_browser.py
        DEBUG    [browser] 🌎  Initializing new browser
        DEBUG    [agent] Version: 0.1.46-9-g62760ba, Source: git
        INFO     [agent] 🧠 Starting an agent with main_model=gpt-4o +tools +vision +memory, planner_model=None, extraction_model=gpt-4o
        DEBUG    [agent] Verifying the ChatOpenAI LLM knows the capital of France...
        DEBUG    [langsmith.client] Sending multipart request with context: trace=91282a01-6667-48a1-8cd7-21aa9337a580,id=91282a01-6667-48a1-8cd7-21aa9337a580
        DEBUG    [agent] 🪪 LLM API keys OPENAI_API_KEY work, ChatOpenAI model is connected & responding correctly.
        ...

```

## /.github/ISSUE_TEMPLATE/2_bug_report.yml

```yml path="/.github/ISSUE_TEMPLATE/2_bug_report.yml" 
name: 🐛 Library Bug Report
description: Report a bug in the browser-use Python library
labels: ["bug", "triage"]
body:
  # - type: markdown
  #   attributes:
  #     value: |
  #       Thanks for taking the time to fill out this bug report! Please fill out the form below to help us reproduce and fix the issue.

  - type: input
    id: version
    attributes:
      label: Browser Use Version
      description: What version of the `browser-use` library are you using? (Run `uv pip show browser-use` or `git log -n 1` to find out) **DO NOT JUST WRITE `latest version` or `main`**
      placeholder: "e.g. 0.4.45 or 62760baaefd"
    validations:
      required: true

  - type: textarea
    id: description
    attributes:
      label: Bug Description, Steps to Reproduce, Screenshots
      description: A clear and concise description of what the bug is + steps taken, drag screenshots in showing any error messages and relevant pages.
      placeholder: |
        1. Installed browser-use library by running: `uv pip install browser-use`
        2. Installed the browser by running: `playwright install chromium --with-deps`
        3. Ran the code below with the following prompt: `go to example.com and do xyz...`
        4. Agent crashed and showed the following error: ...
    validations:
      required: true

  - type: textarea
    id: code
    attributes:
      label: Failing Python Code
      description: Include the exact python code you ran that encountered the issue, redact any sensitive URLs and API keys.
      render: python
      placeholder: |
        from dotenv import load_dotenv
        load_dotenv()
        from browser_use import Agent, Browser, Controller
        from langchain_openai import ChatOpenAI

        llm = ChatOpenAI(model="gpt-4o")
        browser = Browser(chrome_binary_path='/usr/bin/google-chrome')
        agent = Agent(llm=llm, browser=browser))
        ...

  - type: dropdown
    id: model
    attributes:
      label: LLM Model
      description: Which LLM model(s) are you using?
      multiple: true
      options:
        - gpt-4o
        - gpt-4o-mini
        - gpt-4
        - gpt-4.1
        - gpt-4.1-mini
        - gpt-4.1-nano
        - claude-3.7-sonnet
        - claude-3.5-sonnet
        - gemini-2.6-flash-preview
        - gemini-2.5-pro
        - gemini-2.0-flash
        - gemini-2.0-flash-lite
        - gemini-1.5-flash
        - deepseek-chat
        - Local Model (Specify model in description)
        - Other (specify in description)
    validations:
      required: true

  - type: input
    id: os
    attributes:
      label: Operating System
      description: What operating system are you using?
      placeholder: "e.g., macOS 13.1, Windows 11, Ubuntu 22.04"
    validations:
      required: true

  - type: textarea
    id: logs
    attributes:
      label: Full DEBUG Log Output
      description: Please copy and paste the *full* log output *from the start of the run*. Make sure to set `BROWSER_USE_LOG_LEVEL=DEBUG` in your `.env` or shell environment.
      render: shell
      placeholder: |
        $ python /app/browser-use/examples/browser/real_browser.py
        DEBUG    [browser] 🌎  Initializing new browser
        DEBUG    [agent] Version: 0.1.46-9-g62760ba, Source: git
        INFO     [agent] 🧠 Starting an agent with main_model=gpt-4o +tools +vision +memory, planner_model=None, extraction_model=gpt-4o
        DEBUG    [agent] Verifying the ChatOpenAI LLM knows the capital of France...
        DEBUG    [langsmith.client] Sending multipart request with context: trace=91282a01-6667-48a1-8cd7-21aa9337a580,id=91282a01-6667-48a1-8cd7-21aa9337a580
        DEBUG    [agent] 🪪 LLM API keys OPENAI_API_KEY work, ChatOpenAI model is connected & responding correctly.
        ...

```

## /.github/ISSUE_TEMPLATE/3_feature_request.yml

```yml path="/.github/ISSUE_TEMPLATE/3_feature_request.yml" 
name: 💡 Feature Request
description: Suggest a new feature for browser-use
labels: ["enhancement"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for taking the time to suggest a new feature! Please fill out the form below to help us understand your suggestion.

  - type: textarea
    id: problem
    attributes:
      label: Problem Description
      description: Is your feature request related to a problem? Please describe.
      placeholder: I'm always frustrated when...
    validations:
      required: true

  - type: textarea
    id: solution
    attributes:
      label: Proposed Solution
      description: Describe the solution you'd like to see
      placeholder: It would be great if...
    validations:
      required: true

  - type: textarea
    id: alternatives
    attributes:
      label: Alternative Solutions
      description: Describe any alternative solutions or features you've considered
      placeholder: I've also thought about...

  - type: textarea
    id: context
    attributes:
      label: Additional Context
      description: Add any other context or examples about the feature request here
      placeholder: |
        - Example use cases
        - Screenshots or mockups
        - Related issues or discussions

```

## /.github/ISSUE_TEMPLATE/4_docs_issue.yml

```yml path="/.github/ISSUE_TEMPLATE/4_docs_issue.yml" 
name: 📚 Documentation Issue
description: Report an issue in the browser-use documentation
labels: ["documentation"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for taking the time to improve our documentation! Please fill out the form below to help us understand the issue.

  - type: dropdown
    id: type
    attributes:
      label: Type of Documentation Issue
      description: What type of documentation issue is this?
      options:
        - Missing documentation
        - Incorrect documentation
        - Unclear documentation
        - Broken link
        - Other (specify in description)
    validations:
      required: true

  - type: input
    id: page
    attributes:
      label: Documentation Page
      description: Which page or section of the documentation is this about?
      placeholder: "e.g., https://docs.browser-use.com/getting-started or Installation Guide"
    validations:
      required: true

  - type: textarea
    id: description
    attributes:
      label: Issue Description
      description: Describe what's wrong or missing in the documentation
      placeholder: The documentation should...
    validations:
      required: true

  - type: textarea
    id: suggestion
    attributes:
      label: Suggested Changes
      description: If you have specific suggestions for how to improve the documentation, please share them
      placeholder: |
        The documentation could be improved by...

        Example:
        \`\`\`python
        # Your suggested code example or text here
        \`\`\`
    validations:
      required: true

```

## /.github/ISSUE_TEMPLATE/config.yml

```yml path="/.github/ISSUE_TEMPLATE/config.yml" 
blank_issues_enabled: false  # Set to true if you want to allow blank issues
contact_links:
  - name: 🤔 Quickstart Guide
    url: https://docs.browser-use.com/quickstart
    about: Most common issues can be resolved by following our quickstart guide
  - name: 🤔 Questions and Help
    url: https://link.browser-use.com/discord
    about: Please ask questions in our Discord community
  - name: 📖 Documentation
    url: https://docs.browser-use.com
    about: Check our documentation for answers first

```

## /.github/workflows/cloud_evals.yml

```yml path="/.github/workflows/cloud_evals.yml" 
name: cloud_evals

on:
  push:
    branches:
      - main
      - 'releases/*'
  workflow_dispatch:
    inputs:
      commit_hash:
        description: Commit hash of the library to build the Cloud eval image for
        required: false

jobs:
  trigger_cloud_eval_image_build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/github-script@v7
        with:
          github-token: ${{ secrets.TRIGGER_CLOUD_BUILD_GH_KEY }}
          script: |
            const result = await github.rest.repos.createDispatchEvent({
              owner: 'browser-use',
              repo: 'cloud',
              event_type: 'trigger-workflow',
              client_payload: {"commit_hash": "${{ github.event.inputs.commit_hash || github.sha }}"}
            })
            console.log(result)

```

## /.github/workflows/docker.yml

```yml path="/.github/workflows/docker.yml" 
name: docker

on:
  push:
  release:
    types: [published]

jobs:
  build_publish_image:
    runs-on: ubuntu-latest
    permissions:
      packages: write
      contents: read
      attestations: write
      id-token: write
    steps:
      - name: Check out the repo
        uses: actions/checkout@v4

      - name: Set up QEMU
        uses: docker/setup-qemu-action@v3

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Log in to Docker Hub
        uses: docker/login-action@v3
        with:
          username: ${{ secrets.DOCKER_USERNAME }}
          password: ${{ secrets.DOCKER_PASSWORD }}

      - name: Login to GitHub Container Registry
        uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.repository_owner }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Compute Docker tags based on tag/branch
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: |
            browseruse/browseruse
            ghcr.io/browser-use/browser-use
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=pep440,pattern={{version}}
            type=pep440,pattern={{major}}.{{minor}}
            type=sha

      - name: Build and push Docker image
        id: push
        uses: docker/build-push-action@v6
        with:
          platforms: linux/amd64,linux/arm64
          context: .
          file: ./Dockerfile
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=registry,ref=browseruse/browseruse:buildcache
          cache-to: type=registry,ref=browseruse/browseruse:buildcache,mode=max

```

## /.github/workflows/lint.yml

```yml path="/.github/workflows/lint.yml" 
name: lint
on:
  push:
    branches:
      - main
      - stable
      - 'releases/**'
    tags:
      - '*'
  pull_request:
  workflow_dispatch:

jobs:
  lint-syntax:
    name: syntax-errors
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v5
        with:
          enable-cache: true
      - run: uv run ruff check --no-fix --select PLE

  lint-style:
    name: code-style
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v5
        with:
          enable-cache: true
      - run: uv run pre-commit run --all-files --show-diff-on-failure

  lint-typecheck:
    name: type-checker
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v6
        with:
          enable-cache: true
      - run: uv run pyright

```

## /.github/workflows/package.yaml

```yaml path="/.github/workflows/package.yaml" 
name: package
on:
  push:
    branches:
      - main
      - stable
      - 'releases/**'
    tags:
      - '*'
  pull_request:
  workflow_dispatch:

jobs:
  build:
    name: pip-build
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v5
      - run: uv build --python 3.12
      - uses: actions/upload-artifact@v4
        with:
          name: dist-artifact
          path: |
            dist/*.whl
            dist/*.tar.gz

  build_test:
    name: pip-install-on-${{ matrix.os }}-py-${{ matrix.python-version }}
    needs: build
    runs-on: ${{ matrix.os }}
    strategy:
      matrix:
        os: [ubuntu-latest, macos-latest, windows-latest]
        python-version: ["3.11", "3.12", "3.13"]

    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v5
      - uses: actions/download-artifact@v4
        with:
          name: dist-artifact

      - name: Set up venv and test for OS/Python versions
        shell: bash
        run: |
          uv venv /tmp/testenv --python ${{ matrix.python-version }}
          if [[ "$RUNNER_OS" == "Windows" ]]; then
            . /tmp/testenv/Scripts/activate
          else
            source /tmp/testenv/bin/activate
          fi
          uv pip install *.whl
          python -c 'from browser_use import Agent, Browser, Controller, ActionModel, ActionResult'

```

## /.github/workflows/publish.yml

```yml path="/.github/workflows/publish.yml" 
# This workflow will upload a Python Package using Twine when a release is created
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: publish

on:
  release:
    types: [published]     # publish full release to PyPI when a release is created on Github
  schedule:
    - cron: "0 17 * * FRI" # tag a pre-release on Github every Friday at 5 PM UTC

permissions:
  contents: write
  id-token: write

jobs:
  tag_pre_release:
    if: github.event_name == 'schedule'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Create pre-release tag
        run: |
          git fetch --tags
          latest_tag=$(git tag --list --sort=-v:refname | grep -E '^v[0-9]+\.[0-9]+\.[0-9]+rc[0-9]+{{contextString}}#39; | head -n 1)
          if [ -z "$latest_tag" ]; then
            new_tag="v0.1.0rc1"
          else
            new_tag=$(echo $latest_tag | awk -F'rc' '{print $1 "rc" $2+1}')
          fi
          git tag $new_tag
          git push origin $new_tag

  publish_to_pypi:
    if: github.event_name == 'release'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: "3.x"
      - uses: astral-sh/setup-uv@v5
      - run: uv run ruff check --no-fix --select PLE # check only for syntax errors
      - run: uv build
      - run: uv run --isolated --no-project --with pytest --with dist/*.whl tests/conftest.py
      - run: uv run --isolated --no-project --with pytest --with dist/*.tar.gz tests/conftest.py
      - run: uv run --with=dotenv pytest \
          --ignore=tests/test_dropdown_error.py \
          --ignore=tests/test_gif_path.py \
          --ignore=tests/test_models.py \
          --ignore=tests/test_react_dropdown.py \
          --ignore=tests/test_save_conversation.py \
          --ignore=tests/test_vision.py \
          --ignore=tests/test_wait_for_element.py || true
      - run: uv publish --trusted-publishing always
      - name: Push to stable branch (if stable release)
        if: startsWith(github.ref_name, 'v') && !contains(github.ref_name, 'rc')
        run: |
          git checkout -b stable
          git push origin stable

```

## /.github/workflows/test.yaml

```yaml path="/.github/workflows/test.yaml" 
name: test

on:
  push:
    branches:
      - main
      - stable
      - 'releases/**'
    tags:
      - '*'
  pull_request:
  workflow_dispatch:
    
jobs:
  tests:
    name: ${{matrix.test}} 
    runs-on: ubuntu-latest
    strategy:
      matrix:
        test:
        # TODO:
        # - browser/patchright
        # - browser/playwright
        # - browser/user_binary
        # - browser/remote_cdp
        # - models/openai
        # - models/google
        # - models/anthropic
        # - models/azure
        # - models/deepseek
        # - models/grok
        # - functionality/click
        # - functionality/tabs
        # - functionality/input
        # - functionality/scroll
        # - functionality/upload
        # - functionality/download
        # - functionality/save
        # - functionality/vision
        # - functionality/memory
        # - functionality/planner
        # - functionality/hooks
        - test_controller
        - test_tab_management
        - test_sensitive_data
        - test_url_allowlist_security
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v6
        with:
          enable-cache: true
          activate-environment: true

      - run: uv sync

      - name: Detect installed Playwright or Patchright version
        run: echo "PLAYWRIGHT_VERSION=$(uv pip list --format json | jq -r '.[] | select(.name == "playwright") | .version')" >> $GITHUB_ENV

      - name: Cache playwright binaries
        uses: actions/cache@v3
        with:
          path: |
            ~/.cache/ms-playwright
          key: ${{ runner.os }}-playwright-${{ env.PLAYWRIGHT_VERSION }}

      - run: playwright install --no-shell chromium

      - run: pytest tests/${{ matrix.test }}.py

```

## /.gitignore

```gitignore path="/.gitignore" 
# Cache files
.DS_Store
__pycache__/
*.py[cod]
*$py.class
.mypy_cache/
.ruff_cache/
.pytest_cache/
.ipynb_checkpoints

# Virtual Environments
.venv
venv/

# IDEs
.vscode/
.idea/

# Build files
dist/

# Data files
*.gif
*.txt
*.pdf
*.csv
*.json
*.jsonl

# Secrets and sensitive files
secrets.env
.env
browser_cookies.json
cookies.json
gcp-login.json
saved_trajectories/
AgentHistory.json
AgentHistoryList.json
private_example.py
private_example

uv.lock

```

## /.pre-commit-config.yaml

```yaml path="/.pre-commit-config.yaml" 
repos:
  - repo: https://github.com/asottile/yesqa
    rev: v1.5.0
    hooks:
      - id: yesqa

  - repo: https://github.com/codespell-project/codespell
    rev: v2.4.1
    hooks:
      - id: codespell # See pyproject.toml for args
        additional_dependencies:
          - tomli

  - repo: https://github.com/asottile/pyupgrade
    rev: v3.19.1
    hooks:
      - id: pyupgrade
        args: [--py311-plus]

  # - repo: https://github.com/asottile/add-trailing-comma
  #   rev: v3.1.0
  #   hooks:
  #     - id: add-trailing-comma

  - repo: https://github.com/astral-sh/ruff-pre-commit
    rev: v0.11.2
    hooks:
      - id: ruff
      - id: ruff-format
      # see pyproject.toml for more details on ruff config

  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v5.0.0
    hooks:
      # check for basic syntax errors in python and data files
      - id: check-ast
      - id: check-toml
      - id: check-yaml
      - id: check-json
      - id: check-merge-conflict
      # check for bad files and folders
      - id: check-symlinks
      - id: destroyed-symlinks
      - id: check-case-conflict
      - id: check-illegal-windows-names
      - id: check-shebang-scripts-are-executable
      - id: mixed-line-ending
      - id: fix-byte-order-marker
      - id: end-of-file-fixer
      # best practices enforcement
      - id: detect-private-key
      # - id: check-docstring-first
      - id: debug-statements
      - id: forbid-submodules
      - id: check-added-large-files
        args: ["--maxkb=600"]
      # - id: name-tests-test
      #   args: ["--pytest-test-first"]

```

## /.python-version

```python-version path="/.python-version" 
3.11

```

## /.vscode/launch.json

```json path="/.vscode/launch.json" 
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python Debugger: Current File",
            "type": "debugpy",
            "request": "launch",
            "program": "${file}",
            "justMyCode": false,
            "env": {
                "PYTHONPATH": "${workspaceFolder}"
            },
            "console": "integratedTerminal"
        },
        {
            "name": "Python Debugger: Module",
            "type": "debugpy",
            "request": "launch",
            "module": "examples.extend_actions"
        },
        {
            "name": "Python: Debug extend_actions",
            "type": "module",
            "request": "launch",
            "module": "examples.extend_actions",
            "console": "integratedTerminal",
            "justMyCode": false,
            "env": {
                "PYTHONPATH": "${workspaceFolder}"
            }
        },
        {
            "name": "Python: Debug Captcha Tests",
            "type": "python",
            "request": "launch",
            "module": "pytest",
            "args": [
                "tests/test_agent_actions.py",
                "-v",
                "-k",
                "test_captcha_solver",
                "--capture=no"
            ],
            "console": "integratedTerminal",
            "justMyCode": false
        },
        {
            "name": "Python: Debug Ecommerce Interaction",
            "type": "python",
            "request": "launch",
            "module": "pytest",
            "args": [
                "tests/test_agent_actions.py",
                "-v",
                "-k",
                "test_ecommerce_interaction",
                "--capture=no"
            ],
            "console": "integratedTerminal",
            "justMyCode": false
        },
        {
            "name": "Python: Debug Core Functionality",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/.venv/bin/pytest",
            "args": [
                "tests/test_core_functionality.py",
                "-v"
            ],
            "console": "integratedTerminal",
            "justMyCode": false
        },
        {
            "name": "pytest: Debug Current File",
            "type": "python",
            "request": "launch",
            "module": "pytest",
            "args": [
                "${file}",
                "-v",
                "--capture=no"
            ],
            "console": "integratedTerminal",
            "justMyCode": false
        }
    ]
}

```

## /.vscode/settings.json

```json path="/.vscode/settings.json" 
{
  "python.analysis.typeCheckingMode": "basic",
  "[python]": {
    "editor.defaultFormatter": "charliermarsh.ruff",
    "editor.formatOnSave": true,
    "editor.codeActionsOnSave": {
      "source.fixAll.ruff": "explicit",
      "source.organizeImports.ruff": "explicit"
    }
  }
}

```

## /Dockerfile

``` path="/Dockerfile" 
# syntax=docker/dockerfile:1
# check=skip=SecretsUsedInArgOrEnv

# This is the Dockerfile for browser-use, it bundles the following dependencies:
#     python3, pip, playwright, chromium, browser-use and its dependencies.
# Usage:
#     git clone https://github.com/browser-use/browser-use.git && cd browser-use
#     docker build . -t browseruse --no-cache
#     docker run -v "$PWD/data":/data browseruse
#     docker run -v "$PWD/data":/data browseruse --version
# Multi-arch build:
#     docker buildx create --use
#     docker buildx build . --platform=linux/amd64,linux/arm64--push -t browseruse/browseruse:some-tag
#
# Read more: https://docs.browser-use.com

#########################################################################################


FROM python:3.11-slim

LABEL name="browseruse" \
    maintainer="Nick Sweeting <dockerfile@browser-use.com>" \
    description="Make websites accessible for AI agents. Automate tasks online with ease." \
    homepage="https://github.com/browser-use/browser-use" \
    documentation="https://docs.browser-use.com" \
    org.opencontainers.image.title="browseruse" \
    org.opencontainers.image.vendor="browseruse" \
    org.opencontainers.image.description="Make websites accessible for AI agents. Automate tasks online with ease." \
    org.opencontainers.image.source="https://github.com/browser-use/browser-use" \
    com.docker.image.source.entrypoint="Dockerfile" \
    com.docker.desktop.extension.api.version=">= 1.4.7" \
    com.docker.desktop.extension.icon="https://avatars.githubusercontent.com/u/192012301?s=200&v=4" \
    com.docker.extension.publisher-url="https://browser-use.com" \
    com.docker.extension.screenshots='[{"alt": "Screenshot of CLI splashscreen", "url": "https://github.com/user-attachments/assets/3606d851-deb1-439e-ad90-774e7960ded8"}, {"alt": "Screenshot of CLI running", "url": "https://github.com/user-attachments/assets/d018b115-95a4-4ac5-8259-b750bc5f56ad"}]' \
    com.docker.extension.detailed-description='See here for detailed documentation: https://docs.browser-use.com' \
    com.docker.extension.changelog='See here for release notes: https://github.com/browser-use/browser-use/releases' \
    com.docker.extension.categories='web,utility-tools,ai'

ARG TARGETPLATFORM
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT

######### Environment Variables #################################

# Global system-level config
ENV TZ=UTC \
    LANGUAGE=en_US:en \
    LC_ALL=C.UTF-8 \
    LANG=C.UTF-8 \
    DEBIAN_FRONTEND=noninteractive \
    APT_KEY_DONT_WARN_ON_DANGEROUS_USAGE=1 \
    PYTHONIOENCODING=UTF-8 \
    PYTHONUNBUFFERED=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1 \
    UV_CACHE_DIR=/root/.cache/uv \
    UV_LINK_MODE=copy \
    UV_COMPILE_BYTECODE=1 \
    UV_PYTHON_PREFERENCE=only-system \
    npm_config_loglevel=error \
    IN_DOCKER=True

# User config
ENV BROWSERUSE_USER="browseruse" \
    DEFAULT_PUID=911 \
    DEFAULT_PGID=911

# Paths
ENV CODE_DIR=/app \
    DATA_DIR=/data \
    VENV_DIR=/app/.venv \
    PATH="/app/.venv/bin:$PATH"

# Build shell config
SHELL ["/bin/bash", "-o", "pipefail", "-o", "errexit", "-o", "errtrace", "-o", "nounset", "-c"] 

# Force apt to leave downloaded binaries in /var/cache/apt (massively speeds up Docker builds)
RUN echo 'Binary::apt::APT::Keep-Downloaded-Packages "1";' > /etc/apt/apt.conf.d/99keep-cache \
    && echo 'APT::Install-Recommends "0";' > /etc/apt/apt.conf.d/99no-intall-recommends \
    && echo 'APT::Install-Suggests "0";' > /etc/apt/apt.conf.d/99no-intall-suggests \
    && rm -f /etc/apt/apt.conf.d/docker-clean

# Print debug info about build and save it to disk, for human eyes only, not used by anything else
RUN (echo "[i] Docker build for Browser Use $(cat /VERSION.txt) starting..." \
    && echo "PLATFORM=${TARGETPLATFORM} ARCH=$(uname -m) ($(uname -s) ${TARGETARCH} ${TARGETVARIANT})" \
    && echo "BUILD_START_TIME=$(date +"%Y-%m-%d %H:%M:%S %s") TZ=${TZ} LANG=${LANG}" \
    && echo \
    && echo "CODE_DIR=${CODE_DIR} DATA_DIR=${DATA_DIR} PATH=${PATH}" \
    && echo \
    && uname -a \
    && cat /etc/os-release | head -n7 \
    && which bash && bash --version | head -n1 \
    && which dpkg && dpkg --version | head -n1 \
    && echo -e '\n\n' && env && echo -e '\n\n' \
    && which python && python --version \
    && which pip && pip --version \
    && echo -e '\n\n' \
    ) | tee -a /VERSION.txt

# Create non-privileged user for browseruse and chrome
RUN echo "[*] Setting up $BROWSERUSE_USER user uid=${DEFAULT_PUID}..." \
    && groupadd --system $BROWSERUSE_USER \
    && useradd --system --create-home --gid $BROWSERUSE_USER --groups audio,video $BROWSERUSE_USER \
    && usermod -u "$DEFAULT_PUID" "$BROWSERUSE_USER" \
    && groupmod -g "$DEFAULT_PGID" "$BROWSERUSE_USER" \
    && mkdir -p /data \
    && mkdir -p /home/$BROWSERUSE_USER/.config \
    && chown -R $BROWSERUSE_USER:$BROWSERUSE_USER /home/$BROWSERUSE_USER \
    && ln -s $DATA_DIR /home/$BROWSERUSE_USER/.config/browseruse \
    && echo -e "\nBROWSERUSE_USER=$BROWSERUSE_USER PUID=$(id -u $BROWSERUSE_USER) PGID=$(id -g $BROWSERUSE_USER)\n\n" \
    | tee -a /VERSION.txt
    # DEFAULT_PUID and DEFAULT_PID are overridden by PUID and PGID in /bin/docker_entrypoint.sh at runtime
    # https://docs.linuxserver.io/general/understanding-puid-and-pgid

# Install base apt dependencies (adding backports to access more recent apt updates)
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked,id=apt-$TARGETARCH$TARGETVARIANT \
    echo "[+] Installing APT base system dependencies for $TARGETPLATFORM..." \
#     && echo 'deb https://deb.debian.org/debian bookworm-backports main contrib non-free' > /etc/apt/sources.list.d/backports.list \
    && mkdir -p /etc/apt/keyrings \
    && apt-get update -qq \
    && apt-get install -qq -y --no-install-recommends \
        # 1. packaging dependencies
        apt-transport-https ca-certificates apt-utils gnupg2 unzip curl wget grep \
        # 2. docker and init system dependencies:
        # dumb-init gosu cron zlib1g-dev \
        # 3. frivolous CLI helpers to make debugging failed archiving easierL
        nano iputils-ping dnsutils jq \
        # tree yq procps \
        # 4. browser dependencies: (auto-installed by playwright install --with-deps chromium)
     #    libnss3 libxss1 libasound2 libx11-xcb1 \
     #    fontconfig fonts-ipafont-gothic fonts-wqy-zenhei fonts-thai-tlwg fonts-khmeros fonts-kacst fonts-symbola fonts-noto fonts-freefont-ttf \
     #    at-spi2-common fonts-liberation fonts-noto-color-emoji fonts-tlwg-loma-otf fonts-unifont libatk-bridge2.0-0 libatk1.0-0 libatspi2.0-0 libavahi-client3 \
     #    libavahi-common-data libavahi-common3 libcups2 libfontenc1 libice6 libnspr4 libnss3 libsm6 libunwind8 \
     #    libxaw7 libxcomposite1 libxdamage1 libxfont2 \
     #    # 5. x11/xvfb dependencies:
     #    libxkbfile1 libxmu6 libxpm4 libxt6 x11-xkb-utils x11-utils xfonts-encodings \
     #    xfonts-scalable xfonts-utils xserver-common xvfb \
     && rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

# Copy only dependency manifest
WORKDIR /app
COPY pyproject.toml uv.lock* /app/

RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
    echo "[+] Setting up venv using uv in $VENV_DIR..." \
    && ( \
     which uv && uv --version \
     && uv venv \
     && which python | grep "$VENV_DIR" \
     && python --version \
    ) | tee -a /VERSION.txt

# Install playwright using pip (with version from pyproject.toml)
RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
     echo "[+] Installing playwright via pip using version from pyproject.toml..." \
     && ( \
        uv pip install "$(grep -oP 'p....right>=([0-9.])+' pyproject.toml)" \
        && which playwright \
        && playwright --version \
        && echo -e '\n\n' \
     ) | tee -a /VERSION.txt

# Install Chromium using playwright
RUN --mount=type=cache,target=/var/cache/apt,sharing=locked,id=apt-$TARGETARCH$TARGETVARIANT \
    --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
    echo "[+] Installing chromium apt pkgs and binary to /root/.cache/ms-playwright..." \
    && apt-get update -qq \
    && playwright install --with-deps --no-shell chromium \
    && rm -rf /var/lib/apt/lists/* \
    && export CHROME_BINARY="$(python -c 'from playwright.sync_api import sync_playwright; print(sync_playwright().start().chromium.executable_path)')" \
    && ln -s "$CHROME_BINARY" /usr/bin/chromium-browser \
    && ln -s "$CHROME_BINARY" /app/chromium-browser \
    && mkdir -p "/home/${BROWSERUSE_USER}/.config/chromium/Crash Reports/pending/" \
    && chown -R "$BROWSERUSE_USER:$BROWSERUSE_USER" "/home/${BROWSERUSE_USER}/.config" \
    && ( \
        which chromium-browser && /usr/bin/chromium-browser --version \
        && echo -e '\n\n' \
    ) | tee -a /VERSION.txt

RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
     echo "[+] Installing browser-use pip sub-dependencies..." \
     && ( \
        uv sync --all-extras --no-dev --no-install-project \
        && echo -e '\n\n' \
     ) | tee -a /VERSION.txt

# Copy the rest of the browser-use codebase
COPY . /app

# Install the browser-use package and all of its optional dependencies
RUN --mount=type=cache,target=/root/.cache,sharing=locked,id=cache-$TARGETARCH$TARGETVARIANT \
     echo "[+] Installing browser-use pip library from source..." \
     && ( \
        uv sync --all-extras --locked --no-dev \
        && which browser-use \
        && browser-use --version 2>&1 \
        && echo -e '\n\n' \
     ) | tee -a /VERSION.txt

RUN mkdir -p "$DATA_DIR/profiles/default" \
    && chown -R $BROWSERUSE_USER:$BROWSERUSE_USER "$DATA_DIR" "$DATA_DIR"/* \
    && ( \
        echo -e "\n\n[√] Finished Docker build successfully. Saving build summary in: /VERSION.txt" \
        && echo -e "PLATFORM=${TARGETPLATFORM} ARCH=$(uname -m) ($(uname -s) ${TARGETARCH} ${TARGETVARIANT})\n" \
        && echo -e "BUILD_END_TIME=$(date +"%Y-%m-%d %H:%M:%S %s")\n\n" \
    ) | tee -a /VERSION.txt


USER "$BROWSERUSE_USER"
VOLUME "$DATA_DIR"
EXPOSE 9242
EXPOSE 9222

# HEALTHCHECK --interval=30s --timeout=20s --retries=15 \
#     CMD curl --silent 'http://localhost:8000/health/' | grep -q 'OK'

ENTRYPOINT ["browser-use"]

```

## /README.md

<picture>
  <source media="(prefers-color-scheme: dark)" srcset="./static/browser-use-dark.png">
  <source media="(prefers-color-scheme: light)" srcset="./static/browser-use.png">
  <img alt="Shows a black Browser Use Logo in light color mode and a white one in dark color mode." src="./static/browser-use.png"  width="full">
</picture>

<h1 align="center">Enable AI to control your browser 🤖</h1>

[![GitHub stars](https://img.shields.io/github/stars/gregpr07/browser-use?style=social)](https://github.com/gregpr07/browser-use/stargazers)
[![Discord](https://img.shields.io/discord/1303749220842340412?color=7289DA&label=Discord&logo=discord&logoColor=white)](https://link.browser-use.com/discord)
[![Cloud](https://img.shields.io/badge/Cloud-☁️-blue)](https://cloud.browser-use.com)
[![Documentation](https://img.shields.io/badge/Documentation-📕-blue)](https://docs.browser-use.com)
[![Twitter Follow](https://img.shields.io/twitter/follow/Gregor?style=social)](https://x.com/gregpr07)
[![Twitter Follow](https://img.shields.io/twitter/follow/Magnus?style=social)](https://x.com/mamagnus00)
[![Weave Badge](https://img.shields.io/endpoint?url=https%3A%2F%2Fapp.workweave.ai%2Fapi%2Frepository%2Fbadge%2Forg_T5Pvn3UBswTHIsN1dWS3voPg%2F881458615&labelColor=#EC6341)](https://app.workweave.ai/reports/repository/org_T5Pvn3UBswTHIsN1dWS3voPg/881458615)

🌐 Browser-use is the easiest way to connect your AI agents with the browser.

💡 See what others are building and share your projects in our [Discord](https://link.browser-use.com/discord)! Want Swag? Check out our [Merch store](https://browsermerch.com).

🌤️ Skip the setup - try our <b>hosted version</b> for instant browser automation! <b>[Try the cloud ☁︎](https://cloud.browser-use.com)</b>.

# Quick start

With pip (Python>=3.11):

```bash
pip install browser-use
```

For memory functionality (requires Python<3.13 due to PyTorch compatibility):  

```bash
pip install "browser-use[memory]"
```

Install the browser:
```bash
playwright install chromium --with-deps --no-shell
```

Spin up your agent:

```python
import asyncio
from dotenv import load_dotenv
load_dotenv()
from browser_use import Agent
from langchain_openai import ChatOpenAI

async def main():
    agent = Agent(
        task="Compare the price of gpt-4o and DeepSeek-V3",
        llm=ChatOpenAI(model="gpt-4o"),
    )
    await agent.run()

asyncio.run(main())
```

Add your API keys for the provider you want to use to your `.env` file.

```bash
OPENAI_API_KEY=
ANTHROPIC_API_KEY=
AZURE_OPENAI_ENDPOINT=
AZURE_OPENAI_KEY=
GOOGLE_API_KEY=
DEEPSEEK_API_KEY=
GROK_API_KEY=
NOVITA_API_KEY=
```

For other settings, models, and more, check out the [documentation 📕](https://docs.browser-use.com).

### Test with UI

You can test browser-use using its [Web UI](https://github.com/browser-use/web-ui) or [Desktop App](https://github.com/browser-use/desktop).

### Test with an interactive CLI

You can also use our `browser-use` interactive CLI (similar to `claude` code):

```bash
pip install browser-use[cli]
browser-use
```

# Demos

<br/><br/>

[Task](https://github.com/browser-use/browser-use/blob/main/examples/use-cases/shopping.py): Add grocery items to cart, and checkout.

[![AI Did My Groceries](https://github.com/user-attachments/assets/a0ffd23d-9a11-4368-8893-b092703abc14)](https://www.youtube.com/watch?v=L2Ya9PYNns8)

<br/><br/>

Prompt: Add my latest LinkedIn follower to my leads in Salesforce.

![LinkedIn to Salesforce](https://github.com/user-attachments/assets/50d6e691-b66b-4077-a46c-49e9d4707e07)

<br/><br/>

[Prompt](https://github.com/browser-use/browser-use/blob/main/examples/use-cases/find_and_apply_to_jobs.py): Read my CV & find ML jobs, save them to a file, and then start applying for them in new tabs, if you need help, ask me.'

https://github.com/user-attachments/assets/171fb4d6-0355-46f2-863e-edb04a828d04

<br/><br/>

[Prompt](https://github.com/browser-use/browser-use/blob/main/examples/browser/real_browser.py): Write a letter in Google Docs to my Papa, thanking him for everything, and save the document as a PDF.

![Letter to Papa](https://github.com/user-attachments/assets/242ade3e-15bc-41c2-988f-cbc5415a66aa)

<br/><br/>

[Prompt](https://github.com/browser-use/browser-use/blob/main/examples/custom-functions/save_to_file_hugging_face.py): Look up models with a license of cc-by-sa-4.0 and sort by most likes on Hugging face, save top 5 to file.

https://github.com/user-attachments/assets/de73ee39-432c-4b97-b4e8-939fd7f323b3

<br/><br/>

## More examples

For more examples see the [examples](examples) folder or join the [Discord](https://link.browser-use.com/discord) and show off your project. You can also see our [`awesome-prompts`](https://github.com/browser-use/awesome-prompts) repo for prompting inspiration.

# Vision

Tell your computer what to do, and it gets it done.

## Roadmap

### Agent

- [ ] Improve agent memory to handle +100 steps
- [ ] Enhance planning capabilities (load website specific context)
- [ ] Reduce token consumption (system prompt, DOM state)

### DOM Extraction

- [ ] Enable detection for all possible UI elements
- [ ] Improve state representation for UI elements so that all LLMs can understand what's on the page

### Workflows

- [ ] Let user record a workflow - which we can rerun with browser-use as a fallback
- [ ] Make rerunning of workflows work, even if pages change

### User Experience

- [ ] Create various templates for tutorial execution, job application, QA testing, social media, etc. which users can just copy & paste.
- [ ] Improve docs
- [ ] Make it faster

### Parallelization

- [ ] Human work is sequential. The real power of a browser agent comes into reality if we can parallelize similar tasks. For example, if you want to find contact information for 100 companies, this can all be done in parallel and reported back to a main agent, which processes the results and kicks off parallel subtasks again.


## Contributing

We love contributions! Feel free to open issues for bugs or feature requests. To contribute to the docs, check out the `/docs` folder.

## Local Setup

To learn more about the library, check out the [local setup 📕](https://docs.browser-use.com/development/local-setup).


`main` is the primary development branch with frequent changes. For production use, install a stable [versioned release](https://github.com/browser-use/browser-use/releases) instead.

---

## Swag

Want to show off your Browser-use swag? Check out our [Merch store](https://browsermerch.com). Good contributors will receive swag for free 👀.

## Citation

If you use Browser Use in your research or project, please cite:

```bibtex
@software{browser_use2024,
  author = {Müller, Magnus and Žunič, Gregor},
  title = {Browser Use: Enable AI to control your browser},
  year = {2024},
  publisher = {GitHub},
  url = {https://github.com/browser-use/browser-use}
}
```

 <div align="center"> <img src="https://github.com/user-attachments/assets/06fa3078-8461-4560-b434-445510c1766f" width="400"/> 
 
[![Twitter Follow](https://img.shields.io/twitter/follow/Gregor?style=social)](https://x.com/gregpr07)
[![Twitter Follow](https://img.shields.io/twitter/follow/Magnus?style=social)](https://x.com/mamagnus00)
 
 </div>

<div align="center">
Made with ❤️ in Zurich and San Francisco
 </div>


## /SECURITY.md

## Reporting Security Issues

If you believe you have found a security vulnerability in browser-use, please report it through coordinated disclosure.

**Please do not report security vulnerabilities through the repository issues, discussions, or pull requests.**

Instead, please open a new [Github security advisory](https://github.com/browser-use/browser-use/security/advisories/new).

Please include as much of the information listed below as you can to help me better understand and resolve the issue:

* The type of issue (e.g., buffer overflow, SQL injection, or cross-site scripting)
* Full paths of source file(s) related to the manifestation of the issue
* The location of the affected source code (tag/branch/commit or direct URL)
* Any special configuration required to reproduce the issue
* Step-by-step instructions to reproduce the issue
* Proof-of-concept or exploit code (if possible)
* Impact of the issue, including how an attacker might exploit the issue

This information will help me triage your report more quickly.


## /browser_use/README.md

# Codebase Structure

> The code structure inspired by https://github.com/Netflix/dispatch.

Very good structure on how to make a scalable codebase is also in [this repo](https://github.com/zhanymkanov/fastapi-best-practices).

Just a brief document about how we should structure our backend codebase.

## Code Structure

```markdown
src/
/<service name>/
models.py
services.py
prompts.py
views.py
utils.py
routers.py

    	/_<subservice name>/
```

### Service.py

Always a single file, except if it becomes too long - more than ~500 lines, split it into \_subservices

### Views.py

Always split the views into two parts

```python
# All
...

# Requests
...

# Responses
...
```

If too long → split into multiple files

### Prompts.py

Single file; if too long → split into multiple files (one prompt per file or so)

### Routers.py

Never split into more than one file


## /browser_use/__init__.py

```py path="/browser_use/__init__.py" 
import warnings

# Suppress specific deprecation warnings from FAISS
warnings.filterwarnings('ignore', category=DeprecationWarning, module='faiss.loader')
warnings.filterwarnings('ignore', message='builtin type SwigPyPacked has no __module__ attribute')
warnings.filterwarnings('ignore', message='builtin type SwigPyObject has no __module__ attribute')
warnings.filterwarnings('ignore', message='builtin type swigvarlink has no __module__ attribute')

from browser_use.logging_config import setup_logging

setup_logging()

from browser_use.agent.prompts import SystemPrompt as SystemPrompt
from browser_use.agent.service import Agent as Agent
from browser_use.agent.views import ActionModel as ActionModel
from browser_use.agent.views import ActionResult as ActionResult
from browser_use.agent.views import AgentHistoryList as AgentHistoryList
from browser_use.browser.browser import Browser as Browser
from browser_use.browser.browser import BrowserConfig as BrowserConfig
from browser_use.browser.context import BrowserContextConfig
from browser_use.controller.service import Controller as Controller
from browser_use.dom.service import DomService as DomService

__all__ = [
	'Agent',
	'Browser',
	'BrowserConfig',
	'Controller',
	'DomService',
	'SystemPrompt',
	'ActionResult',
	'ActionModel',
	'AgentHistoryList',
	'BrowserContextConfig',
]

```

## /browser_use/agent/gif.py

```py path="/browser_use/agent/gif.py" 
from __future__ import annotations

import base64
import io
import logging
import os
import platform
from typing import TYPE_CHECKING

from browser_use.agent.views import AgentHistoryList

if TYPE_CHECKING:
	from PIL import Image, ImageFont

logger = logging.getLogger(__name__)


def decode_unicode_escapes_to_utf8(text: str) -> str:
	"""Handle decoding any unicode escape sequences embedded in a string (needed to render non-ASCII languages like chinese or arabic in the GIF overlay text)"""

	if r'\u' not in text:
		# doesn't have any escape sequences that need to be decoded
		return text

	try:
		# Try to decode Unicode escape sequences
		return text.encode('latin1').decode('unicode_escape')
	except (UnicodeEncodeError, UnicodeDecodeError):
		# logger.debug(f"Failed to decode unicode escape sequences while generating gif text: {text}")
		return text


def create_history_gif(
	task: str,
	history: AgentHistoryList,
	#
	output_path: str = 'agent_history.gif',
	duration: int = 3000,
	show_goals: bool = True,
	show_task: bool = True,
	show_logo: bool = False,
	font_size: int = 40,
	title_font_size: int = 56,
	goal_font_size: int = 44,
	margin: int = 40,
	line_spacing: float = 1.5,
) -> None:
	"""Create a GIF from the agent's history with overlaid task and goal text."""
	if not history.history:
		logger.warning('No history to create GIF from')
		return

	from PIL import Image, ImageFont

	images = []

	# if history is empty or first screenshot is None, we can't create a gif
	if not history.history or not history.history[0].state.screenshot:
		logger.warning('No history or first screenshot to create GIF from')
		return

	# Try to load nicer fonts
	try:
		# Try different font options in order of preference
		# ArialUni is a font that comes with Office and can render most non-alphabet characters
		font_options = [
			'Microsoft YaHei',  # 微软雅黑
			'SimHei',  # 黑体
			'SimSun',  # 宋体
			'Noto Sans CJK SC',  # 思源黑体
			'WenQuanYi Micro Hei',  # 文泉驿微米黑
			'Helvetica',
			'Arial',
			'DejaVuSans',
			'Verdana',
		]
		font_loaded = False

		for font_name in font_options:
			try:
				if platform.system() == 'Windows':
					# Need to specify the abs font path on Windows
					font_name = os.path.join(os.getenv('WIN_FONT_DIR', 'C:\\Windows\\Fonts'), font_name + '.ttf')
				regular_font = ImageFont.truetype(font_name, font_size)
				title_font = ImageFont.truetype(font_name, title_font_size)
				goal_font = ImageFont.truetype(font_name, goal_font_size)
				font_loaded = True
				break
			except OSError:
				continue

		if not font_loaded:
			raise OSError('No preferred fonts found')

	except OSError:
		regular_font = ImageFont.load_default()
		title_font = ImageFont.load_default()

		goal_font = regular_font

	# Load logo if requested
	logo = None
	if show_logo:
		try:
			logo = Image.open('./static/browser-use.png')
			# Resize logo to be small (e.g., 40px height)
			logo_height = 150
			aspect_ratio = logo.width / logo.height
			logo_width = int(logo_height * aspect_ratio)
			logo = logo.resize((logo_width, logo_height), Image.Resampling.LANCZOS)
		except Exception as e:
			logger.warning(f'Could not load logo: {e}')

	# Create task frame if requested
	if show_task and task:
		task_frame = _create_task_frame(
			task,
			history.history[0].state.screenshot,
			title_font,  # type: ignore
			regular_font,  # type: ignore
			logo,
			line_spacing,
		)
		images.append(task_frame)

	# Process each history item
	for i, item in enumerate(history.history, 1):
		if not item.state.screenshot:
			continue

		# Convert base64 screenshot to PIL Image
		img_data = base64.b64decode(item.state.screenshot)
		image = Image.open(io.BytesIO(img_data))

		if show_goals and item.model_output:
			image = _add_overlay_to_image(
				image=image,
				step_number=i,
				goal_text=item.model_output.current_state.next_goal,
				regular_font=regular_font,  # type: ignore
				title_font=title_font,  # type: ignore
				margin=margin,
				logo=logo,
			)

		images.append(image)

	if images:
		# Save the GIF
		images[0].save(
			output_path,
			save_all=True,
			append_images=images[1:],
			duration=duration,
			loop=0,
			optimize=False,
		)
		logger.info(f'Created GIF at {output_path}')
	else:
		logger.warning('No images found in history to create GIF')


def _create_task_frame(
	task: str,
	first_screenshot: str,
	title_font: ImageFont.FreeTypeFont,
	regular_font: ImageFont.FreeTypeFont,
	logo: Image.Image | None = None,
	line_spacing: float = 1.5,
) -> Image.Image:
	"""Create initial frame showing the task."""
	from PIL import Image, ImageDraw, ImageFont

	img_data = base64.b64decode(first_screenshot)
	template = Image.open(io.BytesIO(img_data))
	image = Image.new('RGB', template.size, (0, 0, 0))
	draw = ImageDraw.Draw(image)

	# Calculate vertical center of image
	center_y = image.height // 2

	# Draw task text with dynamic font size based on task length
	margin = 140  # Increased margin
	max_width = image.width - (2 * margin)

	# Dynamic font size calculation based on task length
	# Start with base font size (regular + 16)
	base_font_size = regular_font.size + 16
	min_font_size = max(regular_font.size - 10, 16)  # Don't go below 16pt
	max_font_size = base_font_size  # Cap at the base font size

	# Calculate dynamic font size based on text length and complexity
	# Longer texts get progressively smaller fonts
	text_length = len(task)
	if text_length > 200:
		# For very long text, reduce font size logarithmically
		font_size = max(base_font_size - int(10 * (text_length / 200)), min_font_size)
	else:
		font_size = base_font_size

	larger_font = ImageFont.truetype(regular_font.path, font_size)

	# Generate wrapped text with the calculated font size
	wrapped_text = _wrap_text(task, larger_font, max_width)

	# Calculate line height with spacing
	line_height = larger_font.size * line_spacing

	# Split text into lines and draw with custom spacing
	lines = wrapped_text.split('\n')
	total_height = line_height * len(lines)

	# Start position for first line
	text_y = center_y - (total_height / 2) + 50  # Shifted down slightly

	for line in lines:
		# Get line width for centering
		line_bbox = draw.textbbox((0, 0), line, font=larger_font)
		text_x = (image.width - (line_bbox[2] - line_bbox[0])) // 2

		draw.text(
			(text_x, text_y),
			line,
			font=larger_font,
			fill=(255, 255, 255),
		)
		text_y += line_height

	# Add logo if provided (top right corner)
	if logo:
		logo_margin = 20
		logo_x = image.width - logo.width - logo_margin
		image.paste(logo, (logo_x, logo_margin), logo if logo.mode == 'RGBA' else None)

	return image


def _add_overlay_to_image(
	image: Image.Image,
	step_number: int,
	goal_text: str,
	regular_font: ImageFont.FreeTypeFont,
	title_font: ImageFont.FreeTypeFont,
	margin: int,
	logo: Image.Image | None = None,
	display_step: bool = True,
	text_color: tuple[int, int, int, int] = (255, 255, 255, 255),
	text_box_color: tuple[int, int, int, int] = (0, 0, 0, 255),
) -> Image.Image:
	"""Add step number and goal overlay to an image."""

	from PIL import Image, ImageDraw

	goal_text = decode_unicode_escapes_to_utf8(goal_text)
	image = image.convert('RGBA')
	txt_layer = Image.new('RGBA', image.size, (0, 0, 0, 0))
	draw = ImageDraw.Draw(txt_layer)
	if display_step:
		# Add step number (bottom left)
		step_text = str(step_number)
		step_bbox = draw.textbbox((0, 0), step_text, font=title_font)
		step_width = step_bbox[2] - step_bbox[0]
		step_height = step_bbox[3] - step_bbox[1]

		# Position step number in bottom left
		x_step = margin + 10  # Slight additional offset from edge
		y_step = image.height - margin - step_height - 10  # Slight offset from bottom

		# Draw rounded rectangle background for step number
		padding = 20  # Increased padding
		step_bg_bbox = (
			x_step - padding,
			y_step - padding,
			x_step + step_width + padding,
			y_step + step_height + padding,
		)
		draw.rounded_rectangle(
			step_bg_bbox,
			radius=15,  # Add rounded corners
			fill=text_box_color,
		)

		# Draw step number
		draw.text(
			(x_step, y_step),
			step_text,
			font=title_font,
			fill=text_color,
		)

	# Draw goal text (centered, bottom)
	max_width = image.width - (4 * margin)
	wrapped_goal = _wrap_text(goal_text, title_font, max_width)
	goal_bbox = draw.multiline_textbbox((0, 0), wrapped_goal, font=title_font)
	goal_width = goal_bbox[2] - goal_bbox[0]
	goal_height = goal_bbox[3] - goal_bbox[1]

	# Center goal text horizontally, place above step number
	x_goal = (image.width - goal_width) // 2
	y_goal = y_step - goal_height - padding * 4  # More space between step and goal

	# Draw rounded rectangle background for goal
	padding_goal = 25  # Increased padding for goal
	goal_bg_bbox = (
		x_goal - padding_goal,  # Remove extra space for logo
		y_goal - padding_goal,
		x_goal + goal_width + padding_goal,
		y_goal + goal_height + padding_goal,
	)
	draw.rounded_rectangle(
		goal_bg_bbox,
		radius=15,  # Add rounded corners
		fill=text_box_color,
	)

	# Draw goal text
	draw.multiline_text(
		(x_goal, y_goal),
		wrapped_goal,
		font=title_font,
		fill=text_color,
		align='center',
	)

	# Add logo if provided (top right corner)
	if logo:
		logo_layer = Image.new('RGBA', image.size, (0, 0, 0, 0))
		logo_margin = 20
		logo_x = image.width - logo.width - logo_margin
		logo_layer.paste(logo, (logo_x, logo_margin), logo if logo.mode == 'RGBA' else None)
		txt_layer = Image.alpha_composite(logo_layer, txt_layer)

	# Composite and convert
	result = Image.alpha_composite(image, txt_layer)
	return result.convert('RGB')


def _wrap_text(text: str, font: ImageFont.FreeTypeFont, max_width: int) -> str:
	"""
	Wrap text to fit within a given width.

	Args:
	    text: Text to wrap
	    font: Font to use for text
	    max_width: Maximum width in pixels

	Returns:
	    Wrapped text with newlines
	"""
	text = decode_unicode_escapes_to_utf8(text)
	words = text.split()
	lines = []
	current_line = []

	for word in words:
		current_line.append(word)
		line = ' '.join(current_line)
		bbox = font.getbbox(line)
		if bbox[2] > max_width:
			if len(current_line) == 1:
				lines.append(current_line.pop())
			else:
				current_line.pop()
				lines.append(' '.join(current_line))
				current_line = [word]

	if current_line:
		lines.append(' '.join(current_line))

	return '\n'.join(lines)

```

## /browser_use/agent/memory/__init__.py

```py path="/browser_use/agent/memory/__init__.py" 
from browser_use.agent.memory.service import Memory
from browser_use.agent.memory.views import MemoryConfig

__all__ = ['Memory', 'MemoryConfig']

```

## /browser_use/agent/memory/service.py

```py path="/browser_use/agent/memory/service.py" 
from __future__ import annotations

import logging
import os

from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
	BaseMessage,
	HumanMessage,
)
from langchain_core.messages.utils import convert_to_openai_messages

from browser_use.agent.memory.views import MemoryConfig
from browser_use.agent.message_manager.service import MessageManager
from browser_use.agent.message_manager.views import ManagedMessage, MessageMetadata
from browser_use.utils import time_execution_sync

logger = logging.getLogger(__name__)


class Memory:
	"""
	Manages procedural memory for agents.

	This class implements a procedural memory management system using Mem0 that transforms agent interaction history
	into concise, structured representations at specified intervals. It serves to optimize context window
	utilization during extended task execution by converting verbose historical information into compact,
	yet comprehensive memory constructs that preserve essential operational knowledge.
	"""

	def __init__(
		self,
		message_manager: MessageManager,
		llm: BaseChatModel,
		config: MemoryConfig | None = None,
	):
		self.message_manager = message_manager
		self.llm = llm

		# Initialize configuration with defaults based on the LLM if not provided
		if config is None:
			self.config = MemoryConfig(llm_instance=llm, agent_id=f'agent_{id(self)}')

			# Set appropriate embedder based on LLM type
			llm_class = llm.__class__.__name__
			if llm_class == 'ChatOpenAI':
				self.config.embedder_provider = 'openai'
				self.config.embedder_model = 'text-embedding-3-small'
				self.config.embedder_dims = 1536
			elif llm_class == 'ChatGoogleGenerativeAI':
				self.config.embedder_provider = 'gemini'
				self.config.embedder_model = 'models/text-embedding-004'
				self.config.embedder_dims = 768
			elif llm_class == 'ChatOllama':
				self.config.embedder_provider = 'ollama'
				self.config.embedder_model = 'nomic-embed-text'
				self.config.embedder_dims = 512
		else:
			# Ensure LLM instance is set in the config
			self.config = MemoryConfig(**dict(config))  # re-validate untrusted user-provided config
			self.config.llm_instance = llm

		# Check for required packages
		try:
			# also disable mem0's telemetry when ANONYMIZED_TELEMETRY=False
			if os.getenv('ANONYMIZED_TELEMETRY', 'true').lower()[0] in 'fn0':
				os.environ['MEM0_TELEMETRY'] = 'False'
			from mem0 import Memory as Mem0Memory
		except ImportError:
			raise ImportError('mem0 is required when enable_memory=True. Please install it with `pip install mem0`.')

		if self.config.embedder_provider == 'huggingface':
			try:
				# check that required package is installed if huggingface is used
				from sentence_transformers import SentenceTransformer  # noqa: F401
			except ImportError:
				raise ImportError(
					'sentence_transformers is required when enable_memory=True and embedder_provider="huggingface". Please install it with `pip install sentence-transformers`.'
				)

		# Initialize Mem0 with the configuration
		self.mem0 = Mem0Memory.from_config(config_dict=self.config.full_config_dict)

	@time_execution_sync('--create_procedural_memory')
	def create_procedural_memory(self, current_step: int) -> None:
		"""
		Create a procedural memory if needed based on the current step.

		Args:
		    current_step: The current step number of the agent
		"""
		logger.info(f'Creating procedural memory at step {current_step}')

		# Get all messages
		all_messages = self.message_manager.state.history.messages

		# Separate messages into those to keep as-is and those to process for memory
		new_messages = []
		messages_to_process = []

		for msg in all_messages:
			if isinstance(msg, ManagedMessage) and msg.metadata.message_type in {'init', 'memory'}:
				# Keep system and memory messages as they are
				new_messages.append(msg)
			else:
				if len(msg.message.content) > 0:
					messages_to_process.append(msg)

		# Need at least 2 messages to create a meaningful summary
		if len(messages_to_process) <= 1:
			logger.info('Not enough non-memory messages to summarize')
			return
		# Create a procedural memory
		memory_content = self._create([m.message for m in messages_to_process], current_step)

		if not memory_content:
			logger.warning('Failed to create procedural memory')
			return

		# Replace the processed messages with the consolidated memory
		memory_message = HumanMessage(content=memory_content)
		memory_tokens = self.message_manager._count_tokens(memory_message)
		memory_metadata = MessageMetadata(tokens=memory_tokens, message_type='memory')

		# Calculate the total tokens being removed
		removed_tokens = sum(m.metadata.tokens for m in messages_to_process)

		# Add the memory message
		new_messages.append(ManagedMessage(message=memory_message, metadata=memory_metadata))

		# Update the history
		self.message_manager.state.history.messages = new_messages
		self.message_manager.state.history.current_tokens -= removed_tokens
		self.message_manager.state.history.current_tokens += memory_tokens
		logger.info(f'Messages consolidated: {len(messages_to_process)} messages converted to procedural memory')

	def _create(self, messages: list[BaseMessage], current_step: int) -> str | None:
		parsed_messages = convert_to_openai_messages(messages)
		try:
			results = self.mem0.add(
				messages=parsed_messages,
				agent_id=self.config.agent_id,
				memory_type='procedural_memory',
				metadata={'step': current_step},
			)
			if len(results.get('results', [])):
				return results.get('results', [])[0].get('memory')
			return None
		except Exception as e:
			logger.error(f'Error creating procedural memory: {e}')
			return None

```

## /browser_use/agent/memory/views.py

```py path="/browser_use/agent/memory/views.py" 
from typing import Any, Literal

from langchain_core.language_models.chat_models import BaseChatModel
from pydantic import BaseModel, ConfigDict, Field


class MemoryConfig(BaseModel):
	"""Configuration for procedural memory."""

	model_config = ConfigDict(
		from_attributes=True, validate_default=True, revalidate_instances='always', validate_assignment=True
	)

	# Memory settings
	agent_id: str = Field(default='browser_use_agent', min_length=1)
	memory_interval: int = Field(default=10, gt=1, lt=100)

	# Embedder settings
	embedder_provider: Literal['openai', 'gemini', 'ollama', 'huggingface'] = 'huggingface'
	embedder_model: str = Field(min_length=2, default='all-MiniLM-L6-v2')
	embedder_dims: int = Field(default=384, gt=10, lt=10000)

	# LLM settings - the LLM instance can be passed separately
	llm_provider: Literal['langchain'] = 'langchain'
	llm_instance: BaseChatModel | None = None

	# Vector store settings
	vector_store_provider: Literal['faiss'] = 'faiss'
	vector_store_base_path: str = Field(default='/tmp/mem0')

	@property
	def vector_store_path(self) -> str:
		"""Returns the full vector store path for the current configuration. e.g. /tmp/mem0_384_faiss"""
		return f'{self.vector_store_base_path}_{self.embedder_dims}_{self.vector_store_provider}'

	@property
	def embedder_config_dict(self) -> dict[str, Any]:
		"""Returns the embedder configuration dictionary."""
		return {
			'provider': self.embedder_provider,
			'config': {'model': self.embedder_model, 'embedding_dims': self.embedder_dims},
		}

	@property
	def llm_config_dict(self) -> dict[str, Any]:
		"""Returns the LLM configuration dictionary."""
		return {'provider': self.llm_provider, 'config': {'model': self.llm_instance}}

	@property
	def vector_store_config_dict(self) -> dict[str, Any]:
		"""Returns the vector store configuration dictionary."""
		return {
			'provider': self.vector_store_provider,
			'config': {
				'embedding_model_dims': self.embedder_dims,
				'path': self.vector_store_path,
			},
		}

	@property
	def full_config_dict(self) -> dict[str, dict[str, Any]]:
		"""Returns the complete configuration dictionary for Mem0."""
		return {
			'embedder': self.embedder_config_dict,
			'llm': self.llm_config_dict,
			'vector_store': self.vector_store_config_dict,
		}

```

## /browser_use/agent/message_manager/service.py

```py path="/browser_use/agent/message_manager/service.py" 
from __future__ import annotations

import logging

from langchain_core.messages import (
	AIMessage,
	BaseMessage,
	HumanMessage,
	SystemMessage,
	ToolMessage,
)
from pydantic import BaseModel

from browser_use.agent.message_manager.views import MessageMetadata
from browser_use.agent.prompts import AgentMessagePrompt
from browser_use.agent.views import ActionResult, AgentOutput, AgentStepInfo, MessageManagerState
from browser_use.browser.views import BrowserState
from browser_use.utils import time_execution_sync

logger = logging.getLogger(__name__)


class MessageManagerSettings(BaseModel):
	max_input_tokens: int = 128000
	estimated_characters_per_token: int = 3
	image_tokens: int = 800
	include_attributes: list[str] = []
	message_context: str | None = None
	sensitive_data: dict[str, str] | None = None
	available_file_paths: list[str] | None = None


class MessageManager:
	def __init__(
		self,
		task: str,
		system_message: SystemMessage,
		settings: MessageManagerSettings = MessageManagerSettings(),
		state: MessageManagerState = MessageManagerState(),
	):
		self.task = task
		self.settings = settings
		self.state = state
		self.system_prompt = system_message

		# Only initialize messages if state is empty
		if len(self.state.history.messages) == 0:
			self._init_messages()

	def _init_messages(self) -> None:
		"""Initialize the message history with system message, context, task, and other initial messages"""
		self._add_message_with_tokens(self.system_prompt, message_type='init')

		if self.settings.message_context:
			context_message = HumanMessage(content='Context for the task' + self.settings.message_context)
			self._add_message_with_tokens(context_message, message_type='init')

		task_message = HumanMessage(
			content=f'Your ultimate task is: """{self.task}""". If you achieved your ultimate task, stop everything and use the done action in the next step to complete the task. If not, continue as usual.'
		)
		self._add_message_with_tokens(task_message, message_type='init')

		if self.settings.sensitive_data:
			info = f'Here are placeholders for sensitive data: {list(self.settings.sensitive_data.keys())}'
			info += '\nTo use them, write <secret>the placeholder name</secret>'
			info_message = HumanMessage(content=info)
			self._add_message_with_tokens(info_message, message_type='init')

		placeholder_message = HumanMessage(content='Example output:')
		self._add_message_with_tokens(placeholder_message, message_type='init')

		example_tool_call = AIMessage(
			content='',
			tool_calls=[
				{
					'name': 'AgentOutput',
					'args': {
						'current_state': {
							'evaluation_previous_goal': """
							Success - I successfully clicked on the 'Apple' link from the Google Search results page, 
							which directed me to the 'Apple' company homepage. This is a good start toward finding 
							the best place to buy a new iPhone as the Apple website often list iPhones for sale.
						""".strip(),
							'memory': """
							I searched for 'iPhone retailers' on Google. From the Google Search results page, 
							I used the 'click_element_by_index' tool to click on element at index [45] labeled 'Best Buy' but calling 
							the tool did not direct me to a new page. I then used the 'click_element_by_index' tool to click 
							on element at index [82] labeled 'Apple' which redirected me to the 'Apple' company homepage. 
							Currently at step 3/15.
						""".strip(),
							'next_goal': """
							Looking at reported structure of the current page, I can see the item '[127]<h3 iPhone/>' 
							in the content. I think this button will lead to more information and potentially prices 
							for iPhones. I'll click on the link at index [127] using the 'click_element_by_index' 
							tool and hope to see prices on the next page.
						""".strip(),
						},
						'action': [{'click_element_by_index': {'index': 127}}],
					},
					'id': str(self.state.tool_id),
					'type': 'tool_call',
				},
			],
		)
		self._add_message_with_tokens(example_tool_call, message_type='init')
		self.add_tool_message(content='Browser started', message_type='init')

		placeholder_message = HumanMessage(content='[Your task history memory starts here]')
		self._add_message_with_tokens(placeholder_message)

		if self.settings.available_file_paths:
			filepaths_msg = HumanMessage(content=f'Here are file paths you can use: {self.settings.available_file_paths}')
			self._add_message_with_tokens(filepaths_msg, message_type='init')

	def add_new_task(self, new_task: str) -> None:
		content = f'Your new ultimate task is: """{new_task}""". Take the previous context into account and finish your new ultimate task. '
		msg = HumanMessage(content=content)
		self._add_message_with_tokens(msg)
		self.task = new_task

	@time_execution_sync('--add_state_message')
	def add_state_message(
		self,
		state: BrowserState,
		result: list[ActionResult] | None = None,
		step_info: AgentStepInfo | None = None,
		use_vision=True,
	) -> None:
		"""Add browser state as human message"""

		# if keep in memory, add to directly to history and add state without result
		if result:
			for r in result:
				if r.include_in_memory:
					if r.extracted_content:
						msg = HumanMessage(content='Action result: ' + str(r.extracted_content))
						self._add_message_with_tokens(msg)
					if r.error:
						# if endswith \n, remove it
						if r.error.endswith('\n'):
							r.error = r.error[:-1]
						# get only last line of error
						last_line = r.error.split('\n')[-1]
						msg = HumanMessage(content='Action error: ' + last_line)
						self._add_message_with_tokens(msg)
					result = None  # if result in history, we dont want to add it again

		# otherwise add state message and result to next message (which will not stay in memory)
		state_message = AgentMessagePrompt(
			state,
			result,
			include_attributes=self.settings.include_attributes,
			step_info=step_info,
		).get_user_message(use_vision)
		self._add_message_with_tokens(state_message)

	def add_model_output(self, model_output: AgentOutput) -> None:
		"""Add model output as AI message"""
		tool_calls = [
			{
				'name': 'AgentOutput',
				'args': model_output.model_dump(mode='json', exclude_unset=True),
				'id': str(self.state.tool_id),
				'type': 'tool_call',
			}
		]

		msg = AIMessage(
			content='',
			tool_calls=tool_calls,
		)

		self._add_message_with_tokens(msg)
		# empty tool response
		self.add_tool_message(content='')

	def add_plan(self, plan: str | None, position: int | None = None) -> None:
		if plan:
			msg = AIMessage(content=plan)
			self._add_message_with_tokens(msg, position)

	@time_execution_sync('--get_messages')
	def get_messages(self) -> list[BaseMessage]:
		"""Get current message list, potentially trimmed to max tokens"""

		msg = [m.message for m in self.state.history.messages]
		# debug which messages are in history with token count # log
		total_input_tokens = 0
		logger.debug(f'Messages in history: {len(self.state.history.messages)}:')
		for m in self.state.history.messages:
			total_input_tokens += m.metadata.tokens
			logger.debug(f'{m.message.__class__.__name__} - Token count: {m.metadata.tokens}')
		logger.debug(f'Total input tokens: {total_input_tokens}')

		return msg

	def _add_message_with_tokens(
		self, message: BaseMessage, position: int | None = None, message_type: str | None = None
	) -> None:
		"""Add message with token count metadata
		position: None for last, -1 for second last, etc.
		"""

		# filter out sensitive data from the message
		if self.settings.sensitive_data:
			message = self._filter_sensitive_data(message)

		token_count = self._count_tokens(message)
		metadata = MessageMetadata(tokens=token_count, message_type=message_type)
		self.state.history.add_message(message, metadata, position)

	@time_execution_sync('--filter_sensitive_data')
	def _filter_sensitive_data(self, message: BaseMessage) -> BaseMessage:
		"""Filter out sensitive data from the message"""

		def replace_sensitive(value: str) -> str:
			if not self.settings.sensitive_data:
				return value

			# Create a dictionary with all key-value pairs from sensitive_data where value is not None or empty
			valid_sensitive_data = {k: v for k, v in self.settings.sensitive_data.items() if v}

			# If there are no valid sensitive data entries, just return the original value
			if not valid_sensitive_data:
				logger.warning('No valid entries found in sensitive_data dictionary')
				return value

			# Replace all valid sensitive data values with their placeholder tags
			for key, val in valid_sensitive_data.items():
				value = value.replace(val, f'<secret>{key}</secret>')

			return value

		if isinstance(message.content, str):
			message.content = replace_sensitive(message.content)
		elif isinstance(message.content, list):
			for i, item in enumerate(message.content):
				if isinstance(item, dict) and 'text' in item:
					item['text'] = replace_sensitive(item['text'])
					message.content[i] = item
		return message

	def _count_tokens(self, message: BaseMessage) -> int:
		"""Count tokens in a message using the model's tokenizer"""
		tokens = 0
		if isinstance(message.content, list):
			for item in message.content:
				if 'image_url' in item:
					tokens += self.settings.image_tokens
				elif isinstance(item, dict) and 'text' in item:
					tokens += self._count_text_tokens(item['text'])
		else:
			msg = message.content
			if hasattr(message, 'tool_calls'):
				msg += str(message.tool_calls)  # type: ignore
			tokens += self._count_text_tokens(msg)
		return tokens

	def _count_text_tokens(self, text: str) -> int:
		"""Count tokens in a text string"""
		tokens = len(text) // self.settings.estimated_characters_per_token  # Rough estimate if no tokenizer available
		return tokens

	def cut_messages(self):
		"""Get current message list, potentially trimmed to max tokens"""
		diff = self.state.history.current_tokens - self.settings.max_input_tokens
		if diff <= 0:
			return None

		msg = self.state.history.messages[-1]

		# if list with image remove image
		if isinstance(msg.message.content, list):
			text = ''
			for item in msg.message.content:
				if 'image_url' in item:
					msg.message.content.remove(item)
					diff -= self.settings.image_tokens
					msg.metadata.tokens -= self.settings.image_tokens
					self.state.history.current_tokens -= self.settings.image_tokens
					logger.debug(
						f'Removed image with {self.settings.image_tokens} tokens - total tokens now: {self.state.history.current_tokens}/{self.settings.max_input_tokens}'
					)
				elif 'text' in item and isinstance(item, dict):
					text += item['text']
			msg.message.content = text
			self.state.history.messages[-1] = msg

		if diff <= 0:
			return None

		# if still over, remove text from state message proportionally to the number of tokens needed with buffer
		# Calculate the proportion of content to remove
		proportion_to_remove = diff / msg.metadata.tokens
		if proportion_to_remove > 0.99:
			raise ValueError(
				f'Max token limit reached - history is too long - reduce the system prompt or task. '
				f'proportion_to_remove: {proportion_to_remove}'
			)
		logger.debug(
			f'Removing {proportion_to_remove * 100:.2f}% of the last message  {proportion_to_remove * msg.metadata.tokens:.2f} / {msg.metadata.tokens:.2f} tokens)'
		)

		content = msg.message.content
		characters_to_remove = int(len(content) * proportion_to_remove)
		content = content[:-characters_to_remove]

		# remove tokens and old long message
		self.state.history.remove_last_state_message()

		# new message with updated content
		msg = HumanMessage(content=content)
		self._add_message_with_tokens(msg)

		last_msg = self.state.history.messages[-1]

		logger.debug(
			f'Added message with {last_msg.metadata.tokens} tokens - total tokens now: {self.state.history.current_tokens}/{self.settings.max_input_tokens} - total messages: {len(self.state.history.messages)}'
		)

	def _remove_last_state_message(self) -> None:
		"""Remove last state message from history"""
		self.state.history.remove_last_state_message()

	def add_tool_message(self, content: str, message_type: str | None = None) -> None:
		"""Add tool message to history"""
		msg = ToolMessage(content=content, tool_call_id=str(self.state.tool_id))
		self.state.tool_id += 1
		self._add_message_with_tokens(msg, message_type=message_type)

```

## /browser_use/agent/message_manager/tests.py

```py path="/browser_use/agent/message_manager/tests.py" 
import pytest
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import AzureChatOpenAI, ChatOpenAI

from browser_use.agent.message_manager.service import MessageManager, MessageManagerSettings
from browser_use.agent.views import ActionResult
from browser_use.browser.views import BrowserState, TabInfo
from browser_use.dom.views import DOMElementNode, DOMTextNode


@pytest.fixture(
	params=[
		ChatOpenAI(model='gpt-4o-mini'),
		AzureChatOpenAI(model='gpt-4o', api_version='2024-02-15-preview'),
		ChatAnthropic(model_name='claude-3-5-sonnet-20240620', timeout=100, temperature=0.0, stop=None),
	],
	ids=['gpt-4o-mini', 'gpt-4o', 'claude-3-5-sonnet'],
)
def message_manager(request: pytest.FixtureRequest):
	task = 'Test task'
	action_descriptions = 'Test actions'
	return MessageManager(
		task=task,
		system_message=SystemMessage(content=action_descriptions),
		settings=MessageManagerSettings(
			max_input_tokens=1000,
			estimated_characters_per_token=3,
			image_tokens=800,
		),
	)


def test_initial_messages(message_manager: MessageManager):
	"""Test that message manager initializes with system and task messages"""
	messages = message_manager.get_messages()
	assert len(messages) == 2
	assert isinstance(messages[0], SystemMessage)
	assert isinstance(messages[1], HumanMessage)
	assert 'Test task' in messages[1].content


def test_add_state_message(message_manager: MessageManager):
	"""Test adding browser state message"""
	state = BrowserState(
		url='https://test.com',
		title='Test Page',
		element_tree=DOMElementNode(
			tag_name='div',
			attributes={},
			children=[],
			is_visible=True,
			parent=None,
			xpath='//div',
		),
		selector_map={},
		tabs=[TabInfo(page_id=1, url='https://test.com', title='Test Page')],
	)
	message_manager.add_state_message(state)

	messages = message_manager.get_messages()
	assert len(messages) == 3
	assert isinstance(messages[2], HumanMessage)
	assert 'https://test.com' in messages[2].content


def test_add_state_with_memory_result(message_manager: MessageManager):
	"""Test adding state with result that should be included in memory"""
	state = BrowserState(
		url='https://test.com',
		title='Test Page',
		element_tree=DOMElementNode(
			tag_name='div',
			attributes={},
			children=[],
			is_visible=True,
			parent=None,
			xpath='//div',
		),
		selector_map={},
		tabs=[TabInfo(page_id=1, url='https://test.com', title='Test Page')],
	)
	result = ActionResult(extracted_content='Important content', include_in_memory=True)

	message_manager.add_state_message(state, [result])
	messages = message_manager.get_messages()

	# Should have system, task, extracted content, and state messages
	assert len(messages) == 4
	assert 'Important content' in messages[2].content
	assert isinstance(messages[2], HumanMessage)
	assert isinstance(messages[3], HumanMessage)
	assert 'Important content' not in messages[3].content


def test_add_state_with_non_memory_result(message_manager: MessageManager):
	"""Test adding state with result that should not be included in memory"""
	state = BrowserState(
		url='https://test.com',
		title='Test Page',
		element_tree=DOMElementNode(
			tag_name='div',
			attributes={},
			children=[],
			is_visible=True,
			parent=None,
			xpath='//div',
		),
		selector_map={},
		tabs=[TabInfo(page_id=1, url='https://test.com', title='Test Page')],
	)
	result = ActionResult(extracted_content='Temporary content', include_in_memory=False)

	message_manager.add_state_message(state, [result])
	messages = message_manager.get_messages()

	# Should have system, task, and combined state+result message
	assert len(messages) == 3
	assert 'Temporary content' in messages[2].content
	assert isinstance(messages[2], HumanMessage)


@pytest.mark.skip('not sure how to fix this')
@pytest.mark.parametrize('max_tokens', [100000, 10000, 5000])
def test_token_overflow_handling_with_real_flow(message_manager: MessageManager, max_tokens):
	"""Test handling of token overflow in a realistic message flow"""
	# Set more realistic token limit
	message_manager.settings.max_input_tokens = max_tokens

	# Create a long sequence of interactions
	for i in range(200):  # Simulate 40 steps of interaction
		# Create state with varying content length
		state = BrowserState(
			url=f'https://test{i}.com',
			title=f'Test Page {i}',
			element_tree=DOMElementNode(
				tag_name='div',
				attributes={},
				children=[
					DOMTextNode(
						text=f'Content {j} ' * (10 + i),  # Increasing content length
						is_visible=True,
						parent=None,
					)
					for j in range(5)  # Multiple DOM items
				],
				is_visible=True,
				parent=None,
				xpath='//div',
			),
			selector_map={j: f'//div[{j}]' for j in range(5)},
			tabs=[TabInfo(page_id=1, url=f'https://test{i}.com', title=f'Test Page {i}')],
		)

		# Alternate between different types of results
		result = None
		if i % 2 == 0:  # Every other iteration
			result = ActionResult(
				extracted_content=f'Important content from step {i}' * 5,
				include_in_memory=i % 4 == 0,  # Include in memory every 4th message
			)

		# Add state message
		if result:
			message_manager.add_state_message(state, [result])
		else:
			message_manager.add_state_message(state)

		try:
			messages = message_manager.get_messages()
		except ValueError as e:
			if 'Max token limit reached - history is too long' in str(e):
				return  # If error occurs, end the test
			else:
				raise e

		assert message_manager.state.history.current_tokens <= message_manager.settings.max_input_tokens + 100

		last_msg = messages[-1]
		assert isinstance(last_msg, HumanMessage)

		if i % 4 == 0:
			assert isinstance(message_manager.state.history.messages[-2].message, HumanMessage)
		if i % 2 == 0 and not i % 4 == 0:
			if isinstance(last_msg.content, list):
				assert 'Current url: https://test' in last_msg.content[0]['text']
			else:
				assert 'Current url: https://test' in last_msg.content

		# Add model output every time
		from browser_use.agent.views import AgentBrain, AgentOutput
		from browser_use.controller.registry.views import ActionModel

		output = AgentOutput(
			current_state=AgentBrain(
				evaluation_previous_goal=f'Success in step {i}',
				memory=f'Memory from step {i}',
				next_goal=f'Goal for step {i + 1}',
			),
			action=[ActionModel()],
		)
		message_manager._remove_last_state_message()
		message_manager.add_model_output(output)

		# Get messages and verify after each addition
		messages = [m.message for m in message_manager.state.history.messages]

		# Verify token limit is respected

		# Verify essential messages are preserved
		assert isinstance(messages[0], SystemMessage)  # System prompt always first
		assert isinstance(messages[1], HumanMessage)  # Task always second
		assert 'Test task' in messages[1].content

		# Verify structure of latest messages
		assert isinstance(messages[-1], AIMessage)  # Last message should be model output
		assert f'step {i}' in messages[-1].content  # Should contain current step info

		# Log token usage for debugging
		token_usage = message_manager.state.history.current_tokens
		token_limit = message_manager.settings.max_input_tokens
		# print(f'Step {i}: Using {token_usage}/{token_limit} tokens')

		# go through all messages and verify that the token count and total tokens is correct
		total_tokens = 0
		real_tokens = []
		stored_tokens = []
		for msg in message_manager.state.history.messages:
			total_tokens += msg.metadata.tokens
			stored_tokens.append(msg.metadata.tokens)
			real_tokens.append(message_manager._count_tokens(msg.message))
		assert total_tokens == sum(real_tokens)
		assert stored_tokens == real_tokens
		assert message_manager.state.history.current_tokens == total_tokens


# pytest -s browser_use/agent/message_manager/tests.py

```

## /browser_use/agent/message_manager/utils.py

```py path="/browser_use/agent/message_manager/utils.py" 
from __future__ import annotations

import json
import logging
import os
import re
from typing import Any

from langchain_core.messages import (
	AIMessage,
	BaseMessage,
	HumanMessage,
	SystemMessage,
	ToolMessage,
)

logger = logging.getLogger(__name__)

MODELS_WITHOUT_TOOL_SUPPORT_PATTERNS = [
	'deepseek-reasoner',
	'deepseek-r1',
	'.*gemma.*-it',
]


def is_model_without_tool_support(model_name: str) -> bool:
	return any(re.match(pattern, model_name) for pattern in MODELS_WITHOUT_TOOL_SUPPORT_PATTERNS)


def extract_json_from_model_output(content: str) -> dict:
	"""Extract JSON from model output, handling both plain JSON and code-block-wrapped JSON."""
	try:
		# If content is wrapped in code blocks, extract just the JSON part
		if '\`\`\`' in content:
			# Find the JSON content between code blocks
			content = content.split('\`\`\`')[1]
			# Remove language identifier if present (e.g., 'json\n')
			if '\n' in content:
				content = content.split('\n', 1)[1]
		# Parse the cleaned content
		result_dict = json.loads(content)

		# some models occasionally respond with a list containing one dict: https://github.com/browser-use/browser-use/issues/1458
		if isinstance(result_dict, list) and len(result_dict) == 1 and isinstance(result_dict[0], dict):
			result_dict = result_dict[0]

		assert isinstance(result_dict, dict), f'Expected JSON dictionary in response, got JSON {type(result_dict)} instead'
		return result_dict
	except json.JSONDecodeError as e:
		logger.warning(f'Failed to parse model output: {content} {str(e)}')
		raise ValueError('Could not parse response.')


def convert_input_messages(input_messages: list[BaseMessage], model_name: str | None) -> list[BaseMessage]:
	"""Convert input messages to a format that is compatible with the planner model"""
	if model_name is None:
		return input_messages

	if is_model_without_tool_support(model_name):
		converted_input_messages = _convert_messages_for_non_function_calling_models(input_messages)
		merged_input_messages = _merge_successive_messages(converted_input_messages, HumanMessage)
		merged_input_messages = _merge_successive_messages(merged_input_messages, AIMessage)
		return merged_input_messages
	return input_messages


def _convert_messages_for_non_function_calling_models(input_messages: list[BaseMessage]) -> list[BaseMessage]:
	"""Convert messages for non-function-calling models"""
	output_messages = []
	for message in input_messages:
		if isinstance(message, HumanMessage):
			output_messages.append(message)
		elif isinstance(message, SystemMessage):
			output_messages.append(message)
		elif isinstance(message, ToolMessage):
			output_messages.append(HumanMessage(content=message.content))
		elif isinstance(message, AIMessage):
			# check if tool_calls is a valid JSON object
			if message.tool_calls:
				tool_calls = json.dumps(message.tool_calls)
				output_messages.append(AIMessage(content=tool_calls))
			else:
				output_messages.append(message)
		else:
			raise ValueError(f'Unknown message type: {type(message)}')
	return output_messages


def _merge_successive_messages(messages: list[BaseMessage], class_to_merge: type[BaseMessage]) -> list[BaseMessage]:
	"""Some models like deepseek-reasoner dont allow multiple human messages in a row. This function merges them into one."""
	merged_messages = []
	streak = 0
	for message in messages:
		if isinstance(message, class_to_merge):
			streak += 1
			if streak > 1:
				if isinstance(message.content, list):
					merged_messages[-1].content += message.content[0]['text']  # type:ignore
				else:
					merged_messages[-1].content += message.content
			else:
				merged_messages.append(message)
		else:
			merged_messages.append(message)
			streak = 0
	return merged_messages


def save_conversation(input_messages: list[BaseMessage], response: Any, target: str, encoding: str | None = None) -> None:
	"""Save conversation history to file."""

	# create folders if not exists
	if dirname := os.path.dirname(target):
		os.makedirs(dirname, exist_ok=True)

	with open(
		target,
		'w',
		encoding=encoding,
	) as f:
		_write_messages_to_file(f, input_messages)
		_write_response_to_file(f, response)


def _write_messages_to_file(f: Any, messages: list[BaseMessage]) -> None:
	"""Write messages to conversation file"""
	for message in messages:
		f.write(f' {message.__class__.__name__} \n')

		if isinstance(message.content, list):
			for item in message.content:
				if isinstance(item, dict) and item.get('type') == 'text':
					f.write(item['text'].strip() + '\n')
		elif isinstance(message.content, str):
			try:
				content = json.loads(message.content)
				f.write(json.dumps(content, indent=2) + '\n')
			except json.JSONDecodeError:
				f.write(message.content.strip() + '\n')

		f.write('\n')


def _write_response_to_file(f: Any, response: Any) -> None:
	"""Write model response to conversation file"""
	f.write(' RESPONSE\n')
	f.write(json.dumps(json.loads(response.model_dump_json(exclude_unset=True)), indent=2))

```

## /browser_use/agent/message_manager/views.py

```py path="/browser_use/agent/message_manager/views.py" 
from __future__ import annotations

from typing import TYPE_CHECKING, Any
from warnings import filterwarnings

from langchain_core._api import LangChainBetaWarning
from langchain_core.load import dumpd, load
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage
from pydantic import BaseModel, ConfigDict, Field, model_serializer, model_validator

filterwarnings('ignore', category=LangChainBetaWarning)

if TYPE_CHECKING:
	from browser_use.agent.views import AgentOutput


class MessageMetadata(BaseModel):
	"""Metadata for a message"""

	tokens: int = 0
	message_type: str | None = None


class ManagedMessage(BaseModel):
	"""A message with its metadata"""

	message: BaseMessage
	metadata: MessageMetadata = Field(default_factory=MessageMetadata)

	model_config = ConfigDict(arbitrary_types_allowed=True)

	# https://github.com/pydantic/pydantic/discussions/7558
	@model_serializer(mode='wrap')
	def to_json(self, original_dump):
		"""
		Returns the JSON representation of the model.

		It uses langchain's `dumps` function to serialize the `message`
		property before encoding the overall dict with json.dumps.
		"""
		data = original_dump(self)

		# NOTE: We override the message field to use langchain JSON serialization.
		data['message'] = dumpd(self.message)

		return data

	@model_validator(mode='before')
	@classmethod
	def validate(
		cls,
		value: Any,
		*,
		strict: bool | None = None,
		from_attributes: bool | None = None,
		context: Any | None = None,
	) -> Any:
		"""
		Custom validator that uses langchain's `loads` function
		to parse the message if it is provided as a JSON string.
		"""
		if isinstance(value, dict) and 'message' in value:
			# NOTE: We use langchain's load to convert the JSON string back into a BaseMessage object.
			filterwarnings('ignore', category=LangChainBetaWarning)
			value['message'] = load(value['message'])
		return value


class MessageHistory(BaseModel):
	"""History of messages with metadata"""

	messages: list[ManagedMessage] = Field(default_factory=list)
	current_tokens: int = 0

	model_config = ConfigDict(arbitrary_types_allowed=True)

	def add_message(self, message: BaseMessage, metadata: MessageMetadata, position: int | None = None) -> None:
		"""Add message with metadata to history"""
		if position is None:
			self.messages.append(ManagedMessage(message=message, metadata=metadata))
		else:
			self.messages.insert(position, ManagedMessage(message=message, metadata=metadata))
		self.current_tokens += metadata.tokens

	def add_model_output(self, output: AgentOutput) -> None:
		"""Add model output as AI message"""
		tool_calls = [
			{
				'name': 'AgentOutput',
				'args': output.model_dump(mode='json', exclude_unset=True),
				'id': '1',
				'type': 'tool_call',
			}
		]

		msg = AIMessage(
			content='',
			tool_calls=tool_calls,
		)
		self.add_message(msg, MessageMetadata(tokens=100))  # Estimate tokens for tool calls

		# Empty tool response
		tool_message = ToolMessage(content='', tool_call_id='1')
		self.add_message(tool_message, MessageMetadata(tokens=10))  # Estimate tokens for empty response

	def get_messages(self) -> list[BaseMessage]:
		"""Get all messages"""
		return [m.message for m in self.messages]

	def get_total_tokens(self) -> int:
		"""Get total tokens in history"""
		return self.current_tokens

	def remove_oldest_message(self) -> None:
		"""Remove oldest non-system message"""
		for i, msg in enumerate(self.messages):
			if not isinstance(msg.message, SystemMessage):
				self.current_tokens -= msg.metadata.tokens
				self.messages.pop(i)
				break

	def remove_last_state_message(self) -> None:
		"""Remove last state message from history"""
		if len(self.messages) > 2 and isinstance(self.messages[-1].message, HumanMessage):
			self.current_tokens -= self.messages[-1].metadata.tokens
			self.messages.pop()


class MessageManagerState(BaseModel):
	"""Holds the state for MessageManager"""

	history: MessageHistory = Field(default_factory=MessageHistory)
	tool_id: int = 1

	model_config = ConfigDict(arbitrary_types_allowed=True)

```

## /browser_use/agent/playwright_script_generator.py

```py path="/browser_use/agent/playwright_script_generator.py" 
import json
import logging
from pathlib import Path
from typing import Any

from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextConfig

logger = logging.getLogger(__name__)


class PlaywrightScriptGenerator:
	"""Generates a Playwright script from AgentHistoryList."""

	def __init__(
		self,
		history_list: list[dict[str, Any]],
		sensitive_data_keys: list[str] | None = None,
		browser_config: BrowserConfig | None = None,
		context_config: BrowserContextConfig | None = None,
	):
		"""
		Initializes the script generator.

		Args:
		    history_list: A list of dictionaries, where each dictionary represents an AgentHistory item.
		                 Expected to be raw dictionaries from `AgentHistoryList.model_dump()`.
		    sensitive_data_keys: A list of keys used as placeholders for sensitive data.
		    browser_config: Configuration from the original Browser instance.
		    context_config: Configuration from the original BrowserContext instance.
		"""
		self.history = history_list
		self.sensitive_data_keys = sensitive_data_keys or []
		self.browser_config = browser_config
		self.context_config = context_config
		self._imports_helpers_added = False
		self._page_counter = 0  # Track pages for tab management

		# Dictionary mapping action types to handler methods
		self._action_handlers = {
			'go_to_url': self._map_go_to_url,
			'wait': self._map_wait,
			'input_text': self._map_input_text,
			'click_element': self._map_click_element,
			'click_element_by_index': self._map_click_element,  # Map legacy action
			'scroll_down': self._map_scroll_down,
			'scroll_up': self._map_scroll_up,
			'send_keys': self._map_send_keys,
			'go_back': self._map_go_back,
			'open_tab': self._map_open_tab,
			'close_tab': self._map_close_tab,
			'switch_tab': self._map_switch_tab,
			'search_google': self._map_search_google,
			'drag_drop': self._map_drag_drop,
			'extract_content': self._map_extract_content,
			'click_download_button': self._map_click_download_button,
			'done': self._map_done,
		}

	def _generate_browser_launch_args(self) -> str:
		"""Generates the arguments string for browser launch based on BrowserConfig."""
		if not self.browser_config:
			# Default launch if no config provided
			return 'headless=False'

		args_dict = {
			'headless': self.browser_config.headless,
			# Add other relevant launch options here based on self.browser_config
			# Example: 'proxy': self.browser_config.proxy.model_dump() if self.browser_config.proxy else None
			# Example: 'args': self.browser_config.extra_browser_args # Be careful inheriting args
		}
		if self.browser_config.proxy:
			args_dict['proxy'] = self.browser_config.proxy.model_dump()

		# Filter out None values
		args_dict = {k: v for k, v in args_dict.items() if v is not None}

		# Format as keyword arguments string
		args_str = ', '.join(f'{key}={repr(value)}' for key, value in args_dict.items())
		return args_str

	def _generate_context_options(self) -> str:
		"""Generates the options string for context creation based on BrowserContextConfig."""
		if not self.context_config:
			return ''  # Default context

		options_dict = {}

		# Map relevant BrowserContextConfig fields to Playwright context options
		if self.context_config.user_agent:
			options_dict['user_agent'] = self.context_config.user_agent
		if self.context_config.locale:
			options_dict['locale'] = self.context_config.locale
		if self.context_config.permissions:
			options_dict['permissions'] = self.context_config.permissions
		if self.context_config.geolocation:
			options_dict['geolocation'] = self.context_config.geolocation
		if self.context_config.timezone_id:
			options_dict['timezone_id'] = self.context_config.timezone_id
		if self.context_config.http_credentials:
			options_dict['http_credentials'] = self.context_config.http_credentials
		if self.context_config.is_mobile is not None:
			options_dict['is_mobile'] = self.context_config.is_mobile
		if self.context_config.has_touch is not None:
			options_dict['has_touch'] = self.context_config.has_touch
		if self.context_config.save_recording_path:
			options_dict['record_video_dir'] = self.context_config.save_recording_path
		if self.context_config.save_har_path:
			options_dict['record_har_path'] = self.context_config.save_har_path

		# Handle viewport/window size
		if self.context_config.no_viewport:
			options_dict['no_viewport'] = True
		elif hasattr(self.context_config, 'window_width') and hasattr(self.context_config, 'window_height'):
			options_dict['viewport'] = {
				'width': self.context_config.window_width,
				'height': self.context_config.window_height,
			}

		# Note: cookies_file and save_downloads_path are handled separately

		# Filter out None values
		options_dict = {k: v for k, v in options_dict.items() if v is not None}

		# Format as keyword arguments string
		options_str = ', '.join(f'{key}={repr(value)}' for key, value in options_dict.items())
		return options_str

	def _get_imports_and_helpers(self) -> list[str]:
		"""Generates necessary import statements (excluding helper functions)."""
		# Return only the standard imports needed by the main script body
		return [
			'import asyncio',
			'import json',
			'import os',
			'import sys',
			'from pathlib import Path',  # Added Path import
			'import urllib.parse',  # Needed for search_google
			'from playwright.async_api import async_playwright, Page, BrowserContext',  # Added BrowserContext
			'from dotenv import load_dotenv',
			'',
			'# Load environment variables',
			'load_dotenv(override=True)',
			'',
			# Helper function definitions are no longer here
		]

	def _get_sensitive_data_definitions(self) -> list[str]:
		"""Generates the SENSITIVE_DATA dictionary definition."""
		if not self.sensitive_data_keys:
			return ['SENSITIVE_DATA = {}', '']

		lines = ['# Sensitive data placeholders mapped to environment variables']
		lines.append('SENSITIVE_DATA = {')
		for key in self.sensitive_data_keys:
			env_var_name = key.upper()
			default_value_placeholder = f'YOUR_{env_var_name}'
			lines.append(f'    "{key}": os.getenv("{env_var_name}", {json.dumps(default_value_placeholder)}),')
		lines.append('}')
		lines.append('')
		return lines

	def _get_selector_for_action(self, history_item: dict, action_index_in_step: int) -> str | None:
		"""
		Gets the selector (preferring XPath) for a given action index within a history step.
		Formats the XPath correctly for Playwright.
		"""
		state = history_item.get('state')
		if not isinstance(state, dict):
			return None
		interacted_elements = state.get('interacted_element')
		if not isinstance(interacted_elements, list):
			return None
		if action_index_in_step >= len(interacted_elements):
			return None
		element_data = interacted_elements[action_index_in_step]
		if not isinstance(element_data, dict):
			return None

		# Prioritize XPath
		xpath = element_data.get('xpath')
		if isinstance(xpath, str) and xpath.strip():
			if not xpath.startswith('xpath=') and not xpath.startswith('/') and not xpath.startswith('//'):
				xpath_selector = f'xpath=//{xpath}'  # Make relative if not already
			elif not xpath.startswith('xpath='):
				xpath_selector = f'xpath={xpath}'  # Add prefix if missing
			else:
				xpath_selector = xpath
			return xpath_selector

		# Fallback to CSS selector if XPath is missing
		css_selector = element_data.get('css_selector')
		if isinstance(css_selector, str) and css_selector.strip():
			return css_selector  # Use CSS selector as is

		logger.warning(
			f'Could not find a usable XPath or CSS selector for action index {action_index_in_step} (element index {element_data.get("highlight_index", "N/A")}).'
		)
		return None

	def _get_goto_timeout(self) -> int:
		"""Gets the page navigation timeout in milliseconds."""
		default_timeout = 90000  # Default 90 seconds
		if self.context_config and self.context_config.maximum_wait_page_load_time:
			# Convert seconds to milliseconds
			return int(self.context_config.maximum_wait_page_load_time * 1000)
		return default_timeout

	# --- Action Mapping Methods ---
	def _map_go_to_url(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		url = params.get('url')
		goto_timeout = self._get_goto_timeout()
		script_lines = []
		if url and isinstance(url, str):
			escaped_url = json.dumps(url)
			script_lines.append(f'            print(f"Navigating to: {url} ({step_info_str})")')
			script_lines.append(f'            await page.goto({escaped_url}, timeout={goto_timeout})')
			script_lines.append(f"            await page.wait_for_load_state('load', timeout={goto_timeout})")
			script_lines.append('            await page.wait_for_timeout(1000)')  # Short pause
		else:
			script_lines.append(f'            # Skipping go_to_url ({step_info_str}): missing or invalid url')
		return script_lines

	def _map_wait(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		seconds = params.get('seconds', 3)
		try:
			wait_seconds = int(seconds)
		except (ValueError, TypeError):
			wait_seconds = 3
		return [
			f'            print(f"Waiting for {wait_seconds} seconds... ({step_info_str})")',
			f'            await asyncio.sleep({wait_seconds})',
		]

	def _map_input_text(
		self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, **kwargs
	) -> list[str]:
		index = params.get('index')
		text = params.get('text', '')
		selector = self._get_selector_for_action(history_item, action_index_in_step)
		script_lines = []
		if selector and index is not None:
			clean_text_expression = f'replace_sensitive_data({json.dumps(str(text))}, SENSITIVE_DATA)'
			escaped_selector = json.dumps(selector)
			escaped_step_info = json.dumps(step_info_str)
			script_lines.append(
				f'            await _try_locate_and_act(page, {escaped_selector}, "fill", text={clean_text_expression}, step_info={escaped_step_info})'
			)
		else:
			script_lines.append(
				f'            # Skipping input_text ({step_info_str}): missing index ({index}) or selector ({selector})'
			)
		return script_lines

	def _map_click_element(
		self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, action_type: str, **kwargs
	) -> list[str]:
		if action_type == 'click_element_by_index':
			logger.warning(f"Mapping legacy 'click_element_by_index' to 'click_element' ({step_info_str})")
		index = params.get('index')
		selector = self._get_selector_for_action(history_item, action_index_in_step)
		script_lines = []
		if selector and index is not None:
			escaped_selector = json.dumps(selector)
			escaped_step_info = json.dumps(step_info_str)
			script_lines.append(
				f'            await _try_locate_and_act(page, {escaped_selector}, "click", step_info={escaped_step_info})'
			)
		else:
			script_lines.append(
				f'            # Skipping {action_type} ({step_info_str}): missing index ({index}) or selector ({selector})'
			)
		return script_lines

	def _map_scroll_down(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		amount = params.get('amount')
		script_lines = []
		if amount and isinstance(amount, int):
			script_lines.append(f'            print(f"Scrolling down by {amount} pixels ({step_info_str})")')
			script_lines.append(f"            await page.evaluate('window.scrollBy(0, {amount})')")
		else:
			script_lines.append(f'            print(f"Scrolling down by one page height ({step_info_str})")')
			script_lines.append("            await page.evaluate('window.scrollBy(0, window.innerHeight)')")
		script_lines.append('            await page.wait_for_timeout(500)')
		return script_lines

	def _map_scroll_up(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		amount = params.get('amount')
		script_lines = []
		if amount and isinstance(amount, int):
			script_lines.append(f'            print(f"Scrolling up by {amount} pixels ({step_info_str})")')
			script_lines.append(f"            await page.evaluate('window.scrollBy(0, -{amount})')")
		else:
			script_lines.append(f'            print(f"Scrolling up by one page height ({step_info_str})")')
			script_lines.append("            await page.evaluate('window.scrollBy(0, -window.innerHeight)')")
		script_lines.append('            await page.wait_for_timeout(500)')
		return script_lines

	def _map_send_keys(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		keys = params.get('keys')
		script_lines = []
		if keys and isinstance(keys, str):
			escaped_keys = json.dumps(keys)
			script_lines.append(f'            print(f"Sending keys: {keys} ({step_info_str})")')
			script_lines.append(f'            await page.keyboard.press({escaped_keys})')
			script_lines.append('            await page.wait_for_timeout(500)')
		else:
			script_lines.append(f'            # Skipping send_keys ({step_info_str}): missing or invalid keys')
		return script_lines

	def _map_go_back(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		goto_timeout = self._get_goto_timeout()
		return [
			'            await asyncio.sleep(60)  # Wait 1 minute (important) before going back',
			f'            print(f"Navigating back using browser history ({step_info_str})")',
			f'            await page.go_back(timeout={goto_timeout})',
			f"            await page.wait_for_load_state('load', timeout={goto_timeout})",
			'            await page.wait_for_timeout(1000)',
		]

	def _map_open_tab(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		url = params.get('url')
		goto_timeout = self._get_goto_timeout()
		script_lines = []
		if url and isinstance(url, str):
			escaped_url = json.dumps(url)
			script_lines.append(f'            print(f"Opening new tab and navigating to: {url} ({step_info_str})")')
			script_lines.append('            page = await context.new_page()')
			script_lines.append(f'            await page.goto({escaped_url}, timeout={goto_timeout})')
			script_lines.append(f"            await page.wait_for_load_state('load', timeout={goto_timeout})")
			script_lines.append('            await page.wait_for_timeout(1000)')
			self._page_counter += 1  # Increment page counter
		else:
			script_lines.append(f'            # Skipping open_tab ({step_info_str}): missing or invalid url')
		return script_lines

	def _map_close_tab(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		page_id = params.get('page_id')
		script_lines = []
		if page_id is not None:
			script_lines.extend(
				[
					f'            print(f"Attempting to close tab with page_id {page_id} ({step_info_str})")',
					f'            if {page_id} < len(context.pages):',
					f'                target_page = context.pages[{page_id}]',
					'                await target_page.close()',
					'                await page.wait_for_timeout(500)',
					'                if context.pages: page = context.pages[-1]',  # Switch to last page
					'                else:',
					"                    print('  Warning: No pages left after closing tab. Cannot switch.', file=sys.stderr)",
					'                    # Optionally, create a new page here if needed: page = await context.new_page()',
					'                if page: await page.bring_to_front()',  # Bring to front if page exists
					'            else:',
					f'                print(f"  Warning: Tab with page_id {page_id} not found to close ({step_info_str})", file=sys.stderr)',
				]
			)
		else:
			script_lines.append(f'            # Skipping close_tab ({step_info_str}): missing page_id')
		return script_lines

	def _map_switch_tab(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		page_id = params.get('page_id')
		script_lines = []
		if page_id is not None:
			script_lines.extend(
				[
					f'            print(f"Switching to tab with page_id {page_id} ({step_info_str})")',
					f'            if {page_id} < len(context.pages):',
					f'                page = context.pages[{page_id}]',
					'                await page.bring_to_front()',
					"                await page.wait_for_load_state('load', timeout=15000)",
					'                await page.wait_for_timeout(500)',
					'            else:',
					f'                print(f"  Warning: Tab with page_id {page_id} not found to switch ({step_info_str})", file=sys.stderr)',
				]
			)
		else:
			script_lines.append(f'            # Skipping switch_tab ({step_info_str}): missing page_id')
		return script_lines

	def _map_search_google(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		query = params.get('query')
		goto_timeout = self._get_goto_timeout()
		script_lines = []
		if query and isinstance(query, str):
			clean_query = f'replace_sensitive_data({json.dumps(query)}, SENSITIVE_DATA)'
			search_url_expression = f'f"https://www.google.com/search?q={{ urllib.parse.quote_plus({clean_query}) }}&udm=14"'
			script_lines.extend(
				[
					f'            search_url = {search_url_expression}',
					f'            print(f"Searching Google for query related to: {{ {clean_query} }} ({step_info_str})")',
					f'            await page.goto(search_url, timeout={goto_timeout})',
					f"            await page.wait_for_load_state('load', timeout={goto_timeout})",
					'            await page.wait_for_timeout(1000)',
				]
			)
		else:
			script_lines.append(f'            # Skipping search_google ({step_info_str}): missing or invalid query')
		return script_lines

	def _map_drag_drop(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		source_sel = params.get('element_source')
		target_sel = params.get('element_target')
		source_coords = (params.get('coord_source_x'), params.get('coord_source_y'))
		target_coords = (params.get('coord_target_x'), params.get('coord_target_y'))
		script_lines = [f'            print(f"Attempting drag and drop ({step_info_str})")']
		if source_sel and target_sel:
			escaped_source = json.dumps(source_sel)
			escaped_target = json.dumps(target_sel)
			script_lines.append(f'            await page.drag_and_drop({escaped_source}, {escaped_target})')
			script_lines.append(f"            print(f'  Dragged element {escaped_source} to {escaped_target}')")
		elif all(c is not None for c in source_coords) and all(c is not None for c in target_coords):
			sx, sy = source_coords
			tx, ty = target_coords
			script_lines.extend(
				[
					f'            await page.mouse.move({sx}, {sy})',
					'            await page.mouse.down()',
					f'            await page.mouse.move({tx}, {ty})',
					'            await page.mouse.up()',
					f"            print(f'  Dragged from ({sx},{sy}) to ({tx},{ty})')",
				]
			)
		else:
			script_lines.append(
				f'            # Skipping drag_drop ({step_info_str}): requires either element selectors or full coordinates'
			)
		script_lines.append('            await page.wait_for_timeout(500)')
		return script_lines

	def _map_extract_content(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		goal = params.get('goal', 'content')
		logger.warning(f"Action 'extract_content' ({step_info_str}) cannot be directly translated to Playwright script.")
		return [f'            # Action: extract_content (Goal: {goal}) - Skipped in Playwright script ({step_info_str})']

	def _map_click_download_button(
		self, params: dict, history_item: dict, action_index_in_step: int, step_info_str: str, **kwargs
	) -> list[str]:
		index = params.get('index')
		selector = self._get_selector_for_action(history_item, action_index_in_step)
		download_dir_in_script = "'./files'"  # Default
		if self.context_config and self.context_config.save_downloads_path:
			download_dir_in_script = repr(self.context_config.save_downloads_path)

		script_lines = []
		if selector and index is not None:
			script_lines.append(
				f'            print(f"Attempting to download file by clicking element ({selector}) ({step_info_str})")'
			)
			script_lines.append('            try:')
			script_lines.append(
				'                async with page.expect_download(timeout=120000) as download_info:'
			)  # 2 min timeout
			step_info_for_download = f'{step_info_str} (triggering download)'
			script_lines.append(
				f'                    await _try_locate_and_act(page, {json.dumps(selector)}, "click", step_info={json.dumps(step_info_for_download)})'
			)
			script_lines.append('                download = await download_info.value')
			script_lines.append(f'                configured_download_dir = {download_dir_in_script}')
			script_lines.append('                download_dir_path = Path(configured_download_dir).resolve()')
			script_lines.append('                download_dir_path.mkdir(parents=True, exist_ok=True)')
			script_lines.append(
				"                base, ext = os.path.splitext(download.suggested_filename or f'download_{{len(list(download_dir_path.iterdir())) + 1}}.tmp')"
			)
			script_lines.append('                counter = 1')
			script_lines.append("                download_path_obj = download_dir_path / f'{base}{ext}'")
			script_lines.append('                while download_path_obj.exists():')
			script_lines.append("                    download_path_obj = download_dir_path / f'{base}({{counter}}){ext}'")
			script_lines.append('                    counter += 1')
			script_lines.append('                await download.save_as(str(download_path_obj))')
			script_lines.append("                print(f'  File downloaded successfully to: {str(download_path_obj)}')")
			script_lines.append('            except PlaywrightActionError as pae:')
			script_lines.append('                raise pae')  # Re-raise to stop script
			script_lines.append('            except Exception as download_err:')
			script_lines.append(
				f"                raise PlaywrightActionError(f'Download failed for {step_info_str}: {{download_err}}') from download_err"
			)
		else:
			script_lines.append(
				f'            # Skipping click_download_button ({step_info_str}): missing index ({index}) or selector ({selector})'
			)
		return script_lines

	def _map_done(self, params: dict, step_info_str: str, **kwargs) -> list[str]:
		script_lines = []
		if isinstance(params, dict):
			final_text = params.get('text', '')
			success_status = params.get('success', False)
			escaped_final_text_with_placeholders = json.dumps(str(final_text))
			script_lines.append(f'            print("\\n--- Task marked as Done by agent ({step_info_str}) ---")')
			script_lines.append(f'            print(f"Agent reported success: {success_status}")')
			script_lines.append('            # Final Message from agent (may contain placeholders):')
			script_lines.append(
				f'            final_message = replace_sensitive_data({escaped_final_text_with_placeholders}, SENSITIVE_DATA)'
			)
			script_lines.append('            print(final_message)')
		else:
			script_lines.append(f'            print("\\n--- Task marked as Done by agent ({step_info_str}) ---")')
			script_lines.append('            print("Success: N/A (invalid params)")')
			script_lines.append('            print("Final Message: N/A (invalid params)")')
		return script_lines

	def _map_action_to_playwright(
		self,
		action_dict: dict,
		history_item: dict,
		previous_history_item: dict | None,
		action_index_in_step: int,
		step_info_str: str,
	) -> list[str]:
		"""
		Translates a single action dictionary into Playwright script lines using dictionary dispatch.
		"""
		if not isinstance(action_dict, dict) or not action_dict:
			return [f'            # Invalid action format: {action_dict} ({step_info_str})']

		action_type = next(iter(action_dict.keys()), None)
		params = action_dict.get(action_type)

		if not action_type or params is None:
			if action_dict == {}:
				return [f'            # Empty action dictionary found ({step_info_str})']
			return [f'            # Could not determine action type or params: {action_dict} ({step_info_str})']

		# Get the handler function from the dictionary
		handler = self._action_handlers.get(action_type)

		if handler:
			# Call the specific handler method
			return handler(
				params=params,
				history_item=history_item,
				action_index_in_step=action_index_in_step,
				step_info_str=step_info_str,
				action_type=action_type,  # Pass action_type for legacy handling etc.
				previous_history_item=previous_history_item,
			)
		else:
			# Handle unsupported actions
			logger.warning(f'Unsupported action type encountered: {action_type} ({step_info_str})')
			return [f'            # Unsupported action type: {action_type} ({step_info_str})']

	def generate_script_content(self) -> str:
		"""Generates the full Playwright script content as a string."""
		script_lines = []
		self._page_counter = 0  # Reset page counter for new script generation

		if not self._imports_helpers_added:
			script_lines.extend(self._get_imports_and_helpers())
			self._imports_helpers_added = True

		# Read helper script content
		helper_script_path = Path(__file__).parent / 'playwright_script_helpers.py'
		try:
			with open(helper_script_path, encoding='utf-8') as f_helper:
				helper_script_content = f_helper.read()
		except FileNotFoundError:
			logger.error(f'Helper script not found at {helper_script_path}. Cannot generate script.')
			return '# Error: Helper script file missing.'
		except Exception as e:
			logger.error(f'Error reading helper script {helper_script_path}: {e}')
			return f'# Error: Could not read helper script: {e}'

		script_lines.extend(self._get_sensitive_data_definitions())

		# Add the helper script content after imports and sensitive data
		script_lines.append('\n# --- Helper Functions (from playwright_script_helpers.py) ---')
		script_lines.append(helper_script_content)
		script_lines.append('# --- End Helper Functions ---')

		# Generate browser launch and context creation code
		browser_launch_args = self._generate_browser_launch_args()
		context_options = self._generate_context_options()
		# Determine browser type (defaulting to chromium)
		browser_type = 'chromium'
		if self.browser_config and self.browser_config.browser_class in ['firefox', 'webkit']:
			browser_type = self.browser_config.browser_class

		script_lines.extend(
			[
				'async def run_generated_script():',
				'    global SENSITIVE_DATA',  # Ensure sensitive data is accessible
				'    async with async_playwright() as p:',
				'        browser = None',
				'        context = None',
				'        page = None',
				'        exit_code = 0 # Default success exit code',
				'        try:',
				f"            print('Launching {browser_type} browser...')",
				# Use generated launch args, remove slow_mo
				f'            browser = await p.{browser_type}.launch({browser_launch_args})',
				# Use generated context options
				f'            context = await browser.new_context({context_options})',
				"            print('Browser context created.')",
			]
		)

		# Add cookie loading logic if cookies_file is specified
		if self.context_config and self.context_config.cookies_file:
			cookies_file_path = repr(self.context_config.cookies_file)
			script_lines.extend(
				[
					'            # Load cookies if specified',
					f'            cookies_path = {cookies_file_path}',
					'            if cookies_path and os.path.exists(cookies_path):',
					'                try:',
					"                    with open(cookies_path, 'r', encoding='utf-8') as f_cookies:",
					'                        cookies = json.load(f_cookies)',
					'                        # Validate sameSite attribute',
					"                        valid_same_site = ['Strict', 'Lax', 'None']",
					'                        for cookie in cookies:',
					"                            if 'sameSite' in cookie and cookie['sameSite'] not in valid_same_site:",
					'                                print(f\'  Warning: Fixing invalid sameSite value "{{cookie["sameSite"]}}" to None for cookie {{cookie.get("name")}}\', file=sys.stderr)',
					"                                cookie['sameSite'] = 'None'",
					'                        await context.add_cookies(cookies)',
					"                        print(f'  Successfully loaded {{len(cookies)}} cookies from {{cookies_path}}')",
					'                except Exception as cookie_err:',
					"                    print(f'  Warning: Failed to load or add cookies from {{cookies_path}}: {{cookie_err}}', file=sys.stderr)",
					'            else:',
					'                if cookies_path:',  # Only print if a path was specified but not found
					"                    print(f'  Cookie file not found at: {cookies_path}')",
					'',
				]
			)

		script_lines.extend(
			[
				'            # Initial page handling',
				'            if context.pages:',
				'                page = context.pages[0]',
				"                print('Using initial page provided by context.')",
				'            else:',
				'                page = await context.new_page()',
				"                print('Created a new page as none existed.')",
				"            print('\\n--- Starting Generated Script Execution ---')",
			]
		)

		action_counter = 0
		stop_processing_steps = False
		previous_item_dict = None

		for step_index, item_dict in enumerate(self.history):
			if stop_processing_steps:
				break

			if not isinstance(item_dict, dict):
				logger.warning(f'Skipping step {step_index + 1}: Item is not a dictionary ({type(item_dict)})')
				script_lines.append(f'\n            # --- Step {step_index + 1}: Skipped (Invalid Format) ---')
				previous_item_dict = item_dict
				continue

			script_lines.append(f'\n            # --- Step {step_index + 1} ---')
			model_output = item_dict.get('model_output')

			if not isinstance(model_output, dict) or 'action' not in model_output:
				script_lines.append('            # No valid model_output or action found for this step')
				previous_item_dict = item_dict
				continue

			actions = model_output.get('action')
			if not isinstance(actions, list):
				script_lines.append(f'            # Actions format is not a list: {type(actions)}')
				previous_item_dict = item_dict
				continue

			for action_index_in_step, action_detail in enumerate(actions):
				action_counter += 1
				script_lines.append(f'            # Action {action_counter}')

				step_info_str = f'Step {step_index + 1}, Action {action_index_in_step + 1}'
				action_lines = self._map_action_to_playwright(
					action_dict=action_detail,
					history_item=item_dict,
					previous_history_item=previous_item_dict,
					action_index_in_step=action_index_in_step,
					step_info_str=step_info_str,
				)
				script_lines.extend(action_lines)

				action_type = next(iter(action_detail.keys()), None) if isinstance(action_detail, dict) else None
				if action_type == 'done':
					stop_processing_steps = True
					break

			previous_item_dict = item_dict

		# Updated final block to include sys.exit
		script_lines.extend(
			[
				'        except PlaywrightActionError as pae:',  # Catch specific action errors
				"            print(f'\\n--- Playwright Action Error: {pae} ---', file=sys.stderr)",
				'            exit_code = 1',  # Set exit code to failure
				'        except Exception as e:',
				"            print(f'\\n--- An unexpected error occurred: {e} ---', file=sys.stderr)",
				'            import traceback',
				'            traceback.print_exc()',
				'            exit_code = 1',  # Set exit code to failure
				'        finally:',
				"            print('\\n--- Generated Script Execution Finished ---')",
				"            print('Closing browser/context...')",
				'            if context:',
				'                 try: await context.close()',
				"                 except Exception as ctx_close_err: print(f'  Warning: could not close context: {ctx_close_err}', file=sys.stderr)",
				'            if browser:',
				'                 try: await browser.close()',
				"                 except Exception as browser_close_err: print(f'  Warning: could not close browser: {browser_close_err}', file=sys.stderr)",
				"            print('Browser/context closed.')",
				'            # Exit with the determined exit code',
				'            if exit_code != 0:',
				"                print(f'Script finished with errors (exit code {exit_code}).', file=sys.stderr)",
				'                sys.exit(exit_code)',  # Exit with non-zero code on error
				'',
				'# --- Script Entry Point ---',
				"if __name__ == '__main__':",
				"    if os.name == 'nt':",
				'        asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())',
				'    asyncio.run(run_generated_script())',
			]
		)

		return '\n'.join(script_lines)

```

## /browser_use/agent/playwright_script_helpers.py

```py path="/browser_use/agent/playwright_script_helpers.py" 
from playwright.async_api import Page


# --- Helper Function for Replacing Sensitive Data ---
def replace_sensitive_data(text: str, sensitive_map: dict) -> str:
	"""Replaces sensitive data placeholders in text."""
	if not isinstance(text, str):
		return text
	for placeholder, value in sensitive_map.items():
		replacement_value = str(value) if value is not None else ''
		text = text.replace(f'<secret>{placeholder}</secret>', replacement_value)
	return text


# --- Helper Function for Robust Action Execution ---
class PlaywrightActionError(Exception):
	"""Custom exception for errors during Playwright script action execution."""

	pass


async def _try_locate_and_act(page: Page, selector: str, action_type: str, text: str | None = None, step_info: str = '') -> None:
	"""
	Attempts an action (click/fill) with XPath fallback by trimming prefixes.
	Raises PlaywrightActionError if the action fails after all fallbacks.
	"""
	print(f'Attempting {action_type} ({step_info}) using selector: {repr(selector)}')
	original_selector = selector
	MAX_FALLBACKS = 50  # Increased fallbacks
	# Increased timeouts for potentially slow pages
	INITIAL_TIMEOUT = 10000  # Milliseconds for the first attempt (10 seconds)
	FALLBACK_TIMEOUT = 1000  # Shorter timeout for fallback attempts (1 second)

	try:
		locator = page.locator(selector).first
		if action_type == 'click':
			await locator.click(timeout=INITIAL_TIMEOUT)
		elif action_type == 'fill' and text is not None:
			await locator.fill(text, timeout=INITIAL_TIMEOUT)
		else:
			# This case should ideally not happen if called correctly
			raise PlaywrightActionError(f"Invalid action_type '{action_type}' or missing text for fill. ({step_info})")
		print(f"  Action '{action_type}' successful with original selector.")
		await page.wait_for_timeout(500)  # Wait after successful action
		return  # Successful exit
	except Exception as e:
		print(f"  Warning: Action '{action_type}' failed with original selector ({repr(selector)}): {e}. Starting fallback...")

		# Fallback only works for XPath selectors
		if not selector.startswith('xpath='):
			# Raise error immediately if not XPath, as fallback won't work
			raise PlaywrightActionError(
				f"Action '{action_type}' failed. Fallback not possible for non-XPath selector: {repr(selector)}. ({step_info})"
			)

		xpath_parts = selector.split('=', 1)
		if len(xpath_parts) < 2:
			raise PlaywrightActionError(
				f"Action '{action_type}' failed. Could not extract XPath string from selector: {repr(selector)}. ({step_info})"
			)
		xpath = xpath_parts[1]  # Correctly get the XPath string

		segments = [seg for seg in xpath.split('/') if seg]

		for i in range(1, min(MAX_FALLBACKS + 1, len(segments))):
			trimmed_xpath_raw = '/'.join(segments[i:])
			fallback_xpath = f'xpath=//{trimmed_xpath_raw}'

			print(f'    Fallback attempt {i}/{MAX_FALLBACKS}: Trying selector: {repr(fallback_xpath)}')
			try:
				locator = page.locator(fallback_xpath).first
				if action_type == 'click':
					await locator.click(timeout=FALLBACK_TIMEOUT)
				elif action_type == 'fill' and text is not None:
					try:
						await locator.clear(timeout=FALLBACK_TIMEOUT)
						await page.wait_for_timeout(100)
					except Exception as clear_error:
						print(f'    Warning: Failed to clear field during fallback ({step_info}): {clear_error}')
					await locator.fill(text, timeout=FALLBACK_TIMEOUT)

				print(f"    Action '{action_type}' successful with fallback selector: {repr(fallback_xpath)}")
				await page.wait_for_timeout(500)
				return  # Successful exit after fallback
			except Exception as fallback_e:
				print(f'    Fallback attempt {i} failed: {fallback_e}')
				if i == MAX_FALLBACKS:
					# Raise exception after exhausting fallbacks
					raise PlaywrightActionError(
						f"Action '{action_type}' failed after {MAX_FALLBACKS} fallback attempts. Original selector: {repr(original_selector)}. ({step_info})"
					)

	# This part should not be reachable if logic is correct, but added as safeguard
	raise PlaywrightActionError(f"Action '{action_type}' failed unexpectedly for {repr(original_selector)}. ({step_info})")

```

## /browser_use/agent/prompts.py

```py path="/browser_use/agent/prompts.py" 
import importlib.resources
from datetime import datetime
from typing import TYPE_CHECKING, Optional

from langchain_core.messages import HumanMessage, SystemMessage

if TYPE_CHECKING:
	from browser_use.agent.views import ActionResult, AgentStepInfo
	from browser_use.browser.views import BrowserState


class SystemPrompt:
	def __init__(
		self,
		action_description: str,
		max_actions_per_step: int = 10,
		override_system_message: str | None = None,
		extend_system_message: str | None = None,
	):
		self.default_action_description = action_description
		self.max_actions_per_step = max_actions_per_step
		prompt = ''
		if override_system_message:
			prompt = override_system_message
		else:
			self._load_prompt_template()
			prompt = self.prompt_template.format(max_actions=self.max_actions_per_step)

		if extend_system_message:
			prompt += f'\n{extend_system_message}'

		self.system_message = SystemMessage(content=prompt)

	def _load_prompt_template(self) -> None:
		"""Load the prompt template from the markdown file."""
		try:
			# This works both in development and when installed as a package
			with importlib.resources.files('browser_use.agent').joinpath('system_prompt.md').open('r') as f:
				self.prompt_template = f.read()
		except Exception as e:
			raise RuntimeError(f'Failed to load system prompt template: {e}')

	def get_system_message(self) -> SystemMessage:
		"""
		Get the system prompt for the agent.

		Returns:
		    SystemMessage: Formatted system prompt
		"""
		return self.system_message


# Functions:
# {self.default_action_description}

# Example:
# {self.example_response()}
# Your AVAILABLE ACTIONS:
# {self.default_action_description}


class AgentMessagePrompt:
	def __init__(
		self,
		state: 'BrowserState',
		result: list['ActionResult'] | None = None,
		include_attributes: list[str] | None = None,
		step_info: Optional['AgentStepInfo'] = None,
	):
		self.state = state
		self.result = result
		self.include_attributes = include_attributes or []
		self.step_info = step_info

	def get_user_message(self, use_vision: bool = True) -> HumanMessage:
		elements_text = self.state.element_tree.clickable_elements_to_string(include_attributes=self.include_attributes)

		has_content_above = (self.state.pixels_above or 0) > 0
		has_content_below = (self.state.pixels_below or 0) > 0

		if elements_text != '':
			if has_content_above:
				elements_text = (
					f'... {self.state.pixels_above} pixels above - scroll or extract content to see more ...\n{elements_text}'
				)
			else:
				elements_text = f'[Start of page]\n{elements_text}'
			if has_content_below:
				elements_text = (
					f'{elements_text}\n... {self.state.pixels_below} pixels below - scroll or extract content to see more ...'
				)
			else:
				elements_text = f'{elements_text}\n[End of page]'
		else:
			elements_text = 'empty page'

		if self.step_info:
			step_info_description = f'Current step: {self.step_info.step_number + 1}/{self.step_info.max_steps}'
		else:
			step_info_description = ''
		time_str = datetime.now().strftime('%Y-%m-%d %H:%M')
		step_info_description += f'Current date and time: {time_str}'

		state_description = f"""
[Task history memory ends]
[Current state starts here]
The following is one-time information - if you need to remember it write it to memory:
Current url: {self.state.url}
Available tabs:
{self.state.tabs}
Interactive elements from top layer of the current page inside the viewport:
{elements_text}
{step_info_description}
"""

		if self.result:
			for i, result in enumerate(self.result):
				if result.extracted_content:
					state_description += f'\nAction result {i + 1}/{len(self.result)}: {result.extracted_content}'
				if result.error:
					# only use last line of error
					error = result.error.split('\n')[-1]
					state_description += f'\nAction error {i + 1}/{len(self.result)}: ...{error}'

		if self.state.screenshot and use_vision is True:
			# Format message for vision model
			return HumanMessage(
				content=[
					{'type': 'text', 'text': state_description},
					{
						'type': 'image_url',
						'image_url': {'url': f'data:image/png;base64,{self.state.screenshot}'},  # , 'detail': 'low'
					},
				]
			)

		return HumanMessage(content=state_description)


class PlannerPrompt(SystemPrompt):
	def __init__(self, available_actions: str):
		self.available_actions = available_actions

	def get_system_message(
		self, is_planner_reasoning: bool, extended_planner_system_prompt: str | None = None
	) -> SystemMessage | HumanMessage:
		"""Get the system message for the planner.

		Args:
		    is_planner_reasoning: If True, return as HumanMessage for chain-of-thought
		    extended_planner_system_prompt: Optional text to append to the base prompt

		Returns:
		    SystemMessage or HumanMessage depending on is_planner_reasoning
		"""

		planner_prompt_text = """
You are a planning agent that helps break down tasks into smaller steps and reason about the current state.
Your role is to:
1. Analyze the current state and history
2. Evaluate progress towards the ultimate goal
3. Identify potential challenges or roadblocks
4. Suggest the next high-level steps to take

Inside your messages, there will be AI messages from different agents with different formats.

Your output format should be always a JSON object with the following fields:
{{
    "state_analysis": "Brief analysis of the current state and what has been done so far",
    "progress_evaluation": "Evaluation of progress towards the ultimate goal (as percentage and description)",
    "challenges": "List any potential challenges or roadblocks",
    "next_steps": "List 2-3 concrete next steps to take",
    "reasoning": "Explain your reasoning for the suggested next steps"
}}

Ignore the other AI messages output structures.

Keep your responses concise and focused on actionable insights.
"""

		if extended_planner_system_prompt:
			planner_prompt_text += f'\n{extended_planner_system_prompt}'

		if is_planner_reasoning:
			return HumanMessage(content=planner_prompt_text)
		else:
			return SystemMessage(content=planner_prompt_text)

```

## /browser_use/agent/service.py

```py path="/browser_use/agent/service.py" 
import asyncio
import gc
import inspect
import json
import logging
import os
import re
import sys
import time
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import Any, Generic, TypeVar

from dotenv import load_dotenv
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import (
	BaseMessage,
	HumanMessage,
	SystemMessage,
)

# from lmnr.sdk.decorators import observe
from pydantic import BaseModel, ValidationError

from browser_use.agent.gif import create_history_gif
from browser_use.agent.memory.service import Memory
from browser_use.agent.memory.views import MemoryConfig
from browser_use.agent.message_manager.service import MessageManager, MessageManagerSettings
from browser_use.agent.message_manager.utils import (
	convert_input_messages,
	extract_json_from_model_output,
	is_model_without_tool_support,
	save_conversation,
)
from browser_use.agent.prompts import AgentMessagePrompt, PlannerPrompt, SystemPrompt
from browser_use.agent.views import (
	REQUIRED_LLM_API_ENV_VARS,
	ActionResult,
	AgentError,
	AgentHistory,
	AgentHistoryList,
	AgentOutput,
	AgentSettings,
	AgentState,
	AgentStepInfo,
	StepMetadata,
	ToolCallingMethod,
)
from browser_use.browser.browser import Browser
from browser_use.browser.context import BrowserContext
from browser_use.browser.views import BrowserState, BrowserStateHistory
from browser_use.controller.registry.views import ActionModel
from browser_use.controller.service import Controller
from browser_use.dom.history_tree_processor.service import (
	DOMHistoryElement,
	HistoryTreeProcessor,
)
from browser_use.exceptions import LLMException
from browser_use.telemetry.service import ProductTelemetry
from browser_use.telemetry.views import (
	AgentTelemetryEvent,
)
from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync

load_dotenv()
logger = logging.getLogger(__name__)

SKIP_LLM_API_KEY_VERIFICATION = os.environ.get('SKIP_LLM_API_KEY_VERIFICATION', 'false').lower()[0] in 'ty1'


def log_response(response: AgentOutput) -> None:
	"""Utility function to log the model's response."""

	if 'Success' in response.current_state.evaluation_previous_goal:
		emoji = '👍'
	elif 'Failed' in response.current_state.evaluation_previous_goal:
		emoji = '⚠'
	else:
		emoji = '🤷'

	logger.info(f'{emoji} Eval: {response.current_state.evaluation_previous_goal}')
	logger.info(f'🧠 Memory: {response.current_state.memory}')
	logger.info(f'🎯 Next goal: {response.current_state.next_goal}')
	for i, action in enumerate(response.action):
		logger.info(f'🛠️  Action {i + 1}/{len(response.action)}: {action.model_dump_json(exclude_unset=True)}')


Context = TypeVar('Context')

AgentHookFunc = Callable[['Agent'], Awaitable[None]]


class Agent(Generic[Context]):
	@time_execution_sync('--init (agent)')
	def __init__(
		self,
		task: str,
		llm: BaseChatModel,
		# Optional parameters
		browser: Browser | None = None,
		browser_context: BrowserContext | None = None,
		controller: Controller[Context] = Controller(),
		# Initial agent run parameters
		sensitive_data: dict[str, str] | None = None,
		initial_actions: list[dict[str, dict[str, Any]]] | None = None,
		# Cloud Callbacks
		register_new_step_callback: (
			Callable[['BrowserState', 'AgentOutput', int], None]  # Sync callback
			| Callable[['BrowserState', 'AgentOutput', int], Awaitable[None]]  # Async callback
			| None
		) = None,
		register_done_callback: (
			Callable[['AgentHistoryList'], Awaitable[None]]  # Async Callback
			| Callable[['AgentHistoryList'], None]  # Sync Callback
			| None
		) = None,
		register_external_agent_status_raise_error_callback: Callable[[], Awaitable[bool]] | None = None,
		# Agent settings
		use_vision: bool = True,
		use_vision_for_planner: bool = False,
		save_conversation_path: str | None = None,
		save_conversation_path_encoding: str | None = 'utf-8',
		max_failures: int = 3,
		retry_delay: int = 10,
		override_system_message: str | None = None,
		extend_system_message: str | None = None,
		max_input_tokens: int = 128000,
		validate_output: bool = False,
		message_context: str | None = None,
		generate_gif: bool | str = False,
		available_file_paths: list[str] | None = None,
		include_attributes: list[str] = [
			'title',
			'type',
			'name',
			'role',
			'aria-label',
			'placeholder',
			'value',
			'alt',
			'aria-expanded',
			'data-date-format',
		],
		max_actions_per_step: int = 10,
		tool_calling_method: ToolCallingMethod | None = 'auto',
		page_extraction_llm: BaseChatModel | None = None,
		planner_llm: BaseChatModel | None = None,
		planner_interval: int = 1,  # Run planner every N steps
		is_planner_reasoning: bool = False,
		extend_planner_system_message: str | None = None,
		injected_agent_state: AgentState | None = None,
		context: Context | None = None,
		save_playwright_script_path: str | None = None,
		enable_memory: bool = True,
		memory_config: MemoryConfig | None = None,
		source: str | None = None,
	):
		if page_extraction_llm is None:
			page_extraction_llm = llm

		# Core components
		self.task = task
		self.llm = llm
		self.controller = controller
		self.sensitive_data = sensitive_data

		self.settings = AgentSettings(
			use_vision=use_vision,
			use_vision_for_planner=use_vision_for_planner,
			save_conversation_path=save_conversation_path,
			save_conversation_path_encoding=save_conversation_path_encoding,
			max_failures=max_failures,
			retry_delay=retry_delay,
			override_system_message=override_system_message,
			extend_system_message=extend_system_message,
			max_input_tokens=max_input_tokens,
			validate_output=validate_output,
			message_context=message_context,
			generate_gif=generate_gif,
			available_file_paths=available_file_paths,
			include_attributes=include_attributes,
			max_actions_per_step=max_actions_per_step,
			tool_calling_method=tool_calling_method,
			page_extraction_llm=page_extraction_llm,
			planner_llm=planner_llm,
			planner_interval=planner_interval,
			is_planner_reasoning=is_planner_reasoning,
			save_playwright_script_path=save_playwright_script_path,
			extend_planner_system_message=extend_planner_system_message,
		)

		# Memory settings
		self.enable_memory = enable_memory
		self.memory_config = memory_config

		# Initialize state
		self.state = injected_agent_state or AgentState()

		# Action setup
		self._setup_action_models()
		self._set_browser_use_version_and_source(source)
		self.initial_actions = self._convert_initial_actions(initial_actions) if initial_actions else None

		# Model setup
		self._set_model_names()
		self.tool_calling_method = self._set_tool_calling_method()

		# Handle users trying to use use_vision=True with DeepSeek models
		if 'deepseek' in self.model_name.lower():
			logger.warning('⚠️ DeepSeek models do not support use_vision=True yet. Setting use_vision=False for now...')
			self.settings.use_vision = False
		if 'deepseek' in (self.planner_model_name or '').lower():
			logger.warning(
				'⚠️ DeepSeek models do not support use_vision=True yet. Setting use_vision_for_planner=False for now...'
			)
			self.settings.use_vision_for_planner = False
		# Handle users trying to use use_vision=True with XAI models
		if 'grok' in self.model_name.lower():
			logger.warning('⚠️ XAI models do not support use_vision=True yet. Setting use_vision=False for now...')
			self.settings.use_vision = False
		if 'grok' in (self.planner_model_name or '').lower():
			logger.warning('⚠️ XAI models do not support use_vision=True yet. Setting use_vision_for_planner=False for now...')
			self.settings.use_vision_for_planner = False

		logger.info(
			f'🧠 Starting an agent with main_model={self.model_name}'
			f'{" +tools" if self.tool_calling_method == "function_calling" else ""}'
			f'{" +rawtools" if self.tool_calling_method == "raw" else ""}'
			f'{" +vision" if self.settings.use_vision else ""}'
			f'{" +memory" if self.enable_memory else ""}, '
			f'planner_model={self.planner_model_name}'
			f'{" +reasoning" if self.settings.is_planner_reasoning else ""}'
			f'{" +vision" if self.settings.use_vision_for_planner else ""}, '
			f'extraction_model={getattr(self.settings.page_extraction_llm, "model_name", None)} '
		)

		# Verify we can connect to the LLM
		self._verify_llm_connection()

		# Initialize available actions for system prompt (only non-filtered actions)
		# These will be used for the system prompt to maintain caching
		self.unfiltered_actions = self.controller.registry.get_prompt_description()

		self.settings.message_context = self._set_message_context()

		# Initialize message manager with state
		# Initial system prompt with all actions - will be updated during each step
		self._message_manager = MessageManager(
			task=task,
			system_message=SystemPrompt(
				action_description=self.unfiltered_actions,
				max_actions_per_step=self.settings.max_actions_per_step,
				override_system_message=override_system_message,
				extend_system_message=extend_system_message,
			).get_system_message(),
			settings=MessageManagerSettings(
				max_input_tokens=self.settings.max_input_tokens,
				include_attributes=self.settings.include_attributes,
				message_context=self.settings.message_context,
				sensitive_data=sensitive_data,
				available_file_paths=self.settings.available_file_paths,
			),
			state=self.state.message_manager_state,
		)

		if self.enable_memory:
			try:
				# Initialize memory
				self.memory = Memory(
					message_manager=self._message_manager,
					llm=self.llm,
					config=self.memory_config,
				)
			except ImportError:
				logger.warning(
					'⚠️ Agent(enable_memory=True) is set but missing some required packages, install and re-run to use memory features: pip install browser-use[memory]'
				)
				self.memory = None
				self.enable_memory = False
		else:
			self.memory = None

		# Browser setup
		self.injected_browser = browser is not None
		self.injected_browser_context = browser_context is not None
		self.browser = browser or Browser()
		self.browser.config.new_context_config.disable_security = self.browser.config.disable_security
		self.browser_context = browser_context or BrowserContext(
			browser=self.browser, config=self.browser.config.new_context_config
		)

		# Huge security warning if sensitive_data is provided but allowed_domains is not set
		if self.sensitive_data and not self.browser_context.config.allowed_domains:
			logger.error(
				'⚠️⚠️⚠️ Agent(sensitive_data=••••••••) was provided but BrowserContextConfig(allowed_domains=[...]) is not locked down! ⚠️⚠️⚠️\n'
				'          ☠️ If the agent visits a malicious website and encounters a prompt-injection attack, your sensitive_data may be exposed!\n\n'
				'             https://docs.browser-use.com/customize/browser-settings#restrict-urls\n'
				'Waiting 10 seconds before continuing... Press [Ctrl+C] to abort.'
			)
			if sys.stdin.isatty():
				try:
					time.sleep(10)
				except KeyboardInterrupt:
					print(
						'\n\n 🛑 Exiting now... set BrowserContextConfig(allowed_domains=["example.com", "example.org"]) to only domains you trust to see your sensitive_data.'
					)
					sys.exit(0)
			else:
				pass  # no point waiting if we're not in an interactive shell
			logger.warning('‼️ Continuing with insecure settings for now... but this will become a hard error in the future!')

		# Callbacks
		self.register_new_step_callback = register_new_step_callback
		self.register_done_callback = register_done_callback
		self.register_external_agent_status_raise_error_callback = register_external_agent_status_raise_error_callback

		# Context
		self.context = context

		# Telemetry
		self.telemetry = ProductTelemetry()

		if self.settings.save_conversation_path:
			logger.info(f'Saving conversation to {self.settings.save_conversation_path}')

	def _set_message_context(self) -> str | None:
		if self.tool_calling_method == 'raw':
			# For raw tool calling, only include actions with no filters initially
			if self.settings.message_context:
				self.settings.message_context += f'\n\nAvailable actions: {self.unfiltered_actions}'
			else:
				self.settings.message_context = f'Available actions: {self.unfiltered_actions}'
		return self.settings.message_context

	def _set_browser_use_version_and_source(self, source_override: str | None = None) -> None:
		"""Get the version and source of the browser-use package (git or pip in a nutshell)"""
		try:
			# First check for repository-specific files
			repo_files = ['.git', 'README.md', 'docs', 'examples']
			package_root = Path(__file__).parent.parent.parent

			# If all of these files/dirs exist, it's likely from git
			if all(Path(package_root / file).exists() for file in repo_files):
				try:
					import subprocess

					version = subprocess.check_output(['git', 'describe', '--tags']).decode('utf-8').strip()
				except Exception:
					version = 'unknown'
				source = 'git'
			else:
				# If no repo files found, try getting version from pip
				from importlib.metadata import version

				version = version('browser-use')
				source = 'pip'
		except Exception:
			version = 'unknown'
			source = 'unknown'
		if source_override is not None:
			source = source_override
		logger.debug(f'Version: {version}, Source: {source}')
		self.version = version
		self.source = source

	def _set_model_names(self) -> None:
		self.chat_model_library = self.llm.__class__.__name__
		self.model_name = 'Unknown'
		if hasattr(self.llm, 'model_name'):
			model = self.llm.model_name  # type: ignore
			self.model_name = model if model is not None else 'Unknown'
		elif hasattr(self.llm, 'model'):
			model = self.llm.model  # type: ignore
			self.model_name = model if model is not None else 'Unknown'

		if self.settings.planner_llm:
			if hasattr(self.settings.planner_llm, 'model_name'):
				self.planner_model_name = self.settings.planner_llm.model_name  # type: ignore
			elif hasattr(self.settings.planner_llm, 'model'):
				self.planner_model_name = self.settings.planner_llm.model  # type: ignore
			else:
				self.planner_model_name = 'Unknown'
		else:
			self.planner_model_name = None

	def _setup_action_models(self) -> None:
		"""Setup dynamic action models from controller's registry"""
		# Initially only include actions with no filters
		self.ActionModel = self.controller.registry.create_action_model()
		# Create output model with the dynamic actions
		self.AgentOutput = AgentOutput.type_with_custom_actions(self.ActionModel)

		# used to force the done action when max_steps is reached
		self.DoneActionModel = self.controller.registry.create_action_model(include_actions=['done'])
		self.DoneAgentOutput = AgentOutput.type_with_custom_actions(self.DoneActionModel)

	def _set_tool_calling_method(self) -> ToolCallingMethod | None:
		tool_calling_method = self.settings.tool_calling_method
		if tool_calling_method == 'auto':
			if is_model_without_tool_support(self.model_name):
				return 'raw'
			elif self.chat_model_library == 'ChatGoogleGenerativeAI':
				return None
			elif self.chat_model_library == 'ChatOpenAI':
				return 'function_calling'
			elif self.chat_model_library == 'AzureChatOpenAI':
				# Azure OpenAI API requires 'tools' parameter for GPT-4
				# The error 'content must be either a string or an array' occurs when
				# the API expects a tools array but gets something else
				if 'gpt-4' in self.model_name.lower():
					return 'tools'
				else:
					return 'function_calling'
			else:
				return None
		else:
			return tool_calling_method

	def add_new_task(self, new_task: str) -> None:
		self._message_manager.add_new_task(new_task)

	async def _raise_if_stopped_or_paused(self) -> None:
		"""Utility function that raises an InterruptedError if the agent is stopped or paused."""

		if self.register_external_agent_status_raise_error_callback:
			if await self.register_external_agent_status_raise_error_callback():
				raise InterruptedError

		if self.state.stopped or self.state.paused:
			# logger.debug('Agent paused after getting state')
			raise InterruptedError

	# @observe(name='agent.step', ignore_output=True, ignore_input=True)
	@time_execution_async('--step (agent)')
	async def step(self, step_info: AgentStepInfo | None = None) -> None:
		"""Execute one step of the task"""
		logger.info(f'📍 Step {self.state.n_steps}')
		state = None
		model_output = None
		result: list[ActionResult] = []
		step_start_time = time.time()
		tokens = 0

		try:
			state = await self.browser_context.get_state(cache_clickable_elements_hashes=True)
			current_page = await self.browser_context.get_current_page()

			# generate procedural memory if needed
			if self.enable_memory and self.memory and self.state.n_steps % self.memory.config.memory_interval == 0:
				self.memory.create_procedural_memory(self.state.n_steps)

			await self._raise_if_stopped_or_paused()

			# Update action models with page-specific actions
			await self._update_action_models_for_page(current_page)

			# Get page-specific filtered actions
			page_filtered_actions = self.controller.registry.get_prompt_description(current_page)

			# If there are page-specific actions, add them as a special message for this step only
			if page_filtered_actions:
				page_action_message = f'For this page, these additional actions are available:\n{page_filtered_actions}'
				self._message_manager._add_message_with_tokens(HumanMessage(content=page_action_message))

			# If using raw tool calling method, we need to update the message context with new actions
			if self.tool_calling_method == 'raw':
				# For raw tool calling, get all non-filtered actions plus the page-filtered ones
				all_unfiltered_actions = self.controller.registry.get_prompt_description()
				all_actions = all_unfiltered_actions
				if page_filtered_actions:
					all_actions += '\n' + page_filtered_actions

				context_lines = (self._message_manager.settings.message_context or '').split('\n')
				non_action_lines = [line for line in context_lines if not line.startswith('Available actions:')]
				updated_context = '\n'.join(non_action_lines)
				if updated_context:
					updated_context += f'\n\nAvailable actions: {all_actions}'
				else:
					updated_context = f'Available actions: {all_actions}'
				self._message_manager.settings.message_context = updated_context

			self._message_manager.add_state_message(state, self.state.last_result, step_info, self.settings.use_vision)

			# Run planner at specified intervals if planner is configured
			if self.settings.planner_llm and self.state.n_steps % self.settings.planner_interval == 0:
				plan = await self._run_planner()
				# add plan before last state message
				self._message_manager.add_plan(plan, position=-1)

			if step_info and step_info.is_last_step():
				# Add last step warning if needed
				msg = 'Now comes your last step. Use only the "done" action now. No other actions - so here your action sequence must have length 1.'
				msg += '\nIf the task is not yet fully finished as requested by the user, set success in "done" to false! E.g. if not all steps are fully completed.'
				msg += '\nIf the task is fully finished, set success in "done" to true.'
				msg += '\nInclude everything you found out for the ultimate task in the done text.'
				logger.info('Last step finishing up')
				self._message_manager._add_message_with_tokens(HumanMessage(content=msg))
				self.AgentOutput = self.DoneAgentOutput

			input_messages = self._message_manager.get_messages()
			tokens = self._message_manager.state.history.current_tokens

			try:
				model_output = await self.get_next_action(input_messages)
				if (
					not model_output.action
					or not isinstance(model_output.action, list)
					or all(action.model_dump() == {} for action in model_output.action)
				):
					logger.warning('Model returned empty action. Retrying...')

					clarification_message = HumanMessage(
						content='You forgot to return an action. Please respond only with a valid JSON action according to the expected format.'
					)

					retry_messages = input_messages + [clarification_message]
					model_output = await self.get_next_action(retry_messages)

					if not model_output.action or all(action.model_dump() == {} for action in model_output.action):
						logger.warning('Model still returned empty after retry. Inserting safe noop action.')
						action_instance = self.ActionModel(
							done={
								'success': False,
								'text': 'No next action returned by LLM!',
							}
						)
						model_output.action = [action_instance]

				# Check again for paused/stopped state after getting model output
				await self._raise_if_stopped_or_paused()

				self.state.n_steps += 1

				if self.register_new_step_callback:
					if inspect.iscoroutinefunction(self.register_new_step_callback):
						await self.register_new_step_callback(state, model_output, self.state.n_steps)
					else:
						self.register_new_step_callback(state, model_output, self.state.n_steps)
				if self.settings.save_conversation_path:
					target = self.settings.save_conversation_path + f'_{self.state.n_steps}.txt'
					save_conversation(input_messages, model_output, target, self.settings.save_conversation_path_encoding)

				self._message_manager._remove_last_state_message()  # we dont want the whole state in the chat history

				# check again if Ctrl+C was pressed before we commit the output to history
				await self._raise_if_stopped_or_paused()

				self._message_manager.add_model_output(model_output)
			except asyncio.CancelledError:
				# Task was cancelled due to Ctrl+C
				self._message_manager._remove_last_state_message()
				raise InterruptedError('Model query cancelled by user')
			except InterruptedError:
				# Agent was paused during get_next_action
				self._message_manager._remove_last_state_message()
				raise  # Re-raise to be caught by the outer try/except
			except Exception as e:
				# model call failed, remove last state message from history
				self._message_manager._remove_last_state_message()
				raise e

			result: list[ActionResult] = await self.multi_act(model_output.action)

			self.state.last_result = result

			if len(result) > 0 and result[-1].is_done:
				logger.info(f'📄 Result: {result[-1].extracted_content}')

			self.state.consecutive_failures = 0

		except InterruptedError:
			# logger.debug('Agent paused')
			self.state.last_result = [
				ActionResult(
					error='The agent was paused mid-step - the last action might need to be repeated', include_in_memory=False
				)
			]
			return
		except asyncio.CancelledError:
			# Directly handle the case where the step is cancelled at a higher level
			# logger.debug('Task cancelled - agent was paused with Ctrl+C')
			self.state.last_result = [ActionResult(error='The agent was paused with Ctrl+C', include_in_memory=False)]
			raise InterruptedError('Step cancelled by user')
		except Exception as e:
			result = await self._handle_step_error(e)
			self.state.last_result = result

		finally:
			step_end_time = time.time()
			if not result:
				return

			if state:
				metadata = StepMetadata(
					step_number=self.state.n_steps,
					step_start_time=step_start_time,
					step_end_time=step_end_time,
					input_tokens=tokens,
				)
				self._make_history_item(model_output, state, result, metadata)

	@time_execution_async('--handle_step_error (agent)')
	async def _handle_step_error(self, error: Exception) -> list[ActionResult]:
		"""Handle all types of errors that can occur during a step"""
		include_trace = logger.isEnabledFor(logging.DEBUG)
		error_msg = AgentError.format_error(error, include_trace=include_trace)
		prefix = f'❌ Result failed {self.state.consecutive_failures + 1}/{self.settings.max_failures} times:\n '
		self.state.consecutive_failures += 1

		if 'Browser closed' in error_msg:
			logger.error('❌  Browser is closed or disconnected, unable to proceed')
			return [ActionResult(error='Browser closed or disconnected, unable to proceed', include_in_memory=False)]

		if isinstance(error, (ValidationError, ValueError)):
			logger.error(f'{prefix}{error_msg}')
			if 'Max token limit reached' in error_msg:
				# cut tokens from history
				self._message_manager.settings.max_input_tokens = self.settings.max_input_tokens - 500
				logger.info(
					f'Cutting tokens from history - new max input tokens: {self._message_manager.settings.max_input_tokens}'
				)
				self._message_manager.cut_messages()
			elif 'Could not parse response' in error_msg:
				# give model a hint how output should look like
				error_msg += '\n\nReturn a valid JSON object with the required fields.'

		else:
			from anthropic import RateLimitError as AnthropicRateLimitError
			from google.api_core.exceptions import ResourceExhausted
			from openai import RateLimitError

			# Define a tuple of rate limit error types for easier maintenance
			RATE_LIMIT_ERRORS = (
				RateLimitError,  # OpenAI
				ResourceExhausted,  # Google
				AnthropicRateLimitError,  # Anthropic
			)

			if isinstance(error, RATE_LIMIT_ERRORS):
				logger.warning(f'{prefix}{error_msg}')
				await asyncio.sleep(self.settings.retry_delay)
			else:
				logger.error(f'{prefix}{error_msg}')

		return [ActionResult(error=error_msg, include_in_memory=True)]

	def _make_history_item(
		self,
		model_output: AgentOutput | None,
		state: BrowserState,
		result: list[ActionResult],
		metadata: StepMetadata | None = None,
	) -> None:
		"""Create and store history item"""

		if model_output:
			interacted_elements = AgentHistory.get_interacted_element(model_output, state.selector_map)
		else:
			interacted_elements = [None]

		state_history = BrowserStateHistory(
			url=state.url,
			title=state.title,
			tabs=state.tabs,
			interacted_element=interacted_elements,
			screenshot=state.screenshot,
		)

		history_item = AgentHistory(model_output=model_output, result=result, state=state_history, metadata=metadata)

		self.state.history.history.append(history_item)

	THINK_TAGS = re.compile(r'<think>.*?</think>', re.DOTALL)
	STRAY_CLOSE_TAG = re.compile(r'.*?</think>', re.DOTALL)

	def _remove_think_tags(self, text: str) -> str:
		# Step 1: Remove well-formed <think>...</think>
		text = re.sub(self.THINK_TAGS, '', text)
		# Step 2: If there's an unmatched closing tag </think>,
		#         remove everything up to and including that.
		text = re.sub(self.STRAY_CLOSE_TAG, '', text)
		return text.strip()

	def _convert_input_messages(self, input_messages: list[BaseMessage]) -> list[BaseMessage]:
		"""Convert input messages to the correct format"""
		if is_model_without_tool_support(self.model_name):
			return convert_input_messages(input_messages, self.model_name)
		else:
			return input_messages

	@time_execution_async('--get_next_action (agent)')
	async def get_next_action(self, input_messages: list[BaseMessage]) -> AgentOutput:
		"""Get next action from LLM based on current state"""
		input_messages = self._convert_input_messages(input_messages)

		if self.tool_calling_method == 'raw':
			logger.debug(f'Using {self.tool_calling_method} for {self.chat_model_library}')
			try:
				output = self.llm.invoke(input_messages)
				response = {'raw': output, 'parsed': None}
			except Exception as e:
				logger.error(f'Failed to invoke model: {str(e)}')
				raise LLMException(401, 'LLM API call failed') from e
			# TODO: currently invoke does not return reasoning_content, we should override invoke
			output.content = self._remove_think_tags(str(output.content))
			try:
				parsed_json = extract_json_from_model_output(output.content)
				parsed = self.AgentOutput(**parsed_json)
				response['parsed'] = parsed
			except (ValueError, ValidationError) as e:
				logger.warning(f'Failed to parse model output: {output} {str(e)}')
				raise ValueError('Could not parse response.')

		elif self.tool_calling_method is None:
			structured_llm = self.llm.with_structured_output(self.AgentOutput, include_raw=True)
			try:
				response: dict[str, Any] = await structured_llm.ainvoke(input_messages)  # type: ignore
				parsed: AgentOutput | None = response['parsed']

			except Exception as e:
				logger.error(f'Failed to invoke model: {str(e)}')
				raise LLMException(401, 'LLM API call failed') from e

		else:
			logger.debug(f'Using {self.tool_calling_method} for {self.chat_model_library}')
			structured_llm = self.llm.with_structured_output(self.AgentOutput, include_raw=True, method=self.tool_calling_method)
			response: dict[str, Any] = await structured_llm.ainvoke(input_messages)  # type: ignore

		# Handle tool call responses
		if response.get('parsing_error') and 'raw' in response:
			raw_msg = response['raw']
			if hasattr(raw_msg, 'tool_calls') and raw_msg.tool_calls:
				# Convert tool calls to AgentOutput format

				tool_call = raw_msg.tool_calls[0]  # Take first tool call

				# Create current state
				tool_call_name = tool_call['name']
				tool_call_args = tool_call['args']

				current_state = {
					'page_summary': 'Processing tool call',
					'evaluation_previous_goal': 'Executing action',
					'memory': 'Using tool call',
					'next_goal': f'Execute {tool_call_name}',
				}

				# Create action from tool call
				action = {tool_call_name: tool_call_args}

				parsed = self.AgentOutput(current_state=current_state, action=[self.ActionModel(**action)])
			else:
				parsed = None
		else:
			parsed = response['parsed']

		if not parsed:
			try:
				parsed_json = extract_json_from_model_output(response['raw'].content)
				parsed = self.AgentOutput(**parsed_json)
			except Exception as e:
				logger.warning(f'Failed to parse model output: {response["raw"].content} {str(e)}')
				raise ValueError('Could not parse response.')

		# cut the number of actions to max_actions_per_step if needed
		if len(parsed.action) > self.settings.max_actions_per_step:
			parsed.action = parsed.action[: self.settings.max_actions_per_step]

		if not (hasattr(self.state, 'paused') and (self.state.paused or self.state.stopped)):
			log_response(parsed)

		return parsed

	def _log_agent_run(self) -> None:
		"""Log the agent run"""
		logger.info(f'🚀 Starting task: {self.task}')

		logger.debug(f'Version: {self.version}, Source: {self.source}')

	def _log_agent_event(self, max_steps: int, agent_run_error: str | None = None) -> None:
		"""Sent the agent event for this run to telemetry"""

		# Prepare action_history data correctly
		action_history_data = []
		for item in self.state.history.history:
			if item.model_output and item.model_output.action:
				# Convert each ActionModel in the step to its dictionary representation
				step_actions = [
					action.model_dump(exclude_unset=True)
					for action in item.model_output.action
					if action  # Ensure action is not None if list allows it
				]
				action_history_data.append(step_actions)
			else:
				# Append None or [] if a step had no actions or no model output
				action_history_data.append(None)

		final_res = self.state.history.final_result()
		final_result_str = json.dumps(final_res) if final_res is not None else None

		self.telemetry.capture(
			AgentTelemetryEvent(
				task=self.task,
				model=self.model_name,
				model_provider=self.chat_model_library,
				planner_llm=self.planner_model_name,
				max_steps=max_steps,
				max_actions_per_step=self.settings.max_actions_per_step,
				use_vision=self.settings.use_vision,
				use_validation=self.settings.validate_output,
				version=self.version,
				source=self.source,
				action_errors=self.state.history.errors(),
				action_history=action_history_data,
				urls_visited=self.state.history.urls(),
				steps=self.state.n_steps,
				total_input_tokens=self.state.history.total_input_tokens(),
				total_duration_seconds=self.state.history.total_duration_seconds(),
				success=self.state.history.is_successful(),
				final_result_response=final_result_str,
				error_message=agent_run_error,
			)
		)

	async def take_step(self) -> tuple[bool, bool]:
		"""Take a step

		Returns:
			Tuple[bool, bool]: (is_done, is_valid)
		"""
		await self.step()

		if self.state.history.is_done():
			if self.settings.validate_output:
				if not await self._validate_output():
					return True, False

			await self.log_completion()
			if self.register_done_callback:
				if inspect.iscoroutinefunction(self.register_done_callback):
					await self.register_done_callback(self.state.history)
				else:
					self.register_done_callback(self.state.history)
			return True, True

		return False, False

	# @observe(name='agent.run', ignore_output=True)
	@time_execution_async('--run (agent)')
	async def run(
		self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, on_step_end: AgentHookFunc | None = None
	) -> AgentHistoryList:
		"""Execute the task with maximum number of steps"""

		loop = asyncio.get_event_loop()
		agent_run_error: str | None = None  # Initialize error tracking variable
		self._force_exit_telemetry_logged = False  # ADDED: Flag for custom telemetry on force exit

		# Set up the Ctrl+C signal handler with callbacks specific to this agent
		from browser_use.utils import SignalHandler

		# Define the custom exit callback function for second CTRL+C
		def on_force_exit_log_telemetry():
			self._log_agent_event(max_steps=max_steps, agent_run_error='SIGINT: Cancelled by user')
			# NEW: Call the flush method on the telemetry instance
			if hasattr(self, 'telemetry') and self.telemetry:
				self.telemetry.flush()
			self._force_exit_telemetry_logged = True  # Set the flag

		signal_handler = SignalHandler(
			loop=loop,
			pause_callback=self.pause,
			resume_callback=self.resume,
			custom_exit_callback=on_force_exit_log_telemetry,  # Pass the new telemetrycallback
			exit_on_second_int=True,
		)
		signal_handler.register()

		try:
			self._log_agent_run()

			# Execute initial actions if provided
			if self.initial_actions:
				result = await self.multi_act(self.initial_actions, check_for_new_elements=False)
				self.state.last_result = result

			for step in range(max_steps):
				# Check if waiting for user input after Ctrl+C
				if self.state.paused:
					signal_handler.wait_for_resume()
					signal_handler.reset()

				# Check if we should stop due to too many failures
				if self.state.consecutive_failures >= self.settings.max_failures:
					logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures')
					agent_run_error = f'Stopped due to {self.settings.max_failures} consecutive failures'
					break

				# Check control flags before each step
				if self.state.stopped:
					logger.info('Agent stopped')
					agent_run_error = 'Agent stopped programmatically'
					break

				while self.state.paused:
					await asyncio.sleep(0.2)  # Small delay to prevent CPU spinning
					if self.state.stopped:  # Allow stopping while paused
						agent_run_error = 'Agent stopped programmatically while paused'
						break

				if on_step_start is not None:
					await on_step_start(self)

				step_info = AgentStepInfo(step_number=step, max_steps=max_steps)
				await self.step(step_info)

				if on_step_end is not None:
					await on_step_end(self)

				if self.state.history.is_done():
					if self.settings.validate_output and step < max_steps - 1:
						if not await self._validate_output():
							continue

					await self.log_completion()
					break
			else:
				agent_run_error = 'Failed to complete task in maximum steps'

				self.state.history.history.append(
					AgentHistory(
						model_output=None,
						result=[ActionResult(error=agent_run_error, include_in_memory=True)],
						state=BrowserStateHistory(
							url='',
							title='',
							tabs=[],
							interacted_element=[],
							screenshot=None,
						),
						metadata=None,
					)
				)

				logger.info(f'❌ {agent_run_error}')

			return self.state.history

		except KeyboardInterrupt:
			# Already handled by our signal handler, but catch any direct KeyboardInterrupt as well
			logger.info('Got KeyboardInterrupt during execution, returning current history')
			agent_run_error = 'KeyboardInterrupt'
			return self.state.history

		except Exception as e:
			logger.error(f'Agent run failed with exception: {e}', exc_info=True)
			agent_run_error = str(e)
			raise e

		finally:
			# Unregister signal handlers before cleanup
			signal_handler.unregister()

			if not self._force_exit_telemetry_logged:  # MODIFIED: Check the flag
				try:
					self._log_agent_event(max_steps=max_steps, agent_run_error=agent_run_error)
					logger.info('Agent run telemetry logged.')
				except Exception as log_e:  # Catch potential errors during logging itself
					logger.error(f'Failed to log telemetry event: {log_e}', exc_info=True)
			else:
				# ADDED: Info message when custom telemetry for SIGINT was already logged
				logger.info('Telemetry for force exit (SIGINT) was logged by custom exit callback.')

			if self.settings.save_playwright_script_path:
				logger.info(
					f'Agent run finished. Attempting to save Playwright script to: {self.settings.save_playwright_script_path}'
				)
				try:
					# Extract sensitive data keys if sensitive_data is provided
					keys = list(self.sensitive_data.keys()) if self.sensitive_data else None
					# Pass browser and context config to the saving method
					self.state.history.save_as_playwright_script(
						self.settings.save_playwright_script_path,
						sensitive_data_keys=keys,
						browser_config=self.browser.config,
						context_config=self.browser_context.config,
					)
				except Exception as script_gen_err:
					# Log any error during script generation/saving
					logger.error(f'Failed to save Playwright script: {script_gen_err}', exc_info=True)

			await self.close()

			if self.settings.generate_gif:
				output_path: str = 'agent_history.gif'
				if isinstance(self.settings.generate_gif, str):
					output_path = self.settings.generate_gif

				create_history_gif(task=self.task, history=self.state.history, output_path=output_path)

	# @observe(name='controller.multi_act')
	@time_execution_async('--multi-act (agent)')
	async def multi_act(
		self,
		actions: list[ActionModel],
		check_for_new_elements: bool = True,
	) -> list[ActionResult]:
		"""Execute multiple actions"""
		results = []

		cached_selector_map = await self.browser_context.get_selector_map()
		cached_path_hashes = {e.hash.branch_path_hash for e in cached_selector_map.values()}

		await self.browser_context.remove_highlights()

		for i, action in enumerate(actions):
			if action.get_index() is not None and i != 0:
				new_state = await self.browser_context.get_state(cache_clickable_elements_hashes=False)
				new_selector_map = new_state.selector_map

				# Detect index change after previous action
				orig_target = cached_selector_map.get(action.get_index())  # type: ignore
				orig_target_hash = orig_target.hash.branch_path_hash if orig_target else None
				new_target = new_selector_map.get(action.get_index())  # type: ignore
				new_target_hash = new_target.hash.branch_path_hash if new_target else None
				if orig_target_hash != new_target_hash:
					msg = f'Element index changed after action {i} / {len(actions)}, because page changed.'
					logger.info(msg)
					results.append(ActionResult(extracted_content=msg, include_in_memory=True))
					break

				new_path_hashes = {e.hash.branch_path_hash for e in new_selector_map.values()}
				if check_for_new_elements and not new_path_hashes.issubset(cached_path_hashes):
					# next action requires index but there are new elements on the page
					msg = f'Something new appeared after action {i} / {len(actions)}'
					logger.info(msg)
					results.append(ActionResult(extracted_content=msg, include_in_memory=True))
					break

			try:
				await self._raise_if_stopped_or_paused()

				result = await self.controller.act(
					action,
					self.browser_context,
					self.settings.page_extraction_llm,
					self.sensitive_data,
					self.settings.available_file_paths,
					context=self.context,
				)

				results.append(result)

				logger.debug(f'Executed action {i + 1} / {len(actions)}')
				if results[-1].is_done or results[-1].error or i == len(actions) - 1:
					break

				await asyncio.sleep(self.browser_context.config.wait_between_actions)
				# hash all elements. if it is a subset of cached_state its fine - else break (new elements on page)

			except asyncio.CancelledError:
				# Gracefully handle task cancellation
				logger.info(f'Action {i + 1} was cancelled due to Ctrl+C')
				if not results:
					# Add a result for the cancelled action
					results.append(ActionResult(error='The action was cancelled due to Ctrl+C', include_in_memory=True))
				raise InterruptedError('Action cancelled by user')

		return results

	async def _validate_output(self) -> bool:
		"""Validate the output of the last action is what the user wanted"""
		system_msg = (
			f'You are a validator of an agent who interacts with a browser. '
			f'Validate if the output of last action is what the user wanted and if the task is completed. '
			f'If the task is unclear defined, you can let it pass. But if something is missing or the image does not show what was requested dont let it pass. '
			f'Try to understand the page and help the model with suggestions like scroll, do x, ... to get the solution right. '
			f'Task to validate: {self.task}. Return a JSON object with 2 keys: is_valid and reason. '
			f'is_valid is a boolean that indicates if the output is correct. '
			f'reason is a string that explains why it is valid or not.'
			f' example: {{"is_valid": false, "reason": "The user wanted to search for "cat photos", but the agent searched for "dog photos" instead."}}'
		)

		if self.browser_context.session:
			state = await self.browser_context.get_state(cache_clickable_elements_hashes=False)
			content = AgentMessagePrompt(
				state=state,
				result=self.state.last_result,
				include_attributes=self.settings.include_attributes,
			)
			msg = [SystemMessage(content=system_msg), content.get_user_message(self.settings.use_vision)]
		else:
			# if no browser session, we can't validate the output
			return True

		class ValidationResult(BaseModel):
			"""
			Validation results.
			"""

			is_valid: bool
			reason: str

		validator = self.llm.with_structured_output(ValidationResult, include_raw=True)
		response: dict[str, Any] = await validator.ainvoke(msg)  # type: ignore
		parsed: ValidationResult = response['parsed']
		is_valid = parsed.is_valid
		if not is_valid:
			logger.info(f'❌ Validator decision: {parsed.reason}')
			msg = f'The output is not yet correct. {parsed.reason}.'
			self.state.last_result = [ActionResult(extracted_content=msg, include_in_memory=True)]
		else:
			logger.info(f'✅ Validator decision: {parsed.reason}')
		return is_valid

	async def log_completion(self) -> None:
		"""Log the completion of the task"""
		logger.info('✅ Task completed')
		if self.state.history.is_successful():
			logger.info('✅ Successfully')
		else:
			logger.info('❌ Unfinished')

		total_tokens = self.state.history.total_input_tokens()
		logger.info(f'📝 Total input tokens used (approximate): {total_tokens}')

		if self.register_done_callback:
			if inspect.iscoroutinefunction(self.register_done_callback):
				await self.register_done_callback(self.state.history)
			else:
				self.register_done_callback(self.state.history)

	async def rerun_history(
		self,
		history: AgentHistoryList,
		max_retries: int = 3,
		skip_failures: bool = True,
		delay_between_actions: float = 2.0,
	) -> list[ActionResult]:
		"""
		Rerun a saved history of actions with error handling and retry logic.

		Args:
				history: The history to replay
				max_retries: Maximum number of retries per action
				skip_failures: Whether to skip failed actions or stop execution
				delay_between_actions: Delay between actions in seconds

		Returns:
				List of action results
		"""
		# Execute initial actions if provided
		if self.initial_actions:
			result = await self.multi_act(self.initial_actions)
			self.state.last_result = result

		results = []

		for i, history_item in enumerate(history.history):
			goal = history_item.model_output.current_state.next_goal if history_item.model_output else ''
			logger.info(f'Replaying step {i + 1}/{len(history.history)}: goal: {goal}')

			if (
				not history_item.model_output
				or not history_item.model_output.action
				or history_item.model_output.action == [None]
			):
				logger.warning(f'Step {i + 1}: No action to replay, skipping')
				results.append(ActionResult(error='No action to replay'))
				continue

			retry_count = 0
			while retry_count < max_retries:
				try:
					result = await self._execute_history_step(history_item, delay_between_actions)
					results.extend(result)
					break

				except Exception as e:
					retry_count += 1
					if retry_count == max_retries:
						error_msg = f'Step {i + 1} failed after {max_retries} attempts: {str(e)}'
						logger.error(error_msg)
						if not skip_failures:
							results.append(ActionResult(error=error_msg))
							raise RuntimeError(error_msg)
					else:
						logger.warning(f'Step {i + 1} failed (attempt {retry_count}/{max_retries}), retrying...')
						await asyncio.sleep(delay_between_actions)

		return results

	async def _execute_history_step(self, history_item: AgentHistory, delay: float) -> list[ActionResult]:
		"""Execute a single step from history with element validation"""
		state = await self.browser_context.get_state(cache_clickable_elements_hashes=False)
		if not state or not history_item.model_output:
			raise ValueError('Invalid state or model output')
		updated_actions = []
		for i, action in enumerate(history_item.model_output.action):
			updated_action = await self._update_action_indices(
				history_item.state.interacted_element[i],
				action,
				state,
			)
			updated_actions.append(updated_action)

			if updated_action is None:
				raise ValueError(f'Could not find matching element {i} in current page')

		result = await self.multi_act(updated_actions)

		await asyncio.sleep(delay)
		return result

	async def _update_action_indices(
		self,
		historical_element: DOMHistoryElement | None,
		action: ActionModel,  # Type this properly based on your action model
		current_state: BrowserState,
	) -> ActionModel | None:
		"""
		Update action indices based on current page state.
		Returns updated action or None if element cannot be found.
		"""
		if not historical_element or not current_state.element_tree:
			return action

		current_element = HistoryTreeProcessor.find_history_element_in_tree(historical_element, current_state.element_tree)

		if not current_element or current_element.highlight_index is None:
			return None

		old_index = action.get_index()
		if old_index != current_element.highlight_index:
			action.set_index(current_element.highlight_index)
			logger.info(f'Element moved in DOM, updated index from {old_index} to {current_element.highlight_index}')

		return action

	async def load_and_rerun(self, history_file: str | Path | None = None, **kwargs) -> list[ActionResult]:
		"""
		Load history from file and rerun it.

		Args:
				history_file: Path to the history file
				**kwargs: Additional arguments passed to rerun_history
		"""
		if not history_file:
			history_file = 'AgentHistory.json'
		history = AgentHistoryList.load_from_file(history_file, self.AgentOutput)
		return await self.rerun_history(history, **kwargs)

	def save_history(self, file_path: str | Path | None = None) -> None:
		"""Save the history to a file"""
		if not file_path:
			file_path = 'AgentHistory.json'
		self.state.history.save_to_file(file_path)

	def pause(self) -> None:
		"""Pause the agent before the next step"""
		print('\n\n⏸️  Got Ctrl+C, paused the agent and left the browser open.')
		self.state.paused = True

		# The signal handler will handle the asyncio pause logic for us
		# No need to duplicate the code here

	def resume(self) -> None:
		"""Resume the agent"""
		print('----------------------------------------------------------------------')
		print('▶️  Got Enter, resuming agent execution where it left off...\n')
		self.state.paused = False

		# The signal handler should have already reset the flags
		# through its reset() method when called from run()

		# playwright browser is always immediately killed by the first Ctrl+C (no way to stop that)
		# so we need to restart the browser if user wants to continue
		if self.browser:
			logger.info('🌎 Restarting/reconnecting to browser...')
			loop = asyncio.get_event_loop()
			loop.create_task(self.browser._init())
			loop.create_task(asyncio.sleep(5))

	def stop(self) -> None:
		"""Stop the agent"""
		logger.info('⏹️ Agent stopping')
		self.state.stopped = True

	def _convert_initial_actions(self, actions: list[dict[str, dict[str, Any]]]) -> list[ActionModel]:
		"""Convert dictionary-based actions to ActionModel instances"""
		converted_actions = []
		action_model = self.ActionModel
		for action_dict in actions:
			# Each action_dict should have a single key-value pair
			action_name = next(iter(action_dict))
			params = action_dict[action_name]

			# Get the parameter model for this action from registry
			action_info = self.controller.registry.registry.actions[action_name]
			param_model = action_info.param_model

			# Create validated parameters using the appropriate param model
			validated_params = param_model(**params)

			# Create ActionModel instance with the validated parameters
			action_model = self.ActionModel(**{action_name: validated_params})
			converted_actions.append(action_model)

		return converted_actions

	def _verify_llm_connection(self) -> bool:
		"""
		Verify that the LLM API keys are setup and the LLM API is responding properly.
		Helps prevent errors due to running out of API credits, missing env vars, or network issues.
		"""
		logger.debug(f'Verifying the {self.llm.__class__.__name__} LLM knows the capital of France...')

		if getattr(self.llm, '_verified_api_keys', None) is True or SKIP_LLM_API_KEY_VERIFICATION:
			# skip roundtrip connection test for speed in cloud environment
			# If the LLM API keys have already been verified during a previous run, skip the test
			self.llm._verified_api_keys = True
			return True

		# show a warning if it looks like any required environment variables are missing
		required_keys = REQUIRED_LLM_API_ENV_VARS.get(self.llm.__class__.__name__, [])
		if required_keys and not check_env_variables(required_keys, any_or_all=all):
			error = f'Expected LLM API Key environment variables might be missing for {self.llm.__class__.__name__}: {" ".join(required_keys)}'
			logger.warning(f'❌ {error}')

		# send a basic sanity-test question to the LLM and verify the response
		test_prompt = 'What is the capital of France? Respond with a single word.'
		test_answer = 'paris'
		try:
			# dont convert this to async! it *should* block any subsequent llm calls from running
			response = self.llm.invoke([HumanMessage(content=test_prompt)])
			response_text = str(response.content).lower()

			if test_answer in response_text:
				logger.debug(
					f'🪪 LLM API keys {", ".join(required_keys)} work, {self.llm.__class__.__name__} model is connected & responding correctly.'
				)
				self.llm._verified_api_keys = True
				return True
			else:
				logger.warning(
					'❌  Got bad LLM response to basic sanity check question: \n\t  %s\n\t\tEXPECTING: %s\n\t\tGOT: %s',
					test_prompt,
					test_answer,
					response,
				)
				raise Exception('LLM responded to a simple test question incorrectly')
		except Exception as e:
			self.llm._verified_api_keys = False
			if required_keys:
				logger.error(
					f'\n\n❌  LLM {self.llm.__class__.__name__} connection test failed. Check that {", ".join(required_keys)} is set correctly in .env and that the LLM API account has sufficient funding.\n\n{e}\n'
				)
				return False
			else:
				pass

	async def _run_planner(self) -> str | None:
		"""Run the planner to analyze state and suggest next steps"""
		# Skip planning if no planner_llm is set
		if not self.settings.planner_llm:
			return None

		# Get current state to filter actions by page
		page = await self.browser_context.get_current_page()

		# Get all standard actions (no filter) and page-specific actions
		standard_actions = self.controller.registry.get_prompt_description()  # No page = system prompt actions
		page_actions = self.controller.registry.get_prompt_description(page)  # Page-specific actions

		# Combine both for the planner
		all_actions = standard_actions
		if page_actions:
			all_actions += '\n' + page_actions

		# Create planner message history using full message history with all available actions
		planner_messages = [
			PlannerPrompt(all_actions).get_system_message(
				is_planner_reasoning=self.settings.is_planner_reasoning,
				extended_planner_system_prompt=self.settings.extend_planner_system_message,
			),
			*self._message_manager.get_messages()[1:],  # Use full message history except the first
		]

		if not self.settings.use_vision_for_planner and self.settings.use_vision:
			last_state_message: HumanMessage = planner_messages[-1]
			# remove image from last state message
			new_msg = ''
			if isinstance(last_state_message.content, list):
				for msg in last_state_message.content:
					if msg['type'] == 'text':  # type: ignore
						new_msg += msg['text']  # type: ignore
					elif msg['type'] == 'image_url':  # type: ignore
						continue  # type: ignore
			else:
				new_msg = last_state_message.content

			planner_messages[-1] = HumanMessage(content=new_msg)

		planner_messages = convert_input_messages(planner_messages, self.planner_model_name)

		# Get planner output
		try:
			response = await self.settings.planner_llm.ainvoke(planner_messages)
		except Exception as e:
			logger.error(f'Failed to invoke planner: {str(e)}')
			raise LLMException(401, 'LLM API call failed') from e

		plan = str(response.content)
		# if deepseek-reasoner, remove think tags
		if self.planner_model_name and (
			'deepseek-r1' in self.planner_model_name or 'deepseek-reasoner' in self.planner_model_name
		):
			plan = self._remove_think_tags(plan)
		try:
			plan_json = json.loads(plan)
			logger.info(f'Planning Analysis:\n{json.dumps(plan_json, indent=4)}')
		except json.JSONDecodeError:
			logger.info(f'Planning Analysis:\n{plan}')
		except Exception as e:
			logger.debug(f'Error parsing planning analysis: {e}')
			logger.info(f'Plan: {plan}')

		return plan

	@property
	def message_manager(self) -> MessageManager:
		return self._message_manager

	async def close(self):
		"""Close all resources"""
		try:
			# First close browser resources
			if self.browser_context and not self.injected_browser_context:
				await self.browser_context.close()
			if self.browser and not self.injected_browser:
				await self.browser.close()

			# Force garbage collection
			gc.collect()

		except Exception as e:
			logger.error(f'Error during cleanup: {e}')

	async def _update_action_models_for_page(self, page) -> None:
		"""Update action models with page-specific actions"""
		# Create new action model with current page's filtered actions
		self.ActionModel = self.controller.registry.create_action_model(page=page)
		# Update output model with the new actions
		self.AgentOutput = AgentOutput.type_with_custom_actions(self.ActionModel)

		# Update done action model too
		self.DoneActionModel = self.controller.registry.create_action_model(include_actions=['done'], page=page)
		self.DoneAgentOutput = AgentOutput.type_with_custom_actions(self.DoneActionModel)

```

## /browser_use/agent/system_prompt.md

You are an AI agent designed to automate browser tasks. Your goal is to accomplish the ultimate task following the rules.

# Input Format

Task
Previous steps
Current URL
Open Tabs
Interactive Elements
[index]<type>text</type>

- index: Numeric identifier for interaction
- type: HTML element type (button, input, etc.)
- text: Element description
  Example:
  [33]<div>User form</div>
  \t*[35]*<button aria-label='Submit form'>Submit</button>

- Only elements with numeric indexes in [] are interactive
- (stacked) indentation (with \t) is important and means that the element is a (html) child of the element above (with a lower index)
- Elements with \* are new elements that were added after the previous step (if url has not changed)

# Response Rules

1. RESPONSE FORMAT: You must ALWAYS respond with valid JSON in this exact format:
   {{"current_state": {{"evaluation_previous_goal": "Success|Failed|Unknown - Analyze the current elements and the image to check if the previous goals/actions are successful like intended by the task. Mention if something unexpected happened. Shortly state why/why not",
   "memory": "Description of what has been done and what you need to remember. Be very specific. Count here ALWAYS how many times you have done something and how many remain. E.g. 0 out of 10 websites analyzed. Continue with abc and xyz",
   "next_goal": "What needs to be done with the next immediate action"}},
   "action":[{{"one_action_name": {{// action-specific parameter}}}}, // ... more actions in sequence]}}

2. ACTIONS: You can specify multiple actions in the list to be executed in sequence. But always specify only one action name per item. Use maximum {max_actions} actions per sequence.
Common action sequences:

- Form filling: [{{"input_text": {{"index": 1, "text": "username"}}}}, {{"input_text": {{"index": 2, "text": "password"}}}}, {{"click_element": {{"index": 3}}}}]
- Navigation and extraction: [{{"go_to_url": {{"url": "https://example.com"}}}}, {{"extract_content": {{"goal": "extract the names"}}}}]
- Actions are executed in the given order
- If the page changes after an action, the sequence is interrupted and you get the new state.
- Only provide the action sequence until an action which changes the page state significantly.
- Try to be efficient, e.g. fill forms at once, or chain actions where nothing changes on the page
- only use multiple actions if it makes sense.

3. ELEMENT INTERACTION:

- Only use indexes of the interactive elements

4. NAVIGATION & ERROR HANDLING:

- If no suitable elements exist, use other functions to complete the task
- If stuck, try alternative approaches - like going back to a previous page, new search, new tab etc.
- Handle popups/cookies by accepting or closing them
- Use scroll to find elements you are looking for
- If you want to research something, open a new tab instead of using the current tab
- If captcha pops up, try to solve it - else try a different approach
- If the page is not fully loaded, use wait action

5. TASK COMPLETION:

- Use the done action as the last action as soon as the ultimate task is complete
- Dont use "done" before you are done with everything the user asked you, except you reach the last step of max_steps.
- If you reach your last step, use the done action even if the task is not fully finished. Provide all the information you have gathered so far. If the ultimate task is completely finished set success to true. If not everything the user asked for is completed set success in done to false!
- If you have to do something repeatedly for example the task says for "each", or "for all", or "x times", count always inside "memory" how many times you have done it and how many remain. Don't stop until you have completed like the task asked you. Only call done after the last step.
- Don't hallucinate actions
- Make sure you include everything you found out for the ultimate task in the done text parameter. Do not just say you are done, but include the requested information of the task.

6. VISUAL CONTEXT:

- When an image is provided, use it to understand the page layout
- Bounding boxes with labels on their top right corner correspond to element indexes

7. Form filling:

- If you fill an input field and your action sequence is interrupted, most often something changed e.g. suggestions popped up under the field.

8. Long tasks:

- Keep track of the status and subresults in the memory.
- You are provided with procedural memory summaries that condense previous task history (every N steps). Use these summaries to maintain context about completed actions, current progress, and next steps. The summaries appear in chronological order and contain key information about navigation history, findings, errors encountered, and current state. Refer to these summaries to avoid repeating actions and to ensure consistent progress toward the task goal.

9. Extraction:

- If your task is to find information - call extract_content on the specific pages to get and store the information.
  Your responses must be always JSON with the specified format.


## /browser_use/agent/tests.py

```py path="/browser_use/agent/tests.py" 
import pytest

from browser_use.agent.views import (
	ActionResult,
	AgentBrain,
	AgentHistory,
	AgentHistoryList,
	AgentOutput,
)
from browser_use.browser.views import BrowserState, BrowserStateHistory, TabInfo
from browser_use.controller.registry.service import Registry
from browser_use.controller.views import ClickElementAction, DoneAction, ExtractPageContentAction
from browser_use.dom.views import DOMElementNode


@pytest.fixture
def sample_browser_state():
	return BrowserState(
		url='https://example.com',
		title='Example Page',
		tabs=[TabInfo(url='https://example.com', title='Example Page', page_id=1)],
		screenshot='screenshot1.png',
		element_tree=DOMElementNode(
			tag_name='root',
			is_visible=True,
			parent=None,
			xpath='',
			attributes={},
			children=[],
		),
		selector_map={},
	)


@pytest.fixture
def action_registry():
	registry = Registry()

	# Register the actions we need for testing
	@registry.action(description='Click an element', param_model=ClickElementAction)
	def click_element(params: ClickElementAction, browser=None):
		pass

	@registry.action(
		description='Extract page content',
		param_model=ExtractPageContentAction,
	)
	def extract_page_content(params: ExtractPageContentAction, browser=None):
		pass

	@registry.action(description='Mark task as done', param_model=DoneAction)
	def done(params: DoneAction):
		pass

	# Create the dynamic ActionModel with all registered actions
	return registry.create_action_model()


@pytest.fixture
def sample_history(action_registry):
	# Create actions with nested params structure
	click_action = action_registry(click_element={'index': 1})

	extract_action = action_registry(extract_page_content={'value': 'text'})

	done_action = action_registry(done={'text': 'Task completed'})

	histories = [
		AgentHistory(
			model_output=AgentOutput(
				current_state=AgentBrain(
					evaluation_previous_goal='None',
					memory='Started task',
					next_goal='Click button',
				),
				action=[click_action],
			),
			result=[ActionResult(is_done=False)],
			state=BrowserStateHistory(
				url='https://example.com',
				title='Page 1',
				tabs=[TabInfo(url='https://example.com', title='Page 1', page_id=1)],
				screenshot='screenshot1.png',
				interacted_element=[{'xpath': '//button[1]'}],
			),
		),
		AgentHistory(
			model_output=AgentOutput(
				current_state=AgentBrain(
					evaluation_previous_goal='Clicked button',
					memory='Button clicked',
					next_goal='Extract content',
				),
				action=[extract_action],
			),
			result=[
				ActionResult(
					is_done=False,
					extracted_content='Extracted text',
					error='Failed to extract completely',
				)
			],
			state=BrowserStateHistory(
				url='https://example.com/page2',
				title='Page 2',
				tabs=[TabInfo(url='https://example.com/page2', title='Page 2', page_id=2)],
				screenshot='screenshot2.png',
				interacted_element=[{'xpath': '//div[1]'}],
			),
		),
		AgentHistory(
			model_output=AgentOutput(
				current_state=AgentBrain(
					evaluation_previous_goal='Extracted content',
					memory='Content extracted',
					next_goal='Finish task',
				),
				action=[done_action],
			),
			result=[ActionResult(is_done=True, extracted_content='Task completed', error=None)],
			state=BrowserStateHistory(
				url='https://example.com/page2',
				title='Page 2',
				tabs=[TabInfo(url='https://example.com/page2', title='Page 2', page_id=2)],
				screenshot='screenshot3.png',
				interacted_element=[{'xpath': '//div[1]'}],
			),
		),
	]
	return AgentHistoryList(history=histories)


def test_last_model_output(sample_history: AgentHistoryList):
	last_output = sample_history.last_action()
	print(last_output)
	assert last_output == {'done': {'text': 'Task completed'}}


def test_get_errors(sample_history: AgentHistoryList):
	errors = sample_history.errors()
	assert len(errors) == 1
	assert errors[0] == 'Failed to extract completely'


def test_final_result(sample_history: AgentHistoryList):
	assert sample_history.final_result() == 'Task completed'


def test_is_done(sample_history: AgentHistoryList):
	assert sample_history.is_done() is True


def test_urls(sample_history: AgentHistoryList):
	urls = sample_history.urls()
	assert 'https://example.com' in urls
	assert 'https://example.com/page2' in urls


def test_all_screenshots(sample_history: AgentHistoryList):
	screenshots = sample_history.screenshots()
	assert len(screenshots) == 3
	assert screenshots == ['screenshot1.png', 'screenshot2.png', 'screenshot3.png']


def test_all_model_outputs(sample_history: AgentHistoryList):
	outputs = sample_history.model_actions()
	print(f'DEBUG: {outputs[0]}')
	assert len(outputs) == 3
	# get first key value pair
	assert dict([next(iter(outputs[0].items()))]) == {'click_element': {'index': 1}}
	assert dict([next(iter(outputs[1].items()))]) == {'extract_page_content': {'value': 'text'}}
	assert dict([next(iter(outputs[2].items()))]) == {'done': {'text': 'Task completed'}}


def test_all_model_outputs_filtered(sample_history: AgentHistoryList):
	filtered = sample_history.model_actions_filtered(include=['click_element'])
	assert len(filtered) == 1
	assert filtered[0]['click_element']['index'] == 1


def test_empty_history():
	empty_history = AgentHistoryList(history=[])
	assert empty_history.last_action() is None
	assert empty_history.final_result() is None
	assert empty_history.is_done() is False
	assert len(empty_history.urls()) == 0


# Add a test to verify action creation
def test_action_creation(action_registry):
	click_action = action_registry(click_element={'index': 1})

	assert click_action.model_dump(exclude_none=True) == {'click_element': {'index': 1}}


# run this with:
# pytest browser_use/agent/tests.py

```

## /browser_use/agent/views.py

```py path="/browser_use/agent/views.py" 
from __future__ import annotations

import json
import traceback
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal

from langchain_core.language_models.chat_models import BaseChatModel
from openai import RateLimitError
from pydantic import BaseModel, ConfigDict, Field, ValidationError, create_model

from browser_use.agent.message_manager.views import MessageManagerState
from browser_use.agent.playwright_script_generator import PlaywrightScriptGenerator
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextConfig
from browser_use.browser.views import BrowserStateHistory
from browser_use.controller.registry.views import ActionModel
from browser_use.dom.history_tree_processor.service import (
	DOMElementNode,
	DOMHistoryElement,
	HistoryTreeProcessor,
)
from browser_use.dom.views import SelectorMap

ToolCallingMethod = Literal['function_calling', 'json_mode', 'raw', 'auto', 'tools']
REQUIRED_LLM_API_ENV_VARS = {
	'ChatOpenAI': ['OPENAI_API_KEY'],
	'AzureChatOpenAI': ['AZURE_OPENAI_ENDPOINT', 'AZURE_OPENAI_KEY'],
	'ChatBedrockConverse': ['ANTHROPIC_API_KEY'],
	'ChatAnthropic': ['ANTHROPIC_API_KEY'],
	'ChatGoogleGenerativeAI': ['GOOGLE_API_KEY'],
	'ChatDeepSeek': ['DEEPSEEK_API_KEY'],
	'ChatOllama': [],
	'ChatGrok': ['GROK_API_KEY'],
}


class AgentSettings(BaseModel):
	"""Options for the agent"""

	use_vision: bool = True
	use_vision_for_planner: bool = False
	save_conversation_path: str | None = None
	save_conversation_path_encoding: str | None = 'utf-8'
	max_failures: int = 3
	retry_delay: int = 10
	max_input_tokens: int = 128000
	validate_output: bool = False
	message_context: str | None = None
	generate_gif: bool | str = False
	available_file_paths: list[str] | None = None
	override_system_message: str | None = None
	extend_system_message: str | None = None
	include_attributes: list[str] = [
		'title',
		'type',
		'name',
		'role',
		'tabindex',
		'aria-label',
		'placeholder',
		'value',
		'alt',
		'aria-expanded',
	]
	max_actions_per_step: int = 10

	tool_calling_method: ToolCallingMethod | None = 'auto'
	page_extraction_llm: BaseChatModel | None = None
	planner_llm: BaseChatModel | None = None
	planner_interval: int = 1  # Run planner every N steps
	is_planner_reasoning: bool = False  # type: ignore
	extend_planner_system_message: str | None = None

	# Playwright script generation setting
	save_playwright_script_path: str | None = None  # Path to save the generated Playwright script


class AgentState(BaseModel):
	"""Holds all state information for an Agent"""

	agent_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
	n_steps: int = 1
	consecutive_failures: int = 0
	last_result: list[ActionResult] | None = None
	history: AgentHistoryList = Field(default_factory=lambda: AgentHistoryList(history=[]))
	last_plan: str | None = None
	paused: bool = False
	stopped: bool = False

	message_manager_state: MessageManagerState = Field(default_factory=MessageManagerState)

	# class Config:
	# 	arbitrary_types_allowed = True


@dataclass
class AgentStepInfo:
	step_number: int
	max_steps: int

	def is_last_step(self) -> bool:
		"""Check if this is the last step"""
		return self.step_number >= self.max_steps - 1


class ActionResult(BaseModel):
	"""Result of executing an action"""

	is_done: bool | None = False
	success: bool | None = None
	extracted_content: str | None = None
	error: str | None = None
	include_in_memory: bool = False  # whether to include in past messages as context or not


class StepMetadata(BaseModel):
	"""Metadata for a single step including timing and token information"""

	step_start_time: float
	step_end_time: float
	input_tokens: int  # Approximate tokens from message manager for this step
	step_number: int

	@property
	def duration_seconds(self) -> float:
		"""Calculate step duration in seconds"""
		return self.step_end_time - self.step_start_time


class AgentBrain(BaseModel):
	"""Current state of the agent"""

	evaluation_previous_goal: str
	memory: str
	next_goal: str


class AgentOutput(BaseModel):
	"""Output model for agent

	@dev note: this model is extended with custom actions in AgentService. You can also use some fields that are not in this model as provided by the linter, as long as they are registered in the DynamicActions model.
	"""

	model_config = ConfigDict(arbitrary_types_allowed=True)

	current_state: AgentBrain
	action: list[ActionModel] = Field(
		...,
		description='List of actions to execute',
		json_schema_extra={'min_items': 1},  # Ensure at least one action is provided
	)

	@staticmethod
	def type_with_custom_actions(custom_actions: type[ActionModel]) -> type[AgentOutput]:
		"""Extend actions with custom actions"""
		model_ = create_model(
			'AgentOutput',
			__base__=AgentOutput,
			action=(
				list[custom_actions],
				Field(..., description='List of actions to execute', json_schema_extra={'min_items': 1}),
			),
			__module__=AgentOutput.__module__,
		)
		model_.__doc__ = 'AgentOutput model with custom actions'
		return model_


class AgentHistory(BaseModel):
	"""History item for agent actions"""

	model_output: AgentOutput | None
	result: list[ActionResult]
	state: BrowserStateHistory
	metadata: StepMetadata | None = None

	model_config = ConfigDict(arbitrary_types_allowed=True, protected_namespaces=())

	@staticmethod
	def get_interacted_element(model_output: AgentOutput, selector_map: SelectorMap) -> list[DOMHistoryElement | None]:
		elements = []
		for action in model_output.action:
			index = action.get_index()
			if index is not None and index in selector_map:
				el: DOMElementNode = selector_map[index]
				elements.append(HistoryTreeProcessor.convert_dom_element_to_history_element(el))
			else:
				elements.append(None)
		return elements

	def model_dump(self, **kwargs) -> dict[str, Any]:
		"""Custom serialization handling circular references"""

		# Handle action serialization
		model_output_dump = None
		if self.model_output:
			action_dump = [action.model_dump(exclude_none=True) for action in self.model_output.action]
			model_output_dump = {
				'current_state': self.model_output.current_state.model_dump(),
				'action': action_dump,  # This preserves the actual action data
			}

		return {
			'model_output': model_output_dump,
			'result': [r.model_dump(exclude_none=True) for r in self.result],
			'state': self.state.to_dict(),
			'metadata': self.metadata.model_dump() if self.metadata else None,
		}


class AgentHistoryList(BaseModel):
	"""List of agent history items"""

	history: list[AgentHistory]

	def total_duration_seconds(self) -> float:
		"""Get total duration of all steps in seconds"""
		total = 0.0
		for h in self.history:
			if h.metadata:
				total += h.metadata.duration_seconds
		return total

	def total_input_tokens(self) -> int:
		"""
		Get total tokens used across all steps.
		Note: These are from the approximate token counting of the message manager.
		For accurate token counting, use tools like LangChain Smith or OpenAI's token counters.
		"""
		total = 0
		for h in self.history:
			if h.metadata:
				total += h.metadata.input_tokens
		return total

	def input_token_usage(self) -> list[int]:
		"""Get token usage for each step"""
		return [h.metadata.input_tokens for h in self.history if h.metadata]

	def __str__(self) -> str:
		"""Representation of the AgentHistoryList object"""
		return f'AgentHistoryList(all_results={self.action_results()}, all_model_outputs={self.model_actions()})'

	def __repr__(self) -> str:
		"""Representation of the AgentHistoryList object"""
		return self.__str__()

	def save_to_file(self, filepath: str | Path) -> None:
		"""Save history to JSON file with proper serialization"""
		try:
			Path(filepath).parent.mkdir(parents=True, exist_ok=True)
			data = self.model_dump()
			with open(filepath, 'w', encoding='utf-8') as f:
				json.dump(data, f, indent=2)
		except Exception as e:
			raise e

	def save_as_playwright_script(
		self,
		output_path: str | Path,
		sensitive_data_keys: list[str] | None = None,
		browser_config: BrowserConfig | None = None,
		context_config: BrowserContextConfig | None = None,
	) -> None:
		"""
		Generates a Playwright script based on the agent's history and saves it to a file.
		Args:
			output_path: The path where the generated Python script will be saved.
			sensitive_data_keys: A list of keys used as placeholders for sensitive data
								 (e.g., ['username_placeholder', 'password_placeholder']).
								 These will be loaded from environment variables in the
								 generated script.
			browser_config: Configuration of the original Browser instance.
			context_config: Configuration of the original BrowserContext instance.
		"""
		try:
			serialized_history = self.model_dump()['history']
			generator = PlaywrightScriptGenerator(serialized_history, sensitive_data_keys, browser_config, context_config)
			script_content = generator.generate_script_content()
			path_obj = Path(output_path)
			path_obj.parent.mkdir(parents=True, exist_ok=True)
			with open(path_obj, 'w', encoding='utf-8') as f:
				f.write(script_content)
		except Exception as e:
			raise e

	def model_dump(self, **kwargs) -> dict[str, Any]:
		"""Custom serialization that properly uses AgentHistory's model_dump"""
		return {
			'history': [h.model_dump(**kwargs) for h in self.history],
		}

	@classmethod
	def load_from_file(cls, filepath: str | Path, output_model: type[AgentOutput]) -> AgentHistoryList:
		"""Load history from JSON file"""
		with open(filepath, encoding='utf-8') as f:
			data = json.load(f)
		# loop through history and validate output_model actions to enrich with custom actions
		for h in data['history']:
			if h['model_output']:
				if isinstance(h['model_output'], dict):
					h['model_output'] = output_model.model_validate(h['model_output'])
				else:
					h['model_output'] = None
			if 'interacted_element' not in h['state']:
				h['state']['interacted_element'] = None
		history = cls.model_validate(data)
		return history

	def last_action(self) -> None | dict:
		"""Last action in history"""
		if self.history and self.history[-1].model_output:
			return self.history[-1].model_output.action[-1].model_dump(exclude_none=True)
		return None

	def errors(self) -> list[str | None]:
		"""Get all errors from history, with None for steps without errors"""
		errors = []
		for h in self.history:
			step_errors = [r.error for r in h.result if r.error]

			# each step can have only one error
			errors.append(step_errors[0] if step_errors else None)
		return errors

	def final_result(self) -> None | str:
		"""Final result from history"""
		if self.history and self.history[-1].result[-1].extracted_content:
			return self.history[-1].result[-1].extracted_content
		return None

	def is_done(self) -> bool:
		"""Check if the agent is done"""
		if self.history and len(self.history[-1].result) > 0:
			last_result = self.history[-1].result[-1]
			return last_result.is_done is True
		return False

	def is_successful(self) -> bool | None:
		"""Check if the agent completed successfully - the agent decides in the last step if it was successful or not. None if not done yet."""
		if self.history and len(self.history[-1].result) > 0:
			last_result = self.history[-1].result[-1]
			if last_result.is_done is True:
				return last_result.success
		return None

	def has_errors(self) -> bool:
		"""Check if the agent has any non-None errors"""
		return any(error is not None for error in self.errors())

	def urls(self) -> list[str | None]:
		"""Get all unique URLs from history"""
		return [h.state.url if h.state.url is not None else None for h in self.history]

	def screenshots(self) -> list[str | None]:
		"""Get all screenshots from history"""
		return [h.state.screenshot if h.state.screenshot is not None else None for h in self.history]

	def action_names(self) -> list[str]:
		"""Get all action names from history"""
		action_names = []
		for action in self.model_actions():
			actions = list(action.keys())
			if actions:
				action_names.append(actions[0])
		return action_names

	def model_thoughts(self) -> list[AgentBrain]:
		"""Get all thoughts from history"""
		return [h.model_output.current_state for h in self.history if h.model_output]

	def model_outputs(self) -> list[AgentOutput]:
		"""Get all model outputs from history"""
		return [h.model_output for h in self.history if h.model_output]

	# get all actions with params
	def model_actions(self) -> list[dict]:
		"""Get all actions from history"""
		outputs = []

		for h in self.history:
			if h.model_output:
				for action, interacted_element in zip(h.model_output.action, h.state.interacted_element):
					output = action.model_dump(exclude_none=True)
					output['interacted_element'] = interacted_element
					outputs.append(output)
		return outputs

	def action_results(self) -> list[ActionResult]:
		"""Get all results from history"""
		results = []
		for h in self.history:
			results.extend([r for r in h.result if r])
		return results

	def extracted_content(self) -> list[str]:
		"""Get all extracted content from history"""
		content = []
		for h in self.history:
			content.extend([r.extracted_content for r in h.result if r.extracted_content])
		return content

	def model_actions_filtered(self, include: list[str] | None = None) -> list[dict]:
		"""Get all model actions from history as JSON"""
		if include is None:
			include = []
		outputs = self.model_actions()
		result = []
		for o in outputs:
			for i in include:
				if i == list(o.keys())[0]:
					result.append(o)
		return result

	def number_of_steps(self) -> int:
		"""Get the number of steps in the history"""
		return len(self.history)


class AgentError:
	"""Container for agent error handling"""

	VALIDATION_ERROR = 'Invalid model output format. Please follow the correct schema.'
	RATE_LIMIT_ERROR = 'Rate limit reached. Waiting before retry.'
	NO_VALID_ACTION = 'No valid action found'

	@staticmethod
	def format_error(error: Exception, include_trace: bool = False) -> str:
		"""Format error message based on error type and optionally include trace"""
		message = ''
		if isinstance(error, ValidationError):
			return f'{AgentError.VALIDATION_ERROR}\nDetails: {str(error)}'
		if isinstance(error, RateLimitError):
			return AgentError.RATE_LIMIT_ERROR
		if include_trace:
			return f'{str(error)}\nStacktrace:\n{traceback.format_exc()}'
		return f'{str(error)}'

```

## /browser_use/browser/browser.py

```py path="/browser_use/browser/browser.py" 
"""
Playwright browser on steroids.
"""

import asyncio
import gc
import logging
import os
import socket
import subprocess
from pathlib import Path
from tempfile import gettempdir
from typing import Literal

import httpx
import psutil
from dotenv import load_dotenv
from playwright.async_api import Browser as PlaywrightBrowser
from playwright.async_api import Playwright, async_playwright
from pydantic import AliasChoices, BaseModel, ConfigDict, Field

load_dotenv()


from browser_use.browser.chrome import (
	CHROME_ARGS,
	CHROME_DEBUG_PORT,
	CHROME_DETERMINISTIC_RENDERING_ARGS,
	CHROME_DISABLE_SECURITY_ARGS,
	CHROME_DOCKER_ARGS,
	CHROME_HEADLESS_ARGS,
)
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.utils.screen_resolution import get_screen_resolution, get_window_adjustments
from browser_use.utils import time_execution_async

logger = logging.getLogger(__name__)

IN_DOCKER = os.environ.get('IN_DOCKER', 'false').lower()[0] in 'ty1'


class ProxySettings(BaseModel):
	"""the same as playwright.sync_api.ProxySettings, but now as a Pydantic BaseModel so pydantic can validate it"""

	server: str
	bypass: str | None = None
	username: str | None = None
	password: str | None = None

	model_config = ConfigDict(populate_by_name=True, from_attributes=True)

	# Support dict-like behavior for compatibility with Playwright's ProxySettings
	def __getitem__(self, key):
		return getattr(self, key)

	def get(self, key, default=None):
		return getattr(self, key, default)


class BrowserConfig(BaseModel):
	r"""
	Configuration for the Browser.

	Default values:
		headless: False
			Whether to run browser in headless mode (not recommended)

		disable_security: False
			Disable browser security features (required for cross-origin iframe support)

		extra_browser_args: []
			Extra arguments to pass to the browser

		wss_url: None
			Connect to a browser instance via WebSocket

		cdp_url: None
			Connect to a browser instance via CDP

		browser_binary_path: None
			Path to a Browser instance to use to connect to your normal browser
			e.g. '/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome'

		chrome_remote_debugging_port: 9222
			Chrome remote debugging port to use to when browser_binary_path is supplied.
			This allows running multiple chrome browsers with same browser_binary_path but running on different ports.
			Also, makes it possible to launch new user provided chrome browser without closing already opened chrome instances,
			by providing non-default chrome debugging port.

		keep_alive: False
			Keep the browser alive after the agent has finished running

		deterministic_rendering: False
			Enable deterministic rendering (makes GPU/font rendering consistent across different OS's and docker)
	"""

	model_config = ConfigDict(
		arbitrary_types_allowed=True,
		extra='ignore',
		populate_by_name=True,
		from_attributes=True,
		validate_assignment=True,
		revalidate_instances='subclass-instances',
	)

	wss_url: str | None = None
	cdp_url: str | None = None

	browser_class: Literal['chromium', 'firefox', 'webkit'] = 'chromium'
	browser_binary_path: str | None = Field(
		default=None, validation_alias=AliasChoices('browser_instance_path', 'chrome_instance_path')
	)
	chrome_remote_debugging_port: int | None = CHROME_DEBUG_PORT
	extra_browser_args: list[str] = Field(default_factory=list)

	headless: bool = False
	disable_security: bool = False  # disable_security=True is dangerous as any malicious URL visited could embed an iframe for the user's bank, and use their cookies to steal money
	deterministic_rendering: bool = False
	keep_alive: bool = Field(default=False, alias='_force_keep_browser_alive')  # used to be called _force_keep_browser_alive

	proxy: ProxySettings | None = None
	new_context_config: BrowserContextConfig = Field(default_factory=BrowserContextConfig)


# @singleton: TODO - think about id singleton makes sense here
# @dev By default this is a singleton, but you can create multiple instances if you need to.
class Browser:
	"""
	Playwright browser on steroids.

	This is persistent browser factory that can spawn multiple browser contexts.
	It is recommended to use only one instance of Browser per your application (RAM usage will grow otherwise).
	"""

	def __init__(
		self,
		config: BrowserConfig | None = None,
	):
		logger.debug('🌎  Initializing new browser')
		self.config = config or BrowserConfig()
		self.playwright: Playwright | None = None
		self.playwright_browser: PlaywrightBrowser | None = None

	async def new_context(self, config: BrowserContextConfig | None = None) -> BrowserContext:
		"""Create a browser context"""
		browser_config = self.config.model_dump() if self.config else {}
		context_config = config.model_dump() if config else {}
		merged_config = {**browser_config, **context_config}
		return BrowserContext(config=BrowserContextConfig(**merged_config), browser=self)

	async def get_playwright_browser(self) -> PlaywrightBrowser:
		"""Get a browser context"""
		if self.playwright_browser is None:
			return await self._init()

		return self.playwright_browser

	@time_execution_async('--init (browser)')
	async def _init(self):
		"""Initialize the browser session"""
		playwright = await async_playwright().start()
		self.playwright = playwright

		browser = await self._setup_browser(playwright)
		self.playwright_browser = browser

		return self.playwright_browser

	async def _setup_remote_cdp_browser(self, playwright: Playwright) -> PlaywrightBrowser:
		"""Sets up and returns a Playwright Browser instance with anti-detection measures. Firefox has no longer CDP support."""
		if 'firefox' in (self.config.browser_binary_path or '').lower():
			raise ValueError(
				'CDP has been deprecated for firefox, check: https://fxdx.dev/deprecating-cdp-support-in-firefox-embracing-the-future-with-webdriver-bidi/'
			)
		if not self.config.cdp_url:
			raise ValueError('CDP URL is required')
		logger.info(f'🔌  Connecting to remote browser via CDP {self.config.cdp_url}')
		browser_class = getattr(playwright, self.config.browser_class)
		browser = await browser_class.connect_over_cdp(self.config.cdp_url)
		return browser

	async def _setup_remote_wss_browser(self, playwright: Playwright) -> PlaywrightBrowser:
		"""Sets up and returns a Playwright Browser instance with anti-detection measures."""
		if not self.config.wss_url:
			raise ValueError('WSS URL is required')
		logger.info(f'🔌  Connecting to remote browser via WSS {self.config.wss_url}')
		browser_class = getattr(playwright, self.config.browser_class)
		browser = await browser_class.connect(self.config.wss_url)
		return browser

	async def _setup_user_provided_browser(self, playwright: Playwright) -> PlaywrightBrowser:
		"""Sets up and returns a Playwright Browser instance with anti-detection measures."""
		if not self.config.browser_binary_path:
			raise ValueError('A browser_binary_path is required')

		assert self.config.browser_class == 'chromium', (
			'browser_binary_path only supports chromium browsers (make sure browser_class=chromium)'
		)

		try:
			# Check if browser is already running
			async with httpx.AsyncClient() as client:
				response = await client.get(
					f'http://localhost:{self.config.chrome_remote_debugging_port}/json/version', timeout=2
				)
				if response.status_code == 200:
					logger.info(
						f'🔌  Reusing existing browser found running on http://localhost:{self.config.chrome_remote_debugging_port}'
					)
					browser_class = getattr(playwright, self.config.browser_class)
					browser = await browser_class.connect_over_cdp(
						endpoint_url=f'http://localhost:{self.config.chrome_remote_debugging_port}',
						timeout=20000,  # 20 second timeout for connection
					)
					return browser
		except httpx.RequestError:
			logger.debug('🌎  No existing Chrome instance found, starting a new one')

		provided_user_data_dir = [arg for arg in self.config.extra_browser_args if '--user-data-dir=' in arg]

		if provided_user_data_dir:
			user_data_dir = Path(provided_user_data_dir[0].split('=')[-1])
		else:
			fallback_user_data_dir = Path(gettempdir()) / 'browseruse' / 'profiles' / 'default'  # /tmp/browseruse
			try:
				# ~/.config/browseruse/profiles/default
				user_data_dir = Path('~/.config') / 'browseruse' / 'profiles' / 'default'
				user_data_dir = user_data_dir.expanduser()
				user_data_dir.mkdir(parents=True, exist_ok=True)
			except Exception as e:
				logger.error(f'❌  Failed to create ~/.config/browseruse directory: {type(e).__name__}: {e}')
				user_data_dir = fallback_user_data_dir
				user_data_dir.mkdir(parents=True, exist_ok=True)

		logger.info(f'🌐  Storing Browser Profile user data dir in: {user_data_dir}')
		try:
			# Remove any existing SingletonLock file to allow the browser to start
			(user_data_dir / 'Default' / 'SingletonLock').unlink()
			self.config.extra_browser_args.append('--no-first-run')
		except (FileNotFoundError, PermissionError, OSError):
			pass

		# Start a new Chrome instance
		chrome_launch_args = [
			*{  # remove duplicates (usually preserves the order, but not guaranteed)
				f'--remote-debugging-port={self.config.chrome_remote_debugging_port}',
				*([f'--user-data-dir={user_data_dir.resolve()}'] if not provided_user_data_dir else []),
				*CHROME_ARGS,
				*(CHROME_DOCKER_ARGS if IN_DOCKER else []),
				*(CHROME_HEADLESS_ARGS if self.config.headless else []),
				*(CHROME_DISABLE_SECURITY_ARGS if self.config.disable_security else []),
				*(CHROME_DETERMINISTIC_RENDERING_ARGS if self.config.deterministic_rendering else []),
				*self.config.extra_browser_args,
			},
		]
		chrome_sub_process = await asyncio.create_subprocess_exec(
			self.config.browser_binary_path,
			*chrome_launch_args,
			stdout=subprocess.DEVNULL,
			stderr=subprocess.DEVNULL,
			shell=False,
		)
		self._chrome_subprocess = psutil.Process(chrome_sub_process.pid)

		# Attempt to connect again after starting a new instance
		for _ in range(10):
			try:
				async with httpx.AsyncClient() as client:
					response = await client.get(
						f'http://localhost:{self.config.chrome_remote_debugging_port}/json/version', timeout=2
					)
					if response.status_code == 200:
						break
			except httpx.RequestError:
				pass
			await asyncio.sleep(1)

		# Attempt to connect again after starting a new instance
		try:
			browser_class = getattr(playwright, self.config.browser_class)
			browser = await browser_class.connect_over_cdp(
				endpoint_url=f'http://localhost:{self.config.chrome_remote_debugging_port}',
				timeout=20000,  # 20 second timeout for connection
			)
			return browser
		except Exception as e:
			logger.error(f'❌  Failed to start a new Chrome instance: {str(e)}')
			raise RuntimeError(
				'To start chrome in Debug mode, you need to close all existing Chrome instances and try again otherwise we can not connect to the instance.'
			)

	async def _setup_builtin_browser(self, playwright: Playwright) -> PlaywrightBrowser:
		"""Sets up and returns a Playwright Browser instance with anti-detection measures."""
		assert self.config.browser_binary_path is None, 'browser_binary_path should be None if trying to use the builtin browsers'

		# Use the configured window size from new_context_config if available
		if (
			not self.config.headless
			and hasattr(self.config, 'new_context_config')
			and hasattr(self.config.new_context_config, 'window_width')
			and hasattr(self.config.new_context_config, 'window_height')
			and not self.config.new_context_config.no_viewport
		):
			screen_size = {
				'width': self.config.new_context_config.window_width,
				'height': self.config.new_context_config.window_height,
			}
			offset_x, offset_y = get_window_adjustments()
		elif self.config.headless:
			screen_size = {'width': 1920, 'height': 1080}
			offset_x, offset_y = 0, 0
		else:
			screen_size = get_screen_resolution()
			offset_x, offset_y = get_window_adjustments()

		chrome_args = {
			f'--remote-debugging-port={self.config.chrome_remote_debugging_port}',
			*CHROME_ARGS,
			*(CHROME_DOCKER_ARGS if IN_DOCKER else []),
			*(CHROME_HEADLESS_ARGS if self.config.headless else []),
			*(CHROME_DISABLE_SECURITY_ARGS if self.config.disable_security else []),
			*(CHROME_DETERMINISTIC_RENDERING_ARGS if self.config.deterministic_rendering else []),
			f'--window-position={offset_x},{offset_y}',
			f'--window-size={screen_size["width"]},{screen_size["height"]}',
			*self.config.extra_browser_args,
		}

		# check if chrome remote debugging port is already taken,
		# if so remove the remote-debugging-port arg to prevent conflicts
		with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
			if s.connect_ex(('localhost', self.config.chrome_remote_debugging_port)) == 0:
				chrome_args.remove(f'--remote-debugging-port={self.config.chrome_remote_debugging_port}')

		browser_class = getattr(playwright, self.config.browser_class)
		args = {
			'chromium': list(chrome_args),
			'firefox': [
				*{
					'-no-remote',
					*self.config.extra_browser_args,
				}
			],
			'webkit': [
				*{
					'--no-startup-window',
					*self.config.extra_browser_args,
				}
			],
		}

		browser = await browser_class.launch(
			channel='chromium',  # https://github.com/microsoft/playwright/issues/33566
			headless=self.config.headless,
			args=args[self.config.browser_class],
			proxy=self.config.proxy.model_dump() if self.config.proxy else None,
			handle_sigterm=False,
			handle_sigint=False,
		)
		return browser

	async def _setup_browser(self, playwright: Playwright) -> PlaywrightBrowser:
		"""Sets up and returns a Playwright Browser instance with anti-detection measures."""
		try:
			if self.config.cdp_url:
				return await self._setup_remote_cdp_browser(playwright)
			if self.config.wss_url:
				return await self._setup_remote_wss_browser(playwright)

			if self.config.headless:
				logger.warning('⚠️ Headless mode is not recommended. Many sites will detect and block all headless browsers.')

			if self.config.browser_binary_path:
				return await self._setup_user_provided_browser(playwright)
			else:
				return await self._setup_builtin_browser(playwright)
		except Exception as e:
			logger.error(f'Failed to initialize Playwright browser: {e}')
			raise

	async def close(self):
		"""Close the browser instance"""
		if self.config.keep_alive:
			return

		try:
			if self.playwright_browser:
				await self.playwright_browser.close()
				del self.playwright_browser
			if self.playwright:
				await self.playwright.stop()
				del self.playwright
			if chrome_proc := getattr(self, '_chrome_subprocess', None):
				try:
					# always kill all children processes, otherwise chrome leaves a bunch of zombie processes
					for proc in chrome_proc.children(recursive=True):
						proc.kill()
					chrome_proc.kill()
				except Exception as e:
					logger.debug(f'Failed to terminate chrome subprocess: {e}')

		except Exception as e:
			if 'OpenAI error' not in str(e):
				logger.debug(f'Failed to close browser properly: {e}')

		finally:
			self.playwright_browser = None
			self.playwright = None
			self._chrome_subprocess = None
			gc.collect()

	def __del__(self):
		"""Async cleanup when object is destroyed"""
		try:
			if self.playwright_browser or self.playwright:
				loop = asyncio.get_running_loop()
				if loop.is_running():
					loop.create_task(self.close())
				else:
					asyncio.run(self.close())
		except Exception as e:
			logger.debug(f'Failed to cleanup browser in destructor: {e}')

```

## /browser_use/browser/chrome.py

```py path="/browser_use/browser/chrome.py" 
CHROME_EXTENSIONS = {}  # coming in a separate PR
CHROME_EXTENSIONS_PATH = 'chrome_extensions'
CHROME_PROFILE_PATH = 'chrome_profile'
CHROME_PROFILE_USER = 'Default'
CHROME_DEBUG_PORT = 9242
CHROME_DISABLED_COMPONENTS = [
	'Translate',
	'AcceptCHFrame',
	'OptimizationHints',
	'ProcessPerSiteUpToMainFrameThreshold',
	'InterestFeedContentSuggestions',
	# 'CalculateNativeWinOcclusion',
	'BackForwardCache',
	# 'HeavyAdPrivacyMitigations',
	'LazyFrameLoading',
	# 'ImprovedCookieControls',
	'PrivacySandboxSettings4',
	'AutofillServerCommunication',
	'CertificateTransparencyComponentUpdater',
	'DestroyProfileOnBrowserClose',
	'CrashReporting',
	'OverscrollHistoryNavigation',
	'InfiniteSessionRestore',
	#'LockProfileCookieDatabase',  # disabling allows multiple chrome instances to concurrently modify profile, but might make chrome much slower https://github.com/yt-dlp/yt-dlp/issues/7271  https://issues.chromium.org/issues/40901624
]  # it's always best to give each chrome instance its own exclusive copy of the user profile


CHROME_HEADLESS_ARGS = [
	'--headless=new',
	# '--test-type',
	# '--test-type=gpu',  # https://github.com/puppeteer/puppeteer/issues/10516
	# '--enable-automation',                            # <- DONT USE THIS, it makes you easily detectable / blocked by cloudflare
]

CHROME_DOCKER_ARGS = [
	# Docker-specific options
	# https://github.com/GoogleChrome/lighthouse-ci/tree/main/docs/recipes/docker-client#--no-sandbox-issues-explained
	'--no-sandbox',  # rely on docker sandboxing in docker, otherwise we need cap_add: SYS_ADM to use host sandboxing
	'--disable-gpu-sandbox',
	'--disable-setuid-sandbox',
	'--disable-dev-shm-usage',  # docker 75mb default shm size is not big enough, disabling just uses /tmp instead
	'--no-xshm',
	# dont try to disable (or install) dbus in docker, its not needed, chrome can work without dbus despite the errors
]

CHROME_DISABLE_SECURITY_ARGS = [
	# DANGER: JS isolation security features (to allow easier tampering with pages during automation)
	# chrome://net-internals
	'--disable-web-security',  # <- WARNING, breaks some sites that expect/enforce strict CORS headers (try webflow.com)
	'--disable-site-isolation-trials',
	'--disable-features=IsolateOrigins,site-per-process',
	# '--allow-file-access-from-files',                     # <- WARNING, dangerous, allows JS to read filesystem using file:// URLs
	# DANGER: Disable HTTPS verification
	'--allow-running-insecure-content',  # Breaks CORS/CSRF/HSTS etc., useful sometimes but very easy to detect
	'--ignore-certificate-errors',
	'--ignore-ssl-errors',
	'--ignore-certificate-errors-spki-list',
	# '--allow-insecure-localhost',
]

# flags to make chrome behave more deterministically across different OS's
CHROME_DETERMINISTIC_RENDERING_ARGS = [
	'--deterministic-mode',
	'--js-flags=--random-seed=1157259159',  # make all JS random numbers deterministic by providing a seed
	'--force-device-scale-factor=1',
	# GPU, canvas, text, and pdf rendering config
	# chrome://gpu
	'--enable-webgl',  # enable web-gl graphics support
	'--font-render-hinting=none',  # make rendering more deterministic by ignoring OS font hints, may also need css override, try:    * {text-rendering: geometricprecision !important; -webkit-font-smoothing: antialiased;}
	'--force-color-profile=srgb',  # make rendering more deterministic by using consistent color profile, if browser looks weird, try: generic-rgb
	# '--disable-partial-raster',  # make rendering more deterministic (TODO: verify if still needed)
	'--disable-skia-runtime-opts',  # make rendering more deterministic by avoiding Skia hot path runtime optimizations
	'--disable-2d-canvas-clip-aa',  # make rendering more deterministic by disabling antialiasing on 2d canvas clips
	# '--disable-gpu',                                  # falls back to more consistent software renderer across all OS's, especially helps linux text rendering look less weird
	# // '--use-gl=swiftshader',                        <- DO NOT USE, breaks M1 ARM64. it makes rendering more deterministic by using simpler CPU renderer instead of OS GPU renderer  bug: https://groups.google.com/a/chromium.org/g/chromium-dev/c/8eR2GctzGuw
	# // '--disable-software-rasterizer',               <- DO NOT USE, harmless, used in tandem with --disable-gpu
	# // '--run-all-compositor-stages-before-draw',     <- DO NOT USE, makes headful chrome hang on startup (tested v121 Google Chrome.app on macOS)
	# // '--disable-gl-drawing-for-tests',              <- DO NOT USE, disables gl output (makes tests run faster if you dont care about canvas)
	# // '--blink-settings=imagesEnabled=false',        <- DO NOT USE, disables images entirely (only sometimes useful to speed up loading)
]


CHROME_ARGS = [
	# Process management & performance tuning
	# chrome://process-internals
	# '--disable-lazy-loading',  # make rendering more deterministic by loading all content up-front instead of on-focus
	# '--disable-renderer-backgrounding',  # dont throttle tab rendering based on focus/visibility
	# '--disable-background-networking',  # dont throttle tab networking based on focus/visibility
	# '--disable-background-timer-throttling',  # dont throttle tab timers based on focus/visibility
	# '--disable-backgrounding-occluded-windows',  # dont throttle tab window based on focus/visibility
	# '--disable-ipc-flooding-protection',  # dont throttle ipc traffic or accessing big request/response/buffer/etc. objects will fail
	# '--disable-extensions-http-throttling',  # dont throttle http traffic based on runtime heuristics
	# '--disable-field-trial-config',  # disable shared field trial state between browser processes
	# '--disable-back-forward-cache',  # disable browsing navigation cache
	# Profile data dir setup
	# chrome://profile-internals
	# f'--user-data-dir={CHROME_PROFILE_PATH}',     # managed by playwright arg instead
	# f'--profile-directory={CHROME_PROFILE_USER}',
	# '--password-store=basic',  # use mock keychain instead of OS-provided keychain (we manage auth.json instead)
	# '--use-mock-keychain',
	# '--disable-cookie-encryption',  # we need to be able to write unencrypted cookies to save/load auth.json
	'--disable-sync',  # don't try to use Google account sync features while automation is active
	# Extensions
	# chrome://inspect/#extensions
	# f'--load-extension={CHROME_EXTENSIONS.map(({unpacked_path}) => unpacked_path).join(',')}',  # not needed when using existing profile that already has extensions installed
	# f'--allowlisted-extension-id={",".join(CHROME_EXTENSIONS.keys())}',
	'--allow-legacy-extension-manifests',
	'--allow-pre-commit-input',  # allow JS mutations before page rendering is complete
	'--disable-blink-features=AutomationControlled',  # hide the signatures that announce browser is being remote-controlled
	# f'--proxy-server=https://43.159.28.126:2334:u7ce652b7568805c4-zone-custom-region-us-session-szGWq3FRU-sessTime-60:u7ce652b7568805c4',      # send all network traffic through a proxy https://2captcha.com/proxy
	# f'--proxy-bypass-list=127.0.0.1',
	# Browser window and viewport setup
	# chrome://version
	# f'--user-agent="{DEFAULT_USER_AGENT}"',
	# f'--window-size={DEFAULT_VIEWPORT.width},{DEFAULT_VIEWPORT.height}',
	# '--window-position=0,0',
	# '--start-maximized',
	'--install-autogenerated-theme=0,0,0',  # black border makes it easier to see which chrome window is browser-use's
	'--hide-scrollbars',  # stop scrollbars from affecting screenshot width/height
	#'--virtual-time-budget=60000',  # DONT USE THIS, makes chrome hang forever and doesn't work, used to fast-forward all animations & timers by 60s, dont use this it's unfortunately buggy and breaks screenshot and PDF capture sometimes
	#'--autoplay-policy=no-user-gesture-required',  # auto-start videos so they trigger network requests + show up in outputs
	#'--disable-gesture-requirement-for-media-playback',
	#'--lang=en-US,en;q=0.9',
	# IO: stdin/stdout, debug port config
	# chrome://inspect
	'--log-level=2',  # 1=DEBUG 2=WARNING 3=ERROR
	'--enable-logging=stderr',
	# '--remote-debugging-address=127.0.0.1',         <- DONT USE THIS, no longer supported on chrome >100, never expose to non-localhost, would allow attacker to drive your browser from any machine
	# '--enable-experimental-extension-apis',          # add support for tab groups via chrome.tabs extension API
	'--disable-focus-on-load',  # prevent browser from hijacking focus
	'--disable-window-activation',
	# '--in-process-gpu',                            <- DONT USE THIS, makes headful startup time ~5-10s slower (tested v121 Google Chrome.app on macOS)
	# '--disable-component-extensions-with-background-pages',  # TODO: check this, disables chrome components that only run in background with no visible UI (could lower startup time)
	# uncomment to disable hardware camera/mic/speaker access + present fake devices to websites
	# (faster to disable, but disabling breaks recording browser audio in puppeteer-stream screenrecordings)
	# '--use-fake-device-for-media-stream',
	# '--use-fake-ui-for-media-stream',
	# '--disable-features=GlobalMediaControls,MediaRouter,DialMediaRouteProvider',
	# Output format options (PDF, screenshot, etc.)
	'--export-tagged-pdf',  # include table on contents and tags in printed PDFs
	'--generate-pdf-document-outline',
	# Suppress first-run features, popups, hints, updates, etc.
	# chrome://system
	'--no-pings',
	'--no-default-browser-check',
	'--no-startup-window',
	'--ash-no-nudges',
	'--disable-infobars',
	'--disable-search-engine-choice-screen',
	'--disable-session-crashed-bubble',
	'--simulate-outdated-no-au="Tue, 31 Dec 2099 23:59:59 GMT"',  # disable browser self-update while automation is active
	'--hide-crash-restore-bubble',
	'--suppress-message-center-popups',
	'--disable-client-side-phishing-detection',
	'--disable-domain-reliability',
	'--disable-datasaver-prompt',
	'--disable-hang-monitor',
	'--disable-session-crashed-bubble',
	'--disable-speech-synthesis-api',
	'--disable-speech-api',
	'--disable-print-preview',
	'--safebrowsing-disable-auto-update',
	# '--deny-permission-prompts',
	'--disable-external-intent-requests',
	# '--disable-notifications',
	'--disable-desktop-notifications',
	'--noerrdialogs',
	'--disable-prompt-on-repost',
	'--silent-debugger-extension-api',
	# '--block-new-web-contents',
	'--metrics-recording-only',
	'--disable-breakpad',
	# other feature flags
	# chrome://flags        chrome://components
	f'--disable-features={",".join(CHROME_DISABLED_COMPONENTS)}',
	'--enable-features=NetworkService',
]

```

## /browser_use/browser/tests/httpx_client_test.py

```py path="/browser_use/browser/tests/httpx_client_test.py" 
import httpx
import pytest

from browser_use.browser.browser import Browser, BrowserConfig


@pytest.mark.asyncio
async def test_browser_close_doesnt_affect_external_httpx_clients():
	"""
	Test that Browser.close() doesn't close HTTPX clients created outside the Browser instance.
	This test demonstrates the issue where Browser.close() is closing all HTTPX clients.
	"""
	# Create an external HTTPX client that should remain open
	external_client = httpx.AsyncClient()

	# Create a Browser instance
	browser = Browser(config=BrowserConfig(headless=True))

	# Close the browser (which should trigger cleanup_httpx_clients)
	await browser.close()

	# Check if the external client is still usable
	try:
		# If the client is closed, this will raise RuntimeError
		# Using a simple HEAD request to a reliable URL
		await external_client.head('https://www.example.com', timeout=2.0)
		client_is_closed = False
	except RuntimeError as e:
		# If we get "Cannot send a request, as the client has been closed"
		client_is_closed = 'client has been closed' in str(e)
	except Exception:
		# Any other exception means the client is not closed but request failed
		client_is_closed = False
	finally:
		# Always clean up our test client properly
		await external_client.aclose()

	# Our external client should not be closed by browser.close()
	assert not client_is_closed, 'External HTTPX client was incorrectly closed by Browser.close()'

```

## /browser_use/dom/__init__.py

```py path="/browser_use/dom/__init__.py" 

```

## /browser_use/exceptions.py

```py path="/browser_use/exceptions.py" 
class LLMException(Exception):
	def __init__(self, status_code, message):
		self.status_code = status_code
		self.message = message
		super().__init__(f'Error {status_code}: {message}')

```

## /codebeaver.yml

```yml path="/codebeaver.yml" 
environment:
- OPENAI_API_KEY=empty
- AZURE_OPENAI_KEY=empty
from: pytest

```

## /examples/use-cases/test_cv.txt

123



The content has been capped at 50000 tokens. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.
Copied!