```
├── .github/
├── workflows/
├── publish.yml
├── .gitignore
├── .vscode/
├── settings.json
├── LICENSE
├── README.md
├── index/
├── __init__.py
├── agent/
├── agent.py
├── demo_images/
├── complex_layout_highlight.png
├── complex_layout_small_elements.png
├── loading.png
├── loading2.png
├── scroll.png
├── message_manager.py
├── models.py
├── prompts.py
├── utils.py
├── browser/
├── browser.py
├── detector.py
├── findVisibleInteractiveElements.js
├── fonts/
├── OpenSans-Medium.ttf
├── models.py
├── utils.py
├── cli.py
├── controller/
├── controller.py
├── default_actions.py
├── llm/
├── llm.py
├── providers/
├── __init__.py
├── anthropic.py
├── anthropic_bedrock.py
├── gemini.py
├── openai.py
├── pyproject.toml
├── static/
├── traces.png
├── uv.lock
```
## /.github/workflows/publish.yml
```yml path="/.github/workflows/publish.yml"
name: Publish Python Package
on:
push:
tags:
- 'v*'
permissions:
contents: read
jobs:
publish:
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/lmnr/
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Install the project
run: uv sync --all-extras --dev
- name: Verify tag matches package version
run: |
# Extract version from tag (remove 'v' prefix)
TAG_VERSION=${GITHUB_REF#refs/tags/v}
# Extract version from pyproject.toml
PACKAGE_VERSION=$(grep -oP '(?<=version = ")[^"]+' pyproject.toml)
echo "Tag version: $TAG_VERSION"
echo "Package version: $PACKAGE_VERSION"
# Check if versions match
if [ "$TAG_VERSION" != "$PACKAGE_VERSION" ]; then
echo "Error: Tag version ($TAG_VERSION) does not match package version ($PACKAGE_VERSION)"
exit 1
fi
- name: Build package
run: uv build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
```
## /.gitignore
```gitignore path="/.gitignore"
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
```
## /.vscode/settings.json
```json path="/.vscode/settings.json"
{
"[python]": {
"editor.codeActionsOnSave": {
"source.fixAll": "explicit",
"source.organizeImports": "explicit"
},
"editor.defaultFormatter": "charliermarsh.ruff"
}
}
```
## /LICENSE
``` path="/LICENSE"
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
```
## /README.md



# Index
Index is the SOTA open-source browser agent for autonomously executing complex tasks on the web.
- [x] Powered by reasoning LLMs with vision capabilities.
- [x] Gemini 2.5 Pro (really fast and accurate)
- [x] Claude 3.7 Sonnet with extended thinking (reliable and accurate)
- [x] OpenAI o4-mini (depending on the reasoning effort, provides good balance between speed, cost and accuracy)
- [x] Gemini 2.5 Flash (really fast, cheap, and good for less complex tasks)
- [x] `pip install lmnr-index` and use it in your project
- [x] `index run` to run the agent in the interactive CLI
- [x] Index is also available as a [serverless API.](https://docs.lmnr.ai/laminar-index/introduction)
- [x] You can also try out Index via [Chat UI](https://lmnr.ai/chat).
- [x] Supports advanced [browser agent observability](https://docs.lmnr.ai/laminar-index/observability) powered by open-source platform [Laminar](https://github.com/lmnr-ai/lmnr).
prompt: go to ycombinator.com. summarize first 3 companies in the W25 batch and make new spreadsheet in google sheets.
https://github.com/user-attachments/assets/2b46ee20-81b6-4188-92fb-4d97fe0b3d6a
## Documentation
Check out full documentation [here](https://docs.lmnr.ai/index-agent/getting-started)
## Index API
The easiest way to use Index in production is via the [serverless API](https://docs.lmnr.ai/laminar-index/introduction). Index API manages remote browser sessions, agent infrastructure and [browser observability](https://docs.lmnr.ai/laminar-index/tracing). To get started, [sign up](https://lmnr.ai/sign-in) and create project API key. Read the [docs](https://docs.lmnr.ai/laminar-index/introduction) to learn more.
### Install Laminar
```bash
pip install lmnr
```
### Use Index via API
```python
from lmnr import Laminar, LaminarClient
# you can also set LMNR_PROJECT_API_KEY environment variable
# Initialize tracing
Laminar.initialize(project_api_key="your_api_key")
# Initialize the client
client = LaminarClient(project_api_key="your_api_key")
for chunk in client.agent.run(
stream=True,
model_provider="gemini",
model="gemini-2.5-pro-preview-03-25",
prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it"
):
print(chunk)
```
## Local Quick Start
### Install dependencies
```bash
pip install lmnr-index
# Install playwright
playwright install chromium
```
### Setup model API keys
Setup your model API keys in `.env` file in your project root:
```
ANTHROPIC_API_KEY=
GEMINI_API_KEY=
OPENAI_API_KEY=
```
### Run the agent with CLI
You can run Index via interactive CLI. It features:
- Browser state persistence between sessions
- Follow-up messages with support for "give human control" action
- Real-time streaming updates
- Beautiful terminal UI using Textual
You can run the agent with the following command. Remember to set API key for the selected model in the `.env` file.
```bash
index run
```
Output will look like this:
```
Loaded existing browser state
╭───────────────────── Interactive Mode ─────────────────────╮
│ Index Browser Agent Interactive Mode │
│ Type your message and press Enter. The agent will respond. │
│ Press Ctrl+C to exit. │
╰────────────────────────────────────────────────────────────╯
Choose an LLM model:
1. Gemini 2.5 Flash
2. Claude 3.7 Sonnet
3. OpenAI o4-mini
Select model [1/2] (1): 3
Using OpenAI model: o4-mini
Loaded existing browser state
Your message: go to lmnr.ai, summarize pricing page
Agent is working...
Step 1: Opening lmnr.ai
Step 2: Opening Pricing page
Step 3: Scrolling for more pricing details
Step 4: Scrolling back up to view pricing tiers
Step 5: Provided concise summary of the three pricing tiers
```
### Running with a personal Chrome instance
You can use Index with personal Chrome browser instance instead of launching a new browser. Main advantage is that all existing logged in sessions will be available.
```bash
# Basic usage with default Chrome path
index run --local-chrome
# With custom Chrome path and debugging port
index run --local-chrome --chrome-path="/path/to/chrome" --port=9223
```
This will launch Chrome with remote debugging enabled and connect Index to it.
#### OS-specific Chrome paths
Default Chrome executable paths on different operating systems:
**macOS**:
```bash
index run --local-chrome --chrome-path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
```
**Windows**:
```bash
index run --local-chrome --chrome-path="C:\Program Files\Google\Chrome\Application\chrome.exe"
```
#### Connecting to an already running Chrome instance
If you already have Chrome running with remote debugging enabled, you can connect to it:
1. Launch Chrome with debugging enabled:
```bash
# macOS
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222
# Windows
"C:\Program Files\Google\Chrome\Application\chrome.exe" --remote-debugging-port=9222
```
2. Then run Index with the same port:
```bash
index run --local-chrome --port=9222
```
### Run the agent with code
```python
import asyncio
from index import Agent, AnthropicProvider
async def main():
llm = AnthropicProvider(
model="claude-3-7-sonnet-20250219",
enable_thinking=True,
thinking_token_budget=2048)
# llm = OpenAIProvider(model="o4-mini") you can also use OpenAI models
agent = Agent(llm=llm)
output = await agent.run(
prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it"
)
print(output.result)
if __name__ == "__main__":
asyncio.run(main())
```
### Stream the agent's output
```python
async for chunk in agent.run_stream(
prompt="Navigate to news.ycombinator.com, find a post about AI, and summarize it"
):
print(chunk)
```
### Enable browser agent observability
To trace Index agent's actions and record browser session you simply need to initialize Laminar tracing before running the agent.
```python
from lmnr import Laminar
Laminar.initialize(project_api_key="your_api_key")
```
Then you will get full observability on the agent's actions synced with the browser session in the Laminar platform.
### Run with remote CDP url
```python
import asyncio
from index import Agent, AnthropicProvider, BrowserConfig
async def main():
# Configure browser to connect to an existing Chrome DevTools Protocol endpoint
browser_config = BrowserConfig(
cdp_url=""
)
llm = AnthropicProvider(model="claude-3-7-sonnet-20250219", enable_thinking=True, thinking_token_budget=2048)
agent = Agent(llm=llm, browser_config=browser_config)
output = await agent.run(
prompt="Navigate to news.ycombinator.com and find the top story"
)
print(output.result)
if __name__ == "__main__":
asyncio.run(main())
```
### Run with local Chrome instance (programmatically)
```python
import asyncio
from index import Agent, AnthropicProvider, BrowserConfig
async def main():
# Configure browser to connect to a local Chrome instance
browser_config = BrowserConfig(
cdp_url="http://localhost:9222"
)
llm = AnthropicProvider(model="claude-3-7-sonnet-20250219", enable_thinking=True, thinking_token_budget=2048)
agent = Agent(llm=llm, browser_config=browser_config)
output = await agent.run(
prompt="Navigate to news.ycombinator.com and find the top story"
)
print(output.result)
if __name__ == "__main__":
asyncio.run(main())
```
### Customize browser window size
```python
import asyncio
from index import Agent, AnthropicProvider, BrowserConfig
async def main():
# Configure browser with custom viewport size
browser_config = BrowserConfig(
viewport_size={"width": 1200, "height": 900}
)
llm = AnthropicProvider(model="claude-3-7-sonnet-20250219")
agent = Agent(llm=llm, browser_config=browser_config)
output = await agent.run(
"Navigate to a responsive website and capture how it looks in full HD resolution"
)
print(output.result)
if __name__ == "__main__":
asyncio.run(main())
```
---
Made with ❤️ by the [Laminar team](https://lmnr.ai)
## /index/__init__.py
```py path="/index/__init__.py"
from index.agent.agent import Agent
from index.agent.models import ActionModel, ActionResult, AgentOutput
from index.browser.browser import Browser, BrowserConfig
from index.browser.detector import Detector
from index.browser.models import InteractiveElement
from index.llm.providers.anthropic import AnthropicProvider
from index.llm.providers.anthropic_bedrock import AnthropicBedrockProvider
from index.llm.providers.gemini import GeminiProvider
from index.llm.providers.openai import OpenAIProvider
__all__ = [
'Agent',
'Browser',
'BrowserConfig',
'ActionResult',
'ActionModel',
'AnthropicProvider',
'AnthropicBedrockProvider',
'OpenAIProvider',
'GeminiProvider',
'AgentOutput',
'Detector',
'InteractiveElement',
]
```
## /index/agent/agent.py
```py path="/index/agent/agent.py"
from __future__ import annotations
import json
import logging
import re
import time
import uuid
from typing import AsyncGenerator, Optional
from dotenv import load_dotenv
from lmnr import Laminar, LaminarSpanContext, observe, use_span
from pydantic import ValidationError
from index.agent.message_manager import MessageManager
from index.agent.models import (
ActionResult,
AgentLLMOutput,
AgentOutput,
AgentState,
AgentStreamChunk,
FinalOutputChunk,
StepChunk,
StepChunkContent,
StepChunkError,
TimeoutChunk,
TimeoutChunkContent,
)
from index.browser.browser import Browser, BrowserConfig
from index.controller.controller import Controller
from index.llm.llm import BaseLLMProvider, Message
load_dotenv()
logger = logging.getLogger(__name__)
class Agent:
def __init__(
self,
llm: BaseLLMProvider,
browser_config: BrowserConfig | None = None
):
self.llm = llm
self.controller = Controller()
# Initialize browser or use the provided one
self.browser = Browser(config=browser_config if browser_config is not None else BrowserConfig())
action_descriptions = self.controller.get_action_descriptions()
self.message_manager = MessageManager(
action_descriptions=action_descriptions,
)
self.state = AgentState(
messages=[],
)
async def step(self, step: int, previous_result: ActionResult | None = None, step_span_context: Optional[LaminarSpanContext] = None) -> tuple[ActionResult, str]:
"""Execute one step of the task"""
with Laminar.start_as_current_span(
name="agent.step",
parent_span_context=step_span_context,
input={
"step": step,
},
):
state = await self.browser.update_state()
if previous_result:
self.message_manager.add_current_state_message(state, previous_result)
input_messages = self.message_manager.get_messages()
try:
model_output = await self._generate_action(input_messages)
except Exception as e:
# model call failed, remove last state message from history before retrying
self.message_manager.remove_last_message()
raise e
if previous_result:
# we're removing the state message that we've just added because we want to append it in a different format
self.message_manager.remove_last_message()
self.message_manager.add_message_from_model_output(step, previous_result, model_output, state.screenshot)
try:
result: ActionResult = await self.controller.execute_action(
model_output.action,
self.browser
)
if result.is_done:
logger.info(f'Result: {result.content}')
self.final_output = result.content
return result, model_output.summary
except Exception as e:
raise e
@observe(name='agent.generate_action', ignore_input=True)
async def _generate_action(self, input_messages: list[Message]) -> AgentLLMOutput:
"""Get next action from LLM based on current state"""
response = await self.llm.call(input_messages)
# Extract content between tags using regex, including variations like
pattern = r"]*)>(.*?) ]*)>"
match = re.search(pattern, response.content, re.DOTALL)
json_str = ""
if not match:
# if we couldn't find the tags, it most likely means the tag is not present in the response
# remove closing and opening tags just in case
closing_tag_pattern = r" ]*)>"
json_str = re.sub(closing_tag_pattern, "", response.content).strip()
open_tag_pattern = r"]*)>"
json_str = re.sub(open_tag_pattern, "", json_str).strip()
json_str = json_str.replace("\`\`\`json", "").replace("\`\`\`", "").strip()
else:
# Extract just the content between the tags without any additional replacement
json_str = match.group(1).strip()
try:
# First try to parse it directly to catch any obvious JSON issues
try:
json.loads(json_str)
except json.JSONDecodeError:
# If direct parsing fails, attempt to fix common issues
# Remove escape characters and control characters (0x00-0x1F) that might cause problems
json_str = json_str.replace('\\n', '\n').replace('\\r', '\r').replace('\\t', '\t')
# Clean all control characters (0x00-0x1F) except valid JSON whitespace (\n, \r, \t)
json_str = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', json_str)
output = AgentLLMOutput.model_validate_json(json_str.strip())
logger.info(f'💡 Thought: {output.thought}')
logger.info(f'💡 Summary: {output.summary}')
logger.info(f'🛠️ Action: {output.action.model_dump_json(exclude_unset=True)}')
if response.thinking:
output.thinking_block = response.thinking
return output
except ValidationError as e:
raise ValueError(f"Could not parse response: {str(e)}\nResponse was: {json_str}")
async def _setup_messages(self, prompt: str, agent_state: str | None = None, start_url: str | None = None):
"""Set up messages based on state dict or initialize with system message"""
if agent_state:
# assuming that the structure of the state.messages is correct
state = AgentState.model_validate_json(agent_state)
self.message_manager.set_messages(state.messages)
# Update browser_context to browser
browser_state = await self.browser.update_state()
self.message_manager.add_current_state_message(browser_state, user_follow_up_message=prompt)
else:
self.message_manager.add_system_message_and_user_prompt(prompt)
if start_url:
await self.browser.goto(start_url)
browser_state = await self.browser.update_state()
self.message_manager.add_current_state_message(browser_state)
async def run(self,
prompt: str,
max_steps: int = 100,
agent_state: str | None = None,
parent_span_context: Optional[LaminarSpanContext] = None,
close_context: bool = True,
session_id: str | None = None,
return_agent_state: bool = False,
return_storage_state: bool = False,
start_url: str | None = None,
) -> AgentOutput:
"""Execute the task with maximum number of steps and return the final result
Args:
prompt: The prompt to execute the task with
max_steps: The maximum number of steps to execute the task with. Defaults to 100.
agent_state: Optional, the state of the agent to execute the task with
parent_span_context: Optional, parent span context in Laminar format to execute the task with
close_context: Whether to close the browser context after the task is executed
session_id: Optional, Agent session id
return_agent_state: Whether to return the agent state with the final output
return_storage_state: Whether to return the storage state with the final output
start_url: Optional, the URL to start the task with
"""
if prompt is None and agent_state is None:
raise ValueError("Either prompt or agent_state must be provided")
with Laminar.start_as_current_span(
name="agent.run",
parent_span_context=parent_span_context,
input={
"prompt": prompt,
"max_steps": max_steps,
"stream": False,
},
) as span:
if session_id is not None:
span.set_attribute("lmnr.internal.agent_session_id", session_id)
await self._setup_messages(prompt, agent_state, start_url)
step = 0
result = None
is_done = False
trace_id = str(uuid.UUID(int=span.get_span_context().trace_id))
try:
while not is_done and step < max_steps:
logger.info(f'📍 Step {step}')
result, _ = await self.step(step, result)
step += 1
is_done = result.is_done
if is_done:
logger.info(f'✅ Task completed successfully in {step} steps')
break
if not is_done:
logger.info('❌ Maximum number of steps reached')
except Exception as e:
logger.info(f'❌ Error in run: {e}')
raise e
finally:
storage_state = await self.browser.get_storage_state()
if close_context:
# Update to close the browser directly
await self.browser.close()
span.set_attribute("lmnr.span.output", result.model_dump_json())
return AgentOutput(
agent_state=self.get_state() if return_agent_state else None,
result=result,
storage_state=storage_state if return_storage_state else None,
step_count=step,
trace_id=trace_id,
)
async def run_stream(self,
prompt: str,
max_steps: int = 100,
agent_state: str | None = None,
parent_span_context: Optional[LaminarSpanContext] = None,
close_context: bool = True,
timeout: Optional[int] = None,
session_id: str | None = None,
return_screenshots: bool = False,
return_agent_state: bool = False,
return_storage_state: bool = False,
start_url: str | None = None,
) -> AsyncGenerator[AgentStreamChunk, None]:
"""Execute the task with maximum number of steps and stream step chunks as they happen
Args:
prompt: The prompt to execute the task with
max_steps: The maximum number of steps to execute the task with
agent_state: The state of the agent to execute the task with
parent_span_context: Parent span context in Laminar format to execute the task with
close_context: Whether to close the browser context after the task is executed
timeout: The timeout for the task
session_id: Agent session id
return_screenshots: Whether to return screenshots with the step chunks
return_agent_state: Whether to return the agent state with the final output chunk
return_storage_state: Whether to return the storage state with the final output chunk
start_url: Optional, the URL to start the task with
"""
# Create a span for the streaming execution
span = Laminar.start_span(
name="agent.run_stream",
parent_span_context=parent_span_context,
input={
"prompt": prompt,
"max_steps": max_steps,
"stream": True,
},
)
trace_id = str(uuid.UUID(int=span.get_span_context().trace_id))
if session_id is not None:
span.set_attribute("lmnr.internal.agent_session_id", session_id)
with use_span(span):
await self._setup_messages(prompt, agent_state, start_url)
step = 0
result = None
is_done = False
if timeout is not None:
start_time = time.time()
try:
# Execute steps and yield results
while not is_done and step < max_steps:
logger.info(f'📍 Step {step}')
with use_span(span):
result, summary = await self.step(step, result)
step += 1
is_done = result.is_done
screenshot = None
if return_screenshots:
state = self.browser.get_state()
screenshot = state.screenshot
if timeout is not None and time.time() - start_time > timeout:
yield TimeoutChunk(
content=TimeoutChunkContent(
action_result=result,
summary=summary,
step=step,
agent_state=self.get_state() if return_agent_state else None,
screenshot=screenshot,
trace_id=trace_id
)
)
return
yield StepChunk(
content=StepChunkContent(
action_result=result,
summary=summary,
trace_id=trace_id,
screenshot=screenshot
)
)
if is_done:
logger.info(f'✅ Task completed successfully in {step} steps')
storage_state = await self.browser.get_storage_state()
# Yield the final output as a chunk
final_output = AgentOutput(
agent_state=self.get_state() if return_agent_state else None,
result=result,
storage_state=storage_state if return_storage_state else None,
step_count=step,
trace_id=trace_id,
)
span.set_attribute("lmnr.span.output", result.model_dump_json())
yield FinalOutputChunk(content=final_output)
break
if not is_done:
logger.info('❌ Maximum number of steps reached')
yield StepChunkError(content=f'Maximum number of steps reached: {max_steps}')
except Exception as e:
logger.info(f'❌ Error in run: {e}')
span.record_exception(e)
yield StepChunkError(content=f'Error in run stream: {e}')
finally:
# Clean up resources
if close_context:
# Update to close the browser directly
await self.browser.close()
span.end()
logger.info('Stream complete, span closed')
def get_state(self) -> AgentState:
self.state.messages = self.message_manager.get_messages()
return self.state
```
## /index/agent/demo_images/complex_layout_highlight.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/complex_layout_highlight.png
## /index/agent/demo_images/complex_layout_small_elements.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/complex_layout_small_elements.png
## /index/agent/demo_images/loading.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/loading.png
## /index/agent/demo_images/loading2.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/loading2.png
## /index/agent/demo_images/scroll.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/agent/demo_images/scroll.png
## /index/agent/message_manager.py
```py path="/index/agent/message_manager.py"
from __future__ import annotations
import logging
from datetime import datetime
from typing import List, Optional
from index.agent.models import ActionResult, AgentLLMOutput
from index.agent.prompts import system_message
from index.agent.utils import load_demo_image_as_b64
from index.browser.models import BrowserState
from index.browser.utils import scale_b64_image
from index.llm.llm import ImageContent, Message, TextContent
logger = logging.getLogger(__name__)
class MessageManager:
def __init__(
self,
action_descriptions: str,
):
self._messages: List[Message] = []
self.action_descriptions = action_descriptions
def add_system_message_and_user_prompt(self, prompt: str) -> None:
complex_layout_highlight = load_demo_image_as_b64('complex_layout_highlight.png')
complex_layout_small_elements = load_demo_image_as_b64('complex_layout_small_elements.png')
still_loading = load_demo_image_as_b64('loading.png')
still_loading_2 = load_demo_image_as_b64('loading2.png')
scroll_over_element_example = load_demo_image_as_b64('scroll.png')
system_msg = Message(
role="system",
content=[
TextContent(text=system_message(self.action_descriptions), cache_control=True),
],
)
self._messages.append(system_msg)
self._messages.append(Message(
role="user",
content=[
TextContent(text=''),
TextContent(text="Here's an example of a complex layout. As an example, if you want to select a 'Roster' section for Colorado Rockies. Then you need to click on element with index 121."),
ImageContent(image_b64=complex_layout_highlight),
TextContent(text=' '),
TextContent(text=''),
TextContent(text="Here's an example of small elements on the page and their functions. Element 7, represented by 'x' icon, is a 'clear text' button. Element 8 is a 'submit' button, represented by '=' icon. This clarification should help you better understand similar layouts."),
ImageContent(image_b64=complex_layout_small_elements),
TextContent(text=' '),
TextContent(text=''),
TextContent(text="Here are some examples of loading pages. If the main content on the page is empty or if there are loading elements, such as skeleton screens, page is still loading. Then, you HAVE to perform `wait_for_page_to_load` action."),
ImageContent(image_b64=still_loading),
ImageContent(image_b64=still_loading_2),
TextContent(text=' '),
TextContent(text=''),
TextContent(text="In some cases, to reveal more content, you need to scroll in scrollable areas of the webpage. Scrollable areas have VERTICAL scrollbars very clearly visible on their right side. In the screenshot below, you can clearly see a scrollbar on the right side of the list of search items. This indicates that the list is scrollable. To scroll over this area, you need to identify any element within the scrollable area and use its index with `scroll_down_over_element` action to scroll over it. In this example, approriate element is with index 15."),
ImageContent(image_b64=scroll_over_element_example),
TextContent(text=' ', cache_control=True),
TextContent(text=f"""Here is the task you need to complete:\n\n\n{prompt}\n
Today's date and time is: {datetime.now().strftime('%B %d, %Y, %I:%M%p')} - keep this date and time in mind when planning your actions."""),
]
))
def get_messages_as_state(self) -> List[Message]:
"""Get messages as state messages"""
return [msg for msg in self._messages if msg.is_state_message]
def remove_last_message(self) -> None:
"""Remove last message from history"""
if len(self._messages) > 1:
self._messages.pop()
def add_current_state_message(
self,
state: BrowserState,
previous_result: ActionResult | None = None,
user_follow_up_message: str | None = None,
) -> None:
"""Add browser state as a user message"""
if state.interactive_elements:
highlighted_elements = ''
for element in state.interactive_elements.values():
# exclude sheets elements
if element.browser_agent_id.startswith("row_") or element.browser_agent_id.startswith("column_"):
continue
start_tag = f"[{element.index}]<{element.tag_name}"
if element.input_type:
start_tag += f" type=\"{element.input_type}\""
start_tag += ">"
element_text = element.text.replace('\n', ' ')
highlighted_elements += f"{start_tag}{element_text}{element.tag_name}>\n"
else:
highlighted_elements = ''
scroll_distance_above_viewport = state.viewport.scroll_distance_above_viewport or 0
scroll_distance_below_viewport = state.viewport.scroll_distance_below_viewport or 0
if scroll_distance_above_viewport > 0:
elements_text = f'{scroll_distance_above_viewport}px scroll distance above current viewport\n'
else:
elements_text = '[Start of page]\n'
if highlighted_elements != '':
elements_text += f'\nHighlighted elements:\n{highlighted_elements}'
if scroll_distance_below_viewport > 0:
elements_text += f'\n{scroll_distance_below_viewport}px scroll distance below current viewport\n'
else:
elements_text += '\n[End of page]'
previous_action_output = ''
if previous_result:
previous_action_output = f'\n{previous_result.content}\n \n\n' if previous_result.content else ''
if previous_result.error:
previous_action_output += f'\n{previous_result.error}\n \n\n'
if user_follow_up_message:
user_follow_up_message = f'\n{user_follow_up_message}\n \n\n'
else:
user_follow_up_message = ''
state_description = f"""{previous_action_output}{user_follow_up_message}
Current URL: {state.url}
Open tabs:
{state.tabs}
Current viewport information:
{elements_text}
"""
state_msg = Message(
role='user',
content=[
TextContent(text=state_description),
TextContent(text=''),
ImageContent(image_b64=state.screenshot),
TextContent(text=' '),
TextContent(text=''),
ImageContent(image_b64=state.screenshot_with_highlights),
TextContent(text=' '),
]
)
self._messages.append(state_msg)
def add_message_from_model_output(self, step: int, previous_result: ActionResult | None, model_output: AgentLLMOutput, screenshot: Optional[str] = None) -> None:
"""Add model output as AI message"""
previous_action_output = ''
for msg in self._messages:
if msg.is_state_message:
msg.content = [msg.content[0]]
if previous_result and screenshot:
previous_action_output = f'\n{previous_result.content}\n ' if previous_result.content else ''
if previous_result.error:
previous_action_output += f'\n{previous_result.error}\n '
usr_msg = Message(
role='user',
content=[
TextContent(text=previous_action_output, cache_control=True),
TextContent(text=f""),
ImageContent(image_b64=scale_b64_image(screenshot, 0.75)),
TextContent(text=f" "),
],
is_state_message=True,
)
self._messages.append(usr_msg)
assistant_content = [
TextContent(text=f"""
{model_output.model_dump_json(indent=2, include={"thought", "action", "summary"}).strip()}
"""),
]
if model_output.thinking_block:
assistant_content = [
model_output.thinking_block,
] + assistant_content
msg = Message(
role='assistant',
content=assistant_content,
)
self._messages.append(msg)
def get_messages(self) -> List[Message]:
found_first_cache_control = False
# clear all past cache control except the latest one
for msg in self._messages[::-1]:
# ignore system messages
if msg.role == 'system':
continue
if found_first_cache_control:
msg.remove_cache_control()
if msg.has_cache_control():
found_first_cache_control = True
return self._messages
def set_messages(self, messages: List[Message]) -> None:
"""Set messages"""
self._messages = messages
```
## /index/agent/models.py
```py path="/index/agent/models.py"
from __future__ import annotations
from typing import Any, Dict, Literal, Optional
from playwright.async_api import StorageState
from pydantic import BaseModel
from index.llm.llm import Message, ThinkingBlock
class AgentState(BaseModel):
"""State of the agent"""
messages: list[Message]
class ActionResult(BaseModel):
"""Result of executing an action"""
is_done: Optional[bool] = False
content: Optional[str] = None
error: Optional[str] = None
give_control: Optional[bool] = False
class ActionModel(BaseModel):
"""Model for an action"""
name: str
params: Dict[str, Any]
class AgentLLMOutput(BaseModel):
"""Output model for agent"""
action: ActionModel
thought: Optional[str] = None
summary: Optional[str] = None
thinking_block: Optional[ThinkingBlock] = None
class AgentOutput(BaseModel):
"""Output model for agent"""
agent_state: Optional[AgentState] = None
result: ActionResult
step_count: int = 0
storage_state: Optional[StorageState] = None
trace_id: str | None = None
class AgentStreamChunk(BaseModel):
"""Base class for chunks in the agent stream"""
type: str
class StepChunkContent(BaseModel):
action_result: ActionResult
summary: str
trace_id: str | None = None
screenshot: Optional[str] = None
class StepChunk(AgentStreamChunk):
"""Chunk containing a step result"""
type: Literal["step"] = "step"
content: StepChunkContent
class TimeoutChunkContent(BaseModel):
action_result: ActionResult
summary: str
step: int
agent_state: AgentState | None = None
trace_id: str | None = None
screenshot: Optional[str] = None
class TimeoutChunk(AgentStreamChunk):
"""Chunk containing a timeout"""
type: Literal["step_timeout"] = "step_timeout"
content: TimeoutChunkContent
class StepChunkError(AgentStreamChunk):
"""Chunk containing an error"""
type: Literal["step_error"] = "step_error"
content: str
class FinalOutputChunk(AgentStreamChunk):
"""Chunk containing the final output"""
type: Literal["final_output"] = "final_output"
content: AgentOutput
```
## /index/agent/prompts.py
```py path="/index/agent/prompts.py"
def system_message(action_descriptions: str) -> str:
return f"""You are an advanced AI assistant designed to interact with a web browser and complete user tasks. Your capabilities include analyzing web page screenshots, interacting with page elements, and navigating through websites to accomplish various objectives.
First, let's review the available actions you can perform:
{action_descriptions}
Your goal is to complete the user's task by carefully analyzing the current state of the web page, planning your actions, reflecting on the outcomes of the previous actions, and avoiding repetition of unsuccessful approaches. Follow the guidelines below:
1. Element Identification:
- Interactable elements on the page are enclosed in uniquely colored bounding boxes with numbered labels.
- Label corresponding to its bounding box is placed at the top right corner of the bounding box, and has exact same color as the bounding box. If the label is larger than the bounding box, the label is placed right outside and tangent to the bounding box.
- Carefully match labels to their corresponding bounding boxes based on the color and position of the label, as labels might slightly overlap with unrelated bounding boxes.
- If bounding box doesn't enclose any element, simply ignore it (most likely the bounding box was incorrectly detected).
- Screenshot enclosed in tag contains clean screenshot of a current browser window.
- Screenshot enclosed in tag has bounding boxes with labels drawn around interactable elements.
- Carefully analyze both screenshots to understand the layout of the page and accurately map bounding boxes to their corresponding elements.
- Remember: each bounding box and corresponding label have the same unique color.
2. Element Interaction:
- Infer role and function of elements based on their appearance, text/icon inside the element, and location on the page.
- Interact only with visible elements on the screen.
- Before entering a text into an input area, make sure that you have clicked on the target input area first.
- Scroll or interact with elements to reveal more content if necessary information is not visible.
- To scroll within areas with scrollbars, first identify any element inside the scrollable area and use its index with `scroll_down_over_element` or `scroll_up_over_element` actions instead of scrolling the entire page. Pay attention to the scrollbar position and direction to identify the correct element.
- Some pages have navigation menu on the left, which might contain useful information, such as filters, categories, navigation, etc. Pay close attention to whether the side menu has scrollbars. If it does, scroll over it using an element within the side menu.
- For clicking on a cell in a spreadsheet, first identify the correct column and row that corresponds to the cell you want to click on. Then, strictly use the `click_on_spreadsheet_cell` action to click on the cell. Don't use `click_element` action for interacting with a spreadsheet cells.
3. Task Execution:
- After you perform an action, analyze the state screenshot to verify that the intended result was achieved (filter was applied, correct date range was selected, text was entered, etc.). If the result was not achieved, identify the problem and fix it. Be creative and persistent in your approach and don't repeat the same actions that failed.
- Break down multi-step tasks into sub-tasks and complete each sub-task one by one.
- Thoroughly explore all possible approaches before declaring the task complete.
- If you encounter obstacles, consider alternative approaches such as returning to a previous page, initiating a new search, or opening a new tab.
- Understand elements on the page and infer the most relevant ones for the current step of the task.
- Ensure that your final output fully addresses all aspects of the user's request.
- Include ALL requested information in the "done" action. Include markdown-formatted links where relevant and useful.
- Important: For research tasks, be persistent and explore multiple results (at least 5-10) before giving up.
- Be persistent and creative in your approach, e.g., using site-specific Google searches to find precise information.
4. Special Situations:
- Cookie popups: Click "I accept" if present. If it persists after clicking, ignore it.
- CAPTCHA: Attempt to solve logically. If unsuccessful, open a new tab and continue the task.
5. Returning control to human:
- For steps that require user information to proceed, such as providing first name, last name, email, phone number, booking information, login, password, credit card information, credentials, etc., unless this information was provided in the initial prompt, you must use `give_human_control` action to give human control of the browser.
- If you can't solve the CAPTCHA, use the `give_human_control` action to give human control of the browser to aid you in solving the CAPTCHA.
- Control is guaranteed to be returned to you after the human has entered the information or solved the CAPTCHA, so you should plan your next actions accordingly.
6. Source citations:
- When you perform research tasks, include links to the websites that you found the information in your final output.
- In general, include links to the websites that you found the information in your final output.
- Strictly use markdown format for the links, because the final output will be rendered as markdown.
7. Spreadsheet interaction:
- To click on a cell in a spreadsheet, use the `click_on_spreadsheet_cell` action to click on a specific cell. DON'T use `click_element` action for interacting with a spreadsheet cells or other elements when the goal is to click on a specific cell.
- To input text into a spreadsheet cell, first click on the cell using the `click_on_spreadsheet_cell` action, then use the `enter_text` action to input text.
Your response must always be in the following JSON format, enclosed in tags:
{{
"thought": "EITHER a very short summary of your thinking process with key points OR exact information that you need to remember for the future (in case of research tasks).",
"action": {{
"name": "action_name",
"params": {{
"param1": "value1",
"param2": "value2"
}}
}},
"summary": "Extremely brief summary of what you are doing to display to the user to help them understand what you are doing"
}}
Remember:
- Think concisely.
- Output only a single action per response.
- You will be prompted again after each action.
- Always provide an output in the specified JSON format, enclosed in tags.
- Reflect on the outcomes of the past actions to avoid repeating unsuccessful approaches.
- Be creative and persistent in trying different strategies within the boundaries of the website.
- Break down multi-step tasks into sub-tasks and complete each sub-task one by one.
- For research tasks, be thorough and explore multiple results before concluding that the desired information is unavailable.
Continue this process until you are absolutely certain that you have completed the user's task fully and accurately. Be thorough, creative, and persistent in your approach.
Your final output should consist only of the correctly formatted JSON object enclosed in tags and should not duplicate or rehash any of the work you did in the thinking block."""
```
## /index/agent/utils.py
```py path="/index/agent/utils.py"
import base64
import importlib.resources
import logging
from index.browser.utils import scale_b64_image
logger = logging.getLogger(__name__)
def load_demo_image_as_b64(image_name: str) -> str:
"""
Load an image from the demo_images directory and return it as a base64 string.
Works reliably whether the package is used directly or as a library.
Args:
image_name: Name of the image file (including extension)
Returns:
Base64 encoded string of the image
"""
try:
# Using importlib.resources to reliably find package data
with importlib.resources.path('index.agent.demo_images', image_name) as img_path:
with open(img_path, 'rb') as img_file:
b64 = base64.b64encode(img_file.read()).decode('utf-8')
return scale_b64_image(b64, 0.75)
except Exception as e:
logger.error(f"Error loading demo image {image_name}: {e}")
raise
```
## /index/browser/browser.py
```py path="/index/browser/browser.py"
"""
Streamlined Playwright browser implementation.
"""
import asyncio
import base64
import io
import logging
from dataclasses import dataclass, field
from importlib import resources
from typing import Any, Optional
from lmnr import observe
from PIL import Image
from playwright.async_api import (
Browser as PlaywrightBrowser,
)
from playwright.async_api import (
BrowserContext as PlaywrightBrowserContext,
)
from playwright.async_api import (
Page,
Playwright,
StorageState,
async_playwright,
)
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential,
)
from typing_extensions import TypedDict # to account for older python versions
# Import detector class
from index.browser.detector import Detector
from index.browser.models import (
BrowserError,
BrowserState,
InteractiveElementsData,
TabInfo,
)
from index.browser.utils import (
filter_elements,
put_highlight_elements_on_screenshot,
scale_b64_image,
)
logger = logging.getLogger(__name__)
INTERACTIVE_ELEMENTS_JS_CODE = resources.read_text('index.browser', 'findVisibleInteractiveElements.js')
class ViewportSize(TypedDict):
width: int
height: int
@dataclass
class BrowserConfig:
"""
Simplified configuration for the Browser.
Parameters:
cdp_url: Optional[str] = None
Connect to a browser instance via CDP
viewport_size: ViewportSize = {"width": 1024, "height": 768}
Default browser window size
storage_state: Optional[StorageState] = None
Storage state to set
detector: Optional[Detector] = None
Detector instance for CV element detection. If None, CV detection is disabled.
"""
cdp_url: Optional[str] = None
viewport_size: ViewportSize = field(default_factory=lambda: {"width": 1024, "height": 768})
storage_state: Optional[StorageState] = None
detector: Optional[Detector] = None
class Browser:
"""
Unified Browser responsible for interacting with the browser via Playwright.
"""
def __init__(self, config: BrowserConfig = BrowserConfig(), close_context: bool = True):
logger.debug('Initializing browser')
self.config = config
self.close_context = close_context
# Playwright-related attributes
self.playwright: Optional[Playwright] = None
self.playwright_browser: Optional[PlaywrightBrowser] = None
self.context: Optional[PlaywrightBrowserContext] = None
# Page and state management
self.current_page: Optional[Page] = None
self._state: Optional[BrowserState] = None
self._cdp_session = None
# CV detection-related attributes
self.detector: Optional[Detector] = config.detector
self.screenshot_scale_factor = None
# Initialize state
self._init_state()
async def __aenter__(self):
"""Async context manager entry"""
await self._init_browser()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit"""
if self.close_context:
await self.close()
def _init_state(self, url: str = '') -> None:
"""Initialize browser state"""
self._state = BrowserState(
url=url,
screenshot_with_highlights=None,
tabs=[],
interactive_elements={},
)
async def _init_browser(self):
"""Initialize the browser and context"""
logger.debug('Initializing browser context')
# Start playwright if needed
if self.playwright is None:
self.playwright = await async_playwright().start()
# Initialize browser if needed
if self.playwright_browser is None:
if self.config.cdp_url:
logger.info(f'Connecting to remote browser via CDP {self.config.cdp_url}')
attempts = 0
while True:
try:
self.playwright_browser = await self.playwright.chromium.connect_over_cdp(
self.config.cdp_url,
timeout=2500,
)
break
except Exception as e:
logger.error(f'Failed to connect to remote browser via CDP {self.config.cdp_url}: {e}. Retrying...')
await asyncio.sleep(1)
attempts += 1
if attempts > 3:
raise e
logger.info(f'Connected to remote browser via CDP {self.config.cdp_url}')
else:
logger.info('Launching new browser instance')
self.playwright_browser = await self.playwright.chromium.launch(
headless=False,
args=[
'--no-sandbox',
'--disable-blink-features=AutomationControlled',
'--disable-web-security',
'--disable-site-isolation-trials',
'--disable-features=IsolateOrigins,site-per-process',
f'--window-size={self.config.viewport_size["width"]},{self.config.viewport_size["height"]}',
]
)
# Create context if needed
if self.context is None:
if len(self.playwright_browser.contexts) > 0:
self.context = self.playwright_browser.contexts[0]
else:
self.context = await self.playwright_browser.new_context(
viewport=self.config.viewport_size,
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.102 Safari/537.36',
java_script_enabled=True,
bypass_csp=True,
ignore_https_errors=True
)
# Apply anti-detection scripts
await self._apply_anti_detection_scripts()
self.context.on('page', self._on_page_change)
if self.config.storage_state and 'cookies' in self.config.storage_state:
await self.context.add_cookies(self.config.storage_state['cookies'])
# Create page if needed
if self.current_page is None:
if len(self.context.pages) > 0:
self.current_page = self.context.pages[-1]
else:
self.current_page = await self.context.new_page()
return self
async def _on_page_change(self, page: Page):
"""Handle page change events"""
logger.info(f'Current page changed to {page.url}')
self._cdp_session = await self.context.new_cdp_session(page)
self.current_page = page
async def _apply_anti_detection_scripts(self):
"""Apply scripts to avoid detection as automation"""
await self.context.add_init_script(
"""
// Webdriver property
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['en-US']
});
// Plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// Chrome runtime
window.chrome = { runtime: {} };
// Permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
(function () {
const originalAttachShadow = Element.prototype.attachShadow;
Element.prototype.attachShadow = function attachShadow(options) {
return originalAttachShadow.call(this, { ...options, mode: "open" });
};
})();
"""
)
async def close(self):
"""Close the browser instance and cleanup resources"""
logger.debug('Closing browser')
try:
# Close CDP session if exists
self._cdp_session = None
# Close context
if self.context:
try:
await self.context.close()
except Exception as e:
logger.debug(f'Failed to close context: {e}')
self.context = None
# Close browser
if self.playwright_browser:
try:
await self.playwright_browser.close()
except Exception as e:
logger.debug(f'Failed to close browser: {e}')
self.playwright_browser = None
# Stop playwright
if self.playwright:
await self.playwright.stop()
self.playwright = None
except Exception as e:
logger.error(f'Error during browser cleanup: {e}')
finally:
self.context = None
self.current_page = None
self._state = None
self.playwright_browser = None
self.playwright = None
async def goto(self, url: str):
"""Navigate to a URL"""
page = await self.get_current_page()
await page.goto(url, wait_until='domcontentloaded')
await asyncio.sleep(2)
async def get_tabs_info(self) -> list[TabInfo]:
"""Get information about all tabs"""
tabs_info = []
for page_id, page in enumerate(self.context.pages):
tab_info = TabInfo(page_id=page_id, url=page.url, title=await page.title())
tabs_info.append(tab_info)
return tabs_info
async def switch_to_tab(self, page_id: int) -> None:
"""Switch to a specific tab by its page_id"""
if self.context is None:
await self._init_browser()
pages = self.context.pages
if page_id >= len(pages):
raise BrowserError(f'No tab found with page_id: {page_id}')
page = pages[page_id]
self.current_page = page
await page.bring_to_front()
await page.wait_for_load_state()
async def create_new_tab(self, url: str | None = None) -> None:
"""Create a new tab and optionally navigate to a URL"""
if self.context is None:
await self._init_browser()
new_page = await self.context.new_page()
self.current_page = new_page
await new_page.wait_for_load_state()
if url:
await new_page.goto(url, wait_until='domcontentloaded')
async def close_current_tab(self):
"""Close the current tab"""
if self.current_page is None:
return
await self.current_page.close()
# Switch to the first available tab if any exist
if self.context and self.context.pages:
await self.switch_to_tab(0)
async def get_current_page(self) -> Page:
"""Get the current page"""
if self.current_page is None:
await self._init_browser()
return self.current_page
def get_state(self) -> BrowserState:
"""Get the current browser state"""
return self._state
@observe(name='browser.update_state', ignore_output=True)
async def update_state(self) -> BrowserState:
"""Update the browser state with current page information and return it"""
self._state = await self._update_state()
return self._state
@observe(name='browser._update_state', ignore_output=True)
async def _update_state(self) -> BrowserState:
"""Update and return state."""
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=0.5, min=0.5, max=2),
retry=retry_if_exception_type((Exception)),
reraise=True
)
async def get_stable_state():
if self.current_page is None:
await self._init_browser()
url = self.current_page.url
detect_sheets = 'docs.google.com/spreadsheets/d' in url
screenshot_b64 = await self.fast_screenshot()
interactive_elements_data = await self.get_interactive_elements(screenshot_b64, detect_sheets)
interactive_elements = {element.index: element for element in interactive_elements_data.elements}
# Create highlighted version of the screenshot
screenshot_with_highlights = put_highlight_elements_on_screenshot(
interactive_elements,
screenshot_b64
)
tabs = await self.get_tabs_info()
return BrowserState(
url=url,
tabs=tabs,
screenshot_with_highlights=screenshot_with_highlights,
screenshot=screenshot_b64,
viewport=interactive_elements_data.viewport,
interactive_elements=interactive_elements,
)
try:
self._state = await get_stable_state()
return self._state
except Exception as e:
logger.error(f'Failed to update state after multiple attempts: {str(e)}')
# Return last known good state if available
if hasattr(self, '_state'):
return self._state
raise
@observe(name='browser.detect_browser_elements')
async def detect_browser_elements(self) -> InteractiveElementsData:
"""Get all interactive elements on the page"""
page = await self.get_current_page()
result = await page.evaluate(INTERACTIVE_ELEMENTS_JS_CODE)
interactive_elements_data = InteractiveElementsData(**result)
return interactive_elements_data
@observe(name='browser.get_interactive_elements', ignore_output=True)
async def get_interactive_elements(self, screenshot_b64: str, detect_sheets: bool = False) -> InteractiveElementsData:
"""
Get interactive elements using combined browser and CV detection.
Args:
screenshot_b64: Optional base64 encoded screenshot. If None, a new screenshot will be taken.
detect_sheets: Whether to detect sheets elements
Returns:
Combined detection results
"""
elements = []
if self.detector is not None:
browser_elements_data = await self.detect_browser_elements()
scale_factor = browser_elements_data.viewport.width / 1024
cv_elements = await self.detector.detect_from_image(screenshot_b64, scale_factor, detect_sheets)
# Combine and filter detections
elements = filter_elements(browser_elements_data.elements + cv_elements)
else:
browser_elements_data = await self.detect_browser_elements()
elements = browser_elements_data.elements
# Create new InteractiveElementsData with combined elements
return InteractiveElementsData(
viewport=browser_elements_data.viewport,
elements=elements
)
async def get_cdp_session(self):
"""Get or create a CDP session for the current page"""
# Create a new session if we don't have one or the page has changed
if (self._cdp_session is None or
not hasattr(self._cdp_session, '_page') or
self._cdp_session._page != self.current_page):
self._cdp_session = await self.context.new_cdp_session(self.current_page)
# Store reference to the page this session belongs to
self._cdp_session._page = self.current_page
return self._cdp_session
@observe(name='browser.take_screenshot', ignore_output=True)
async def fast_screenshot(self) -> str:
"""
Returns a base64 encoded screenshot of the current page.
Returns:
Base64 encoded screenshot
"""
# Use cached CDP session instead of creating a new one each time
cdp_session = await self.get_cdp_session()
screenshot_params = {
"format": "png",
"fromSurface": False,
"captureBeyondViewport": False,
}
# Capture screenshot using CDP Session
screenshot_data = await cdp_session.send("Page.captureScreenshot", screenshot_params)
screenshot_b64 = screenshot_data["data"]
if self.screenshot_scale_factor is None:
test_img_data = base64.b64decode(screenshot_b64)
test_img = Image.open(io.BytesIO(test_img_data))
logger.info(f'Test image size: {test_img.size}')
self.screenshot_scale_factor = 1024 / test_img.size[0]
logger.info(f'Screenshot scale factor: {self.screenshot_scale_factor}')
screenshot_b64 = scale_b64_image(screenshot_b64, self.screenshot_scale_factor)
return screenshot_b64
async def get_cookies(self) -> list[dict[str, Any]]:
"""Get cookies from the browser"""
if self.context:
cookies = await self.context.cookies()
return cookies
return []
async def get_storage_state(self) -> dict[str, Any]:
"""Get local storage from the browser"""
if self.context:
cookies = await self.context.cookies()
return {
'cookies': cookies,
}
return {}
```
## /index/browser/detector.py
```py path="/index/browser/detector.py"
"""
Computer vision detector module.
"""
from abc import ABC, abstractmethod
from typing import List
from index.browser.models import InteractiveElement
class Detector(ABC):
"""Abstract interface for object detection in browser screenshots."""
@abstractmethod
async def detect_from_image(self, image_b64: str, scale_factor: float, detect_sheets: bool = False) -> List[InteractiveElement]:
"""
Detect interactive elements from a base64 encoded image.
Args:
image_b64: Base64 encoded image screenshot.
scale_factor: Scale factor to scale the coordinates of screenshot to browser viewport coordinates.
detect_sheets: Flag to indicate if specialized sheet detection should be used.
Returns:
List of detected InteractiveElement objects.
"""
pass
```
## /index/browser/findVisibleInteractiveElements.js
```js path="/index/browser/findVisibleInteractiveElements.js"
() => {
console.time('totalExecutionTime');
// Define element weights for interactive likelihood - moved to higher scope
const elementWeights = {
'button': 10,
'a': 10,
'input': 10,
'select': 10,
'textarea': 10,
'summary': 8,
'details': 7,
'label': 5, // Labels are clickable but not always interactive
'option': 7,
'tr': 4,
'th': 3,
'td': 3,
'li': 8,
'div': 2,
'span': 1,
'img': 2,
'svg': 3,
'path': 3
};
function generateUniqueId() {
const rand = Math.random().toString(36);
return `ba-${rand}`;
}
// Add this helper function to check element coverage
function isElementTooBig(rect) {
const viewportWidth = window.innerWidth || document.documentElement.clientWidth;
const viewportHeight = window.innerHeight || document.documentElement.clientHeight;
const viewportArea = viewportWidth * viewportHeight;
// Calculate visible area of the element
const visibleWidth = Math.min(rect.right, viewportWidth) - Math.max(rect.left, 0);
const visibleHeight = Math.min(rect.bottom, viewportHeight) - Math.max(rect.top, 0);
const visibleArea = visibleWidth * visibleHeight;
// Check if element covers more than 50% of viewport
return (visibleArea / viewportArea) > 0.5;
}
// Helper function to check if element is in the visible viewport
function isInViewport(rect) {
// Get viewport dimensions
const viewportWidth = window.innerWidth || document.documentElement.clientWidth;
const viewportHeight = window.innerHeight || document.documentElement.clientHeight;
// Element must have meaningful size
if (rect.width < 2 || rect.height < 2) {
return false;
}
// Check if substantial part of the element is in viewport (at least 30%)
const visibleWidth = Math.min(rect.right, viewportWidth) - Math.max(rect.left, 0);
const visibleHeight = Math.min(rect.bottom, viewportHeight) - Math.max(rect.top, 0);
if (visibleWidth <= 0 || visibleHeight <= 0) {
return false; // Not in viewport at all
}
const visibleArea = visibleWidth * visibleHeight;
const totalArea = rect.width * rect.height;
const visiblePercent = visibleArea / totalArea;
return visiblePercent >= 0.3; // At least 30% visible
}
// Helper function to get correct bounding rectangle, accounting for iframes
function getAdjustedBoundingClientRect(element, contextInfo = null) {
const rect = element.getBoundingClientRect();
// If element is in an iframe, adjust coordinates
if (contextInfo && contextInfo.iframe) {
const iframeRect = contextInfo.iframe.getBoundingClientRect();
return {
top: rect.top + iframeRect.top,
right: rect.right + iframeRect.left,
bottom: rect.bottom + iframeRect.top,
left: rect.left + iframeRect.left,
width: rect.width,
height: rect.height
};
}
return rect;
}
// Helper function to check if element is the top element at its position
function isTopElement(element) {
try {
const rect = getAdjustedBoundingClientRect(element, element._contextInfo);
const centerX = rect.left + rect.width / 2;
const centerY = rect.top + rect.height / 2;
// Check if the element is visible at its center point
const elementsAtPoint = document.elementsFromPoint(centerX, centerY);
// Nothing at this point (might be covered by an overlay)
if (!elementsAtPoint || elementsAtPoint.length === 0) {
return false;
}
// Handle iframe cases
if (element._contextInfo && element._contextInfo.iframe) {
// For elements in iframes, check if the iframe itself is the top-level element
// then check if the element is topmost within that iframe
const iframe = element._contextInfo.iframe;
// First check if iframe is visible at the adjusted center point
const iframeVisibleAtPoint = elementsAtPoint.includes(iframe);
if (!iframeVisibleAtPoint) {
return false;
}
// Then check if element is topmost within the iframe
try {
const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
// Convert coordinates to iframe's local coordinate system
const iframeRect = iframe.getBoundingClientRect();
const localX = centerX - iframeRect.left;
const localY = centerY - iframeRect.top;
const elementAtPointInIframe = iframeDoc.elementFromPoint(localX, localY);
if (!elementAtPointInIframe) return false;
return elementAtPointInIframe === element || element.contains(elementAtPointInIframe) || elementAtPointInIframe.contains(element);
} catch (e) {
console.warn('Error checking element position in iframe:', e);
return false;
}
}
// Handle shadow DOM cases
if (element._contextInfo && element._contextInfo.shadowHost) {
// For shadow DOM elements, first check if its shadow host is visible
const shadowHost = element._contextInfo.shadowHost;
const shadowHostVisible = elementsAtPoint.includes(shadowHost);
if (!shadowHostVisible) {
return false;
}
// Shadow DOM elements aren't directly accessible via elementsFromPoint
// So we're simplifying and assuming visibility based on the host visibility
return true;
}
const elementAtPoint = document.elementFromPoint(centerX, centerY);
if (!elementAtPoint) return false;
// Check if the element at this point is our element or a descendant/ancestor of our element
return element === elementAtPoint ||
element.contains(elementAtPoint) ||
elementAtPoint.contains(element);
} catch (e) {
console.warn('Error in isTopElement check:', e);
return false;
}
}
// Add helper function to get effective z-index
function getEffectiveZIndex(element) {
let current = element;
let zIndex = 'auto';
while (current && current !== document) {
const style = window.getComputedStyle(current);
if (style.position !== 'static' && style.zIndex !== 'auto') {
zIndex = parseInt(style.zIndex, 10);
break;
}
current = current.parentElement;
}
return zIndex === 'auto' ? 0 : zIndex;
}
// Function to find all interactive elements
function findInteractiveElements() {
console.time('findInteractiveElements');
// Batch selectors for better performance
const selectors = {
highPriority: 'button, a[href], input:not([type="hidden"]), select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="menuitem"], [role="tab"], li[role="option"], [role="switch"]',
mediumPriority: 'details, summary, svg, path, td, [role="option"], [role="radio"], [role="switch"], [tabindex]:not([tabindex="-1"]), [aria-label], [aria-labelledby]',
lowPriority: '[onclick], .clickable, .btn, .button, .nav-item, .menu-item'
};
// Process only elements in viewport for better performance
const allElements = [];
const processedElements = new Set();
const viewportElements = [];
// Function to query elements within a document or shadow root
function queryElementsInContext(context, selector) {
try {
return context.querySelectorAll(selector);
} catch (e) {
console.warn('Error querying for elements:', e);
return [];
}
}
// Function to process a document or shadow root
function processContext(context, contextInfo = { iframe: null, shadowHost: null }) {
// Process elements in priority order
Object.keys(selectors).forEach(priority => {
try {
const elements = queryElementsInContext(context, selectors[priority]);
for (let i = 0; i < elements.length; i++) {
const element = elements[i];
// Skip already processed
if (processedElements.has(element)) {
continue;
}
processedElements.add(element);
// Add context information to the element
element._contextInfo = contextInfo;
allElements.push(element);
}
} catch (e) {
console.warn(`Error processing ${priority} elements:`, e);
}
});
// Process shadow DOM
const shadowHosts = queryElementsInContext(context, '*');
for (let i = 0; i < shadowHosts.length; i++) {
const host = shadowHosts[i];
if (host.shadowRoot) {
processContext(
host.shadowRoot,
{
iframe: contextInfo.iframe,
shadowHost: host
}
);
}
}
}
// Process main document
processContext(document);
// Process iframes
try {
const iframes = document.querySelectorAll('iframe');
for (let i = 0; i < iframes.length; i++) {
const iframe = iframes[i];
// Skip iframes from different origins
try {
// This will throw if cross-origin
const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
processContext(iframeDoc, { iframe: iframe, shadowHost: null });
} catch (e) {
console.warn('Could not access iframe content (likely cross-origin):', e);
}
}
} catch (e) {
console.warn('Error processing iframes:', e);
}
// Process cursor:pointer elements in all contexts
function processCursorPointerElements(context, contextInfo = { iframe: null, shadowHost: null }) {
try {
const allElementsInContext = queryElementsInContext(context, '*');
for (let i = 0; i < allElementsInContext.length; i++) {
const element = allElementsInContext[i];
// Skip already processed
if (processedElements.has(element)) {
continue;
}
// Quick check before expensive operations
const rect = getAdjustedBoundingClientRect(element, contextInfo);
if (!isInViewport(rect)) {
continue;
}
// Check style
if (isTopElement(element) && window.getComputedStyle(element).cursor === 'pointer') {
// Add context information to the element
element._contextInfo = contextInfo;
processedElements.add(element);
allElements.push(element);
viewportElements.push({
element: element,
rect: rect,
weight: 1,
zIndex: getEffectiveZIndex(element)
});
}
// Process shadow DOM of this element
if (element.shadowRoot) {
processCursorPointerElements(
element.shadowRoot,
{
iframe: contextInfo.iframe,
shadowHost: element
}
);
}
}
} catch (e) {
console.warn('Error processing cursor:pointer elements:', e);
}
}
// Process cursor:pointer elements in the main document
processCursorPointerElements(document);
// Process cursor:pointer elements in iframes
try {
const iframes = document.querySelectorAll('iframe');
for (let i = 0; i < iframes.length; i++) {
const iframe = iframes[i];
try {
const iframeDoc = iframe.contentDocument || iframe.contentWindow.document;
processCursorPointerElements(iframeDoc, { iframe: iframe, shadowHost: null });
} catch (e) {
// Already logged in previous iframe processing
}
}
} catch (e) {
// Already logged in previous iframe processing
}
// Filter for visible elements
for (let i = 0; i < allElements.length; i++) {
const element = allElements[i];
// Skip detailed processing if not in viewport
const rect = getAdjustedBoundingClientRect(element, element._contextInfo);
if (!isInViewport(rect)) {
continue;
}
// Skip disabled elements
if (element.hasAttribute('disabled') ||
element.getAttribute('aria-disabled') === 'true') {
continue;
}
// Add check for too-large elements
if (isElementTooBig(rect)) {
continue; // Skip elements that cover more than 50% of viewport
}
// Check if the element is the top element at its position
if (!isTopElement(element)) {
continue;
}
// Calculate element weight
let weight = elementWeights[element.tagName.toLowerCase()] || 1;
// Boost weight for elements with specific attributes
if (element.getAttribute('role') === 'button') weight = Math.max(weight, 8);
if (element.hasAttribute('onclick')) weight = Math.max(weight, 7);
if (element.hasAttribute('href')) weight = Math.max(weight, 8);
if (window.getComputedStyle(element).cursor === 'pointer') weight = Math.max(weight, 4);
// Add to viewport elements
viewportElements.push({
element: element,
rect: rect,
weight: weight,
zIndex: getEffectiveZIndex(element)
});
// Add this to the code that processes each element
element.setAttribute('data-element-index', i);
// Add a unique identifier attribute to the element
const uniqueId = generateUniqueId();
element.setAttribute('data-browser-agent-id', uniqueId);
}
console.timeEnd('findInteractiveElements');
console.log(`Found ${viewportElements.length} interactive elements in viewport (out of ${allElements.length} total)`);
return viewportElements;
}
// Calculate Intersection over Union (IoU) between two rectangles
function calculateIoU(rect1, rect2) {
// Calculate area of each rectangle
const area1 = (rect1.right - rect1.left) * (rect1.bottom - rect1.top);
const area2 = (rect2.right - rect2.left) * (rect2.bottom - rect2.top);
// Calculate intersection
const intersectLeft = Math.max(rect1.left, rect2.left);
const intersectTop = Math.max(rect1.top, rect2.top);
const intersectRight = Math.min(rect1.right, rect2.right);
const intersectBottom = Math.min(rect1.bottom, rect2.bottom);
// Check if intersection exists
if (intersectRight < intersectLeft || intersectBottom < intersectTop) {
return 0; // No intersection
}
// Calculate area of intersection
const intersectionArea = (intersectRight - intersectLeft) * (intersectBottom - intersectTop);
// Calculate union area
const unionArea = area1 + area2 - intersectionArea;
// Calculate IoU
return intersectionArea / unionArea;
}
// Check if rect1 is fully contained within rect2
function isFullyContained(rect1, rect2) {
return rect1.left >= rect2.left &&
rect1.right <= rect2.right &&
rect1.top >= rect2.top &&
rect1.bottom <= rect2.bottom;
}
// Filter overlapping elements using weight and IoU
function filterOverlappingElements(elements) {
console.time('filterOverlappingElements');
// Sort by area (descending - larger first), then by weight (descending) for same area
elements.sort((a, b) => {
// Calculate areas
const areaA = a.rect.width * a.rect.height;
const areaB = b.rect.width * b.rect.height;
// Sort by area first (larger area first)
if (areaB !== areaA) {
return areaB - areaA; // Larger area first
}
// For same area, sort by weight (higher weight first)
return b.weight - a.weight;
});
const filteredElements = [];
const iouThreshold = 0.7; // Threshold for considering elements as overlapping
// Add elements one by one, checking against already added elements
for (let i = 0; i < elements.length; i++) {
const current = elements[i];
let shouldAdd = true;
// For each element already in our filtered list
for (let j = 0; j < filteredElements.length; j++) {
const existing = filteredElements[j];
// Convert DOMRect to plain object for IoU calculation
const currentRect = {
left: current.rect.left,
top: current.rect.top,
right: current.rect.right,
bottom: current.rect.bottom
};
const existingRect = {
left: existing.rect.left,
top: existing.rect.top,
right: existing.rect.right,
bottom: existing.rect.bottom
};
// Check for high overlap
const iou = calculateIoU(currentRect, existingRect);
if (iou > iouThreshold) {
shouldAdd = false;
break;
}
// Check if current element is fully contained within an existing element with higher weight
if (existing.weight >= current.weight &&
isFullyContained(currentRect, existingRect) &&
existing.zIndex === current.zIndex) {
shouldAdd = false;
break;
}
}
if (shouldAdd) {
filteredElements.push(current);
}
}
console.timeEnd('filterOverlappingElements');
return filteredElements;
}
// Main function to get interactive elements with coordinates
function getInteractiveElementsData() {
// Find all potential interactive elements
const potentialElements = findInteractiveElements();
// Filter out overlapping elements
const filteredElements = filterOverlappingElements(potentialElements);
console.log(`Filtered to ${filteredElements.length} non-overlapping elements`);
// Sort elements by position (top-to-bottom, left-to-right)
const sortedElements = sortElementsByPosition(filteredElements);
// Prepare result with viewport metadata
const result = {
viewport: {
width: window.innerWidth,
height: window.innerHeight,
scrollX: Math.round(window.scrollX),
scrollY: Math.round(window.scrollY),
devicePixelRatio: window.devicePixelRatio || 1,
scrollDistanceAboveViewport: Math.round(window.scrollY),
scrollDistanceBelowViewport: Math.round(document.documentElement.scrollHeight - window.scrollY - window.innerHeight)
},
elements: []
};
// Process each interactive element (now sorted by position)
sortedElements.forEach((item, index) => {
const element = item.element;
const rect = item.rect;
// Ensure each element has a index_id
let browserId = element.getAttribute('data-browser-agent-id');
if (!browserId) {
const uniqueId = generateUniqueId();
element.setAttribute('data-browser-agent-id', uniqueId);
browserId = uniqueId;
}
// Get element text (direct or from children)
let text = element.innerText || '';
if (!text) {
const textNodes = Array.from(element.childNodes)
.filter(node => node.nodeType === Node.TEXT_NODE)
.map(node => node.textContent.trim())
.filter(content => content.length > 0);
text = textNodes.join(' ');
}
// Extract important attributes
const attributes = {};
['id', 'class', 'href', 'type', 'name', 'value', 'placeholder', 'aria-label', 'title', 'role'].forEach(attr => {
if (element.hasAttribute(attr)) {
attributes[attr] = element.getAttribute(attr);
}
});
// Determine input type and element role more clearly
let elementType = element.tagName.toLowerCase();
let inputType = null;
// Handle input elements specifically
if (elementType === 'input' && element.hasAttribute('type')) {
inputType = element.getAttribute('type').toLowerCase();
}
// scaledRect is for coordinates scaled to 1024 width
const scaleFactor = 1024 / window.innerWidth
const scaledRect = {
left: Math.round(rect.left * scaleFactor),
top: Math.round(rect.top * scaleFactor),
right: Math.round(rect.right * scaleFactor),
bottom: Math.round(rect.bottom * scaleFactor),
width: Math.round(rect.width * scaleFactor),
height: Math.round(rect.height * scaleFactor),
}
// Create element data object
const elementData = {
tagName: elementType,
text: text.trim(),
attributes,
index,
weight: item.weight,
browserAgentId: browserId, // Use the guaranteed ID
inputType: inputType, // Add specific input type
viewport: {
x: Math.round(rect.left),
y: Math.round(rect.top),
width: Math.round(rect.width),
height: Math.round(rect.height)
},
page: {
x: Math.round(rect.left + window.scrollX),
y: Math.round(rect.top + window.scrollY),
width: Math.round(rect.width),
height: Math.round(rect.height)
},
center: {
x: Math.round(rect.left + rect.width/2),
y: Math.round(rect.top + rect.height/2)
},
rect: scaledRect,
zIndex: item.zIndex
};
// Add context information for iframe or shadow DOM if applicable
if (element._contextInfo) {
elementData.context = {};
// Add iframe information if element is within an iframe
if (element._contextInfo.iframe) {
const iframeRect = element._contextInfo.iframe.getBoundingClientRect();
elementData.context.iframe = {
id: element._contextInfo.iframe.id || null,
name: element._contextInfo.iframe.name || null,
src: element._contextInfo.iframe.src || null,
rect: {
x: Math.round(iframeRect.left),
y: Math.round(iframeRect.top),
width: Math.round(iframeRect.width),
height: Math.round(iframeRect.height)
}
};
}
// Add shadow DOM information if element is within a shadow DOM
if (element._contextInfo.shadowHost) {
const shadowHost = element._contextInfo.shadowHost;
const shadowHostRect = shadowHost.getBoundingClientRect();
elementData.context.shadowDOM = {
hostTagName: shadowHost.tagName.toLowerCase(),
hostId: shadowHost.id || null,
hostRect: {
x: Math.round(shadowHostRect.left),
y: Math.round(shadowHostRect.top),
width: Math.round(shadowHostRect.width),
height: Math.round(shadowHostRect.height)
}
};
}
}
result.elements.push(elementData);
});
return result;
}
// Add new function to sort elements by position
function sortElementsByPosition(elements) {
// Define what "same row" means (elements within this Y-distance are considered in the same row)
const ROW_THRESHOLD = 20; // pixels
// First, group elements into rows based on their Y position
const rows = [];
let currentRow = [];
// Copy elements to avoid modifying the original array
const sortedByY = [...elements].sort((a, b) => {
return a.rect.top - b.rect.top;
});
// Group into rows
sortedByY.forEach(element => {
if (currentRow.length === 0) {
// Start a new row
currentRow.push(element);
} else {
// Check if this element is in the same row as the previous ones
const lastElement = currentRow[currentRow.length - 1];
if (Math.abs(element.rect.top - lastElement.rect.top) <= ROW_THRESHOLD) {
// Same row
currentRow.push(element);
} else {
// New row
rows.push([...currentRow]);
currentRow = [element];
}
}
});
// Add the last row if not empty
if (currentRow.length > 0) {
rows.push(currentRow);
}
// Sort each row by X position (left to right)
rows.forEach(row => {
row.sort((a, b) => a.rect.left - b.rect.left);
});
// Flatten the rows back into a single array
return rows.flat();
}
// Execute and measure performance
console.time('getInteractiveElements');
const result = getInteractiveElementsData();
console.timeEnd('getInteractiveElements');
console.timeEnd('totalExecutionTime');
return result;
};
```
## /index/browser/fonts/OpenSans-Medium.ttf
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/index/browser/fonts/OpenSans-Medium.ttf
## /index/browser/models.py
```py path="/index/browser/models.py"
from dataclasses import dataclass, field
from typing import Optional
from pydantic import BaseModel, ConfigDict
from pydantic.alias_generators import to_camel
# Pydantic
class TabInfo(BaseModel):
"""Represents information about a browser tab"""
page_id: int
url: str
title: str
class Coordinates(BaseModel):
x: int
y: int
width: Optional[int] = None
height: Optional[int] = None
class Rect(BaseModel):
left: int
top: int
right: int
bottom: int
width: int
height: int
class InteractiveElement(BaseModel):
"""Represents an interactive element on the page"""
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
index: int
tag_name: str
text: str
attributes: dict[str, str]
viewport: Coordinates
page: Coordinates
center: Coordinates
weight: float
browser_agent_id: str
input_type: Optional[str] = field(default=None)
rect: Rect
z_index: int
class BrowserError(Exception):
"""Base class for all browser errors"""
class URLNotAllowedError(BrowserError):
"""Error raised when a URL is not allowed"""
class Viewport(BaseModel):
"""Represents the viewport of the browser"""
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
width: int = field(default_factory=lambda: 1024)
height: int = field(default_factory=lambda: 768)
scroll_x: int = field(default_factory=lambda: 0)
scroll_y: int = field(default_factory=lambda: 0)
device_pixel_ratio: float = field(default_factory=lambda: 1)
scroll_distance_above_viewport: int = field(default_factory=lambda: 0)
scroll_distance_below_viewport: int = field(default_factory=lambda: 0)
class InteractiveElementsData(BaseModel):
"""Represents the data returned by the interactive elements script"""
viewport: Viewport
elements: list[InteractiveElement]
@dataclass
class BrowserState:
url: str
tabs: list[TabInfo]
viewport: Viewport = field(default_factory=Viewport)
screenshot_with_highlights: Optional[str] = None
screenshot: Optional[str] = None
interactive_elements: dict[int, InteractiveElement] = field(default_factory=dict)
```
## /index/browser/utils.py
```py path="/index/browser/utils.py"
import base64
import logging
from io import BytesIO
from pathlib import Path
from typing import List
from PIL import Image, ImageDraw, ImageFont
from index.browser.models import InteractiveElement, Rect
logger = logging.getLogger(__name__)
def put_highlight_elements_on_screenshot(elements: dict[int, InteractiveElement], screenshot_b64: str) -> str:
"""Highlight elements using Pillow instead of OpenCV"""
try:
# Decode base64 to PIL Image
image_data = base64.b64decode(screenshot_b64)
image = Image.open(BytesIO(image_data))
draw = ImageDraw.Draw(image)
# Colors (RGB format for PIL)
base_colors = [
(204, 0, 0),
(0, 136, 0),
(0, 0, 204),
(204, 112, 0),
(102, 0, 102),
(0, 102, 102),
(204, 51, 153),
(44, 0, 102),
(204, 35, 0),
(28, 102, 66),
(170, 0, 0),
(36, 82, 123)
]
placed_labels = []
def generate_unique_color(base_color, element_idx):
"""Generate a unique color variation based on element index"""
r, g, b = base_color
# Use prime numbers to create deterministic but non-repeating patterns
offset_r = (element_idx * 17) % 31 - 15 # Range: -15 to 15
offset_g = (element_idx * 23) % 29 - 14 # Range: -14 to 14
offset_b = (element_idx * 13) % 27 - 13 # Range: -13 to 13
# Ensure RGB values stay within 0-255 range
r = max(0, min(255, r + offset_r))
g = max(0, min(255, g + offset_g))
b = max(0, min(255, b + offset_b))
return (r, g, b)
# Load custom font from the package
try:
# Path to your packaged font
font_path = Path(__file__).parent / "fonts" / "OpenSans-Medium.ttf"
font = ImageFont.truetype(str(font_path), 11)
except Exception as e:
logger.warning(f"Could not load custom font: {e}, falling back to default")
font = ImageFont.load_default()
for idx, element in elements.items():
# don't draw sheets elements
if element.browser_agent_id.startswith("row_") or element.browser_agent_id.startswith("column_"):
continue
base_color = base_colors[idx % len(base_colors)]
color = generate_unique_color(base_color, idx)
rect = element.rect
# Draw rectangle
draw.rectangle(
[(rect.left, rect.top), (rect.right, rect.bottom)],
outline=color,
width=2
)
# Prepare label
text = str(idx)
# Get precise text dimensions for proper centering
text_bbox = draw.textbbox((0, 0), text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
# Make label size exactly proportional for better aesthetics
label_width = text_width + 4
label_height = text_height + 4
# Positioning logic
if label_width > rect.width or label_height > rect.height:
label_x = rect.left + rect.width
label_y = rect.top
else:
label_x = rect.left + rect.width - label_width
label_y = rect.top
# Check for overlaps with existing labels
label_rect = {
'left': label_x, 'top': label_y,
'right': label_x + label_width, 'bottom': label_y + label_height
}
for existing in placed_labels:
if not (label_rect['right'] < existing['left'] or
label_rect['left'] > existing['right'] or
label_rect['bottom'] < existing['top'] or
label_rect['top'] > existing['bottom']):
label_y = existing['bottom'] + 2
label_rect['top'] = label_y
label_rect['bottom'] = label_y + label_height
break
# Ensure label is visible within image boundaries
img_width, img_height = image.size
if label_x < 0:
label_x = 0
elif label_x + label_width >= img_width:
label_x = img_width - label_width - 1
if label_y < 0:
label_y = 0
elif label_y + label_height >= img_height:
label_y = img_height - label_height - 1
# Draw label background
draw.rectangle(
[(label_x, label_y), (label_x + label_width, label_y + label_height)],
fill=color
)
# magic numbers to center the text
text_x = label_x + 3
text_y = label_y - 1
# Draw text
draw.text(
(text_x, text_y),
text,
fill=(255, 255, 255),
font=font
)
placed_labels.append(label_rect)
# Convert back to base64
buffer = BytesIO()
image.save(buffer, format="PNG")
new_image_base64 = base64.b64encode(buffer.getvalue()).decode()
return new_image_base64
except Exception as e:
logger.error(f"Failed to add highlights to screenshot: {str(e)}")
return screenshot_b64
def scale_b64_image(image_b64: str, scale_factor: float) -> str:
"""
Scale down a base64 encoded image using Pillow.
Args:
image_b64: Base64 encoded image string
scale_factor: Factor to scale the image by (0.5 = half size)
Returns:
Base64 encoded scaled image
"""
try:
# Decode base64 to PIL Image
image_data = base64.b64decode(image_b64)
image = Image.open(BytesIO(image_data))
if image is None:
return image_b64
# Get original dimensions
width, height = image.size
# Calculate new dimensions
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
# Resize the image using high quality resampling
resized_image = image.resize(
(new_width, new_height),
Image.LANCZOS
)
# Convert back to base64
buffer = BytesIO()
resized_image.save(buffer, format="PNG")
resized_image_b64 = base64.b64encode(buffer.getvalue()).decode()
return resized_image_b64
except Exception:
return image_b64
def calculate_iou(rect1: Rect, rect2: Rect) -> float:
"""
Calculate Intersection over Union between two rectangles.
Args:
rect1: First rectangle with left, top, right, bottom keys
rect2: Second rectangle with left, top, right, bottom keys
Returns:
IoU value
"""
# Calculate intersection
intersect_left = max(rect1.left, rect2.left)
intersect_top = max(rect1.top, rect2.top)
intersect_right = min(rect1.right, rect2.right)
intersect_bottom = min(rect1.bottom, rect2.bottom)
# Check if intersection exists
if intersect_right < intersect_left or intersect_bottom < intersect_top:
return 0.0 # No intersection
# Calculate area of each rectangle
area1 = (rect1.right - rect1.left) * (rect1.bottom - rect1.top)
area2 = (rect2.right - rect2.left) * (rect2.bottom - rect2.top)
# Calculate area of intersection
intersection_area = (intersect_right - intersect_left) * (intersect_bottom - intersect_top)
# Calculate union area
union_area = area1 + area2 - intersection_area
# Calculate IoU
return intersection_area / union_area if union_area > 0 else 0.0
def is_fully_contained(rect1: Rect, rect2: Rect) -> bool:
"""
Check if rect1 is fully contained within rect2.
Args:
rect1: First rectangle with left, top, right, bottom keys
rect2: Second rectangle with left, top, right, bottom keys
Returns:
True if rect1 is fully contained within rect2
"""
return (rect1.left >= rect2.left and
rect1.right <= rect2.right and
rect1.top >= rect2.top and
rect1.bottom <= rect2.bottom)
def filter_overlapping_elements(elements: List[InteractiveElement], iou_threshold: float = 0.7) -> List[InteractiveElement]:
"""
Filter overlapping elements using weight and IoU.
Args:
elements: Elements to filter
iou_threshold: Threshold for considering elements as overlapping
Returns:
Filtered elements
"""
if not elements:
return []
# Sort by area (descending), then by weight (descending)
elements.sort(key=lambda e: (
-(e.rect.width * e.rect.height), # Negative area for descending sort
-e.weight # Negative weight for descending sort
))
filtered_elements: List[InteractiveElement] = []
# Add elements one by one, checking against already added elements
for current in elements:
should_add = True
# For each element already in our filtered list
for existing in filtered_elements:
# Check overlap with IoU
iou = calculate_iou(current.rect, existing.rect)
if iou > iou_threshold:
should_add = False
break
# Check if current element is fully contained within an existing element with higher weight
if is_fully_contained(current.rect, existing.rect):
if existing.weight >= current.weight and existing.z_index == current.z_index:
should_add = False
break
else:
# If current element has higher weight and is more than 50% of the size of the existing element, remove the existing element
if current.rect.width * current.rect.height >= existing.rect.width * existing.rect.height * 0.5:
filtered_elements.remove(existing)
break
if should_add:
filtered_elements.append(current)
return filtered_elements
def sort_elements_by_position(elements: List[InteractiveElement]) -> List[InteractiveElement]:
"""
Sort elements by position (top to bottom, left to right).
Args:
elements: Elements to sort
Returns:
Sorted elements
"""
if not elements:
return []
# Define what "same row" means
ROW_THRESHOLD = 20 # pixels
# First, group elements into rows based on Y position
rows = []
current_row = []
# Copy and sort elements by Y position
sorted_by_y = sorted(elements, key=lambda e: e.rect.top)
# Group into rows
for element in sorted_by_y:
if not current_row:
# Start a new row
current_row.append(element)
else:
# Check if this element is in the same row as the previous ones
last_element = current_row[-1]
if abs(element.rect.top - last_element.rect.top) <= ROW_THRESHOLD:
# Same row
current_row.append(element)
else:
# New row
rows.append(list(current_row))
current_row = [element]
# Add the last row if not empty
if current_row:
rows.append(current_row)
# Sort each row by X position (left to right)
for row in rows:
row.sort(key=lambda e: e.rect.left)
# Flatten the rows back into a single array
elements = [element for row in rows for element in row]
for i, element in enumerate(elements):
element.index = i
return elements
def filter_elements(
elements: List[InteractiveElement],
iou_threshold: float = 0.7
) -> List[InteractiveElement]:
"""
Combine interactive elements from multiple detection methods and filter duplicates.
Args:
elements: Interactive elements from multiple detection methods
iou_threshold: Threshold for considering elements as overlapping
Returns:
Combined and filtered elements
"""
#Filter overlapping elements
filtered = filter_overlapping_elements(elements, iou_threshold)
# Sort elements by position
sorted_elements = sort_elements_by_position(filtered)
return sorted_elements
```
## /index/cli.py
```py path="/index/cli.py"
#!/usr/bin/env python
import asyncio
import json
import logging
import os
import subprocess
import time
from typing import Dict, List, Optional
import requests
import typer
from dotenv import load_dotenv
from rich.console import Console
from rich.logging import RichHandler
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt
from textual.app import App
from textual.containers import Container, Horizontal, Vertical
from textual.reactive import reactive
from textual.widgets import Button, Footer, Header, Input, Static
from index.agent.agent import Agent
from index.agent.models import AgentOutput, AgentState
from index.browser.browser import BrowserConfig
from index.llm.llm import BaseLLMProvider
from index.llm.providers.anthropic import AnthropicProvider
from index.llm.providers.gemini import GeminiProvider
from index.llm.providers.openai import OpenAIProvider
load_dotenv()
# Create Typer app
app = typer.Typer(help="Index - Browser AI agent CLI")
# Configuration constants
BROWSER_STATE_FILE = "browser_state.json"
DEFAULT_CHROME_PATH = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"
DEFAULT_DEBUGGING_PORT = 9222
console = Console()
def setup_logging(debug: bool = False):
"""Configure logging based on debug flag"""
log_level = logging.INFO if debug else logging.WARNING
# Configure root logger
logging.basicConfig(
level=log_level,
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True, console=console)]
)
# Set specific logger levels
logging.getLogger("index").setLevel(log_level)
logging.getLogger("playwright").setLevel(logging.WARNING) # Always keep playwright at WARNING
if debug:
console.print("[yellow]Debug mode enabled - logging set to INFO level[/]")
class AgentSession:
"""Manages an agent session with state persistence"""
def __init__(self, llm: Optional[BaseLLMProvider] = None, use_local_chrome: bool = False, chrome_path: str = DEFAULT_CHROME_PATH, debugging_port: int = DEFAULT_DEBUGGING_PORT, debug: bool = False):
self.llm = llm
self.chrome_process = None
self.use_local_chrome = use_local_chrome
self.chrome_path = chrome_path
self.debugging_port = debugging_port
self.logger = logging.getLogger("index.agent_session")
browser_config = None
if os.path.exists(BROWSER_STATE_FILE) and not use_local_chrome:
with open(BROWSER_STATE_FILE, "r") as f:
self.storage_state = json.load(f)
console.print("[green]Loaded existing browser state[/green]")
browser_config = BrowserConfig(
storage_state=self.storage_state,
viewport_size={
"width": 1200,
"height": 800
}
)
else:
if use_local_chrome:
# Launch Chrome and connect to it
self._launch_local_chrome()
browser_config = BrowserConfig(
cdp_url="http://localhost:" + str(self.debugging_port),
)
else:
browser_config = BrowserConfig(
viewport_size={
"width": 1200,
"height": 800
}
)
self.agent = Agent(llm=self.llm, browser_config=browser_config)
self.agent_state: Optional[str] = None
self.step_count: int = 0
self.action_results: List[Dict] = []
self.is_running: bool = False
self.storage_state: Optional[Dict] = None
def _launch_local_chrome(self):
"""Launch a local Chrome instance with remote debugging enabled"""
# Check if Chrome is already running with the specified debugging port
try:
response = requests.get(f"http://localhost:{self.debugging_port}/json/version", timeout=2)
if response.status_code == 200:
console.print(f"[green]Connected to already running Chrome instance on port {self.debugging_port}[/green]")
self.logger.info(f"Connected to existing Chrome instance on port {self.debugging_port}")
return
except requests.RequestException:
# No running Chrome instance found on the specified port, proceed with launching a new one
pass
console.print(f"[blue]Launching Chrome from {self.chrome_path} with debugging port {self.debugging_port}[/blue]")
try:
self.chrome_process = subprocess.Popen(
[self.chrome_path, f"--remote-debugging-port={self.debugging_port}", "--no-first-run", "--no-default-browser-check"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
console.print("[green]Chrome launched successfully[/green]")
self.logger.info(f"Chrome process started with PID {self.chrome_process.pid}")
# Give Chrome time to start up
time.sleep(2)
except Exception as e:
self.logger.error(f"Failed to launch Chrome: {str(e)}")
console.print(f"[red]Failed to launch Chrome: {str(e)}[/red]")
raise
def save_state(self, agent_output: AgentOutput):
"""Save agent state to file"""
if agent_output.storage_state:
with open(BROWSER_STATE_FILE, "w") as f:
json.dump(agent_output.storage_state, f)
self.logger.info("Agent state saved to file")
console.print("[green]Saved agent state[/green]")
async def run_agent(self, prompt: str) -> AgentOutput:
"""Run the agent with the given prompt"""
self.is_running = True
self.logger.info(f"Running agent with prompt: {prompt}")
try:
# Run the agent
if self.agent_state:
result = await self.agent.run(
prompt=prompt,
agent_state=self.agent_state,
close_context=False,
return_storage_state=True,
return_agent_state=True
)
else:
result = await self.agent.run(
prompt=prompt,
close_context=False,
return_storage_state=True,
return_agent_state=True
)
self.step_count = result.step_count
self.agent_state = result.agent_state.model_dump_json()
self.save_state(result)
return result
finally:
self.is_running = False
async def stream_run(self, prompt: str):
"""Run the agent with streaming output"""
self.is_running = True
self.logger.info(f"Running agent with streaming and prompt: {prompt}")
try:
# Run the agent with streaming
if self.agent_state:
stream = self.agent.run_stream(
prompt=prompt,
agent_state=self.agent_state,
close_context=False,
max_steps=500, # large number to allow the agent to run for a long time
return_agent_state=True,
return_storage_state=True
)
else:
stream = self.agent.run_stream(
prompt=prompt,
close_context=False,
max_steps=500, # large number to allow the agent to run for a long time
return_agent_state=True,
return_storage_state=True
)
final_output = None
async for chunk in stream:
# Directly yield the raw chunk without any modifications
yield chunk
# Store final output for state saving
if chunk.type == "final_output":
final_output = chunk.content
if final_output:
self.step_count = final_output.step_count
self.agent_state = final_output.agent_state.model_dump_json()
self.save_state(final_output)
finally:
self.is_running = False
def reset(self):
"""Reset agent state"""
if os.path.exists(BROWSER_STATE_FILE):
os.remove(BROWSER_STATE_FILE)
self.agent_state = None
self.step_count = 0
self.action_results = []
self.logger.info("Agent state reset")
console.print("[yellow]Agent state reset[/yellow]")
async def close(self):
"""Close the agent and any associated resources"""
# Close the browser instance
if self.agent and self.agent.browser:
self.logger.info("Closing browser instance")
await self.agent.browser.close()
# Terminate Chrome process if launched locally
if self.chrome_process:
self.logger.info(f"Terminating Chrome process with PID {self.chrome_process.pid}")
console.print("[yellow]Terminating local Chrome instance...[/yellow]")
self.chrome_process.terminate()
self.chrome_process = None
class AgentUI(App):
"""Textual-based UI for interacting with the agent"""
CSS = """
Header {
background: #3b82f6;
color: white;
text-align: center;
padding: 1;
}
Footer {
background: #1e3a8a;
color: white;
text-align: center;
padding: 1;
}
#prompt-input {
padding: 1 2;
border: tall $accent;
margin: 1 1;
height: 3;
}
#output-container {
height: 1fr;
border: solid #ccc;
background: #f8fafc;
padding: 1;
margin: 0 1;
overflow-y: auto;
}
#action-results {
height: 15;
border: solid #ccc;
background: #f8fafc;
margin: 0 1 1 1;
overflow-y: auto;
}
.action-result {
border: solid #e5e7eb;
margin: 1 0;
padding: 1;
}
.action-title {
color: #3b82f6;
text-style: bold;
}
.action-content {
margin-top: 1;
}
Button {
margin: 1 1;
}
#buttons-container {
height: auto;
align: center middle;
}
.running {
color: #f97316;
text-style: bold;
}
.completed {
color: #22c55e;
text-style: bold;
}
.error {
color: #ef4444;
text-style: bold;
}
"""
TITLE = "Index Browser Agent CLI"
BINDINGS = [
("q", "quit", "Quit"),
("r", "reset", "Reset Agent"),
("ctrl+s", "send", "Send Message"),
]
agent_session = None
status = reactive("Ready")
def compose(self):
yield Header()
with Vertical():
with Container(id="output-container"):
yield Static(id="output", expand=True)
with Container(id="action-results"):
yield Static(id="results", expand=True)
with Horizontal(id="buttons-container"):
yield Button("Send", id="send-btn", variant="primary")
yield Button("Reset", id="reset-btn", variant="error")
yield Input(placeholder="Enter your task or follow-up message...", id="prompt-input")
yield Footer()
def update_output(self):
"""Update the output display"""
output = ""
if self.agent_session.agent_state:
state = AgentState.model_validate_json(self.agent_session.agent_state)
# Get the latest user and assistant messages
user_msgs = [m for m in state.messages if m.role == "user"]
assistant_msgs = [m for m in state.messages if m.role == "assistant"]
if user_msgs:
latest_user = user_msgs[-1]
output += f"[bold blue]User:[/] {latest_user.content}\n\n"
if assistant_msgs:
latest_assistant = assistant_msgs[-1]
output += f"[bold green]Assistant:[/] {latest_assistant.content}\n\n"
output += f"[dim]Steps completed: {self.agent_session.step_count}[/]\n"
output += f"[dim]Status: {self.status}[/]\n"
else:
output = "[italic]No previous session. Start by sending a task.[/]"
self.query_one("#output", Static).update(Markdown(output))
# Update action results
if self.agent_session.action_results:
results_output = ""
for i, result in enumerate(reversed(self.agent_session.action_results[-5:])):
action_type = result.get("type", "unknown")
content = result.get("content", {})
if action_type == "step":
action_result = content.get("action_result", {})
summary = content.get("summary", "No summary available")
results_output += f"[bold]Step {i+1}[/]\n"
results_output += f"Summary: {summary}\n"
if action_result.get("is_done"):
results_output += "[green]Task completed[/]\n"
if action_result.get("give_control"):
results_output += "[yellow]Agent requested human control[/]\n"
results_output += f"Message: {action_result.get('content', '')}\n"
results_output += "\n"
elif action_type == "error":
results_output += "[bold red]Error[/]\n"
results_output += f"{content}\n\n"
self.query_one("#results", Static).update(Markdown(results_output))
async def on_button_pressed(self, event: Button.Pressed):
"""Handle button presses"""
if event.button.id == "send-btn":
await self.action_send()
elif event.button.id == "reset-btn":
self.action_reset()
def action_reset(self):
"""Reset the agent state"""
self.agent_session.reset()
self.agent_session.action_results = []
self.update_output()
async def action_send(self):
"""Send the current prompt to the agent"""
prompt = self.query_one("#prompt-input", Input).value
if not prompt.strip():
return
self.status = "Running..."
self.query_one("#prompt-input", Input).value = ""
self.update_output()
try:
# Stream the results to provide real-time feedback
async for chunk in self.agent_session.stream_run(prompt):
self.agent_session.action_results.append(chunk)
self.update_output()
await asyncio.sleep(0.1) # Small delay to ensure UI updates
self.status = "Ready"
except Exception as e:
self.status = f"Error: {str(e)}"
finally:
self.update_output()
async def on_mount(self):
"""Called when the app is mounted"""
# Register cleanup handler
self.set_interval(0.1, self._check_exit)
async def _check_exit(self):
"""Check if app is exiting and clean up resources"""
if self.exiting:
if self.agent_session:
await self.agent_session.close()
def action_quit(self):
"""Quit the application"""
self.exit()
@app.command()
def run(
prompt: str = typer.Option(None, "--prompt", "-p", help="Initial prompt to send to the agent"),
use_local_chrome: bool = typer.Option(False, "--local-chrome", help="Use local Chrome instance instead of launching a new browser"),
chrome_path: str = typer.Option(DEFAULT_CHROME_PATH, "--chrome-path", help="Path to Chrome executable"),
debugging_port: int = typer.Option(DEFAULT_DEBUGGING_PORT, "--port", help="Remote debugging port for Chrome"),
debug: bool = typer.Option(False, "--debug", help="Enable debug logging")
):
"""
Launch the interactive loop for the Index browser agent
"""
# Set up logging if debug mode is enabled
setup_logging(debug)
asyncio.run(_interactive_loop(
initial_prompt=prompt,
use_local_chrome=use_local_chrome,
chrome_path=chrome_path,
debugging_port=debugging_port,
debug=debug
))
@app.command(name="ui")
def run_ui(
prompt: str = typer.Option(None, "--prompt", "-p", help="Initial prompt to send to the agent"),
use_local_chrome: bool = typer.Option(False, "--local-chrome", help="Use local Chrome instance instead of launching a new browser"),
chrome_path: str = typer.Option(DEFAULT_CHROME_PATH, "--chrome-path", help="Path to Chrome executable"),
debugging_port: int = typer.Option(DEFAULT_DEBUGGING_PORT, "--port", help="Remote debugging port for Chrome"),
debug: bool = typer.Option(False, "--debug", help="Enable debug logging")
):
"""
Launch the graphical UI for the Index browser agent
"""
# Set up logging if debug mode is enabled
setup_logging(debug)
# Select model and check API key
llm_provider = select_model_and_check_key()
# Initialize UI with the selected LLM provider
agent_ui = AgentUI()
agent_ui.agent_session = AgentSession(
llm=llm_provider,
use_local_chrome=use_local_chrome,
chrome_path=chrome_path,
debugging_port=debugging_port,
debug=debug
)
if prompt:
# If a prompt is provided, we'll send it once the UI is ready
async def send_initial_prompt():
await asyncio.sleep(0.5) # Give UI time to initialize
agent_ui.query_one("#prompt-input", Input).value = prompt
await agent_ui.action_send()
agent_ui.set_interval(0.1, lambda: asyncio.create_task(send_initial_prompt()))
agent_ui.run()
def create_llm_provider(provider: str, model: str) -> BaseLLMProvider:
"""Create an LLM provider based on model choice"""
if provider == "openai":
# OpenAI model
console.print(f"[cyan]Using OpenAI model: {model}[/]")
return OpenAIProvider(model=model, reasoning_effort="low")
elif provider == "gemini":
# Gemini model
if model == "gemini-2.5-pro-preview-03-25":
console.print(f"[cyan]Using Gemini model: {model}[/]")
return GeminiProvider(
model=model,
thinking_token_budget=8192
)
elif model == "gemini-2.5-flash-preview-04-17":
console.print(f"[cyan]Using Gemini model: {model}[/]")
return GeminiProvider(
model=model,
thinking_token_budget=8192
)
else:
raise ValueError(f"Unsupported Gemini model: {model}")
elif provider == "anthropic":
# Anthropic model
console.print(f"[cyan]Using Anthropic model: {model}[/]")
return AnthropicProvider(
model=model,
enable_thinking=True,
thinking_token_budget=2048
)
else:
raise ValueError(f"Unsupported provider: {provider}")
def check_and_save_api_key(required_key: str):
"""Check if API key exists, prompt for it if missing, and save to .env file"""
if not os.environ.get(required_key):
console.print(f"\n[yellow]API key {required_key} not found in environment.[/]")
api_key = Prompt.ask(f"Enter your {required_key}", password=True)
# Save to .env file
env_path = ".env"
if os.path.exists(env_path):
# Read existing content
with open(env_path, "r") as f:
env_content = f.read()
env_content += f"\n{required_key}={api_key}"
with open(env_path, "w") as f:
f.write(env_content)
console.print(f"[green]Saved {required_key} to .env file[/]")
else:
# Create new .env file
with open(env_path, "w") as f:
f.write(f"{required_key}={api_key}")
console.print("[green]Created .env file[/]")
# Update environment variable for current session
os.environ[required_key] = api_key
# Reload dotenv to ensure changes are applied
load_dotenv(override=True)
def select_model_and_check_key():
"""Select a model and check for required API key"""
console.print("\n[bold green]Choose an LLM model:[/]")
console.print("1. [bold]Gemini 2.5 Pro[/]")
console.print("2. [bold]Gemini 2.5 Flash[/]")
console.print("3. [bold]Claude 3.7 Sonnet[/]")
console.print("4. [bold]OpenAI o4-mini[/]")
choice = Prompt.ask(
"[bold]Select model[/]",
choices=["1", "2", "3", "4"],
default="1"
)
provider = ""
model = ""
required_key = ""
# Create LLM provider based on selection
if choice == "1":
provider = "gemini"
model = "gemini-2.5-pro-preview-03-25"
required_key = "GEMINI_API_KEY"
elif choice == "2":
provider = "gemini"
model = "gemini-2.5-flash-preview-04-17"
required_key = "GEMINI_API_KEY"
elif choice == "3":
provider = "anthropic"
model = "claude-3-7-sonnet-20250219"
required_key = "ANTHROPIC_API_KEY"
elif choice == "4":
provider = "openai"
model = "o4-mini"
required_key = "OPENAI_API_KEY"
else:
raise ValueError(f"Invalid choice: {choice}")
# Check and save API key if needed
check_and_save_api_key(required_key)
return create_llm_provider(provider, model)
async def _interactive_loop(initial_prompt: str = None, use_local_chrome: bool = False, chrome_path: str = DEFAULT_CHROME_PATH, debugging_port: int = DEFAULT_DEBUGGING_PORT, debug: bool = False):
"""Implementation of the interactive loop mode"""
# Display welcome panel
console.print(Panel.fit(
"Index Browser Agent Interactive Mode\n"
"Type your message and press Enter. The agent will respond.\n"
"Press Ctrl+C to exit.",
title="Interactive Mode",
border_style="blue"
))
# Select model and check API key
llm_provider = select_model_and_check_key()
# Create agent session with selected provider
session = AgentSession(
llm=llm_provider,
use_local_chrome=use_local_chrome,
chrome_path=chrome_path,
debugging_port=debugging_port,
debug=debug
)
try:
first_message = True
awaiting_human_input = False
while True:
# Check if we're waiting for the user to return control to the agent
if awaiting_human_input:
console.print("\n[yellow]Agent is waiting for control to be returned.[/]")
console.print("[yellow]Press Enter to return control to the agent...[/]", end="")
input() # Wait for Enter key
user_message = "Returning control back, continue your task"
console.print(f"\n[bold blue]Your message:[/] {user_message}")
awaiting_human_input = False
# Normal message input flow
elif first_message and initial_prompt:
user_message = initial_prompt
console.print(f"\n[bold blue]Your message:[/] {user_message}")
first_message = False
else:
console.print("\n[bold blue]Your message:[/] ", end="")
user_message = input()
first_message = False
if not user_message.strip():
continue
console.print("\n[bold cyan]Agent is working...[/]")
step_num = 1
human_control_requested = False
# Run the agent with streaming output
try:
async for chunk in session.stream_run(user_message):
if chunk.type == "step":
action_result = chunk.content.action_result
summary = chunk.content.summary
# Simple single-line output for steps
console.print(f"[bold blue]Step {step_num}:[/] {summary}")
# Display additional info for special actions as separate lines
if action_result and action_result.is_done and not action_result.give_control:
console.print(" [green bold]✓ Task completed successfully![/]")
if action_result and action_result.give_control:
human_control_requested = True
message = action_result.content or "No message provided"
console.print(" [yellow bold]⚠ Human control requested:[/]")
console.print(f" [yellow]{message}[/]")
# Increment step counter for next step
step_num += 1
elif chunk.type == "step_error":
console.print(f"[bold red]Error:[/] {chunk.content}")
elif chunk.type == "final_output":
# Keep panel for final output
result_content = chunk.content.result.content if chunk.content.result else "No result content"
console.print(Panel(
f"{result_content}",
title="Final Output",
border_style="green",
expand=False
))
except Exception as e:
console.print(f"[bold red]Error:[/] {str(e)}")
console.print(f"[dim]Type: {type(e)}[/]")
console.print_exception()
# After agent completes
if human_control_requested:
console.print("\n[yellow]Agent has requested human control.[/]")
awaiting_human_input = True
else:
console.print("\n[green]Agent has completed the task.[/]")
console.print("[dim]Waiting for your next message...[/]")
except KeyboardInterrupt:
console.print("\n[yellow]Exiting interactive mode...[/]")
# Close the browser before exiting
await session.close()
def main():
"""Entry point for the CLI"""
app()
if __name__ == "__main__":
main()
```
## /index/controller/controller.py
```py path="/index/controller/controller.py"
import inspect
import json
import logging
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Type, get_type_hints
from lmnr import Laminar
from pydantic import BaseModel
from index.agent.models import ActionModel, ActionResult
from index.browser.browser import Browser
from index.controller.default_actions import register_default_actions
logger = logging.getLogger(__name__)
@dataclass
class Action:
"""Represents a registered action"""
name: str
description: str
function: Callable
browser_context: bool = False
class Controller:
"""Controller for browser actions with integrated registry functionality"""
def __init__(
self,
exclude_actions: List[str] = None,
output_model: Optional[Type[BaseModel]] = None,
):
self.exclude_actions = exclude_actions or []
self.output_model = output_model
self._actions: Dict[str, Action] = {}
# Register default actions
register_default_actions(self, self.output_model)
def action(self, description: str = None):
"""
Decorator for registering actions
Args:
description: Optional description of what the action does.
If not provided, uses the function's docstring.
"""
def decorator(func: Callable) -> Callable:
if func.__name__ in self.exclude_actions:
return func
# Use provided description or function docstring
action_description = description
if action_description is None:
action_description = inspect.getdoc(func) or "No description provided"
# Clean up docstring (remove indentation)
action_description = inspect.cleandoc(action_description)
browser_context = False
if 'browser' in inspect.signature(func).parameters:
browser_context = True
@wraps(func)
async def async_wrapper(*args, **kwargs):
return await func(*args, **kwargs)
# Register the action
self._actions[func.__name__] = Action(
name=func.__name__,
description=action_description,
function=async_wrapper,
browser_context=browser_context,
)
return func
return decorator
async def execute_action(
self,
action: ActionModel,
browser: Browser,
) -> ActionResult:
"""Execute an action from an ActionModel"""
action_name = action.name
params = action.params
if params is not None:
with Laminar.start_as_current_span(
name=action_name,
input={
'action': action_name,
'params': params,
},
span_type='TOOL',
):
logger.info(f'Executing action: {action_name} with params: {params}')
action = self._actions.get(action_name)
if action is None:
raise ValueError(f'Action {action_name} not found')
try:
kwargs = params.copy() if params else {}
# Add browser to kwargs if it's provided
if action.browser_context and browser is not None:
kwargs['browser'] = browser
result = await action.function(**kwargs)
Laminar.set_span_output(result)
return result
except Exception as e:
raise RuntimeError(f'Error executing action {action_name}: {str(e)}') from e
else:
raise ValueError('Params are not provided for action: {action_name}')
def get_action_descriptions(self) -> str:
"""Return a dictionary of all registered actions and their metadata"""
action_info = []
for name, action in self._actions.items():
sig = inspect.signature(action.function)
type_hints = get_type_hints(action.function)
# Build parameter info
params = {}
for param_name in sig.parameters.keys():
if param_name == 'browser': # Skip browser parameter in descriptions
continue
param_type = type_hints.get(param_name, Any).__name__
params[param_name] = {
'type': param_type,
}
action_info.append(json.dumps({
'name': name,
'description': action.description,
'parameters': params
}, indent=2))
return '\n\n'.join(action_info)
```
## /index/controller/default_actions.py
```py path="/index/controller/default_actions.py"
import asyncio
import json
import logging
import platform
import re
from tenacity import retry, stop_after_attempt, wait_exponential
from index.agent.models import ActionResult
from index.browser.browser import Browser
logger = logging.getLogger(__name__)
def register_default_actions(controller, output_model=None):
"""Register all default browser actions to the provided controller"""
@controller.action('Complete task')
async def done(text: str):
return ActionResult(is_done=True, content=text)
@controller.action()
async def give_human_control(message: str, browser: Browser):
"""Give human control of the browser. Use this action when you need to use user information, such as first name, last name, email, phone number, booking information, login/password, etc. to proceed with the task. Also, if you can't solve the CAPTCHA, use this action.
Args:
message: Message to give to the human, explaining why you need human intervention.
"""
return ActionResult(give_control=True, content=message, is_done=True)
@controller.action()
async def search_google(query: str, browser: Browser):
"""
Open google search in new tab and search for the query.
"""
page = await browser.get_current_page()
await page.goto(f'https://www.google.com/search?q={query}&udm=14')
await page.wait_for_load_state()
msg = f"Searched for '{query}' in Google"
logger.info(msg)
return ActionResult(content=msg)
@controller.action()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
reraise=True,
before_sleep=lambda retry_state: logger.warning(
f"Retrying step after error: {retry_state.outcome.exception()}. Attempt {retry_state.attempt_number}"
)
)
async def go_to_url(url: str, browser: Browser):
"""Navigate to URL in the current tab"""
page = await browser.get_current_page()
await page.goto(url, wait_until='domcontentloaded')
await asyncio.sleep(1.5)
msg = f"Navigated to {url}"
logger.info(msg)
return ActionResult(content=msg)
@controller.action()
async def go_back_to_previous_page(browser: Browser):
"""Go back to the previous page"""
try:
page = await browser.get_current_page()
await page.go_back(wait_until='domcontentloaded')
await asyncio.sleep(2)
msg = 'Navigated back to the previous page'
logger.info(msg)
return ActionResult(content=msg)
except Exception as e:
logger.debug(f'During go_back: {e}')
return ActionResult(error=str(e))
@controller.action()
async def click_on_spreadsheet_cell(row: str, column: str, browser: Browser) -> ActionResult:
"""Click on a spreadsheet cell at a specific row and column. You HAVE to use this action when you need to click on a cell in a spreadsheet. DON'T try to use click_element action, it will not work.
Args:
row: Row of the cell to click on, it should be a number formatted as a string. e.g. "1"
column: Column of the cell to click on, it should be a letter formatted as a string. e.g. "A"
"""
page = await browser.get_current_page()
state = browser.get_state()
elements = state.interactive_elements.values()
row_element = next((e for e in elements if e.browser_agent_id == f"row_{row}"), None)
column_element = next((e for e in elements if e.browser_agent_id == f"column_{column}"), None)
if not row_element or not column_element:
return ActionResult(error='Row or column element not found - pay close attention to the row and column numbers.')
# reseting click just in case
await page.mouse.click(state.viewport.width / 2, state.viewport.height / 2)
await asyncio.sleep(0.05)
await page.mouse.click(column_element.center.x, row_element.center.y, click_count=2)
await asyncio.sleep(0.05)
return ActionResult(content=f'Clicked on spreadsheet cell with row {row} and column {column}')
@controller.action()
async def click_element(index: int, browser: Browser, wait_after_click: bool = False):
"""
Click on the element with index.
Args:
index: Index of the element to click on.
wait_after_click: If True, wait for 2 second after clicking the element. Only set it to True when you think that clicking will trigger loading state, for instance navigation to new page, search, loading of a content, etc.
"""
# clean index if it contains any non-numeric characters
cleaned_index_str = re.sub(r'\D', '', str(index))
if cleaned_index_str == '':
logger.error(f'Index is not a number. Index: {index}')
return ActionResult(error="`index` should be a valid number.")
index = int(cleaned_index_str)
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f"Element with index {index} does not exist - retry or use alternative actions.")
element = state.interactive_elements[index]
initial_pages = len(browser.context.pages) if browser.context else 0
try:
page = await browser.get_current_page()
await page.mouse.click(element.center.x, element.center.y)
msg = f'Clicked element with index {index}: <{element.tag_name}>{element.tag_name}>'
logger.info(msg)
if browser.context and len(browser.context.pages) > initial_pages:
new_tab_msg = 'New tab opened - switching to it'
msg += f' - {new_tab_msg}'
logger.info(new_tab_msg)
await browser.switch_to_tab(-1)
if wait_after_click:
await asyncio.sleep(2)
return ActionResult(content=msg)
except Exception as e:
return ActionResult(error=str(e))
@controller.action(
description='Use this action to wait for the page to load, if you see that the content on the clean screenshot is empty or loading UI elements such as skeleton screens. This action will wait for page to load. Then you can continue with your actions.',
)
async def wait_for_page_to_load() -> ActionResult:
return ActionResult(content='Waited for page to load')
@controller.action()
async def enter_text(text: str, press_enter: bool, browser: Browser):
"""Enter text with a keyboard. Use it AFTER you have clicked on an input element. This action will override the current text in the element.
Args:
text: Text to enter with a keyboard.
press_enter: If True, `Enter` button will be pressed after entering the text. Use this when you think it would make sense to press `Enter` after entering the text, such as when you're submitting a form, performing a search, etc.
"""
try:
page = await browser.get_current_page()
# clear the element
await page.keyboard.press("ControlOrMeta+a")
await asyncio.sleep(0.1)
await page.keyboard.press("Backspace")
await asyncio.sleep(0.1)
# input text into the element
await page.keyboard.type(text)
if press_enter:
await page.keyboard.press("Enter")
await asyncio.sleep(2)
msg = f'Entered "{text}" on the keyboard. Make sure to double check that the text was entered to where you intended.'
logger.info(msg)
return ActionResult(content=msg)
except Exception as e:
return ActionResult(error=f'Failed to enter text. Error: {str(e)}')
# Tab Management Actions
@controller.action('Switch tab')
async def switch_tab(page_id: int, browser: Browser):
await browser.switch_to_tab(page_id)
await asyncio.sleep(0.5)
msg = f'Switched to tab {page_id}'
logger.info(msg)
return ActionResult(content=msg)
@controller.action('Open url in new tab')
async def open_tab(url: str, browser: Browser):
await browser.create_new_tab(url)
msg = f'Opened new tab with {url}'
logger.info(msg)
return ActionResult(content=msg)
@controller.action(
"Scrolls entire page down. Use this action when you want to scroll the entire page down. Don't use this action if you want to scroll over a specific scrollable area on a page."
)
async def scroll_page_down(browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
# move mouse to the center of the page
await page.mouse.move(state.viewport.width / 2, state.viewport.height / 2)
await asyncio.sleep(0.1)
# scroll down by one page
await page.mouse.wheel(0, state.viewport.height * 0.8)
return ActionResult(content="Scrolled mouse wheel down (it doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
"Scrolls entire page up. Use this action when you want to scroll the entire page up. Don't use this action if you want to scroll over a specific scrollable area on a page."
)
async def scroll_page_up(browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
# move mouse to the center of the page
await page.mouse.move(state.viewport.width / 2, state.viewport.height / 2)
await asyncio.sleep(0.1)
# scroll up by one page
await page.mouse.wheel(0, -state.viewport.height * 0.8)
return ActionResult(content="Scrolled mouse wheel up (it doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
"Moves mouse to the element with index `index`, located inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel down."
)
async def scroll_down_over_element(index: int, browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions')
element = state.interactive_elements[index]
await page.mouse.move(element.center.x, element.center.y)
await asyncio.sleep(0.1)
await page.mouse.wheel(0, state.viewport.height / 3)
return ActionResult(content=f"Move mouse to element with index {index} and scroll mouse wheel down. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
"Moves mouse to the element with index `index`, located inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel up."
)
async def scroll_up_over_element(index: int, browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions')
element = state.interactive_elements[index]
await page.mouse.move(element.center.x, element.center.y)
await asyncio.sleep(0.1)
await page.mouse.wheel(0, -state.viewport.height / 3)
return ActionResult(content=f"Move mouse to element with index {index} and scroll mouse wheel up. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
"Moves mouse at the location of the element with index `index`, which should be inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel horizontally to the right."
)
async def scroll_right_over_element(index: int, browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f'Element index {index} does not exist - retry or use an alternative action')
element = state.interactive_elements[index]
await page.mouse.move(element.center.x, element.center.y)
await asyncio.sleep(0.1)
await page.mouse.wheel(state.viewport.width / 3, 0)
return ActionResult(content=f"Moved mouse to element with index {index} and scroll mouse wheel horizontally to the right. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
"Moves mouse at the location of the element with index `index`, which should be inside scrollable area of the webpage, identified by scrollbars. Then scrolls mouse wheel horizontally to the left."
)
async def scroll_left_over_element(index: int, browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f'Element index {index} does not exist - retry or use an alternative action')
element = state.interactive_elements[index]
await page.mouse.move(element.center.x, element.center.y)
await asyncio.sleep(0.1)
await page.mouse.wheel(-state.viewport.width / 3, 0)
return ActionResult(content=f"Moved mouse to element with index {index} and scroll mouse wheel horizontally to the left. (It doesn't guarantee that something has scrolled, you need to check new state screenshot to confirm)")
@controller.action(
'Press enter key. Use this action when you need to submit a form or perform an action that requires pressing enter.'
)
async def press_enter(browser: Browser):
page = await browser.get_current_page()
await page.keyboard.press('Enter')
return ActionResult(content='Pressed enter key')
@controller.action(
'Remove all text in the element with index.'
)
async def clear_text_in_element(index: int, browser: Browser):
page = await browser.get_current_page()
state = browser.get_state()
if index not in state.interactive_elements:
return ActionResult(error=f'Element index {index} does not exist - retry or use alternative actions')
element = state.interactive_elements[index]
await page.mouse.move(element.center.x, element.center.y)
await page.mouse.click(element.center.x, element.center.y)
await asyncio.sleep(0.1)
if platform.system() == "Darwin":
await page.keyboard.press('Meta+A')
else:
await page.keyboard.press('Control+A')
await asyncio.sleep(0.1)
await page.keyboard.press('Backspace')
return ActionResult(content='Removed all text in the element with index')
@controller.action()
async def get_select_options(index: int, browser: Browser) -> ActionResult:
"""Get all options from a element. Use this action when you need to get all options from a dropdown."""
try:
# Get the page and element information
page = await browser.get_current_page()
interactive_elements = browser.get_state().interactive_elements
# Verify the element exists and is a select
if index not in interactive_elements:
return ActionResult(error=f"No element found with index {index}")
element = interactive_elements[index]
# Check if it's a select element
if element.tag_name.lower() != 'select':
return ActionResult(error=f"Element {index} is not a select element, it's a {element.tag_name}")
# Use the unique ID to find the element
options_data = await page.evaluate("""
(args) => {
// Find the select element using the unique ID
const select = document.querySelector(`[data-browser-agent-id="${args.browserAgentId}"]`);
if (!select) return null;
// Get all options
return {
options: Array.from(select.options).map(opt => ({
text: opt.text,
value: opt.value,
index: opt.index
})),
id: select.id,
name: select.name
};
}
""", {"browserAgentId": element.browser_agent_id})
# Process options from direct approach
formatted_options = []
for opt in options_data['options']:
encoded_text = json.dumps(opt['text'])
formatted_options.append(f'{opt["index"]}: option={encoded_text}')
msg = '\n'.join(formatted_options)
msg += '\nIf you decide to use this select element, use the exact option name in select_dropdown_option'
logger.info(f'Found dropdown with ID: {options_data["id"]}, Name: {options_data["name"]}')
return ActionResult(content=msg)
except Exception as e:
logger.error(f'Failed to get dropdown options: {str(e)}')
return ActionResult(error=f'Error getting dropdown options: {str(e)}')
@controller.action(
description='Select an option from a element by the text (name) of the option. Use this after get_select_options and when you need to select an option from a dropdown.',
)
async def select_dropdown_option(
index: int,
option: str,
browser: Browser,
) -> ActionResult:
"""Select dropdown option by the text of the option you want to select"""
try:
# Get the interactive element
page = await browser.get_current_page()
interactive_elements = browser.get_state().interactive_elements
# Verify the element exists and is a select
if index not in interactive_elements:
return ActionResult(error=f"No element found with index {index}")
element = interactive_elements[index]
# Check if it's a select element
if element.tag_name.lower() != 'select':
return ActionResult(error=f"Element {index} is not a select element, it's a {element.tag_name}")
logger.debug(f"Attempting to select '{option}' using browser_agent_id: {element.browser_agent_id}")
# Use JavaScript to select the option using the unique ID
result = await page.evaluate("""
(args) => {
const uniqueId = args.uniqueId;
const optionText = args.optionText;
try {
// Find the select element by unique ID - works across frames too
function findElementByUniqueId(root, id) {
// Check in main document first
let element = document.querySelector(`[data-browser-agent-id="${id}"]`);
if (element) return element;
}
const select = findElementByUniqueId(window, uniqueId);
if (!select) {
return {
success: false,
error: "Select element not found with ID: " + uniqueId
};
}
// Find the option with matching text
let found = false;
let selectedValue = null;
let selectedIndex = -1;
for (let i = 0; i < select.options.length; i++) {
const opt = select.options[i];
if (opt.text === optionText) {
// Select this option
opt.selected = true;
found = true;
selectedValue = opt.value;
selectedIndex = i;
// Trigger change event
const event = new Event('change', { bubbles: true });
select.dispatchEvent(event);
break;
}
}
if (found) {
return {
success: true,
value: selectedValue,
index: selectedIndex
};
} else {
return {
success: false,
error: "Option not found: " + optionText,
availableOptions: Array.from(select.options).map(o => o.text)
};
}
} catch (e) {
return {
success: false,
error: e.toString()
};
}
}
""", {"uniqueId": element.browser_agent_id, "optionText": option})
if result.get('success'):
msg = f"Selected option '{option}' with value '{result.get('value')}' at index {result.get('index')}"
logger.info(msg)
return ActionResult(content=msg)
else:
error_msg = result.get('error', 'Unknown error')
if 'availableOptions' in result:
available = result.get('availableOptions', [])
error_msg += f". Available options: {', '.join(available)}"
logger.error(f"Selection failed: {error_msg}")
return ActionResult(error=error_msg)
except Exception as e:
msg = f'Selection failed: {str(e)}'
logger.error(msg)
return ActionResult(error=msg)
```
## /index/llm/llm.py
```py path="/index/llm/llm.py"
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
class MessageRole(Enum):
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
TOOL = "tool" # For OpenAI function calling responses
@dataclass
class MessageContent:
"""Base class for message content"""
cache_control: Optional[bool] = None
@dataclass
class TextContent(MessageContent):
"""Text content in a message"""
text: str = ""
type: str = "text"
@dataclass
class ImageContent(MessageContent):
"""Image content in a message"""
image_b64: Optional[str] = None
image_url: Optional[str] = None
type: str = "image"
@dataclass
class ThinkingBlock(MessageContent):
"""Thinking block in a message"""
thinking: str = ""
signature: str = ""
type: str = "thinking"
@dataclass
class Message:
"""A message in a conversation"""
role: Union[str, MessageRole]
content: Union[str, List[Union[TextContent, ImageContent, ThinkingBlock]]]
name: Optional[str] = None # For tool/function messages
tool_call_id: Optional[str] = None # For tool/function responses
is_state_message: Optional[bool] = False
def __post_init__(self):
# Convert role enum to string if needed
if isinstance(self.role, MessageRole):
self.role = self.role.value
# Convert string content to TextContent if needed
if isinstance(self.content, str):
self.content = [TextContent(text=self.content)]
elif isinstance(self.content, (TextContent, ImageContent)):
self.content = [self.content]
def to_openai_format(self) -> Dict:
"""Convert to OpenAI message format"""
message = {"role": self.role}
if isinstance(self.content, str):
message["content"] = self.content
elif isinstance(self.content, list):
content_blocks = []
for content_block in self.content:
block = {}
if isinstance(content_block, TextContent):
block["type"] = "text"
block["text"] = content_block.text
elif isinstance(content_block, ImageContent):
block["type"] = "image_url"
block["image_url"] = {
"url": "data:image/png;base64," + content_block.image_b64
}
content_blocks.append(block)
message["content"] = content_blocks
return message
def to_anthropic_format(self, enable_cache_control: bool = True) -> Dict:
"""Convert to Anthropic message format"""
message = {"role": self.role}
if isinstance(self.content, str):
message["content"] = self.content
elif isinstance(self.content, list):
content_blocks = []
for content_block in self.content:
block = {}
if isinstance(content_block, TextContent):
block["type"] = "text"
block["text"] = content_block.text
elif isinstance(content_block, ImageContent):
block["type"] = "image"
block["source"] = {
"type": "base64",
"media_type": "image/png", # This should be configurable based on image type
"data": content_block.image_b64 if content_block.image_b64 else content_block.image_url
}
elif isinstance(content_block, ThinkingBlock):
block["type"] = "thinking"
block["thinking"] = content_block.thinking
block["signature"] = content_block.signature
if content_block.cache_control and enable_cache_control:
block["cache_control"] = {"type": "ephemeral"}
content_blocks.append(block)
message["content"] = content_blocks
return message
def to_gemini_format(self) -> Dict:
"""Convert to Gemini message format"""
parts = []
if isinstance(self.content, str):
parts = [{"text": self.content}]
elif isinstance(self.content, list):
for content_block in self.content:
if isinstance(content_block, TextContent):
parts.append({"text": content_block.text})
elif isinstance(content_block, ImageContent):
if content_block.image_b64:
parts.append({"inline_data": {
"mime_type": "image/png",
"data": content_block.image_b64
}})
elif content_block.image_url:
parts.append({"file_data": {
"mime_type": "image/png",
"file_uri": content_block.image_url
}})
return {
"role": 'model' if self.role == 'assistant' else 'user',
"parts": parts
}
def remove_cache_control(self):
if isinstance(self.content, list):
for content_block in self.content:
if isinstance(content_block, TextContent):
content_block.cache_control = None
elif isinstance(content_block, ImageContent):
content_block.cache_control = None
def add_cache_control_to_state_message(self):
if not self.is_state_message or not isinstance(self.content, list) or len(self.content) < 3:
return
if len(self.content) == 3:
self.content[-1].cache_control = True
def has_cache_control(self):
if not isinstance(self.content, list):
return False
return any(content.cache_control for content in self.content)
class LLMResponse(BaseModel):
content: str
raw_response: Any
usage: Dict[str, int]
thinking: Optional[ThinkingBlock] = None
class BaseLLMProvider(ABC):
def __init__(self, model: str):
self.model = model
@abstractmethod
async def call(
self,
messages: List[Message],
temperature: float = 1,
max_tokens: Optional[int] = None,
**kwargs
) -> LLMResponse:
pass
```
## /index/llm/providers/__init__.py
```py path="/index/llm/providers/__init__.py"
from .anthropic import AnthropicProvider
from .anthropic_bedrock import AnthropicBedrockProvider
from .gemini import GeminiProvider
from .openai import OpenAIProvider
__all__ = [
"OpenAIProvider",
"AnthropicProvider",
"AnthropicBedrockProvider",
"GeminiProvider",
]
```
## /index/llm/providers/anthropic.py
```py path="/index/llm/providers/anthropic.py"
import logging
from typing import List, Optional
import backoff
from anthropic import AsyncAnthropic
from ..llm import BaseLLMProvider, LLMResponse, Message, ThinkingBlock
from ..providers.anthropic_bedrock import AnthropicBedrockProvider
logger = logging.getLogger(__name__)
class AnthropicProvider(BaseLLMProvider):
def __init__(self, model: str, enable_thinking: bool = True, thinking_token_budget: Optional[int] = 2048):
super().__init__(model=model)
self.client = AsyncAnthropic()
self.thinking_token_budget = thinking_token_budget
self.anthropic_bedrock = AnthropicBedrockProvider(model=f"us.anthropic.{model}-v1:0", enable_thinking=enable_thinking, thinking_token_budget=thinking_token_budget)
self.enable_thinking = enable_thinking
@backoff.on_exception(
backoff.constant, # constant backoff
Exception, # retry on any exception
max_tries=3, # stop after 3 attempts
interval=10,
on_backoff=lambda details: logger.info(
f"API error, retrying in {details['wait']:.2f} seconds... (attempt {details['tries']})"
)
)
async def call(
self,
messages: List[Message],
temperature: float = -1,
max_tokens: Optional[int] = 16000,
**kwargs
) -> LLMResponse:
# Make a copy of messages to prevent modifying the original list during retries
messages_copy = messages.copy()
if len(messages_copy) < 2 or messages_copy[0].role != "system":
raise ValueError("System message is required for Anthropic and length of messages must be at least 2")
system_message = messages_copy[0]
if self.enable_thinking:
try:
response = await self.client.messages.create(
model=self.model,
system=system_message.to_anthropic_format()["content"],
messages=[msg.to_anthropic_format() for msg in messages_copy[1:]],
temperature=1,
thinking={
"type": "enabled",
"budget_tokens": self.thinking_token_budget,
},
max_tokens=max(self.thinking_token_budget + 1, max_tokens),
**kwargs
)
except Exception as e:
logger.error(f"Error calling Anthropic: {str(e)}")
response = await self.anthropic_bedrock.call(
messages_copy,
**kwargs
)
return LLMResponse(
content=response.content[1].text,
raw_response=response,
usage=response.usage.model_dump(),
thinking=ThinkingBlock(thinking=response.content[0].thinking, signature=response.content[0].signature)
)
else:
response = await self.client.messages.create(
model=self.model,
messages=[msg.to_anthropic_format() for msg in messages_copy[1:]],
temperature=temperature,
max_tokens=max_tokens,
system=system_message.to_anthropic_format()["content"],
**kwargs
)
return LLMResponse(
content=response.content[0].text,
raw_response=response,
usage=response.usage.model_dump()
)
```
## /index/llm/providers/anthropic_bedrock.py
```py path="/index/llm/providers/anthropic_bedrock.py"
import logging
import os
from typing import List, Optional
import backoff
from anthropic import AsyncAnthropicBedrock
from dotenv import load_dotenv
from ..llm import BaseLLMProvider, LLMResponse, Message
load_dotenv()
logger = logging.getLogger(__name__)
class AnthropicBedrockProvider(BaseLLMProvider):
def __init__(self, model: str, enable_thinking: bool = True, thinking_token_budget: Optional[int] = 8192):
super().__init__(model=model)
self.client = AsyncAnthropicBedrock(
aws_access_key=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
aws_region=os.getenv('AWS_REGION'),
)
self.enable_thinking = enable_thinking
self.thinking_token_budget = thinking_token_budget
@backoff.on_exception( # noqa: F821
backoff.constant, # constant backoff
Exception, # retry on any exception
max_tries=3, # stop after 3 attempts
interval=10,
)
async def call(
self,
messages: List[Message],
temperature: float = 1,
max_tokens: Optional[int] = 2048,
**kwargs
) -> LLMResponse:
messages_copy = messages.copy()
if len(messages_copy) < 2 or messages_copy[0].role != "system":
raise ValueError("System message is required for Anthropic Bedrock and length of messages must be at least 2")
system_message = messages_copy[0]
try:
if self.enable_thinking:
response = await self.client.messages.create(
model=self.model,
system=system_message.to_anthropic_format(enable_cache_control=False)["content"],
messages=[msg.to_anthropic_format(enable_cache_control=False) for msg in messages_copy[1:]],
temperature=1,
thinking={
"type": "enabled",
"budget_tokens": self.thinking_token_budget,
},
max_tokens=max(self.thinking_token_budget + 1, max_tokens),
**kwargs
)
return LLMResponse(
content=response.content[1].text,
raw_response=response,
usage=response.usage
)
else:
response = await self.client.messages.create(
model=self.model,
messages=[msg.to_anthropic_format(enable_cache_control=False) for msg in messages_copy[1:]],
temperature=temperature,
max_tokens=max_tokens,
system=system_message.to_anthropic_format(enable_cache_control=False)["content"],
**kwargs
)
return LLMResponse(
content=response.content[0].text,
raw_response=response,
usage=response.usage
)
except Exception as e:
logger.error(f"Error calling Anthropic Bedrock: {str(e)}")
raise e
```
## /index/llm/providers/gemini.py
```py path="/index/llm/providers/gemini.py"
import logging
import os
from typing import List, Optional
import backoff
from google import genai
from ..llm import BaseLLMProvider, LLMResponse, Message
logger = logging.getLogger(__name__)
class GeminiProvider(BaseLLMProvider):
def __init__(self, model: str, thinking_token_budget: int = 8192):
super().__init__(model=model)
self.client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
self.thinking_token_budget = thinking_token_budget
@backoff.on_exception(
backoff.constant, # constant backoff
Exception, # retry on any exception
max_tries=3, # stop after 3 attempts
interval=0.5,
on_backoff=lambda details: logger.info(
f"API error, retrying in {details['wait']:.2f} seconds... (attempt {details['tries']})"
),
)
async def call(
self,
messages: List[Message],
temperature: float = 1.0,
max_tokens: Optional[int] = None,
**kwargs
) -> LLMResponse:
if len(messages) < 2 or messages[0].role != "system":
raise ValueError("System message is required and length of messages must be at least 2")
system = messages[0].content[0].text
gemini_messages = [msg.to_gemini_format() for msg in messages[1:]]
config = {
"temperature": temperature,
"thinking_config": {
"thinking_budget": self.thinking_token_budget
},
"system_instruction": {
"text": system
}
}
if max_tokens:
config["max_output_tokens"] = max_tokens
response = await self.client.aio.models.generate_content(
model=self.model,
contents=gemini_messages,
config=config,
)
# Extract usage information if available
usage = {}
if hasattr(response, "usage_metadata"):
usage = {
"prompt_tokens": getattr(response.usage_metadata, "prompt_token_count", 0),
"completion_tokens": getattr(response.usage_metadata, "candidates_token_count", 0),
"total_tokens": getattr(response.usage_metadata, "total_token_count", 0)
}
return LLMResponse(
content=response.text,
raw_response=response,
usage=usage
)
```
## /index/llm/providers/openai.py
```py path="/index/llm/providers/openai.py"
from typing import List, Optional
from openai import AsyncOpenAI
from ..llm import BaseLLMProvider, LLMResponse, Message
class OpenAIProvider(BaseLLMProvider):
def __init__(self, model: str, reasoning_effort: Optional[str] = "low"):
super().__init__(model=model)
self.client = AsyncOpenAI()
self.reasoning_effort = reasoning_effort
async def call(
self,
messages: List[Message],
temperature: float = 1.0,
) -> LLMResponse:
args = {
"temperature": temperature,
}
if self.model.startswith("o") and self.reasoning_effort:
args["reasoning_effort"] = self.reasoning_effort
args["temperature"] = 1
response = await self.client.chat.completions.create(
model=self.model,
messages=[msg.to_openai_format() for msg in messages],
**args
)
return LLMResponse(
content=response.choices[0].message.content,
raw_response=response,
usage={
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
)
```
## /pyproject.toml
```toml path="/pyproject.toml"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["index"]
[project]
name = "lmnr-index"
version = "0.1.9"
description = "Index - SOTA browser AI agent for autonomous task execution on the web"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"anthropic[bedrock]>=0.49.0",
"backoff>=2.2.1",
"lmnr[anthropic,openai]>=0.5.1",
"openai>=1.65.2",
"playwright>=1.50.0",
"tenacity>=9.0.0",
"pillow>=11.1.0",
"rich>=13.5.0",
"textual>=0.50.1",
"typer>=0.9.0",
"google-genai>=1.11.0",
]
[project.scripts]
index = "index.cli:main"
[tool.uv]
dev-dependencies = [
"pytest>=8.3.3"
]
[project.license]
file = "LICENSE"
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
addopts = "-v -ra -q"
```
## /static/traces.png
Binary file available at https://raw.githubusercontent.com/lmnr-ai/index/refs/heads/main/static/traces.png
The content has been capped at 50000 tokens, and files over NaN bytes have been omitted. The user could consider applying other filters to refine the result. The better and more specific the context, the better the LLM can follow instructions. If the context seems verbose, the user can refine the filter using uithub. Thank you for using https://uithub.com - Perfect LLM context for any GitHub repo.